package main import ( "context" "fmt" "log" "time" "github.com/go-redis/redis/v8" "github.com/hashicorp/go-multierror" ) type ProjectBackfeedManager struct { Context context.Context Cancel context.CancelFunc Done chan bool C chan *BackfeedItem Name string BackfeedRedis *redis.ClusterClient ProjectRedis *redis.Client //Lock sync.RWMutex ProjectConfig ProjectConfig } type ProjectRedisConfig struct { Host string `json:"host"` Pass string `json:"pass"` Port int `json:"port"` } func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool { if that.ProjectConfig.RedisConfig == nil && new == nil { return false } return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new } func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error { //that.Lock.RLock() //defer that.Lock.RUnlock() //if that.C == nil { // return false //} select { case <-ctx.Done(): return ctx.Err() case <-that.Context.Done(): return fmt.Errorf("backfeed channel closed") case that.C <- item: return nil //default: // return fmt.Errorf("backfeed channel full") } } func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) { if blocking { select { case <-that.Context.Done(): return nil, false case item, ok := <-that.C: return item, ok } } else { select { case <-that.Context.Done(): return nil, false case item, ok := <-that.C: return item, ok default: return nil, false } } } var Tag = struct{}{} func (that *ProjectBackfeedManager) Do() { defer close(that.Done) defer that.Cancel() pipe := that.BackfeedRedis.Pipeline() for { select { case <-that.Context.Done(): break case <-that.Done: break default: } keyMap := map[string][][]byte{} var sAddItems []any skipFeedItems := map[string]struct{}{} wrapped := 0 for wrapped < ItemWrapSize { item, ok := that.PopItem(wrapped == 0) if !ok { break } if item.SkipBloom { sAddItems = append(sAddItems, item.Item) } else { key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) keyMap[key] = append(keyMap[key], item.Item) if item.SkipFeed { skipFeedItems[string(item.Item)] = Tag } } wrapped++ } if wrapped == 0 { break } if len(keyMap) > 0 { try := 0 lastTS := make([]any, 0, len(keyMap)*2) now := time.Now() for key := range keyMap { lastTS = append(lastTS, key) lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix())) } resultMap := map[string]*redis.Cmd{} pipe.HSet(context.Background(), ":last_ts", lastTS...) for { for key, items := range keyMap { args := []any{ "bf.madd", key, } for _, item := range items { args = append(args, item) } resultMap[key] = pipe.Do(context.Background(), args...) } if _, err := pipe.Exec(context.Background()); err != nil { log.Printf("%s", err) } var err error for key, items := range keyMap { res, cmdErr := resultMap[key].BoolSlice() if cmdErr != nil { err = multierror.Append(err, cmdErr) continue } if len(res) != len(keyMap[key]) { err = multierror.Append(err, fmt.Errorf("invalid response length for %s: %d != %d", key, len(res), len(keyMap[key]))) continue } for i, v := range res { if v { sAddItems = append(sAddItems, items[i]) } } delete(keyMap, key) } if err == nil { break } log.Printf("%s", err) time.Sleep(time.Duration(try) * time.Second) try++ } } dupes := wrapped - len(sAddItems) if len(sAddItems) > 0 && len(skipFeedItems) > 0 { sAddItemsFiltered := make([]any, 0, len(sAddItems)) for _, item := range sAddItems { itemBytes := item.([]byte) itemString := string(itemBytes) if _, exists := skipFeedItems[itemString]; !exists { sAddItemsFiltered = append(sAddItemsFiltered, item) } } sAddItems = sAddItemsFiltered } if len(sAddItems) > 0 { try := 0 for { if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil { log.Printf("failed to sadd items for %s: %s", that.Name, err) time.Sleep(time.Duration(try) * time.Second) try++ } else { break } } } if dupes > 0 { that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes)) } } }