#2 Support stash:* queues registered in stashes set.

Отворено
arkiver wants to merge 2 commits from stashes into master
  1. +9
    -0
      main.go

+ 9
- 0
main.go Прегледај датотеку

@@ -86,6 +86,7 @@ func (that *Offloader) RefreshQueues() error {
pipe := that.RedisClient.Pipeline()
prioritiesCmdRes := pipe.ZRange(that.Context, fmt.Sprintf("%s:priorities", that.Name), 0, -1)
filtersCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:filters", that.Name))
stashesCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:stashes", that.Name))
_, err := pipe.Exec(that.Context)
if err != nil {
return err
@@ -98,6 +99,10 @@ func (that *Offloader) RefreshQueues() error {
if err != nil {
return err
}
stashes, err := stashesCmdRes.Result()
if err != nil {
return err
}
setQueueMap := map[string]string{
"todo": "todo",
"todo:secondary": "todo:secondary",
@@ -105,6 +110,7 @@ func (that *Offloader) RefreshQueues() error {
"todo:backfeed": "todo:backfeed",
"done": "done",
"unretrievable": "unretrievable",
"backfeed_retry": "backfeed_retry",
}
for _, filter := range filters {
setQueueMap[fmt.Sprintf("filtered:%s", filter)] = "filtered"
@@ -112,6 +118,9 @@ func (that *Offloader) RefreshQueues() error {
for _, priority := range priorities {
setQueueMap[fmt.Sprintf("todo:prio:%s", priority)] = fmt.Sprintf("todo:prio:%s", priority)
}
for _, stash := range stashes {
setQueueMap[fmt.Sprintf("stash:%s", stash)] = fmt.Sprintf("stash:%s", stash)
}
needQueueMap := map[string]bool{}
for setName, queueName := range setQueueMap {
needQueueMap[queueName] = true


Loading…
Откажи
Сачувај