#2 Support stash:* queues registered in stashes set.

Offen
arkiver möchte 2 Commits von stashes nach master zusammenführen
  1. +9
    -0
      main.go

+ 9
- 0
main.go Datei anzeigen

@@ -86,6 +86,7 @@ func (that *Offloader) RefreshQueues() error {
pipe := that.RedisClient.Pipeline() pipe := that.RedisClient.Pipeline()
prioritiesCmdRes := pipe.ZRange(that.Context, fmt.Sprintf("%s:priorities", that.Name), 0, -1) prioritiesCmdRes := pipe.ZRange(that.Context, fmt.Sprintf("%s:priorities", that.Name), 0, -1)
filtersCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:filters", that.Name)) filtersCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:filters", that.Name))
stashesCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:stashes", that.Name))
_, err := pipe.Exec(that.Context) _, err := pipe.Exec(that.Context)
if err != nil { if err != nil {
return err return err
@@ -98,6 +99,10 @@ func (that *Offloader) RefreshQueues() error {
if err != nil { if err != nil {
return err return err
} }
stashes, err := stashesCmdRes.Result()
if err != nil {
return err
}
setQueueMap := map[string]string{ setQueueMap := map[string]string{
"todo": "todo", "todo": "todo",
"todo:secondary": "todo:secondary", "todo:secondary": "todo:secondary",
@@ -105,6 +110,7 @@ func (that *Offloader) RefreshQueues() error {
"todo:backfeed": "todo:backfeed", "todo:backfeed": "todo:backfeed",
"done": "done", "done": "done",
"unretrievable": "unretrievable", "unretrievable": "unretrievable",
"backfeed_retry": "backfeed_retry",
} }
for _, filter := range filters { for _, filter := range filters {
setQueueMap[fmt.Sprintf("filtered:%s", filter)] = "filtered" setQueueMap[fmt.Sprintf("filtered:%s", filter)] = "filtered"
@@ -112,6 +118,9 @@ func (that *Offloader) RefreshQueues() error {
for _, priority := range priorities { for _, priority := range priorities {
setQueueMap[fmt.Sprintf("todo:prio:%s", priority)] = fmt.Sprintf("todo:prio:%s", priority) setQueueMap[fmt.Sprintf("todo:prio:%s", priority)] = fmt.Sprintf("todo:prio:%s", priority)
} }
for _, stash := range stashes {
setQueueMap[fmt.Sprintf("stash:%s", stash)] = fmt.Sprintf("stash:%s", stash)
}
needQueueMap := map[string]bool{} needQueueMap := map[string]bool{}
for setName, queueName := range setQueueMap { for setName, queueName := range setQueueMap {
needQueueMap[queueName] = true needQueueMap[queueName] = true


Laden…
Abbrechen
Speichern