|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- package main
-
- import (
- "context"
- "fmt"
- "log"
- "time"
-
- "github.com/go-redis/redis/v8"
- "github.com/hashicorp/go-multierror"
- )
-
- type ProjectBackfeedManager struct {
- Context context.Context
- Cancel context.CancelFunc
- Done chan bool
- C chan *BackfeedItem
- Name string
- BackfeedRedis *redis.ClusterClient
- ProjectRedis *redis.Client
- //Lock sync.RWMutex
- ProjectConfig ProjectConfig
- }
-
- type ProjectRedisConfig struct {
- Host string `json:"host"`
- Pass string `json:"pass"`
- Port int `json:"port"`
- }
-
- func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
- if that.ProjectConfig.RedisConfig == nil && new == nil {
- return false
- }
- return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
- }
-
- func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error {
- //that.Lock.RLock()
- //defer that.Lock.RUnlock()
- //if that.C == nil {
- // return false
- //}
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-that.Context.Done():
- return fmt.Errorf("backfeed channel closed")
- case that.C <- item:
- return nil
- //default:
- // return fmt.Errorf("backfeed channel full")
- }
- }
-
- func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
- if blocking {
- select {
- case <-that.Context.Done():
- return nil, false
- case item, ok := <-that.C:
- return item, ok
- }
- } else {
- select {
- case <-that.Context.Done():
- return nil, false
- case item, ok := <-that.C:
- return item, ok
- default:
- return nil, false
- }
- }
- }
-
- var Tag = struct{}{}
-
- func (that *ProjectBackfeedManager) Do() {
- defer close(that.Done)
- defer that.Cancel()
-
- pipe := that.BackfeedRedis.Pipeline()
- for {
- select {
- case <-that.Context.Done():
- break
- case <-that.Done:
- break
- default:
- }
- keyMap := map[string][][]byte{}
- var sAddItems []any
- skipFeedItems := map[string]struct{}{}
- wrapped := 0
- for wrapped < ItemWrapSize {
- item, ok := that.PopItem(wrapped == 0)
- if !ok {
- break
- }
- if item.SkipBloom {
- sAddItems = append(sAddItems, item.Item)
- } else {
- key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
- keyMap[key] = append(keyMap[key], item.Item)
- if item.SkipFeed {
- skipFeedItems[string(item.Item)] = Tag
- }
- }
- wrapped++
- }
- if wrapped == 0 {
- break
- }
- if len(keyMap) > 0 {
- try := 0
- lastTS := make([]any, 0, len(keyMap)*2)
- now := time.Now()
- for key := range keyMap {
- lastTS = append(lastTS, key)
- lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix()))
- }
- resultMap := map[string]*redis.Cmd{}
- pipe.HSet(context.Background(), ":last_ts", lastTS...)
- for {
- for key, items := range keyMap {
- args := []any{
- "bf.madd",
- key,
- }
- for _, item := range items {
- args = append(args, item)
- }
- resultMap[key] = pipe.Do(context.Background(), args...)
- }
- if _, err := pipe.Exec(context.Background()); err != nil {
- log.Printf("%s", err)
- }
- var err error
- for key, items := range keyMap {
- res, cmdErr := resultMap[key].BoolSlice()
- if cmdErr != nil {
- err = multierror.Append(err, cmdErr)
- continue
- }
- if len(res) != len(keyMap[key]) {
- err = multierror.Append(err, fmt.Errorf("invalid response length for %s: %d != %d", key, len(res), len(keyMap[key])))
- continue
- }
- for i, v := range res {
- if v {
- sAddItems = append(sAddItems, items[i])
- }
- }
- delete(keyMap, key)
- }
- if err == nil {
- break
- }
- log.Printf("%s", err)
- time.Sleep(time.Duration(try) * time.Second)
- try++
- }
- }
- dupes := wrapped - len(sAddItems)
- if len(sAddItems) > 0 && len(skipFeedItems) > 0 {
- sAddItemsFiltered := make([]any, 0, len(sAddItems))
- for _, item := range sAddItems {
- itemBytes := item.([]byte)
- itemString := string(itemBytes)
- if _, exists := skipFeedItems[itemString]; !exists {
- sAddItemsFiltered = append(sAddItemsFiltered, item)
- }
- }
- sAddItems = sAddItemsFiltered
- }
- if len(sAddItems) > 0 {
- try := 0
- for {
- if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil {
- log.Printf("failed to sadd items for %s: %s", that.Name, err)
- time.Sleep(time.Duration(try) * time.Second)
- try++
- } else {
- break
- }
- }
- }
- if dupes > 0 {
- that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
- }
- }
- }
|