You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

165 lines
4.9 KiB

  1. package main
  2. import (
  3. "context"
  4. "log"
  5. "os"
  6. "os/signal"
  7. "syscall"
  8. "time"
  9. "github.com/BurntSushi/toml"
  10. "github.com/go-redis/redis/v8"
  11. )
  12. const BatchSize = 10000
  13. var RedisClients map[string]*redis.Client
  14. type Config struct {
  15. RedisServerConfigs map[string]RedisServerConfig `toml:"servers"`
  16. ShovelConfigs []ShovelConfig `toml:"shovels"`
  17. }
  18. type ShovelConfig struct {
  19. Key string `toml:"key"`
  20. Src string `toml:"src"`
  21. Dst string `toml:"dst"`
  22. DstKey string `toml:"dstkey"`
  23. }
  24. type RedisServerConfig struct {
  25. Network string `toml:"network"`
  26. Addr string `toml:"addr"`
  27. Username string `toml:"username"`
  28. Password string `toml:"password"`
  29. DB int `toml:"db"`
  30. MaxRetries int `toml:"maxretries"`
  31. MinRetryBackoff float64 `toml:"minretrybackoff"`
  32. MaxRetryBackoff float64 `toml:"maxretrybackoff"`
  33. DialTimeout float64 `toml:"dialtimeout"`
  34. ReadTimeout float64 `toml:"readtimeout"`
  35. WriteTimeout float64 `toml:"writetimeout"`
  36. PoolFIFO bool `toml:"poolfifo"`
  37. PoolSize int `toml:"poolsize"`
  38. MinIdleConns int `toml:"minidleconns"`
  39. MaxConnAge float64 `toml:"maxconnage"`
  40. PoolTimeout float64 `toml:"pooltimeout"`
  41. IdleTimeout float64 `toml:"idletimeout"`
  42. IdleCheckFrequency float64 `toml:"idlecheckfrequency"`
  43. }
  44. func RedisConfigToRedisOptions(config RedisServerConfig) *redis.Options {
  45. nano := float64(time.Second.Nanoseconds())
  46. if config.ReadTimeout == 0 {
  47. config.ReadTimeout = 15 * time.Minute.Seconds()
  48. }
  49. return &redis.Options{
  50. Network: config.Network,
  51. Addr: config.Addr,
  52. Username: config.Username,
  53. Password: config.Password,
  54. DB: config.DB,
  55. MaxRetries: config.MaxRetries,
  56. MinRetryBackoff: time.Duration(config.MinRetryBackoff * nano),
  57. MaxRetryBackoff: time.Duration(config.MaxRetryBackoff * nano),
  58. DialTimeout: time.Duration(config.DialTimeout * nano),
  59. ReadTimeout: time.Duration(config.ReadTimeout * nano),
  60. WriteTimeout: time.Duration(config.WriteTimeout * nano),
  61. PoolFIFO: config.PoolFIFO,
  62. PoolSize: config.PoolSize,
  63. MinIdleConns: config.MinIdleConns,
  64. MaxConnAge: time.Duration(config.MaxConnAge * nano),
  65. PoolTimeout: time.Duration(config.PoolTimeout * nano),
  66. IdleTimeout: time.Duration(config.IdleTimeout * nano),
  67. IdleCheckFrequency: time.Duration(config.IdleCheckFrequency * nano),
  68. }
  69. }
  70. func StartShovelWorker(c context.Context, dc chan bool, s *redis.Client, d *redis.Client, sk string, dk string) {
  71. defer close(dc)
  72. if dk == "" {
  73. dk = sk
  74. }
  75. var m time.Duration = 0
  76. for {
  77. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  78. items, err := s.SPopN(ctx, sk, BatchSize).Result()
  79. cancel()
  80. if err != nil {
  81. log.Printf("unable to spop %s: %s", sk, err)
  82. } else if len(items) != 0 {
  83. var iitems []interface{}
  84. for _, item := range items {
  85. iitems = append(iitems, item)
  86. }
  87. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
  88. err = d.SAdd(ctx, dk, iitems...).Err()
  89. cancel()
  90. if err != nil {
  91. log.Printf("unable to sadd %s: %s", dk, err)
  92. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
  93. err = s.SAdd(ctx, sk, iitems...).Err()
  94. cancel()
  95. if err != nil {
  96. log.Printf("unable to revert spop %s: %s", sk, err)
  97. }
  98. } else if len(items) >= BatchSize {
  99. m = 0
  100. }
  101. }
  102. t := time.NewTimer(m * time.Second)
  103. select {
  104. case <-c.Done():
  105. if !t.Stop() {
  106. <-t.C
  107. }
  108. return
  109. case <-t.C:
  110. }
  111. if m < 60 {
  112. m++
  113. }
  114. }
  115. }
  116. func main() {
  117. var config Config
  118. _, err := toml.DecodeFile("./config.toml", &config)
  119. if err != nil {
  120. log.Panicf("error parsing config.toml: %s", err)
  121. }
  122. RedisClients = map[string]*redis.Client{}
  123. for i, c := range config.ShovelConfigs {
  124. if _, has := config.RedisServerConfigs[c.Src]; !has {
  125. log.Panicf("invalid redis source: %s", c.Src)
  126. }
  127. if _, has := config.RedisServerConfigs[c.Dst]; !has {
  128. log.Panicf("invalid redis destination: %s", c.Dst)
  129. }
  130. if c.DstKey == "" {
  131. config.ShovelConfigs[i].DstKey = c.Key
  132. }
  133. }
  134. for n, c := range config.RedisServerConfigs {
  135. RedisClients[n] = redis.NewClient(RedisConfigToRedisOptions(c))
  136. }
  137. ctx, cancel := context.WithCancel(context.Background())
  138. var doneChans []chan bool
  139. for _, c := range config.ShovelConfigs {
  140. log.Printf("starting shovel worker for %s/%s -> %s/%s", c.Src, c.Key, c.Dst, c.DstKey)
  141. doneChan := make(chan bool)
  142. go StartShovelWorker(ctx, doneChan, RedisClients[c.Src], RedisClients[c.Dst], c.Key, c.DstKey)
  143. doneChans = append(doneChans, doneChan)
  144. }
  145. sc := make(chan os.Signal, 1)
  146. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  147. <-sc
  148. cancel()
  149. log.Printf("waiting for %d workers to shut down...", len(doneChans))
  150. for _, c := range doneChans {
  151. <-c
  152. }
  153. }