|
- package main
-
- import (
- "context"
- "encoding/json"
- "fmt"
- "log"
- "math/rand"
- "net/http"
- _ "net/http/pprof"
- "os"
- "os/signal"
- "strings"
- "syscall"
- "time"
-
- "github.com/go-redis/redis/v8"
- dq "github.com/nsqio/go-diskqueue"
- )
-
- const (
- DefaultWatermarkHigh int64 = 100000
- DefaultWatermarkLow int64 = 100000
- DefaultBatchSize int64 = 10000
- )
-
- func l(_ dq.LogLevel, f string, args ...interface{}) {
- log.Printf(f, args...)
- }
-
- type Core struct {
- DataDir string
- MainClient *redis.Client
- Offloaders map[string]*Offloader
- MigrationMode bool
- }
-
- type ProjectRedisConfig struct {
- Host string `json:"host"`
- Pass string `json:"pass"`
- Port int `json:"port"`
- }
-
- type ProjectOffloadConfig struct {
- WatermarkHigh int64 `json:"high"`
- WatermarkLow int64 `json:"low"`
- BatchSize int64 `json:"batchsize"`
- }
-
- type ProjectConfig struct {
- RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
- OffloadConfig ProjectOffloadConfig `json:"offload"`
- }
-
- type Offloader struct {
- RedisClient *redis.Client
- ProjectConfig ProjectConfig
- Context context.Context
- Cancel context.CancelFunc
- Done chan bool
- Queues map[string]dq.Interface
- Sets map[string]string
- Name string
- Core *Core
- }
-
- func (that *Offloader) CleanName(s string) string {
- return strings.ReplaceAll(strings.ReplaceAll(s, "/", "_"), "\x00", "_")
- }
-
- func (that *Offloader) RedisConfigDiffers(new *ProjectRedisConfig) bool {
- if that.ProjectConfig.RedisConfig == nil && new == nil {
- return false
- }
- if that.ProjectConfig.RedisConfig == nil || new == nil || that.ProjectConfig.RedisConfig.Host != new.Host || that.ProjectConfig.RedisConfig.Port != new.Port || that.ProjectConfig.RedisConfig.Pass != new.Pass {
- return true
- }
- return false
- }
-
- func (that *Offloader) OffloadConfigDiffers(new ProjectOffloadConfig) bool {
- return that.ProjectConfig.OffloadConfig.WatermarkHigh != new.WatermarkHigh || that.ProjectConfig.OffloadConfig.WatermarkLow != new.WatermarkLow || that.ProjectConfig.OffloadConfig.BatchSize != new.BatchSize
- }
-
- func (that *Offloader) RefreshQueues() error {
- pipe := that.RedisClient.Pipeline()
- prioritiesCmdRes := pipe.ZRange(that.Context, fmt.Sprintf("%s:priorities", that.Name), 0, -1)
- filtersCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:filters", that.Name))
- stashesCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:stashes", that.Name))
- _, err := pipe.Exec(that.Context)
- if err != nil {
- return err
- }
- priorities, err := prioritiesCmdRes.Result()
- if err != nil {
- return err
- }
- filters, err := filtersCmdRes.Result()
- if err != nil {
- return err
- }
- stashes, err := stashesCmdRes.Result()
- if err != nil {
- return err
- }
- setQueueMap := map[string]string{
- "todo": "todo",
- "todo:secondary": "todo:secondary",
- "todo:redo": "todo:redo",
- "todo:backfeed": "todo:backfeed",
- "done": "done",
- "unretrievable": "unretrievable",
- "backfeed_retry": "backfeed_retry",
- }
- for _, filter := range filters {
- setQueueMap[fmt.Sprintf("filtered:%s", filter)] = "filtered"
- }
- for _, priority := range priorities {
- setQueueMap[fmt.Sprintf("todo:prio:%s", priority)] = fmt.Sprintf("todo:prio:%s", priority)
- }
- for _, stash := range stashes {
- setQueueMap[fmt.Sprintf("stash:%s", stash)] = fmt.Sprintf("stash:%s", stash)
- }
- needQueueMap := map[string]bool{}
- for setName, queueName := range setQueueMap {
- needQueueMap[queueName] = true
- if _, has := that.Queues[queueName]; !has {
- log.Printf("opening queue %s for %s:%s", queueName, that.Name, setName)
- queue := dq.New(fmt.Sprintf("%s:%s", that.Name, that.CleanName(queueName)), that.Core.DataDir, 128*1024*1024, 0, 128*1024*1024, 1_000_000, 5*time.Second, l)
- if queue == nil {
- return fmt.Errorf("unable to open disk queue %s:%s (dq.New()==nil)", that.Name, that.CleanName(queueName))
- }
- that.Queues[queueName] = queue
- }
- that.Sets[setName] = queueName
- }
- for k, v := range that.Queues {
- if _, has := needQueueMap[k]; !has {
- v.Close()
- delete(that.Queues, k)
- }
- }
- return nil
- }
-
- func (that *Offloader) CloseQueues() {
- for k, q := range that.Queues {
- log.Printf("closing queue %s for %s", k, that.Name)
- q.Close()
- }
- }
-
- func (that *Offloader) UpdateStats() {
- hset := map[string]interface{}{}
- for k, q := range that.Sets {
- if k != q {
- continue
- }
- queue := that.Queues[q]
- if queue != nil {
- hset[k] = fmt.Sprintf("%d", queue.Depth())
- }
- }
- _, err := that.RedisClient.HSet(that.Context, fmt.Sprintf("%s:offloaded", that.Name), hset).Result()
- if err != nil {
- log.Printf("unable to hmset %s:offloaded: %s", that.Name, err)
- }
- }
-
- func (that *Offloader) Do() {
- defer close(that.Done)
- defer that.Cancel()
-
- if that.ProjectConfig.RedisConfig != nil {
- defer that.RedisClient.Close()
- }
-
- that.Sets = map[string]string{}
- that.Queues = map[string]dq.Interface{}
- defer that.CloseQueues()
-
- ticker := time.NewTicker(1 * time.Second)
- defer ticker.Stop()
-
- refreshTicker := time.NewTicker(5 * time.Minute)
- defer refreshTicker.Stop()
-
- if err := that.RefreshQueues(); err != nil {
- log.Printf("unable to refresh queues for %s: %s", that.Name, err)
- return
- }
- that.UpdateStats()
-
- skipSleepChan := make(chan bool, 1)
- defer close(skipSleepChan)
-
- watermarkHigh := that.ProjectConfig.OffloadConfig.WatermarkHigh
- if watermarkHigh == 0 {
- watermarkHigh = DefaultWatermarkHigh
- }
-
- watermarkLow := that.ProjectConfig.OffloadConfig.WatermarkLow
- if watermarkLow == 0 {
- watermarkLow = DefaultWatermarkLow
- }
-
- batchSize := that.ProjectConfig.OffloadConfig.BatchSize
- if batchSize == 0 {
- batchSize = DefaultBatchSize
- }
-
- for {
- //for k, q := range that.Queues {
- // key := fmt.Sprintf("%s:%s", that.Name, k)
- // scard, err := that.RedisClient.SCard(that.Context, key).Result()
- // if err != nil {
- // log.Printf("unable to scard %s: %s", key, err)
- // continue
- // }
- // for scard > watermarkHigh || scard < watermarkLow {
- // select {
- // case <-that.Context.Done():
- // return
- // case <-refreshTicker.C:
- // that.RefreshQueues()
- // that.UpdateStats()
- // default:
- // }
- // if scard > watermarkHigh {
- // spopLimit := scard - watermarkHigh
- // if spopLimit > batchSize {
- // spopLimit = batchSize
- // }
- // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
- // entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result()
- // cancel()
- // if err != nil {
- // log.Printf("unable to spop %s: %s", key, err)
- // }
- // scard = scard - int64(len(entries))
- // for _, entry := range entries {
- // err := q.Put([]byte(entry))
- // if err != nil {
- // log.Printf("unable to q.Put %s: %s", key, err)
- // return
- // }
- // }
- // } else if scard < watermarkLow {
- // spopLimit := watermarkLow - scard
- // if spopLimit > batchSize {
- // spopLimit = batchSize
- // }
- // var entries []interface{}
- // for q.Depth() > 0 && int64(len(entries)) < spopLimit {
- // entry := <-q.ReadChan()
- // entries = append(entries, string(entry))
- // }
- // if len(entries) == 0 {
- // break
- // }
- // ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
- // _, err := that.RedisClient.SAdd(ctx, key, entries...).Result()
- // cancel()
- // if err != nil {
- // log.Printf("unable to sadd %s %#v: %s", key, entries, err)
- // for _, entry := range entries {
- // err := q.Put([]byte(entry.(string)))
- // if err != nil {
- // log.Printf("unable to q.Put %s: %s", key, err)
- // }
- // }
- // return
- // }
- // scard = scard + int64(len(entries))
- // }
- // }
- //}
-
- scards := map[string]*redis.IntCmd{}
- pipe := that.RedisClient.Pipeline()
- for k := range that.Sets {
- key := fmt.Sprintf("%s:%s", that.Name, k)
- scards[k] = pipe.SCard(that.Context, key)
- }
- _, err := pipe.Exec(that.Context)
- if err != nil {
- log.Printf("unable to scard %s: %s", that.Name, err)
- } else {
- rerun := false
- for k, q := range that.Sets {
- key := fmt.Sprintf("%s:%s", that.Name, k)
- scard, err := scards[k].Result()
- if err != nil {
- log.Printf("unable to scard %s: %s", key, err)
- continue
- }
- if !that.Core.MigrationMode && (scard > watermarkHigh || (k != q && scard > 0)) {
- spopLimit := scard - watermarkHigh
- if k != q || spopLimit > batchSize {
- spopLimit = batchSize
- }
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
- entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result()
- cancel()
- if err != nil {
- log.Printf("unable to spop %s: %s", key, err)
- }
- if len(entries) == 0 {
- continue
- }
- for _, entry := range entries {
- err := that.Queues[q].Put([]byte(entry))
- if err != nil {
- log.Printf("unable to q.Put %s: %s", key, err)
- return
- }
- }
- rerun = true
- } else if k == q && ((that.Core.MigrationMode && scard < watermarkHigh*2) || scard < watermarkLow) && that.Queues[q].Depth() > 0 {
- spopLimit := watermarkLow - scard
- if spopLimit > batchSize || that.Core.MigrationMode {
- spopLimit = batchSize
- }
- var entries []interface{}
- for that.Queues[q].Depth() > 0 && int64(len(entries)) < spopLimit {
- entry := <-that.Queues[q].ReadChan()
- entries = append(entries, string(entry))
- }
- if len(entries) == 0 {
- continue
- }
- ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
- _, err := that.RedisClient.SAdd(ctx, key, entries...).Result()
- cancel()
- if err != nil {
- log.Printf("unable to sadd %s: %s", key, err)
- for _, entry := range entries {
- err := that.Queues[q].Put([]byte(entry.(string)))
- if err != nil {
- log.Printf("unable to q.Put %s: %s", key, err)
- }
- }
- return
- }
- rerun = true
- }
- }
- if rerun {
- select {
- case skipSleepChan <- true:
- default:
- }
- }
-
- that.UpdateStats()
- }
-
- select {
- case <-that.Context.Done():
- return
- case <-refreshTicker.C:
- if err := that.RefreshQueues(); err != nil {
- log.Printf("unable to refresh queues for %s: %s", that.Name, err)
- return
- }
- that.UpdateStats()
- case <-ticker.C:
- that.UpdateStats()
- case <-skipSleepChan:
- }
- }
- }
-
- func (that *Core) StopProjects() {
- var doneChans []chan bool
- for project, offloader := range that.Offloaders {
- log.Printf("stopping offloader %s", project)
- offloader.Cancel()
- doneChans = append(doneChans, offloader.Done)
- }
- for _, c := range doneChans {
- <-c
- }
- }
-
- func (that *Core) RefreshProjects(redisClient *redis.Client) {
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
- res, err := redisClient.HGetAll(ctx, "trackers").Result()
- cancel()
- if err != nil {
- log.Printf("unable to refresh trackers table: %s", err)
- return
- }
- updatedProjects := map[string]ProjectConfig{}
- for project, configString := range res {
- //if project != "ua" && project != "ukr-net" && project != "ua-urls" {
- // continue
- //}
- config := ProjectConfig{}
- err := json.Unmarshal([]byte(configString), &config)
- if err != nil {
- log.Printf("unable to decode project %s config: %s", project, err)
- continue
- }
- updatedProjects[project] = config
- }
- for project, offloader := range that.Offloaders {
- _, stopRequired := updatedProjects[project]
- stopRequired = !stopRequired
- if !stopRequired {
- stopRequired = offloader.OffloadConfigDiffers(updatedProjects[project].OffloadConfig)
- if !stopRequired {
- stopRequired = offloader.RedisConfigDiffers(updatedProjects[project].RedisConfig)
- if !stopRequired {
- select {
- case <-offloader.Context.Done():
- stopRequired = true
- case <-offloader.Done:
- stopRequired = true
- default:
- }
- }
- }
- }
- if stopRequired {
- log.Printf("stopping offloader %s", project)
- offloader.Cancel()
- <-offloader.Done
- delete(that.Offloaders, project)
- }
- }
- for project, config := range updatedProjects {
- if _, has := that.Offloaders[project]; !has {
- log.Printf("starting offloader %s", project)
- offloader := &Offloader{}
- offloader.Name = project
- offloader.ProjectConfig = config
- offloader.Core = that
- if config.RedisConfig != nil {
- offloader.RedisClient = redis.NewClient(&redis.Options{
- Addr: fmt.Sprintf("%s:%d", config.RedisConfig.Host, config.RedisConfig.Port),
- Username: "default",
- Password: config.RedisConfig.Pass,
- ReadTimeout: 15 * time.Minute,
- })
- } else {
- offloader.RedisClient = redisClient
- }
- offloader.Context, offloader.Cancel = context.WithCancel(context.Background())
- offloader.Done = make(chan bool)
- that.Offloaders[project] = offloader
- go offloader.Do()
- }
- }
- }
-
- func main() {
- rand.Seed(time.Now().UnixNano())
- log.SetFlags(log.Flags() | log.Lshortfile)
- go func() {
- var err error
- for i := 0; i < 10; i++ {
- port := rand.Intn(65535-1024) + 1024
- err = http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", port), nil)
- if err != nil && err != http.ErrServerClosed {
- log.Printf("unable to listen on port %d: %s", port, err)
- continue
- }
- break
- }
- }()
-
- dataDir := os.Getenv("DATA_DIR")
- if dataDir == "" {
- log.Panicf("no DATA_DIR specified")
- }
- // check if dataDir exists and is a directory
- if stat, err := os.Stat(dataDir); os.IsNotExist(err) {
- log.Panicf("DATA_DIR %s does not exist", dataDir)
- } else if !stat.IsDir() {
- log.Panicf("DATA_DIR %s is not a directory", dataDir)
- }
-
- mainOptions, err := redis.ParseURL(os.Getenv("REDIS_URL"))
- if err != nil {
- log.Panicf("%s", err)
- }
- mainOptions.ReadTimeout = 15 * time.Minute
- mainClient := redis.NewClient(mainOptions)
-
- sc := make(chan os.Signal, 1)
- signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
- ticker := time.NewTicker(1 * time.Minute)
- core := &Core{
- DataDir: dataDir,
- MainClient: mainClient,
- Offloaders: map[string]*Offloader{},
- MigrationMode: os.Getenv("MIGRATION_MODE") != "",
- }
- defer core.StopProjects()
- for {
- core.RefreshProjects(mainClient)
- select {
- case <-sc:
- core.StopProjects()
- return
- case <-ticker.C:
- }
- }
- }
|