Jelajahi Sumber

[🚀] cmap

bvbej 4 bulan lalu
induk
melakukan
d8e12bd4f7
3 mengubah file dengan 365 tambahan dan 3 penghapusan
  1. 2 1
      pkg/ants/pool.go
  2. 3 2
      pkg/auth/store.go
  3. 360 0
      pkg/cmap/cmap.go

+ 2 - 1
pkg/ants/pool.go

@@ -1,6 +1,7 @@
 package ants
 
 import (
+	"errors"
 	"fmt"
 	"git.bvbej.com/bvbej/base-golang/pkg/ticker"
 	"github.com/panjf2000/ants/v2"
@@ -83,7 +84,7 @@ func (p *goroutinePool) Submit(task func()) {
 		return
 	}
 	err := p.pool.Submit(task)
-	if err == ants.ErrPoolOverload {
+	if errors.Is(err, ants.ErrPoolOverload) {
 		p.pool.Tune(p.Size() + p.step)
 		p.Submit(task)
 	}

+ 3 - 2
pkg/auth/store.go

@@ -2,10 +2,11 @@ package auth
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"github.com/google/uuid"
 	jsonIterator "github.com/json-iterator/go"
-	redis "github.com/redis/go-redis/v9"
+	"github.com/redis/go-redis/v9"
 	"github.com/tidwall/buntdb"
 	"time"
 )
@@ -47,7 +48,7 @@ func (ts *buntStore) remove(key string) error {
 		_, err := tx.Delete(key)
 		return err
 	})
