cmap.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. package cmap
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "sync"
  6. )
  7. var ShardCount = 32
  8. type Stringer interface {
  9. fmt.Stringer
  10. comparable
  11. }
  12. // ConcurrentMap A "thread" safe map of type string:Anything.
  13. // To avoid lock bottlenecks this map is dived to several (ShardCount) map shards.
  14. type ConcurrentMap[K comparable, V any] struct {
  15. shards []*ConcurrentMapShared[K, V]
  16. sharding func(key K) uint32
  17. }
  18. // ConcurrentMapShared A "thread" safe string to anything map.
  19. type ConcurrentMapShared[K comparable, V any] struct {
  20. items map[K]V
  21. sync.RWMutex // Read Write mutex, guards access to internal map.
  22. }
  23. func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
  24. m := ConcurrentMap[K, V]{
  25. sharding: sharding,
  26. shards: make([]*ConcurrentMapShared[K, V], ShardCount),
  27. }
  28. for i := 0; i < ShardCount; i++ {
  29. m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)}
  30. }
  31. return m
  32. }
  33. // New Creates a new concurrent map.
  34. func New[V any]() ConcurrentMap[string, V] {
  35. return create[string, V](fnv32)
  36. }
  37. // NewStringer Creates a new concurrent map.
  38. func NewStringer[K Stringer, V any]() ConcurrentMap[K, V] {
  39. return create[K, V](strfnv32[K])
  40. }
  41. // NewWithCustomShardingFunction Creates a new concurrent map.
  42. func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
  43. return create[K, V](sharding)
  44. }
  45. // GetShard returns shard under given key
  46. func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
  47. return m.shards[uint(m.sharding(key))%uint(ShardCount)]
  48. }
  49. func (m ConcurrentMap[K, V]) MSet(data map[K]V) {
  50. for key, value := range data {
  51. shard := m.GetShard(key)
  52. shard.Lock()
  53. shard.items[key] = value
  54. shard.Unlock()
  55. }
  56. }
  57. // Set Sets the given value under the specified key.
  58. func (m ConcurrentMap[K, V]) Set(key K, value V) {
  59. // Get map shard.
  60. shard := m.GetShard(key)
  61. shard.Lock()
  62. shard.items[key] = value
  63. shard.Unlock()
  64. }
  65. // UpsertCb Callback to return new element to be inserted into the map
  66. // It is called while lock is held, therefore it MUST NOT
  67. // try to access other keys in same map, as it can lead to deadlock since
  68. // Go sync.RWLock is not reentrant
  69. type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V
  70. // Upsert Insert or Update - updates existing element or inserts a new one using UpsertCb
  71. func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V) {
  72. shard := m.GetShard(key)
  73. shard.Lock()
  74. v, ok := shard.items[key]
  75. res = cb(ok, v, value)
  76. shard.items[key] = res
  77. shard.Unlock()
  78. return res
  79. }
  80. // SetIfAbsent Sets the given value under the specified key if no value was associated with it.
  81. func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool {
  82. // Get map shard.
  83. shard := m.GetShard(key)
  84. shard.Lock()
  85. _, ok := shard.items[key]
  86. if !ok {
  87. shard.items[key] = value
  88. }
  89. shard.Unlock()
  90. return !ok
  91. }
  92. // Get retrieves an element from map under given key.
  93. func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
  94. // Get shard
  95. shard := m.GetShard(key)
  96. shard.RLock()
  97. // Get item from shard.
  98. val, ok := shard.items[key]
  99. shard.RUnlock()
  100. return val, ok
  101. }
  102. // Count returns the number of elements within the map.
  103. func (m ConcurrentMap[K, V]) Count() int {
  104. count := 0
  105. for i := 0; i < ShardCount; i++ {
  106. shard := m.shards[i]
  107. shard.RLock()
  108. count += len(shard.items)
  109. shard.RUnlock()
  110. }
  111. return count
  112. }
  113. // Has Looks up an item under specified key
  114. func (m ConcurrentMap[K, V]) Has(key K) bool {
  115. // Get shard
  116. shard := m.GetShard(key)
  117. shard.RLock()
  118. // See if element is within shard.
  119. _, ok := shard.items[key]
  120. shard.RUnlock()
  121. return ok
  122. }
  123. // Remove removes an element from the map.
  124. func (m ConcurrentMap[K, V]) Remove(key K) {
  125. // Try to get shard.
  126. shard := m.GetShard(key)
  127. shard.Lock()
  128. delete(shard.items, key)
  129. shard.Unlock()
  130. }
  131. // RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
  132. // If returns true, the element will be removed from the map
  133. type RemoveCb[K any, V any] func(key K, v V, exists bool) bool
  134. // RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
  135. // If callback returns true and element exists, it will remove it from the map
  136. // Returns the value returned by the callback (even if element was not present in the map)
  137. func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool {
  138. // Try to get shard.
  139. shard := m.GetShard(key)
  140. shard.Lock()
  141. v, ok := shard.items[key]
  142. remove := cb(key, v, ok)
  143. if remove && ok {
  144. delete(shard.items, key)
  145. }
  146. shard.Unlock()
  147. return remove
  148. }
  149. // Pop removes an element from the map and returns it
  150. func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool) {
  151. // Try to get shard.
  152. shard := m.GetShard(key)
  153. shard.Lock()
  154. v, exists = shard.items[key]
  155. delete(shard.items, key)
  156. shard.Unlock()
  157. return v, exists
  158. }
  159. // IsEmpty checks if map is empty.
  160. func (m ConcurrentMap[K, V]) IsEmpty() bool {
  161. return m.Count() == 0
  162. }
  163. // Tuple Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
  164. type Tuple[K comparable, V any] struct {
  165. Key K
  166. Val V
  167. }
  168. // IterBuffered returns a buffered iterator which could be used in a for range loop.
  169. func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V] {
  170. chans := snapshot(m)
  171. total := 0
  172. for _, c := range chans {
  173. total += cap(c)
  174. }
  175. ch := make(chan Tuple[K, V], total)
  176. go fanIn(chans, ch)
  177. return ch
  178. }
  179. // Clear removes all items from map.
  180. func (m ConcurrentMap[K, V]) Clear() {
  181. for item := range m.IterBuffered() {
  182. m.Remove(item.Key)
  183. }
  184. }
  185. // Returns an array of channels that contains elements in each shard,
  186. // which likely takes a snapshot of `m`.
  187. // It returns once the size of each buffered channel is determined,
  188. // before all the channels are populated using goroutines.
  189. func snapshot[K comparable, V any](m ConcurrentMap[K, V]) (chans []chan Tuple[K, V]) {
  190. //When you access map items before initializing.
  191. if len(m.shards) == 0 {
  192. panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
  193. }
  194. chans = make([]chan Tuple[K, V], ShardCount)
  195. wg := sync.WaitGroup{}
  196. wg.Add(ShardCount)
  197. // Foreach shard.
  198. for index, shard := range m.shards {
  199. go func(index int, shard *ConcurrentMapShared[K, V]) {
  200. // Foreach key, value pair.
  201. shard.RLock()
  202. chans[index] = make(chan Tuple[K, V], len(shard.items))
  203. wg.Done()
  204. for key, val := range shard.items {
  205. chans[index] <- Tuple[K, V]{key, val}
  206. }
  207. shard.RUnlock()
  208. close(chans[index])
  209. }(index, shard)
  210. }
  211. wg.Wait()
  212. return chans
  213. }
  214. // fanIn reads elements from channels `chans` into channel `out`
  215. func fanIn[K comparable, V any](chans []chan Tuple[K, V], out chan Tuple[K, V]) {
  216. wg := sync.WaitGroup{}
  217. wg.Add(len(chans))
  218. for _, ch := range chans {
  219. go func(ch chan Tuple[K, V]) {
  220. for t := range ch {
  221. out <- t
  222. }
  223. wg.Done()
  224. }(ch)
  225. }
  226. wg.Wait()
  227. close(out)
  228. }
  229. // Items returns all items as map[string]V
  230. func (m ConcurrentMap[K, V]) Items() map[K]V {
  231. tmp := make(map[K]V)
  232. // Insert items to temporary map.
  233. for item := range m.IterBuffered() {
  234. tmp[item.Key] = item.Val
  235. }
  236. return tmp
  237. }
  238. // IterCb Iterator callbacalled for every key,value found in
  239. // maps. RLock is held for all calls for a given shard
  240. // therefore callback sess consistent view of a shard,
  241. // but not across the shards
  242. type IterCb[K comparable, V any] func(key K, v V)
  243. // IterCb Callback based iterator, cheapest way to read
  244. // all elements in a map.
  245. func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V]) {
  246. for idx := range m.shards {
  247. shard := (m.shards)[idx]
  248. shard.RLock()
  249. for key, value := range shard.items {
  250. fn(key, value)
  251. }
  252. shard.RUnlock()
  253. }
  254. }
  255. // Keys returns all keys as []string
  256. func (m ConcurrentMap[K, V]) Keys() []K {
  257. count := m.Count()
  258. ch := make(chan K, count)
  259. go func() {
  260. // Foreach shard.
  261. wg := sync.WaitGroup{}
  262. wg.Add(ShardCount)
  263. for _, shard := range m.shards {
  264. go func(shard *ConcurrentMapShared[K, V]) {
  265. // Foreach key, value pair.
  266. shard.RLock()
  267. for key := range shard.items {
  268. ch <- key
  269. }
  270. shard.RUnlock()
  271. wg.Done()
  272. }(shard)
  273. }
  274. wg.Wait()
  275. close(ch)
  276. }()
  277. // Generate keys
  278. keys := make([]K, 0, count)
  279. for k := range ch {
  280. keys = append(keys, k)
  281. }
  282. return keys
  283. }
  284. // MarshalJSON Reviles ConcurrentMap "private" variables to json marshal.
  285. func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error) {
  286. // Create a temporary map, which will hold all item spread across shards.
  287. tmp := make(map[K]V)
  288. // Insert items to temporary map.
  289. for item := range m.IterBuffered() {
  290. tmp[item.Key] = item.Val
  291. }
  292. return json.Marshal(tmp)
  293. }
  294. func strfnv32[K fmt.Stringer](key K) uint32 {
  295. return fnv32(key.String())
  296. }
  297. func fnv32(key string) uint32 {
  298. hash := uint32(2166136261)
  299. const prime32 = uint32(16777619)
  300. keyLength := len(key)
  301. for i := 0; i < keyLength; i++ {
  302. hash *= prime32
  303. hash ^= uint32(key[i])
  304. }
  305. return hash
  306. }
  307. // UnmarshalJSON Reverse process of Marshal.
  308. func (m *ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error) {
  309. tmp := make(map[K]V)
  310. // Unmarshal into a single map.
  311. if err := json.Unmarshal(b, &tmp); err != nil {
  312. return err
  313. }
  314. // foreach key,value pair in temporary map insert into our concurrent map.
  315. for key, val := range tmp {
  316. m.Set(key, val)
  317. }
  318. return nil
  319. }