|
|
@@ -86,6 +86,7 @@ func (that *Offloader) RefreshQueues() error { |
|
|
|
pipe := that.RedisClient.Pipeline() |
|
|
|
prioritiesCmdRes := pipe.ZRange(that.Context, fmt.Sprintf("%s:priorities", that.Name), 0, -1) |
|
|
|
filtersCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:filters", that.Name)) |
|
|
|
stashesCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:stashes", that.Name)) |
|
|
|
_, err := pipe.Exec(that.Context) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
@@ -98,6 +99,10 @@ func (that *Offloader) RefreshQueues() error { |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
stashes, err := stashesCmdRes.Result() |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
setQueueMap := map[string]string{ |
|
|
|
"todo": "todo", |
|
|
|
"todo:secondary": "todo:secondary", |
|
|
@@ -105,6 +110,7 @@ func (that *Offloader) RefreshQueues() error { |
|
|
|
"todo:backfeed": "todo:backfeed", |
|
|
|
"done": "done", |
|
|
|
"unretrievable": "unretrievable", |
|
|
|
"backfeed_retry": "backfeed_retry", |
|
|
|
} |
|
|
|
for _, filter := range filters { |
|
|
|
setQueueMap[fmt.Sprintf("filtered:%s", filter)] = "filtered" |
|
|
@@ -112,6 +118,9 @@ func (that *Offloader) RefreshQueues() error { |
|
|
|
for _, priority := range priorities { |
|
|
|
setQueueMap[fmt.Sprintf("todo:prio:%s", priority)] = fmt.Sprintf("todo:prio:%s", priority) |
|
|
|
} |
|
|
|
for _, stash := range stashes { |
|
|
|
setQueueMap[fmt.Sprintf("stash:%s", stash)] = fmt.Sprintf("stash:%s", stash) |
|
|
|
} |
|
|
|
needQueueMap := map[string]bool{} |
|
|
|
for setName, queueName := range setQueueMap { |
|
|
|
needQueueMap[queueName] = true |
|
|
|