-	if err == buntdb.ErrNotFound {
+	if errors.Is(err, buntdb.ErrNotFound) {
 		return nil
 	}
 	return err

+ 360 - 0
pkg/cmap/cmap.go

@@ -0,0 +1,360 @@
+package cmap
+
+import (
+	"encoding/json"
+	"fmt"
+	"sync"
+)
+
+var ShardCount = 32
+
+type Stringer interface {
+	fmt.Stringer
+	comparable
+}
+
+// ConcurrentMap A "thread" safe map of type string:Anything.
+// To avoid lock bottlenecks this map is dived to several (ShardCount) map shards.
+type ConcurrentMap[K comparable, V any] struct {
+	shards   []*ConcurrentMapShared[K, V]
+	sharding func(key K) uint32
+}
+
+// ConcurrentMapShared A "thread" safe string to anything map.
+type ConcurrentMapShared[K comparable, V any] struct {
+	items        map[K]V
+	sync.RWMutex // Read Write mutex, guards access to internal map.
+}
+
+func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
+	m := ConcurrentMap[K, V]{
+		sharding: sharding,
+		shards:   make([]*ConcurrentMapShared[K, V], ShardCount),
+	}
+	for i := 0; i < ShardCount; i++ {
+		m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)}
+	}
+	return m
+}
+
+// New Creates a new concurrent map.
+func New[V any]() ConcurrentMap[string, V] {
+	return create[string, V](fnv32)
+}
+
+// NewStringer Creates a new concurrent map.
+func NewStringer[K Stringer, V any]() ConcurrentMap[K, V] {
+	return create[K, V](strfnv32[K])
+}
+
+// NewWithCustomShardingFunction Creates a new concurrent map.
+func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
+	return create[K, V](sharding)
+}
+
+// GetShard returns shard under given key
+func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
+	return m.shards[uint(m.sharding(key))%uint(ShardCount)]
+}
+
+func (m ConcurrentMap[K, V]) MSet(data map[K]V) {
+	for key, value := range data {
+		shard := m.GetShard(key)
+		shard.Lock()
+		shard.items[key] = value
+		shard.Unlock()
+	}
+}
+
+// Set Sets the given value under the specified key.
+func (m ConcurrentMap[K, V]) Set(key K, value V) {
+	// Get map shard.
+	shard := m.GetShard(key)
+	shard.Lock()
+	shard.items[key] = value
+	shard.Unlock()
+}
+
+// UpsertCb Callback to return new element to be inserted into the map
+// It is called while lock is held, therefore it MUST NOT
+// try to access other keys in same map, as it can lead to deadlock since
+// Go sync.RWLock is not reentrant
+type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V
+
+// Upsert Insert or Update - updates existing element or inserts a new one using UpsertCb
+func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V) {
+	shard := m.GetShard(key)
+	shard.Lock()
+	v, ok := shard.items[key]
+	res = cb(ok, v, value)
+	shard.items[key] = res
+	shard.Unlock()
+	return res
+}
+
+// SetIfAbsent Sets the given value under the specified key if no value was associated with it.
+func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool {
+	// Get map shard.
+	shard := m.GetShard(key)
+	shard.Lock()
+	_, ok := shard.items[key]
+	if !ok {
+		shard.items[key] = value
+	}
+	shard.Unlock()
+	return !ok
+}
+
+// Get retrieves an element from map under given key.
+func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
+	// Get shard
+	shard := m.GetShard(key)
+	shard.RLock()
+	// Get item from shard.
+	val, ok := shard.items[key]
+	shard.RUnlock()
+	return val, ok
+}
+
+// Count returns the number of elements within the map.
+func (m ConcurrentMap[K, V]) Count() int {
+	count := 0
+	for i := 0; i < ShardCount; i++ {
+		shard := m.shards[i]
+		shard.RLock()
+		count += len(shard.items)
+		shard.RUnlock()
+	}
+	return count
+}
+
+// Has Looks up an item under specified key
+func (m ConcurrentMap[K, V]) Has(key K) bool {
+	// Get shard
+	shard := m.GetShard(key)
+	shard.RLock()
+	// See if element is within shard.
+	_, ok := shard.items[key]
+	shard.RUnlock()
+	return ok
+}
+
+// Remove removes an element from the map.
+func (m ConcurrentMap[K, V]) Remove(key K) {
+	// Try to get shard.
+	shard := m.GetShard(key)
+	shard.Lock()
+	delete(shard.items, key)
+	shard.Unlock()
+}
+
+// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
+// If returns true, the element will be removed from the map
+type RemoveCb[K any, V any] func(key K, v V, exists bool) bool
+
+// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
+// If callback returns true and element exists, it will remove it from the map
+// Returns the value returned by the callback (even if element was not present in the map)
+func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool {
+	// Try to get shard.
+	shard := m.GetShard(key)
+	shard.Lock()
+	v, ok := shard.items[key]
+	remove := cb(key, v, ok)
+	if remove && ok {
+		delete(shard.items, key)
+	}
+	shard.Unlock()
+	return remove
+}
+
+// Pop removes an element from the map and returns it
+func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool) {
+	// Try to get shard.
+	shard := m.GetShard(key)
+	shard.Lock()
+	v, exists = shard.items[key]
+	delete(shard.items, key)
+	shard.Unlock()
+	return v, exists
+}
+
+// IsEmpty checks if map is empty.
+func (m ConcurrentMap[K, V]) IsEmpty() bool {
+	return m.Count() == 0
+}
+
+// Tuple Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
+type Tuple[K comparable, V any] struct {
+	Key K
+	Val V
+}
+
+// IterBuffered returns a buffered iterator which could be used in a for range loop.
+func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V] {
+	chans := snapshot(m)
+	total := 0
+	for _, c := range chans {
+		total += cap(c)
+	}
+	ch := make(chan Tuple[K, V], total)
+	go fanIn(chans, ch)
+	return ch
+}
+
+// Clear removes all items from map.
+func (m ConcurrentMap[K, V]) Clear() {
+	for item := range m.IterBuffered() {
+		m.Remove(item.Key)
+	}
+}
+
+// Returns an array of channels that contains elements in each shard,
+// which likely takes a snapshot of `m`.
+// It returns once the size of each buffered channel is determined,
+// before all the channels are populated using goroutines.
+func snapshot[K comparable, V any](m ConcurrentMap[K, V]) (chans []chan Tuple[K, V]) {
+	//When you access map items before initializing.
+	if len(m.shards) == 0 {
+		panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
+	}
+	chans = make([]chan Tuple[K, V], ShardCount)
+	wg := sync.WaitGroup{}
+	wg.Add(ShardCount)
+	// Foreach shard.
+	for index, shard := range m.shards {
+		go func(index int, shard *ConcurrentMapShared[K, V]) {
+			// Foreach key, value pair.
+			shard.RLock()
+			chans[index] = make(chan Tuple[K, V], len(shard.items))
+			wg.Done()
+			for key, val := range shard.items {
+				chans[index] <- Tuple[K, V]{key, val}
+			}
+			shard.RUnlock()
+			close(chans[index])
+		}(index, shard)
+	}
+	wg.Wait()
+	return chans
+}
+
+// fanIn reads elements from channels `chans` into channel `out`
+func fanIn[K comparable, V any](chans []chan Tuple[K, V], out chan Tuple[K, V]) {
+	wg := sync.WaitGroup{}
+	wg.Add(len(chans))
+	for _, ch := range chans {
+		go func(ch chan Tuple[K, V]) {
+			for t := range ch {
+				out <- t
+			}
+			wg.Done()
+		}(ch)
+	}
+	wg.Wait()
+	close(out)
+}
+
+// Items returns all items as map[string]V
+func (m ConcurrentMap[K, V]) Items() map[K]V {
+	tmp := make(map[K]V)
+
+	// Insert items to temporary map.
+	for item := range m.IterBuffered() {
+		tmp[item.Key] = item.Val
+	}
+
+	return tmp
+}
+
+// IterCb Iterator callbacalled for every key,value found in
+// maps. RLock is held for all calls for a given shard
+// therefore callback sess consistent view of a shard,
+// but not across the shards
+type IterCb[K comparable, V any] func(key K, v V)
+
+// IterCb Callback based iterator, cheapest way to read
+// all elements in a map.
+func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V]) {
+	for idx := range m.shards {
+		shard := (m.shards)[idx]
+		shard.RLock()
+		for key, value := range shard.items {
+			fn(key, value)
+		}
+		shard.RUnlock()
+	}
+}
+
+// Keys returns all keys as []string
+func (m ConcurrentMap[K, V]) Keys() []K {
+	count := m.Count()
+	ch := make(chan K, count)
+	go func() {
+		// Foreach shard.
+		wg := sync.WaitGroup{}
+		wg.Add(ShardCount)
+		for _, shard := range m.shards {
+			go func(shard *ConcurrentMapShared[K, V]) {
+				// Foreach key, value pair.
+				shard.RLock()
+				for key := range shard.items {
+					ch <- key
+				}
+				shard.RUnlock()
+				wg.Done()
+			}(shard)
+		}
+		wg.Wait()
+		close(ch)
+	}()
+
+	// Generate keys
+	keys := make([]K, 0, count)
+	for k := range ch {
+		keys = append(keys, k)
+	}
+	return keys
+}
+
+// MarshalJSON Reviles ConcurrentMap "private" variables to json marshal.
+func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error) {
+	// Create a temporary map, which will hold all item spread across shards.
+	tmp := make(map[K]V)
+
+	// Insert items to temporary map.
+	for item := range m.IterBuffered() {
+		tmp[item.Key] = item.Val
+	}
+	return json.Marshal(tmp)
+}
+func strfnv32[K fmt.Stringer](key K) uint32 {
+	return fnv32(key.String())
+}
+
+func fnv32(key string) uint32 {
+	hash := uint32(2166136261)
+	const prime32 = uint32(16777619)
+	keyLength := len(key)
+	for i := 0; i < keyLength; i++ {
+		hash *= prime32
+		hash ^= uint32(key[i])
+	}
+	return hash
+}
+
+// UnmarshalJSON Reverse process of Marshal.
+func (m *ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error) {
+	tmp := make(map[K]V)
+
+	// Unmarshal into a single map.
+	if err := json.Unmarshal(b, &tmp); err != nil {
+		return err
+	}
+
+	// foreach key,value pair in temporary map insert into our concurrent map.
+	for key, val := range tmp {
+		m.Set(key, val)
+	}
+	return nil
+}