package main import ( "context" "log" "os" "os/signal" "syscall" "time" "github.com/BurntSushi/toml" "github.com/go-redis/redis/v8" ) const BatchSize = 10000 var RedisClients map[string]*redis.Client type Config struct { RedisServerConfigs map[string]RedisServerConfig `toml:"servers"` ShovelConfigs []ShovelConfig `toml:"shovels"` } type ShovelConfig struct { Key string `toml:"key"` Src string `toml:"src"` Dst string `toml:"dst"` DstKey string `toml:"dstkey"` } type RedisServerConfig struct { Network string `toml:"network"` Addr string `toml:"addr"` Username string `toml:"username"` Password string `toml:"password"` DB int `toml:"db"` MaxRetries int `toml:"maxretries"` MinRetryBackoff float64 `toml:"minretrybackoff"` MaxRetryBackoff float64 `toml:"maxretrybackoff"` DialTimeout float64 `toml:"dialtimeout"` ReadTimeout float64 `toml:"readtimeout"` WriteTimeout float64 `toml:"writetimeout"` PoolFIFO bool `toml:"poolfifo"` PoolSize int `toml:"poolsize"` MinIdleConns int `toml:"minidleconns"` MaxConnAge float64 `toml:"maxconnage"` PoolTimeout float64 `toml:"pooltimeout"` IdleTimeout float64 `toml:"idletimeout"` IdleCheckFrequency float64 `toml:"idlecheckfrequency"` } func RedisConfigToRedisOptions(config RedisServerConfig) *redis.Options { nano := float64(time.Second.Nanoseconds()) if config.ReadTimeout == 0 { config.ReadTimeout = 15 * time.Minute.Seconds() } return &redis.Options{ Network: config.Network, Addr: config.Addr, Username: config.Username, Password: config.Password, DB: config.DB, MaxRetries: config.MaxRetries, MinRetryBackoff: time.Duration(config.MinRetryBackoff * nano), MaxRetryBackoff: time.Duration(config.MaxRetryBackoff * nano), DialTimeout: time.Duration(config.DialTimeout * nano), ReadTimeout: time.Duration(config.ReadTimeout * nano), WriteTimeout: time.Duration(config.WriteTimeout * nano), PoolFIFO: config.PoolFIFO, PoolSize: config.PoolSize, MinIdleConns: config.MinIdleConns, MaxConnAge: time.Duration(config.MaxConnAge * nano), PoolTimeout: time.Duration(config.PoolTimeout * nano), IdleTimeout: time.Duration(config.IdleTimeout * nano), IdleCheckFrequency: time.Duration(config.IdleCheckFrequency * nano), } } func StartShovelWorker(c context.Context, dc chan bool, s *redis.Client, d *redis.Client, sk string, dk string) { defer close(dc) if dk == "" { dk = sk } var m time.Duration = 0 for { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) items, err := s.SPopN(ctx, sk, BatchSize).Result() cancel() if err != nil { log.Printf("unable to spop %s: %s", sk, err) } else if len(items) != 0 { var iitems []interface{} for _, item := range items { iitems = append(iitems, item) } ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) err = d.SAdd(ctx, dk, iitems...).Err() cancel() if err != nil { log.Printf("unable to sadd %s: %s", dk, err) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) err = s.SAdd(ctx, sk, iitems...).Err() cancel() if err != nil { log.Printf("unable to revert spop %s: %s", sk, err) } } else if len(items) >= BatchSize { m = 0 } } t := time.NewTimer(m * time.Second) select { case <-c.Done(): if !t.Stop() { <-t.C } return case <-t.C: } if m < 60 { m++ } } } func main() { var config Config _, err := toml.DecodeFile("./config.toml", &config) if err != nil { log.Panicf("error parsing config.toml: %s", err) } RedisClients = map[string]*redis.Client{} for i, c := range config.ShovelConfigs { if _, has := config.RedisServerConfigs[c.Src]; !has { log.Panicf("invalid redis source: %s", c.Src) } if _, has := config.RedisServerConfigs[c.Dst]; !has { log.Panicf("invalid redis destination: %s", c.Dst) } if c.DstKey == "" { config.ShovelConfigs[i].DstKey = c.Key } } for n, c := range config.RedisServerConfigs { RedisClients[n] = redis.NewClient(RedisConfigToRedisOptions(c)) } ctx, cancel := context.WithCancel(context.Background()) var doneChans []chan bool for _, c := range config.ShovelConfigs { log.Printf("starting shovel worker for %s/%s -> %s/%s", c.Src, c.Key, c.Dst, c.DstKey) doneChan := make(chan bool) go StartShovelWorker(ctx, doneChan, RedisClients[c.Src], RedisClients[c.Dst], c.Key, c.DstKey) doneChans = append(doneChans, doneChan) } sc := make(chan os.Signal, 1) signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill) <-sc cancel() log.Printf("waiting for %d workers to shut down...", len(doneChans)) for _, c := range doneChans { <-c } }