You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

200 lines
4.7 KiB

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/go-redis/redis/v8"
  8. "github.com/hashicorp/go-multierror"
  9. )
  10. type ProjectBackfeedManager struct {
  11. Context context.Context
  12. Cancel context.CancelFunc
  13. Done chan bool
  14. C chan *BackfeedItem
  15. Name string
  16. BackfeedRedis *redis.ClusterClient
  17. ProjectRedis *redis.Client
  18. //Lock sync.RWMutex
  19. ProjectConfig ProjectConfig
  20. }
  21. type ProjectRedisConfig struct {
  22. Host string `json:"host"`
  23. Pass string `json:"pass"`
  24. Port int `json:"port"`
  25. }
  26. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  27. if that.ProjectConfig.RedisConfig == nil && new == nil {
  28. return false
  29. }
  30. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  31. }
  32. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error {
  33. //that.Lock.RLock()
  34. //defer that.Lock.RUnlock()
  35. //if that.C == nil {
  36. // return false
  37. //}
  38. select {
  39. case <-ctx.Done():
  40. return ctx.Err()
  41. case <-that.Context.Done():
  42. return fmt.Errorf("backfeed channel closed")
  43. case that.C <- item:
  44. return nil
  45. //default:
  46. // return fmt.Errorf("backfeed channel full")
  47. }
  48. }
  49. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  50. if blocking {
  51. select {
  52. case <-that.Context.Done():
  53. return nil, false
  54. case item, ok := <-that.C:
  55. return item, ok
  56. }
  57. } else {
  58. select {
  59. case <-that.Context.Done():
  60. return nil, false
  61. case item, ok := <-that.C:
  62. return item, ok
  63. default:
  64. return nil, false
  65. }
  66. }
  67. }
  68. var Tag = struct{}{}
  69. func (that *ProjectBackfeedManager) Do() {
  70. defer close(that.Done)
  71. defer that.Cancel()
  72. pipe := that.BackfeedRedis.Pipeline()
  73. for {
  74. select {
  75. case <-that.Context.Done():
  76. break
  77. case <-that.Done:
  78. break
  79. default:
  80. }
  81. item, ok := that.PopItem(true)
  82. if !ok {
  83. break
  84. }
  85. keyMap := map[string][][]byte{}
  86. var sAddItems []any
  87. if item.SkipBloom {
  88. sAddItems = append(sAddItems, item.Item)
  89. continue
  90. } else {
  91. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  92. keyMap[key] = append(keyMap[key], item.Item)
  93. }
  94. wrapped := 1
  95. skipFeedItems := map[string]struct{}{}
  96. for wrapped < ItemWrapSize {
  97. item, ok := that.PopItem(false)
  98. if !ok {
  99. break
  100. }
  101. if item.SkipBloom {
  102. sAddItems = append(sAddItems, item.Item)
  103. continue
  104. } else {
  105. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  106. keyMap[key] = append(keyMap[key], item.Item)
  107. }
  108. if item.SkipFeed {
  109. skipFeedItems[string(item.Item)] = Tag
  110. }
  111. wrapped++
  112. }
  113. if len(keyMap) > 0 {
  114. try := 0
  115. lastTS := make([]any, 0, len(keyMap)*2)
  116. now := time.Now()
  117. for key := range keyMap {
  118. lastTS = append(lastTS, key)
  119. lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix()))
  120. }
  121. resultMap := map[string]*redis.Cmd{}
  122. pipe.HSet(context.Background(), ":last_ts", lastTS...)
  123. for {
  124. for key, items := range keyMap {
  125. args := []any{
  126. "bf.madd",
  127. key,
  128. }
  129. for _, item := range items {
  130. args = append(args, item)
  131. }
  132. resultMap[key] = pipe.Do(context.Background(), args...)
  133. }
  134. if _, err := pipe.Exec(context.Background()); err != nil {
  135. log.Printf("%s", err)
  136. }
  137. var err error
  138. for key, items := range keyMap {
  139. res, cmdErr := resultMap[key].BoolSlice()
  140. if cmdErr != nil {
  141. err = multierror.Append(err, cmdErr)
  142. continue
  143. }
  144. if len(res) != len(keyMap[key]) {
  145. err = multierror.Append(err, fmt.Errorf("invalid response length for %s: %d != %d", key, len(res), len(keyMap[key])))
  146. continue
  147. }
  148. for i, v := range res {
  149. if v {
  150. sAddItems = append(sAddItems, items[i])
  151. }
  152. }
  153. delete(keyMap, key)
  154. }
  155. if err == nil {
  156. break
  157. }
  158. log.Printf("%s", err)
  159. time.Sleep(time.Duration(try) * time.Second)
  160. try++
  161. }
  162. }
  163. dupes := wrapped - len(sAddItems)
  164. if len(sAddItems) > 0 && len(skipFeedItems) > 0 {
  165. sAddItemsFiltered := make([]any, 0, len(sAddItems))
  166. for _, item := range sAddItems {
  167. if _, exists := skipFeedItems[item.(string)]; !exists {
  168. sAddItemsFiltered = append(sAddItemsFiltered, item)
  169. }
  170. }
  171. sAddItems = sAddItemsFiltered
  172. }
  173. if len(sAddItems) > 0 {
  174. try := 0
  175. for {
  176. if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil {
  177. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  178. time.Sleep(time.Duration(try) * time.Second)
  179. try++
  180. } else {
  181. break
  182. }
  183. }
  184. }
  185. if dupes > 0 {
  186. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  187. }
  188. }
  189. }