123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360 |
- package cmap
- import (
- "encoding/json"
- "fmt"
- "sync"
- )
- var ShardCount = 32
- type Stringer interface {
- fmt.Stringer
- comparable
- }
- // ConcurrentMap A "thread" safe map of type string:Anything.
- // To avoid lock bottlenecks this map is dived to several (ShardCount) map shards.
- type ConcurrentMap[K comparable, V any] struct {
- shards []*ConcurrentMapShared[K, V]
- sharding func(key K) uint32
- }
- // ConcurrentMapShared A "thread" safe string to anything map.
- type ConcurrentMapShared[K comparable, V any] struct {
- items map[K]V
- sync.RWMutex // Read Write mutex, guards access to internal map.
- }
- func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
- m := ConcurrentMap[K, V]{
- sharding: sharding,
- shards: make([]*ConcurrentMapShared[K, V], ShardCount),
- }
- for i := 0; i < ShardCount; i++ {
- m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)}
- }
- return m
- }
- // New Creates a new concurrent map.
- func New[V any]() ConcurrentMap[string, V] {
- return create[string, V](fnv32)
- }
- // NewStringer Creates a new concurrent map.
- func NewStringer[K Stringer, V any]() ConcurrentMap[K, V] {
- return create[K, V](strfnv32[K])
- }
- // NewWithCustomShardingFunction Creates a new concurrent map.
- func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
- return create[K, V](sharding)
- }
- // GetShard returns shard under given key
- func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
- return m.shards[uint(m.sharding(key))%uint(ShardCount)]
- }
- func (m ConcurrentMap[K, V]) MSet(data map[K]V) {
- for key, value := range data {
- shard := m.GetShard(key)
- shard.Lock()
- shard.items[key] = value
- shard.Unlock()
- }
- }
- // Set Sets the given value under the specified key.
- func (m ConcurrentMap[K, V]) Set(key K, value V) {
- // Get map shard.
- shard := m.GetShard(key)
- shard.Lock()
- shard.items[key] = value
- shard.Unlock()
- }
- // UpsertCb Callback to return new element to be inserted into the map
- // It is called while lock is held, therefore it MUST NOT
- // try to access other keys in same map, as it can lead to deadlock since
- // Go sync.RWLock is not reentrant
- type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V
- // Upsert Insert or Update - updates existing element or inserts a new one using UpsertCb
- func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V) {
- shard := m.GetShard(key)
- shard.Lock()
- v, ok := shard.items[key]
- res = cb(ok, v, value)
- shard.items[key] = res
- shard.Unlock()
- return res
- }
- // SetIfAbsent Sets the given value under the specified key if no value was associated with it.
- func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool {
- // Get map shard.
- shard := m.GetShard(key)
- shard.Lock()
- _, ok := shard.items[key]
- if !ok {
- shard.items[key] = value
- }
- shard.Unlock()
- return !ok
- }
- // Get retrieves an element from map under given key.
- func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
- // Get shard
- shard := m.GetShard(key)
- shard.RLock()
- // Get item from shard.
- val, ok := shard.items[key]
- shard.RUnlock()
- return val, ok
- }
- // Count returns the number of elements within the map.
- func (m ConcurrentMap[K, V]) Count() int {
- count := 0
- for i := 0; i < ShardCount; i++ {
- shard := m.shards[i]
- shard.RLock()
- count += len(shard.items)
- shard.RUnlock()
- }
- return count
- }
- // Has Looks up an item under specified key
- func (m ConcurrentMap[K, V]) Has(key K) bool {
- // Get shard
- shard := m.GetShard(key)
- shard.RLock()
- // See if element is within shard.
- _, ok := shard.items[key]
- shard.RUnlock()
- return ok
- }
- // Remove removes an element from the map.
- func (m ConcurrentMap[K, V]) Remove(key K) {
- // Try to get shard.
- shard := m.GetShard(key)
- shard.Lock()
- delete(shard.items, key)
- shard.Unlock()
- }
- // RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
- // If returns true, the element will be removed from the map
- type RemoveCb[K any, V any] func(key K, v V, exists bool) bool
- // RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
- // If callback returns true and element exists, it will remove it from the map
- // Returns the value returned by the callback (even if element was not present in the map)
- func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool {
- // Try to get shard.
- shard := m.GetShard(key)
- shard.Lock()
- v, ok := shard.items[key]
- remove := cb(key, v, ok)
- if remove && ok {
- delete(shard.items, key)
- }
- shard.Unlock()
- return remove
- }
- // Pop removes an element from the map and returns it
- func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool) {
- // Try to get shard.
- shard := m.GetShard(key)
- shard.Lock()
- v, exists = shard.items[key]
- delete(shard.items, key)
- shard.Unlock()
- return v, exists
- }
- // IsEmpty checks if map is empty.
- func (m ConcurrentMap[K, V]) IsEmpty() bool {
- return m.Count() == 0
- }
- // Tuple Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
- type Tuple[K comparable, V any] struct {
- Key K
- Val V
- }
- // IterBuffered returns a buffered iterator which could be used in a for range loop.
- func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V] {
- chans := snapshot(m)
- total := 0
- for _, c := range chans {
- total += cap(c)
- }
- ch := make(chan Tuple[K, V], total)
- go fanIn(chans, ch)
- return ch
- }
- // Clear removes all items from map.
- func (m ConcurrentMap[K, V]) Clear() {
- for item := range m.IterBuffered() {
- m.Remove(item.Key)
- }
- }
- // Returns an array of channels that contains elements in each shard,
- // which likely takes a snapshot of `m`.
- // It returns once the size of each buffered channel is determined,
- // before all the channels are populated using goroutines.
- func snapshot[K comparable, V any](m ConcurrentMap[K, V]) (chans []chan Tuple[K, V]) {
- //When you access map items before initializing.
- if len(m.shards) == 0 {
- panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
- }
- chans = make([]chan Tuple[K, V], ShardCount)
- wg := sync.WaitGroup{}
- wg.Add(ShardCount)
- // Foreach shard.
- for index, shard := range m.shards {
- go func(index int, shard *ConcurrentMapShared[K, V]) {
- // Foreach key, value pair.
- shard.RLock()
- chans[index] = make(chan Tuple[K, V], len(shard.items))
- wg.Done()
- for key, val := range shard.items {
- chans[index] <- Tuple[K, V]{key, val}
- }
- shard.RUnlock()
- close(chans[index])
- }(index, shard)
- }
- wg.Wait()
- return chans
- }
- // fanIn reads elements from channels `chans` into channel `out`
- func fanIn[K comparable, V any](chans []chan Tuple[K, V], out chan Tuple[K, V]) {
- wg := sync.WaitGroup{}
- wg.Add(len(chans))
- for _, ch := range chans {
- go func(ch chan Tuple[K, V]) {
- for t := range ch {
- out <- t
- }
- wg.Done()
- }(ch)
- }
- wg.Wait()
- close(out)
- }
- // Items returns all items as map[string]V
- func (m ConcurrentMap[K, V]) Items() map[K]V {
- tmp := make(map[K]V)
- // Insert items to temporary map.
- for item := range m.IterBuffered() {
- tmp[item.Key] = item.Val
- }
- return tmp
- }
- // IterCb Iterator callbacalled for every key,value found in
- // maps. RLock is held for all calls for a given shard
- // therefore callback sess consistent view of a shard,
- // but not across the shards
- type IterCb[K comparable, V any] func(key K, v V)
- // IterCb Callback based iterator, cheapest way to read
- // all elements in a map.
- func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V]) {
- for idx := range m.shards {
- shard := (m.shards)[idx]
- shard.RLock()
- for key, value := range shard.items {
- fn(key, value)
- }
- shard.RUnlock()
- }
- }
- // Keys returns all keys as []string
- func (m ConcurrentMap[K, V]) Keys() []K {
- count := m.Count()
- ch := make(chan K, count)
- go func() {
- // Foreach shard.
- wg := sync.WaitGroup{}
- wg.Add(ShardCount)
- for _, shard := range m.shards {
- go func(shard *ConcurrentMapShared[K, V]) {
- // Foreach key, value pair.
- shard.RLock()
- for key := range shard.items {
- ch <- key
- }
- shard.RUnlock()
- wg.Done()
- }(shard)
- }
- wg.Wait()
- close(ch)
- }()
- // Generate keys
- keys := make([]K, 0, count)
- for k := range ch {
- keys = append(keys, k)
- }
- return keys
- }
- // MarshalJSON Reviles ConcurrentMap "private" variables to json marshal.
- func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error) {
- // Create a temporary map, which will hold all item spread across shards.
- tmp := make(map[K]V)
- // Insert items to temporary map.
- for item := range m.IterBuffered() {
- tmp[item.Key] = item.Val
- }
- return json.Marshal(tmp)
- }
- func strfnv32[K fmt.Stringer](key K) uint32 {
- return fnv32(key.String())
- }
- func fnv32(key string) uint32 {
- hash := uint32(2166136261)
- const prime32 = uint32(16777619)
- keyLength := len(key)
- for i := 0; i < keyLength; i++ {
- hash *= prime32
- hash ^= uint32(key[i])
- }
- return hash
- }
- // UnmarshalJSON Reverse process of Marshal.
- func (m *ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error) {
- tmp := make(map[K]V)
- // Unmarshal into a single map.
- if err := json.Unmarshal(b, &tmp); err != nil {
- return err
- }
- // foreach key,value pair in temporary map insert into our concurrent map.
- for key, val := range tmp {
- m.Set(key, val)
- }
- return nil
- }
|