package main import ( "context" "encoding/json" "fmt" "log" "math/rand" "net/http" _ "net/http/pprof" "os" "os/signal" "strings" "syscall" "time" "github.com/go-redis/redis/v8" dq "github.com/nsqio/go-diskqueue" ) const ( DefaultWatermarkHigh int64 = 100000 DefaultWatermarkLow int64 = 100000 DefaultBatchSize int64 = 10000 ) func l(_ dq.LogLevel, f string, args ...interface{}) { log.Printf(f, args...) } type Core struct { DataDir string MainClient *redis.Client Offloaders map[string]*Offloader MigrationMode bool } type ProjectRedisConfig struct { Host string `json:"host"` Pass string `json:"pass"` Port int `json:"port"` } type ProjectOffloadConfig struct { WatermarkHigh int64 `json:"high"` WatermarkLow int64 `json:"low"` BatchSize int64 `json:"batchsize"` } type ProjectConfig struct { RedisConfig *ProjectRedisConfig `json:"redis,omitempty"` OffloadConfig ProjectOffloadConfig `json:"offload"` } type Offloader struct { RedisClient *redis.Client ProjectConfig ProjectConfig Context context.Context Cancel context.CancelFunc Done chan bool Queues map[string]dq.Interface Sets map[string]string Name string Core *Core } func (that *Offloader) CleanName(s string) string { return strings.ReplaceAll(strings.ReplaceAll(s, "/", "_"), "\x00", "_") } func (that *Offloader) RedisConfigDiffers(new *ProjectRedisConfig) bool { if that.ProjectConfig.RedisConfig == nil && new == nil { return false } if that.ProjectConfig.RedisConfig == nil || new == nil || that.ProjectConfig.RedisConfig.Host != new.Host || that.ProjectConfig.RedisConfig.Port != new.Port || that.ProjectConfig.RedisConfig.Pass != new.Pass { return true } return false } func (that *Offloader) OffloadConfigDiffers(new ProjectOffloadConfig) bool { return that.ProjectConfig.OffloadConfig.WatermarkHigh != new.WatermarkHigh || that.ProjectConfig.OffloadConfig.WatermarkLow != new.WatermarkLow || that.ProjectConfig.OffloadConfig.BatchSize != new.BatchSize } func (that *Offloader) RefreshQueues() error { pipe := that.RedisClient.Pipeline() prioritiesCmdRes := pipe.ZRange(that.Context, fmt.Sprintf("%s:priorities", that.Name), 0, -1) filtersCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:filters", that.Name)) _, err := pipe.Exec(that.Context) if err != nil { return err } priorities, err := prioritiesCmdRes.Result() if err != nil { return err } filters, err := filtersCmdRes.Result() if err != nil { return err } setQueueMap := map[string]string{ "todo": "todo", "todo:secondary": "todo:secondary", "todo:redo": "todo:redo", "todo:backfeed": "todo:backfeed", "done": "done", "unretrievable": "unretrievable", } for _, filter := range filters { setQueueMap[fmt.Sprintf("filtered:%s", filter)] = "filtered" } for _, priority := range priorities { setQueueMap[fmt.Sprintf("todo:prio:%s", priority)] = fmt.Sprintf("todo:prio:%s", priority) } needQueueMap := map[string]bool{} for setName, queueName := range setQueueMap { needQueueMap[queueName] = true if _, has := that.Queues[queueName]; !has { log.Printf("opening queue %s for %s:%s", queueName, that.Name, setName) queue := dq.New(fmt.Sprintf("%s:%s", that.Name, that.CleanName(queueName)), that.Core.DataDir, 128*1024*1024, 0, 128*1024*1024, 1_000_000, 5*time.Second, l) if queue == nil { return fmt.Errorf("unable to open disk queue %s:%s (dq.New()==nil)", that.Name, that.CleanName(queueName)) } that.Queues[queueName] = queue } that.Sets[setName] = queueName } for k, v := range that.Queues { if _, has := needQueueMap[k]; !has { v.Close() delete(that.Queues, k) } } return nil } func (that *Offloader) CloseQueues() { for k, q := range that.Queues { log.Printf("closing queue %s for %s", k, that.Name) q.Close() } } func (that *Offloader) UpdateStats() { hset := map[string]interface{}{} for k, q := range that.Sets { if k != q { continue } queue := that.Queues[q] if queue != nil { hset[k] = fmt.Sprintf("%d", queue.Depth()) } } _, err := that.RedisClient.HSet(that.Context, fmt.Sprintf("%s:offloaded", that.Name), hset).Result() if err != nil { log.Printf("unable to hmset %s:offloaded: %s", that.Name, err) } } func (that *Offloader) Do() { defer close(that.Done) defer that.Cancel() if that.ProjectConfig.RedisConfig != nil { defer that.RedisClient.Close() } that.Sets = map[string]string{} that.Queues = map[string]dq.Interface{} defer that.CloseQueues() ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() refreshTicker := time.NewTicker(5 * time.Minute) defer refreshTicker.Stop() if err := that.RefreshQueues(); err != nil { log.Printf("unable to refresh queues for %s: %s", that.Name, err) return } that.UpdateStats() skipSleepChan := make(chan bool, 1) defer close(skipSleepChan) watermarkHigh := that.ProjectConfig.OffloadConfig.WatermarkHigh if watermarkHigh == 0 { watermarkHigh = DefaultWatermarkHigh } watermarkLow := that.ProjectConfig.OffloadConfig.WatermarkLow if watermarkLow == 0 { watermarkLow = DefaultWatermarkLow } batchSize := that.ProjectConfig.OffloadConfig.BatchSize if batchSize == 0 { batchSize = DefaultBatchSize } for { //for k, q := range that.Queues { // key := fmt.Sprintf("%s:%s", that.Name, k) // scard, err := that.RedisClient.SCard(that.Context, key).Result() // if err != nil { // log.Printf("unable to scard %s: %s", key, err) // continue // } // for scard > watermarkHigh || scard < watermarkLow { // select { // case <-that.Context.Done(): // return // case <-refreshTicker.C: // that.RefreshQueues() // that.UpdateStats() // default: // } // if scard > watermarkHigh { // spopLimit := scard - watermarkHigh // if spopLimit > batchSize { // spopLimit = batchSize // } // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) // entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result() // cancel() // if err != nil { // log.Printf("unable to spop %s: %s", key, err) // } // scard = scard - int64(len(entries)) // for _, entry := range entries { // err := q.Put([]byte(entry)) // if err != nil { // log.Printf("unable to q.Put %s: %s", key, err) // return // } // } // } else if scard < watermarkLow { // spopLimit := watermarkLow - scard // if spopLimit > batchSize { // spopLimit = batchSize // } // var entries []interface{} // for q.Depth() > 0 && int64(len(entries)) < spopLimit { // entry := <-q.ReadChan() // entries = append(entries, string(entry)) // } // if len(entries) == 0 { // break // } // ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) // _, err := that.RedisClient.SAdd(ctx, key, entries...).Result() // cancel() // if err != nil { // log.Printf("unable to sadd %s %#v: %s", key, entries, err) // for _, entry := range entries { // err := q.Put([]byte(entry.(string))) // if err != nil { // log.Printf("unable to q.Put %s: %s", key, err) // } // } // return // } // scard = scard + int64(len(entries)) // } // } //} scards := map[string]*redis.IntCmd{} pipe := that.RedisClient.Pipeline() for k := range that.Sets { key := fmt.Sprintf("%s:%s", that.Name, k) scards[k] = pipe.SCard(that.Context, key) } _, err := pipe.Exec(that.Context) if err != nil { log.Printf("unable to scard %s: %s", that.Name, err) } else { rerun := false for k, q := range that.Sets { if that.Queues[q] == nil { continue } key := fmt.Sprintf("%s:%s", that.Name, k) scard, err := scards[k].Result() if err != nil { log.Printf("unable to scard %s: %s", key, err) continue } if !that.Core.MigrationMode && (scard > watermarkHigh || (k != q && scard > 0)) { spopLimit := scard - watermarkHigh if k != q || spopLimit > batchSize { spopLimit = batchSize } ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result() cancel() if err != nil { log.Printf("unable to spop %s: %s", key, err) } if len(entries) == 0 { continue } for _, entry := range entries { err := that.Queues[q].Put([]byte(entry)) if err != nil { log.Printf("unable to q.Put %s: %s", key, err) return } } rerun = true } else if k == q && ((that.Core.MigrationMode && scard < watermarkHigh*2) || scard < watermarkLow) && that.Queues[q].Depth() > 0 { spopLimit := watermarkLow - scard if spopLimit > batchSize || that.Core.MigrationMode { spopLimit = batchSize } var entries []interface{} for that.Queues[q].Depth() > 0 && int64(len(entries)) < spopLimit { entry := <-that.Queues[q].ReadChan() entries = append(entries, string(entry)) } if len(entries) == 0 { continue } ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) _, err := that.RedisClient.SAdd(ctx, key, entries...).Result() cancel() if err != nil { log.Printf("unable to sadd %s: %s", key, err) for _, entry := range entries { err := that.Queues[q].Put([]byte(entry.(string))) if err != nil { log.Printf("unable to q.Put %s: %s", key, err) } } return } rerun = true } } if rerun { select { case skipSleepChan <- true: default: } } that.UpdateStats() } select { case <-that.Context.Done(): return case <-refreshTicker.C: if err := that.RefreshQueues(); err != nil { log.Printf("unable to refresh queues for %s: %s", that.Name, err) return } that.UpdateStats() case <-ticker.C: that.UpdateStats() case <-skipSleepChan: } } } func (that *Core) StopProjects() { var doneChans []chan bool for project, offloader := range that.Offloaders { log.Printf("stopping offloader %s", project) offloader.Cancel() doneChans = append(doneChans, offloader.Done) } for _, c := range doneChans { <-c } } func (that *Core) RefreshProjects(redisClient *redis.Client) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) res, err := redisClient.HGetAll(ctx, "trackers").Result() cancel() if err != nil { log.Printf("unable to refresh trackers table: %s", err) return } updatedProjects := map[string]ProjectConfig{} for project, configString := range res { //if project != "ua" && project != "ukr-net" && project != "ua-urls" { // continue //} config := ProjectConfig{} err := json.Unmarshal([]byte(configString), &config) if err != nil { log.Printf("unable to decode project %s config: %s", project, err) continue } updatedProjects[project] = config } for project, offloader := range that.Offloaders { _, stopRequired := updatedProjects[project] stopRequired = !stopRequired if !stopRequired { stopRequired = offloader.OffloadConfigDiffers(updatedProjects[project].OffloadConfig) if !stopRequired { stopRequired = offloader.RedisConfigDiffers(updatedProjects[project].RedisConfig) if !stopRequired { select { case <-offloader.Context.Done(): stopRequired = true case <-offloader.Done: stopRequired = true default: } } } } if stopRequired { log.Printf("stopping offloader %s", project) offloader.Cancel() <-offloader.Done delete(that.Offloaders, project) } } for project, config := range updatedProjects { if _, has := that.Offloaders[project]; !has { log.Printf("starting offloader %s", project) offloader := &Offloader{} offloader.Name = project offloader.ProjectConfig = config offloader.Core = that if config.RedisConfig != nil { offloader.RedisClient = redis.NewClient(&redis.Options{ Addr: fmt.Sprintf("%s:%d", config.RedisConfig.Host, config.RedisConfig.Port), Username: "default", Password: config.RedisConfig.Pass, ReadTimeout: 15 * time.Minute, }) } else { offloader.RedisClient = redisClient } offloader.Context, offloader.Cancel = context.WithCancel(context.Background()) offloader.Done = make(chan bool) that.Offloaders[project] = offloader go offloader.Do() } } } func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(log.Flags() | log.Lshortfile) go func() { var err error for i := 0; i < 10; i++ { port := rand.Intn(65535-1024) + 1024 err = http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", port), nil) if err != nil && err != http.ErrServerClosed { log.Printf("unable to listen on port %d: %s", port, err) continue } break } }() dataDir := os.Getenv("DATA_DIR") if dataDir == "" { log.Panicf("no DATA_DIR specified") } // check if dataDir exists and is a directory if stat, err := os.Stat(dataDir); os.IsNotExist(err) { log.Panicf("DATA_DIR %s does not exist", dataDir) } else if !stat.IsDir() { log.Panicf("DATA_DIR %s is not a directory", dataDir) } mainOptions, err := redis.ParseURL(os.Getenv("REDIS_URL")) if err != nil { log.Panicf("%s", err) } mainOptions.ReadTimeout = 15 * time.Minute mainClient := redis.NewClient(mainOptions) sc := make(chan os.Signal, 1) signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill) ticker := time.NewTicker(1 * time.Minute) core := &Core{ DataDir: dataDir, MainClient: mainClient, Offloaders: map[string]*Offloader{}, MigrationMode: os.Getenv("MIGRATION_MODE") != "", } defer core.StopProjects() for { core.RefreshProjects(mainClient) select { case <-sc: core.StopProjects() return case <-ticker.C: } } }