diff --git a/main.go b/main.go index 90c80f0..0349f5a 100644 --- a/main.go +++ b/main.go @@ -34,10 +34,9 @@ type ProjectRedisConfig struct { } type ProjectOffloadConfig struct { - WatermarkHigh int64 `json:"high"` - WatermarkMiddle int64 `json:"middle"` - WatermarkLow int64 `json:"low"` - BatchSize int64 `json:"batchsize"` + WatermarkHigh int64 `json:"high"` + WatermarkLow int64 `json:"low"` + BatchSize int64 `json:"batchsize"` } type ProjectConfig struct { @@ -48,7 +47,6 @@ type ProjectConfig struct { type Offloader struct { RedisClient *redis.Client ProjectConfig ProjectConfig - OffloadConfig ProjectOffloadConfig Context context.Context Cancel context.CancelFunc Done chan bool @@ -72,32 +70,30 @@ func (that *Offloader) RedisConfigDiffers(new *ProjectRedisConfig) bool { } func (that *Offloader) OffloadConfigDiffers(new ProjectOffloadConfig) bool { - return that.OffloadConfig.WatermarkHigh != new.WatermarkHigh || that.OffloadConfig.WatermarkMiddle != new.WatermarkMiddle || that.OffloadConfig.WatermarkLow != new.WatermarkLow || that.OffloadConfig.BatchSize != new.BatchSize + return that.ProjectConfig.OffloadConfig.WatermarkHigh != new.WatermarkHigh || that.ProjectConfig.OffloadConfig.WatermarkLow != new.WatermarkLow || that.ProjectConfig.OffloadConfig.BatchSize != new.BatchSize } -func (that *Offloader) RefreshQueues() { +func (that *Offloader) RefreshQueues() error { pipe := that.RedisClient.Pipeline() prioritiesCmdRes := pipe.ZRange(that.Context, fmt.Sprintf("%s:priorities", that.Name), 0, -1) filtersCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:filters", that.Name)) _, err := pipe.Exec(that.Context) if err != nil { - log.Printf("unable to refresh queues for offloader %s: %s", that.Name, err) - return + return err } priorities, err := prioritiesCmdRes.Result() if err != nil { - log.Printf("unable to refresh queues for offloader %s: %s", that.Name, err) - return + return err } filters, err := filtersCmdRes.Result() if err != nil { - log.Printf("unable to refresh queues for offloader %s: %s", that.Name, err) - return + return err } setQueueMap := map[string]string{ "todo": "todo", "todo:secondary": "todo:secondary", "todo:redo": "todo:redo", + "todo:backfeed": "todo:backfeed", "done": "done", "unretrievable": "unretrievable", } @@ -112,7 +108,11 @@ func (that *Offloader) RefreshQueues() { needQueueMap[queueName] = true if _, has := that.Queues[queueName]; !has { log.Printf("opening queue %s for %s:%s", queueName, that.Name, setName) - that.Queues[queueName] = dq.New(fmt.Sprintf("%s:%s", that.Name, that.CleanName(queueName)), dataDir, 128*1024*1024, 0, 128*1024*1024, 1_000_000, 5*time.Second, l) + queue := dq.New(fmt.Sprintf("%s:%s", that.Name, that.CleanName(queueName)), dataDir, 128*1024*1024, 0, 128*1024*1024, 1_000_000, 5*time.Second, l) + if queue == nil { + return fmt.Errorf("unable to open disk queue %s:%s (dq.New()==nil)", that.Name, that.CleanName(queueName)) + } + that.Queues[queueName] = queue } that.Sets[setName] = queueName } @@ -122,6 +122,7 @@ func (that *Offloader) RefreshQueues() { delete(that.Queues, k) } } + return nil } func (that *Offloader) CloseQueues() { @@ -163,23 +164,26 @@ func (that *Offloader) Do() { refreshTicker := time.NewTicker(5 * time.Minute) defer refreshTicker.Stop() - that.RefreshQueues() + if err := that.RefreshQueues(); err != nil { + log.Printf("unable to refresh queues for %s: %s", that.Name, err) + return + } that.UpdateStats() skipSleepChan := make(chan bool, 1) defer close(skipSleepChan) - watermarkHigh := that.OffloadConfig.WatermarkHigh + watermarkHigh := that.ProjectConfig.OffloadConfig.WatermarkHigh if watermarkHigh == 0 { watermarkHigh = DefaultWatermarkHigh } - watermarkLow := that.OffloadConfig.WatermarkLow + watermarkLow := that.ProjectConfig.OffloadConfig.WatermarkLow if watermarkLow == 0 { watermarkLow = DefaultWatermarkLow } - batchSize := that.OffloadConfig.BatchSize + batchSize := that.ProjectConfig.OffloadConfig.BatchSize if batchSize == 0 { batchSize = DefaultBatchSize } @@ -334,7 +338,10 @@ func (that *Offloader) Do() { case <-that.Context.Done(): return case <-refreshTicker.C: - that.RefreshQueues() + if err := that.RefreshQueues(); err != nil { + log.Printf("unable to refresh queues for %s: %s", that.Name, err) + return + } that.UpdateStats() case <-ticker.C: that.UpdateStats() @@ -449,6 +456,7 @@ func main() { sc := make(chan os.Signal, 1) signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill) ticker := time.NewTicker(1 * time.Minute) + defer StopProjects() for { RefreshProjects(mainClient) select {