|
- package main
-
- import (
- "context"
- "log"
- "os"
- "os/signal"
- "syscall"
- "time"
-
- "github.com/BurntSushi/toml"
- "github.com/go-redis/redis/v8"
- )
-
- const BatchSize = 10000
-
- var RedisClients map[string]*redis.Client
-
- type Config struct {
- RedisServerConfigs map[string]RedisServerConfig `toml:"servers"`
- ShovelConfigs []ShovelConfig `toml:"shovels"`
- }
-
- type ShovelConfig struct {
- Key string `toml:"key"`
- Src string `toml:"src"`
- Dst string `toml:"dst"`
- DstKey string `toml:"dstkey"`
- }
-
- type RedisServerConfig struct {
- Network string `toml:"network"`
- Addr string `toml:"addr"`
- Username string `toml:"username"`
- Password string `toml:"password"`
- DB int `toml:"db"`
- MaxRetries int `toml:"maxretries"`
- MinRetryBackoff float64 `toml:"minretrybackoff"`
- MaxRetryBackoff float64 `toml:"maxretrybackoff"`
- DialTimeout float64 `toml:"dialtimeout"`
- ReadTimeout float64 `toml:"readtimeout"`
- WriteTimeout float64 `toml:"writetimeout"`
- PoolFIFO bool `toml:"poolfifo"`
- PoolSize int `toml:"poolsize"`
- MinIdleConns int `toml:"minidleconns"`
- MaxConnAge float64 `toml:"maxconnage"`
- PoolTimeout float64 `toml:"pooltimeout"`
- IdleTimeout float64 `toml:"idletimeout"`
- IdleCheckFrequency float64 `toml:"idlecheckfrequency"`
- }
-
- func RedisConfigToRedisOptions(config RedisServerConfig) *redis.Options {
- nano := float64(time.Second.Nanoseconds())
- if config.ReadTimeout == 0 {
- config.ReadTimeout = 15 * time.Minute.Seconds()
- }
- return &redis.Options{
- Network: config.Network,
- Addr: config.Addr,
- Username: config.Username,
- Password: config.Password,
- DB: config.DB,
- MaxRetries: config.MaxRetries,
- MinRetryBackoff: time.Duration(config.MinRetryBackoff * nano),
- MaxRetryBackoff: time.Duration(config.MaxRetryBackoff * nano),
- DialTimeout: time.Duration(config.DialTimeout * nano),
- ReadTimeout: time.Duration(config.ReadTimeout * nano),
- WriteTimeout: time.Duration(config.WriteTimeout * nano),
- PoolFIFO: config.PoolFIFO,
- PoolSize: config.PoolSize,
- MinIdleConns: config.MinIdleConns,
- MaxConnAge: time.Duration(config.MaxConnAge * nano),
- PoolTimeout: time.Duration(config.PoolTimeout * nano),
- IdleTimeout: time.Duration(config.IdleTimeout * nano),
- IdleCheckFrequency: time.Duration(config.IdleCheckFrequency * nano),
- }
- }
-
- func StartShovelWorker(c context.Context, dc chan bool, s *redis.Client, d *redis.Client, sk string, dk string) {
- defer close(dc)
- if dk == "" {
- dk = sk
- }
- var m time.Duration = 0
- for {
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
- items, err := s.SPopN(ctx, sk, BatchSize).Result()
- cancel()
- if err != nil {
- log.Printf("unable to spop %s: %s", sk, err)
- } else if len(items) != 0 {
- var iitems []interface{}
- for _, item := range items {
- iitems = append(iitems, item)
- }
- ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
- err = d.SAdd(ctx, dk, iitems...).Err()
- cancel()
- if err != nil {
- log.Printf("unable to sadd %s: %s", dk, err)
- ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
- err = s.SAdd(ctx, sk, iitems...).Err()
- cancel()
- if err != nil {
- log.Printf("unable to revert spop %s: %s", sk, err)
- }
- } else if len(items) >= BatchSize {
- m = 0
- }
- }
- t := time.NewTimer(m * time.Second)
- select {
- case <-c.Done():
- if !t.Stop() {
- <-t.C
- }
- return
- case <-t.C:
- }
- if m < 60 {
- m++
- }
- }
- }
-
- func main() {
- var config Config
- _, err := toml.DecodeFile("./config.toml", &config)
- if err != nil {
- log.Panicf("error parsing config.toml: %s", err)
- }
- RedisClients = map[string]*redis.Client{}
- for i, c := range config.ShovelConfigs {
- if _, has := config.RedisServerConfigs[c.Src]; !has {
- log.Panicf("invalid redis source: %s", c.Src)
- }
- if _, has := config.RedisServerConfigs[c.Dst]; !has {
- log.Panicf("invalid redis destination: %s", c.Dst)
- }
- if c.DstKey == "" {
- config.ShovelConfigs[i].DstKey = c.Key
- }
- }
- for n, c := range config.RedisServerConfigs {
- RedisClients[n] = redis.NewClient(RedisConfigToRedisOptions(c))
- }
- ctx, cancel := context.WithCancel(context.Background())
- var doneChans []chan bool
- for _, c := range config.ShovelConfigs {
- log.Printf("starting shovel worker for %s/%s -> %s/%s", c.Src, c.Key, c.Dst, c.DstKey)
- doneChan := make(chan bool)
- go StartShovelWorker(ctx, doneChan, RedisClients[c.Src], RedisClients[c.Dst], c.Key, c.DstKey)
- doneChans = append(doneChans, doneChan)
- }
-
- sc := make(chan os.Signal, 1)
- signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
- <-sc
- cancel()
- log.Printf("waiting for %d workers to shut down...", len(doneChans))
- for _, c := range doneChans {
- <-c
- }
- }
|