diff --git a/globalbackfeedmanager.go b/globalbackfeedmanager.go index 6c9f14e..025bb51 100644 --- a/globalbackfeedmanager.go +++ b/globalbackfeedmanager.go @@ -168,6 +168,13 @@ func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *ht return } + skipBloom := req.URL.Query().Get("skipbloom") != "" + skipFeed := req.URL.Query().Get("skipfeed") != "" + if skipBloom && skipFeed { + WriteResponse(res, http.StatusBadRequest, fmt.Errorf("skipbloom and skipfeed are mutually exclusive")) + return + } + projectBackfeedManager := that.GetFeed(slug) if projectBackfeedManager == nil { WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel")) @@ -214,6 +221,8 @@ func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *ht PrimaryShard: GenShardHash(bcopy), SecondaryShard: secondaryShard, Item: bcopy, + SkipBloom: skipBloom, + SkipFeed: skipFeed, } if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil { WriteResponse(res, http.StatusInternalServerError, err) diff --git a/projectbackfeedmanager.go b/projectbackfeedmanager.go index 003f61b..023c43a 100644 --- a/projectbackfeedmanager.go +++ b/projectbackfeedmanager.go @@ -73,9 +73,10 @@ func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) } } +var Tag = struct{}{} + func (that *ProjectBackfeedManager) Do() { defer close(that.Done) - //defer that.CloseItemChannel() defer that.Cancel() pipe := that.BackfeedRedis.Pipeline() @@ -92,77 +93,95 @@ func (that *ProjectBackfeedManager) Do() { break } keyMap := map[string][][]byte{} - key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) - keyMap[key] = append(keyMap[key], item.Item) + var sAddItems []any + if item.SkipBloom { + sAddItems = append(sAddItems, item.Item) + continue + } else { + key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) + keyMap[key] = append(keyMap[key], item.Item) + } wrapped := 1 + skipFeedItems := map[string]struct{}{} for wrapped < ItemWrapSize { item, ok := that.PopItem(false) if !ok { break } - key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) - keyMap[key] = append(keyMap[key], item.Item) + if item.SkipBloom { + sAddItems = append(sAddItems, item.Item) + continue + } else { + key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) + keyMap[key] = append(keyMap[key], item.Item) + } + if item.SkipFeed { + skipFeedItems[string(item.Item)] = Tag + } wrapped++ } - select { - case <-that.Context.Done(): - break - case <-that.Done: - break - default: - } - now := time.Now() - resultMap := map[string]*redis.Cmd{} - lastTS := make([]any, 0, len(keyMap)*2) - for key := range keyMap { - lastTS = append(lastTS, key) - lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix())) - } - pipe.HSet(context.Background(), ":last_ts", lastTS...) - try := 0 - var sAddItems []any - for { - for key, items := range keyMap { - args := []any{ - "bf.madd", - key, - } - for _, item := range items { - args = append(args, item) - } - resultMap[key] = pipe.Do(context.Background(), args...) + if len(keyMap) > 0 { + try := 0 + lastTS := make([]any, 0, len(keyMap)*2) + now := time.Now() + for key := range keyMap { + lastTS = append(lastTS, key) + lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix())) } - if _, err := pipe.Exec(context.Background()); err != nil { - log.Printf("%s", err) - } - var err error - for key, items := range keyMap { - res, cmdErr := resultMap[key].BoolSlice() - if cmdErr != nil { - err = multierror.Append(err, cmdErr) - continue + resultMap := map[string]*redis.Cmd{} + pipe.HSet(context.Background(), ":last_ts", lastTS...) + for { + for key, items := range keyMap { + args := []any{ + "bf.madd", + key, + } + for _, item := range items { + args = append(args, item) + } + resultMap[key] = pipe.Do(context.Background(), args...) } - if len(res) != len(keyMap[key]) { - err = multierror.Append(err, fmt.Errorf("invalid response length for %s: %d != %d", key, len(res), len(keyMap[key]))) - continue + if _, err := pipe.Exec(context.Background()); err != nil { + log.Printf("%s", err) } - for i, v := range res { - if v { - sAddItems = append(sAddItems, items[i]) + var err error + for key, items := range keyMap { + res, cmdErr := resultMap[key].BoolSlice() + if cmdErr != nil { + err = multierror.Append(err, cmdErr) + continue + } + if len(res) != len(keyMap[key]) { + err = multierror.Append(err, fmt.Errorf("invalid response length for %s: %d != %d", key, len(res), len(keyMap[key]))) + continue + } + for i, v := range res { + if v { + sAddItems = append(sAddItems, items[i]) + } } + delete(keyMap, key) } - delete(keyMap, key) - } - if err == nil { - break + if err == nil { + break + } + log.Printf("%s", err) + time.Sleep(time.Duration(try) * time.Second) + try++ } - log.Printf("%s", err) - time.Sleep(time.Duration(try) * time.Second) - try++ } dupes := wrapped - len(sAddItems) - try = 0 - if len(sAddItems) != 0 { + if len(sAddItems) > 0 && len(skipFeedItems) > 0 { + sAddItemsFiltered := make([]any, 0, len(sAddItems)) + for _, item := range sAddItems { + if _, exists := skipFeedItems[item.(string)]; !exists { + sAddItemsFiltered = append(sAddItemsFiltered, item) + } + } + sAddItems = sAddItemsFiltered + } + if len(sAddItems) > 0 { + try := 0 for { if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil { log.Printf("failed to sadd items for %s: %s", that.Name, err)