Compare commits

...

2 Commits

1 changed files with 18 additions and 2 deletions
Unified View
  1. +18
    -2
      main.go

+ 18
- 2
main.go View File

@@ -77,8 +77,8 @@ func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *Backfeed
return fmt.Errorf("backfeed channel closed") return fmt.Errorf("backfeed channel closed")
case that.C <- item: case that.C <- item:
return nil return nil
//default:
// return fmt.Errorf("backfeed channel full")
default:
return fmt.Errorf("backfeed channel full")
} }
} }


@@ -169,14 +169,30 @@ func (that *ProjectBackfeedManager) Do() {
} }
resultMap[key] = pipe.Do(context.Background(), args...) resultMap[key] = pipe.Do(context.Background(), args...)
} }
var sAddErrItems []any
if _, err := pipe.Exec(context.Background()); err != nil { if _, err := pipe.Exec(context.Background()); err != nil {
log.Printf("%s", err) log.Printf("%s", err)
for _, items := range keyMap {
for _, item := range items {
sAddErrItems = append(sAddErrItems, item)
}
}
if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:backfeed_retry", that.Name), sAddErrItems...).Err(); err != nil {
log.Printf("failed to sadd failed items for %s: %s", that.Name, err)
}
continue
} }
var sAddItems []any var sAddItems []any
for key, items := range keyMap { for key, items := range keyMap {
res, err := resultMap[key].BoolSlice() res, err := resultMap[key].BoolSlice()
if err != nil { if err != nil {
log.Printf("%s", err) log.Printf("%s", err)
for _, item := range items {
sAddErrItems = append(sAddErrItems, item)
}
if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:backfeed_retry", that.Name), sAddErrItems...).Err(); err != nil {
log.Printf("failed to sadd failed items for %s: %s", that.Name, err)
}
continue continue
} }
if len(res) != len(keyMap[key]) { if len(res) != len(keyMap[key]) {


Loading…
Cancel
Save