You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

181 lines
4.1 KiB

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/go-redis/redis/v8"
  8. "github.com/hashicorp/go-multierror"
  9. )
  10. type ProjectBackfeedManager struct {
  11. Context context.Context
  12. Cancel context.CancelFunc
  13. Done chan bool
  14. C chan *BackfeedItem
  15. Name string
  16. BackfeedRedis *redis.ClusterClient
  17. ProjectRedis *redis.Client
  18. //Lock sync.RWMutex
  19. ProjectConfig ProjectConfig
  20. }
  21. type ProjectRedisConfig struct {
  22. Host string `json:"host"`
  23. Pass string `json:"pass"`
  24. Port int `json:"port"`
  25. }
  26. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  27. if that.ProjectConfig.RedisConfig == nil && new == nil {
  28. return false
  29. }
  30. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  31. }
  32. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error {
  33. //that.Lock.RLock()
  34. //defer that.Lock.RUnlock()
  35. //if that.C == nil {
  36. // return false
  37. //}
  38. select {
  39. case <-ctx.Done():
  40. return ctx.Err()
  41. case <-that.Context.Done():
  42. return fmt.Errorf("backfeed channel closed")
  43. case that.C <- item:
  44. return nil
  45. //default:
  46. // return fmt.Errorf("backfeed channel full")
  47. }
  48. }
  49. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  50. if blocking {
  51. select {
  52. case <-that.Context.Done():
  53. return nil, false
  54. case item, ok := <-that.C:
  55. return item, ok
  56. }
  57. } else {
  58. select {
  59. case <-that.Context.Done():
  60. return nil, false
  61. case item, ok := <-that.C:
  62. return item, ok
  63. default:
  64. return nil, false
  65. }
  66. }
  67. }
  68. func (that *ProjectBackfeedManager) Do() {
  69. defer close(that.Done)
  70. //defer that.CloseItemChannel()
  71. defer that.Cancel()
  72. pipe := that.BackfeedRedis.Pipeline()
  73. for {
  74. select {
  75. case <-that.Context.Done():
  76. break
  77. case <-that.Done:
  78. break
  79. default:
  80. }
  81. item, ok := that.PopItem(true)
  82. if !ok {
  83. break
  84. }
  85. keyMap := map[string][][]byte{}
  86. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  87. keyMap[key] = append(keyMap[key], item.Item)
  88. wrapped := 1
  89. for wrapped < ItemWrapSize {
  90. item, ok := that.PopItem(false)
  91. if !ok {
  92. break
  93. }
  94. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  95. keyMap[key] = append(keyMap[key], item.Item)
  96. wrapped++
  97. }
  98. select {
  99. case <-that.Context.Done():
  100. break
  101. case <-that.Done:
  102. break
  103. default:
  104. }
  105. now := time.Now()
  106. resultMap := map[string]*redis.Cmd{}
  107. lastTS := make([]any, 0, len(keyMap)*2)
  108. for key := range keyMap {
  109. lastTS = append(lastTS, key)
  110. lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix()))
  111. }
  112. pipe.HSet(context.Background(), ":last_ts", lastTS...)
  113. try := 0
  114. var sAddItems []any
  115. for {
  116. for key, items := range keyMap {
  117. args := []any{
  118. "bf.madd",
  119. key,
  120. }
  121. for _, item := range items {
  122. args = append(args, item)
  123. }
  124. resultMap[key] = pipe.Do(context.Background(), args...)
  125. }
  126. if _, err := pipe.Exec(context.Background()); err != nil {
  127. log.Printf("%s", err)
  128. }
  129. var err error
  130. for key, items := range keyMap {
  131. res, cmdErr := resultMap[key].BoolSlice()
  132. if cmdErr != nil {
  133. err = multierror.Append(err, cmdErr)
  134. continue
  135. }
  136. if len(res) != len(keyMap[key]) {
  137. err = multierror.Append(err, fmt.Errorf("invalid response length for %s: %d != %d", key, len(res), len(keyMap[key])))
  138. continue
  139. }
  140. for i, v := range res {
  141. if v {
  142. sAddItems = append(sAddItems, items[i])
  143. }
  144. }
  145. delete(keyMap, key)
  146. }
  147. if err == nil {
  148. break
  149. }
  150. log.Printf("%s", err)
  151. time.Sleep(time.Duration(try) * time.Second)
  152. try++
  153. }
  154. dupes := wrapped - len(sAddItems)
  155. try = 0
  156. if len(sAddItems) != 0 {
  157. for {
  158. if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil {
  159. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  160. time.Sleep(time.Duration(try) * time.Second)
  161. try++
  162. } else {
  163. break
  164. }
  165. }
  166. }
  167. if dupes > 0 {
  168. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  169. }
  170. }
  171. }