redis.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. package cache
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "time"
  7. "git.bvbej.com/bvbej/base-golang/pkg/time_parse"
  8. "git.bvbej.com/bvbej/base-golang/pkg/trace"
  9. "github.com/redis/go-redis/v9"
  10. )
  11. type Option func(*option)
  12. type Trace = trace.T
  13. type option struct {
  14. Trace *trace.Trace
  15. Redis *trace.Redis
  16. }
  17. type RedisConfig struct {
  18. Addr string `yaml:"addr"`
  19. Pass string `yaml:"pass"`
  20. DB int `yaml:"db"`
  21. MaxRetries int `yaml:"maxRetries"` // 最大重试次数
  22. PoolSize int `yaml:"poolSize"` // Redis连接池大小
  23. MinIdleConn int `yaml:"minIdleConn"` // 最小空闲连接数
  24. }
  25. func newOption() *option {
  26. return &option{}
  27. }
  28. var _ Repo = (*cacheRepo)(nil)
  29. type Repo interface {
  30. i()
  31. Client() *redis.Client
  32. Set(key, value string, ttl time.Duration, options ...Option) error
  33. Get(key string, options ...Option) (string, error)
  34. TTL(key string) (time.Duration, error)
  35. Expire(key string, ttl time.Duration) bool
  36. ExpireAt(key string, ttl time.Time) bool
  37. Del(key string, options ...Option) bool
  38. Exists(keys ...string) bool
  39. Incr(key string, options ...Option) (int64, error)
  40. Decr(key string, options ...Option) (int64, error)
  41. HGet(key, field string, options ...Option) (string, error)
  42. HSet(key, field, value string, options ...Option) error
  43. HDel(key, field string, options ...Option) error
  44. HGetAll(key string, options ...Option) (map[string]string, error)
  45. HIncrBy(key, field string, incr int64, options ...Option) (int64, error)
  46. HIncrByFloat(key, field string, incr float64, options ...Option) (float64, error)
  47. LPush(key, value string, options ...Option) error
  48. LLen(key string, options ...Option) (int64, error)
  49. BRPop(key string, timeout time.Duration, options ...Option) (string, error)
  50. Close() error
  51. }
  52. type cacheRepo struct {
  53. client *redis.Client
  54. ctx context.Context
  55. }
  56. func New(cfg RedisConfig) (Repo, error) {
  57. client := redis.NewClient(&redis.Options{
  58. Addr: cfg.Addr,
  59. Password: cfg.Pass,
  60. DB: cfg.DB,
  61. MaxRetries: cfg.MaxRetries,
  62. PoolSize: cfg.PoolSize,
  63. MinIdleConns: cfg.MinIdleConn,
  64. })
  65. ctx := context.TODO()
  66. if err := client.Ping(ctx).Err(); err != nil {
  67. return nil, errors.Join(err, errors.New("ping redis err"))
  68. }
  69. return &cacheRepo{
  70. client: client,
  71. ctx: ctx,
  72. }, nil
  73. }
  74. func WithTrace(t Trace) Option {
  75. return func(opt *option) {
  76. if t != nil {
  77. opt.Trace = t.(*trace.Trace)
  78. opt.Redis = new(trace.Redis)
  79. }
  80. }
  81. }
  82. func (c *cacheRepo) i() {}
  83. func (c *cacheRepo) Client() *redis.Client {
  84. return c.client
  85. }
  86. func (c *cacheRepo) Set(key, value string, ttl time.Duration, options ...Option) error {
  87. ts := time.Now()
  88. opt := newOption()
  89. defer func() {
  90. if opt.Trace != nil {
  91. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  92. opt.Redis.Handle = "set"
  93. opt.Redis.Key = key
  94. opt.Redis.Value = value
  95. opt.Redis.TTL = ttl.Minutes()
  96. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  97. opt.Trace.AppendRedis(opt.Redis)
  98. }
  99. }()
  100. for _, f := range options {
  101. f(opt)
  102. }
  103. if err := c.client.Set(c.ctx, key, value, ttl).Err(); err != nil {
  104. return errors.Join(err, fmt.Errorf("redis set key: %s err", key))
  105. }
  106. return nil
  107. }
  108. func (c *cacheRepo) Get(key string, options ...Option) (string, error) {
  109. ts := time.Now()
  110. opt := newOption()
  111. defer func() {
  112. if opt.Trace != nil {
  113. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  114. opt.Redis.Handle = "get"
  115. opt.Redis.Key = key
  116. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  117. opt.Trace.AppendRedis(opt.Redis)
  118. }
  119. }()
  120. for _, f := range options {
  121. f(opt)
  122. }
  123. value, err := c.client.Get(c.ctx, key).Result()
  124. if err != nil {
  125. return "", errors.Join(err, fmt.Errorf("redis get key: %s err", key))
  126. }
  127. return value, nil
  128. }
  129. func (c *cacheRepo) TTL(key string) (time.Duration, error) {
  130. ttl, err := c.client.TTL(c.ctx, key).Result()
  131. if err != nil {
  132. return -1, errors.Join(err, fmt.Errorf("redis get key: %s err", key))
  133. }
  134. return ttl, nil
  135. }
  136. func (c *cacheRepo) Expire(key string, ttl time.Duration) bool {
  137. ok, _ := c.client.Expire(c.ctx, key, ttl).Result()
  138. return ok
  139. }
  140. func (c *cacheRepo) ExpireAt(key string, ttl time.Time) bool {
  141. ok, _ := c.client.ExpireAt(c.ctx, key, ttl).Result()
  142. return ok
  143. }
  144. func (c *cacheRepo) Exists(keys ...string) bool {
  145. if len(keys) == 0 {
  146. return true
  147. }
  148. value, _ := c.client.Exists(c.ctx, keys...).Result()
  149. return value > 0
  150. }
  151. func (c *cacheRepo) Del(key string, options ...Option) bool {
  152. ts := time.Now()
  153. opt := newOption()
  154. defer func() {
  155. if opt.Trace != nil {
  156. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  157. opt.Redis.Handle = "del"
  158. opt.Redis.Key = key
  159. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  160. opt.Trace.AppendRedis(opt.Redis)
  161. }
  162. }()
  163. for _, f := range options {
  164. f(opt)
  165. }
  166. if key == "" {
  167. return true
  168. }
  169. value, _ := c.client.Del(c.ctx, key).Result()
  170. return value > 0
  171. }
  172. func (c *cacheRepo) Incr(key string, options ...Option) (int64, error) {
  173. ts := time.Now()
  174. opt := newOption()
  175. defer func() {
  176. if opt.Trace != nil {
  177. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  178. opt.Redis.Handle = "incr"
  179. opt.Redis.Key = key
  180. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  181. opt.Trace.AppendRedis(opt.Redis)
  182. }
  183. }()
  184. for _, f := range options {
  185. f(opt)
  186. }
  187. value, err := c.client.Incr(c.ctx, key).Result()
  188. if err != nil {
  189. return 0, errors.Join(err, fmt.Errorf("redis incr key: %s err", key))
  190. }
  191. return value, nil
  192. }
  193. func (c *cacheRepo) Decr(key string, options ...Option) (int64, error) {
  194. ts := time.Now()
  195. opt := newOption()
  196. defer func() {
  197. if opt.Trace != nil {
  198. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  199. opt.Redis.Handle = "decr"
  200. opt.Redis.Key = key
  201. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  202. opt.Trace.AppendRedis(opt.Redis)
  203. }
  204. }()
  205. for _, f := range options {
  206. f(opt)
  207. }
  208. value, err := c.client.Decr(c.ctx, key).Result()
  209. if err != nil {
  210. return 0, errors.Join(err, fmt.Errorf("redis decr key: %s err", key))
  211. }
  212. return value, nil
  213. }
  214. func (c *cacheRepo) HGet(key, field string, options ...Option) (string, error) {
  215. ts := time.Now()
  216. opt := newOption()
  217. defer func() {
  218. if opt.Trace != nil {
  219. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  220. opt.Redis.Handle = "hash get"
  221. opt.Redis.Key = key
  222. opt.Redis.Value = field
  223. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  224. opt.Trace.AppendRedis(opt.Redis)
  225. }
  226. }()
  227. for _, f := range options {
  228. f(opt)
  229. }
  230. value, err := c.client.HGet(c.ctx, key, field).Result()
  231. if err != nil {
  232. return "", errors.Join(err, fmt.Errorf("redis hget key: %s field: %s err", key, field))
  233. }
  234. return value, nil
  235. }
  236. func (c *cacheRepo) HSet(key, field, value string, options ...Option) error {
  237. ts := time.Now()
  238. opt := newOption()
  239. defer func() {
  240. if opt.Trace != nil {
  241. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  242. opt.Redis.Handle = "hash set"
  243. opt.Redis.Key = key
  244. opt.Redis.Value = field + "/" + value
  245. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  246. opt.Trace.AppendRedis(opt.Redis)
  247. }
  248. }()
  249. for _, f := range options {
  250. f(opt)
  251. }
  252. if err := c.client.HSet(c.ctx, key, field, value).Err(); err != nil {
  253. return errors.Join(err, fmt.Errorf("redis hset key: %s field: %s err", key, field))
  254. }
  255. return nil
  256. }
  257. func (c *cacheRepo) HDel(key, field string, options ...Option) error {
  258. ts := time.Now()
  259. opt := newOption()
  260. defer func() {
  261. if opt.Trace != nil {
  262. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  263. opt.Redis.Handle = "hash del"
  264. opt.Redis.Key = key
  265. opt.Redis.Value = field
  266. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  267. opt.Trace.AppendRedis(opt.Redis)
  268. }
  269. }()
  270. for _, f := range options {
  271. f(opt)
  272. }
  273. if err := c.client.HDel(c.ctx, key, field).Err(); err != nil {
  274. return errors.Join(err, fmt.Errorf("redis hdel key: %s field: %s err", key, field))
  275. }
  276. return nil
  277. }
  278. func (c *cacheRepo) HGetAll(key string, options ...Option) (map[string]string, error) {
  279. ts := time.Now()
  280. opt := newOption()
  281. defer func() {
  282. if opt.Trace != nil {
  283. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  284. opt.Redis.Handle = "hash get all"
  285. opt.Redis.Key = key
  286. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  287. opt.Trace.AppendRedis(opt.Redis)
  288. }
  289. }()
  290. for _, f := range options {
  291. f(opt)
  292. }
  293. value, err := c.client.HGetAll(c.ctx, key).Result()
  294. if err != nil {
  295. return nil, errors.Join(err, fmt.Errorf("redis hget all key: %s err", key))
  296. }
  297. return value, nil
  298. }
  299. func (c *cacheRepo) HIncrBy(key, field string, incr int64, options ...Option) (int64, error) {
  300. ts := time.Now()
  301. opt := newOption()
  302. defer func() {
  303. if opt.Trace != nil {
  304. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  305. opt.Redis.Handle = "hash incr int64"
  306. opt.Redis.Key = key
  307. opt.Redis.Value = fmt.Sprintf("field:%s incr:%d", field, incr)
  308. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  309. opt.Trace.AppendRedis(opt.Redis)
  310. }
  311. }()
  312. for _, f := range options {
  313. f(opt)
  314. }
  315. value, err := c.client.HIncrBy(c.ctx, key, field, incr).Result()
  316. if err != nil {
  317. return 0, errors.Join(err, fmt.Errorf("redis hash incr int64 key: %s err", key))
  318. }
  319. return value, nil
  320. }
  321. func (c *cacheRepo) HIncrByFloat(key, field string, incr float64, options ...Option) (float64, error) {
  322. ts := time.Now()
  323. opt := newOption()
  324. defer func() {
  325. if opt.Trace != nil {
  326. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  327. opt.Redis.Handle = "hash incr float64"
  328. opt.Redis.Key = key
  329. opt.Redis.Value = fmt.Sprintf("field:%s incr:%d", field, incr)
  330. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  331. opt.Trace.AppendRedis(opt.Redis)
  332. }
  333. }()
  334. for _, f := range options {
  335. f(opt)
  336. }
  337. value, err := c.client.HIncrByFloat(c.ctx, key, field, incr).Result()
  338. if err != nil {
  339. return 0, errors.Join(err, fmt.Errorf("redis hash incr float64 key: %s err", key))
  340. }
  341. return value, nil
  342. }
  343. func (c *cacheRepo) LPush(key, value string, options ...Option) error {
  344. ts := time.Now()
  345. opt := newOption()
  346. defer func() {
  347. if opt.Trace != nil {
  348. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  349. opt.Redis.Handle = "list push"
  350. opt.Redis.Key = key
  351. opt.Redis.Value = value
  352. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  353. opt.Trace.AppendRedis(opt.Redis)
  354. }
  355. }()
  356. for _, f := range options {
  357. f(opt)
  358. }
  359. _, err := c.client.LPush(c.ctx, key, value).Result()
  360. if err != nil {
  361. return errors.Join(err, fmt.Errorf("redis list push key: %s value: %s err", key, value))
  362. }
  363. return nil
  364. }
  365. func (c *cacheRepo) LLen(key string, options ...Option) (int64, error) {
  366. ts := time.Now()
  367. opt := newOption()
  368. defer func() {
  369. if opt.Trace != nil {
  370. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  371. opt.Redis.Handle = "list len"
  372. opt.Redis.Key = key
  373. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  374. opt.Trace.AppendRedis(opt.Redis)
  375. }
  376. }()
  377. for _, f := range options {
  378. f(opt)
  379. }
  380. value, err := c.client.LLen(c.ctx, key).Result()
  381. if err != nil {
  382. return 0, errors.Join(err, fmt.Errorf("redis list len key: %s err", key))
  383. }
  384. return value, nil
  385. }
  386. func (c *cacheRepo) BRPop(key string, timeout time.Duration, options ...Option) (string, error) {
  387. ts := time.Now()
  388. opt := newOption()
  389. defer func() {
  390. if opt.Trace != nil {
  391. opt.Redis.Timestamp = time_parse.CSTLayoutString()
  392. opt.Redis.Handle = "list brpop"
  393. opt.Redis.Key = key
  394. opt.Redis.TTL = timeout.Seconds()
  395. opt.Redis.CostSeconds = time.Since(ts).Seconds()
  396. opt.Trace.AppendRedis(opt.Redis)
  397. }
  398. }()
  399. for _, f := range options {
  400. f(opt)
  401. }
  402. value, err := c.client.BRPop(c.ctx, timeout, key).Result()
  403. if err != nil {
  404. return "", errors.Join(err, fmt.Errorf("redis list len key: %s err", key))
  405. }
  406. return value[1], nil
  407. }
  408. func (c *cacheRepo) Close() error {
  409. return c.client.Close()
  410. }