You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

193 lines
4.5 KiB

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/go-redis/redis/v8"
  8. "github.com/hashicorp/go-multierror"
  9. )
  10. type ProjectBackfeedManager struct {
  11. Context context.Context
  12. Cancel context.CancelFunc
  13. Done chan bool
  14. C chan *BackfeedItem
  15. Name string
  16. BackfeedRedis *redis.ClusterClient
  17. ProjectRedis *redis.Client
  18. //Lock sync.RWMutex
  19. ProjectConfig ProjectConfig
  20. }
  21. type ProjectRedisConfig struct {
  22. Host string `json:"host"`
  23. Pass string `json:"pass"`
  24. Port int `json:"port"`
  25. }
  26. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  27. if that.ProjectConfig.RedisConfig == nil && new == nil {
  28. return false
  29. }
  30. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  31. }
  32. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error {
  33. //that.Lock.RLock()
  34. //defer that.Lock.RUnlock()
  35. //if that.C == nil {
  36. // return false
  37. //}
  38. select {
  39. case <-ctx.Done():
  40. return ctx.Err()
  41. case <-that.Context.Done():
  42. return fmt.Errorf("backfeed channel closed")
  43. case that.C <- item:
  44. return nil
  45. //default:
  46. // return fmt.Errorf("backfeed channel full")
  47. }
  48. }
  49. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  50. if blocking {
  51. select {
  52. case <-that.Context.Done():
  53. return nil, false
  54. case item, ok := <-that.C:
  55. return item, ok
  56. }
  57. } else {
  58. select {
  59. case <-that.Context.Done():
  60. return nil, false
  61. case item, ok := <-that.C:
  62. return item, ok
  63. default:
  64. return nil, false
  65. }
  66. }
  67. }
  68. var Tag = struct{}{}
  69. func (that *ProjectBackfeedManager) Do() {
  70. defer close(that.Done)
  71. defer that.Cancel()
  72. pipe := that.BackfeedRedis.Pipeline()
  73. for {
  74. select {
  75. case <-that.Context.Done():
  76. break
  77. case <-that.Done:
  78. break
  79. default:
  80. }
  81. keyMap := map[string][][]byte{}
  82. var sAddItems []any
  83. skipFeedItems := map[string]struct{}{}
  84. wrapped := 0
  85. for wrapped < ItemWrapSize {
  86. item, ok := that.PopItem(wrapped == 0)
  87. if !ok {
  88. break
  89. }
  90. if item.SkipBloom {
  91. sAddItems = append(sAddItems, item.Item)
  92. } else {
  93. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  94. keyMap[key] = append(keyMap[key], item.Item)
  95. if item.SkipFeed {
  96. skipFeedItems[string(item.Item)] = Tag
  97. }
  98. }
  99. wrapped++
  100. }
  101. if wrapped == 0 {
  102. break
  103. }
  104. if len(keyMap) > 0 {
  105. try := 0
  106. lastTS := make([]any, 0, len(keyMap)*2)
  107. now := time.Now()
  108. for key := range keyMap {
  109. lastTS = append(lastTS, key)
  110. lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix()))
  111. }
  112. resultMap := map[string]*redis.Cmd{}
  113. pipe.HSet(context.Background(), ":last_ts", lastTS...)
  114. for {
  115. for key, items := range keyMap {
  116. args := []any{
  117. "bf.madd",
  118. key,
  119. }
  120. for _, item := range items {
  121. args = append(args, item)
  122. }
  123. resultMap[key] = pipe.Do(context.Background(), args...)
  124. }
  125. if _, err := pipe.Exec(context.Background()); err != nil {
  126. log.Printf("%s", err)
  127. }
  128. var err error
  129. for key, items := range keyMap {
  130. res, cmdErr := resultMap[key].BoolSlice()
  131. if cmdErr != nil {
  132. err = multierror.Append(err, cmdErr)
  133. continue
  134. }
  135. if len(res) != len(keyMap[key]) {
  136. err = multierror.Append(err, fmt.Errorf("invalid response length for %s: %d != %d", key, len(res), len(keyMap[key])))
  137. continue
  138. }
  139. for i, v := range res {
  140. if v {
  141. sAddItems = append(sAddItems, items[i])
  142. }
  143. }
  144. delete(keyMap, key)
  145. }
  146. if err == nil {
  147. break
  148. }
  149. log.Printf("%s", err)
  150. time.Sleep(time.Duration(try) * time.Second)
  151. try++
  152. }
  153. }
  154. dupes := wrapped - len(sAddItems)
  155. if len(sAddItems) > 0 && len(skipFeedItems) > 0 {
  156. sAddItemsFiltered := make([]any, 0, len(sAddItems))
  157. for _, item := range sAddItems {
  158. itemBytes := item.([]byte)
  159. itemString := string(itemBytes)
  160. if _, exists := skipFeedItems[itemString]; !exists {
  161. sAddItemsFiltered = append(sAddItemsFiltered, item)
  162. }
  163. }
  164. sAddItems = sAddItemsFiltered
  165. }
  166. if len(sAddItems) > 0 {
  167. try := 0
  168. for {
  169. if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil {
  170. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  171. time.Sleep(time.Duration(try) * time.Second)
  172. try++
  173. } else {
  174. break
  175. }
  176. }
  177. }
  178. if dupes > 0 {
  179. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  180. }
  181. }
  182. }