You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

323 lines
8.9 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "compress/flate"
  5. "compress/gzip"
  6. "context"
  7. "encoding/json"
  8. "fmt"
  9. "io"
  10. "log"
  11. "net/http"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/go-redis/redis/v8"
  16. "github.com/gorilla/mux"
  17. "github.com/tevino/abool/v2"
  18. )
  19. type GlobalBackfeedManager struct {
  20. Context context.Context
  21. Cancel context.CancelFunc
  22. ActiveFeeds map[string]*ProjectBackfeedManager
  23. ActiveSlugs map[string]string
  24. TrackerRedis *redis.Client
  25. BackfeedRedis *redis.ClusterClient
  26. Lock sync.RWMutex
  27. Populated *abool.AtomicBool
  28. }
  29. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  30. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  31. if err != nil {
  32. return err
  33. }
  34. var projects []string
  35. projectSlugMap := map[string][]string{}
  36. for slug, project := range slugProjectMap {
  37. projectSlugMap[project] = append(projectSlugMap[project], slug)
  38. }
  39. for project := range projectSlugMap {
  40. projects = append(projects, project)
  41. }
  42. projectConfigs := map[string]ProjectConfig{}
  43. if len(projects) != 0 {
  44. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  45. if err != nil {
  46. return err
  47. }
  48. if len(projects) != len(cfgi) {
  49. return fmt.Errorf("hmget result had unexpected length")
  50. }
  51. for i, project := range projects {
  52. configString, ok := cfgi[i].(string)
  53. if !ok {
  54. continue
  55. }
  56. config := ProjectConfig{}
  57. if err := json.Unmarshal([]byte(configString), &config); err != nil {
  58. continue
  59. }
  60. projectConfigs[project] = config
  61. }
  62. }
  63. projects = nil
  64. for project := range projectSlugMap {
  65. if _, has := projectConfigs[project]; !has {
  66. delete(projectSlugMap, project)
  67. continue
  68. }
  69. projects = append(projects, project)
  70. }
  71. for slug, project := range slugProjectMap {
  72. if _, has := projectConfigs[project]; !has {
  73. delete(slugProjectMap, slug)
  74. }
  75. }
  76. // add feeds for new projects
  77. for _, project := range projects {
  78. projectConfig := projectConfigs[project]
  79. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  80. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  81. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  82. outdatedProjectBackfeedManager = projectBackfeedManager
  83. } else {
  84. continue
  85. }
  86. }
  87. ctx, cancel := context.WithCancel(that.Context)
  88. projectBackfeedManager := &ProjectBackfeedManager{
  89. Context: ctx,
  90. Cancel: cancel,
  91. Done: make(chan bool),
  92. C: make(chan *BackfeedItem, ItemChannelBuffer),
  93. BackfeedRedis: that.BackfeedRedis,
  94. Name: project,
  95. ProjectConfig: projectConfig,
  96. }
  97. if projectConfig.RedisConfig != nil {
  98. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  99. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  100. Username: "default",
  101. Password: projectConfig.RedisConfig.Pass,
  102. ReadTimeout: 15 * time.Minute,
  103. })
  104. } else {
  105. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  106. }
  107. go projectBackfeedManager.Do()
  108. that.Lock.Lock()
  109. that.ActiveFeeds[project] = projectBackfeedManager
  110. that.Lock.Unlock()
  111. if outdatedProjectBackfeedManager != nil {
  112. outdatedProjectBackfeedManager.Cancel()
  113. <-outdatedProjectBackfeedManager.Done
  114. log.Printf("updated project: %s", project)
  115. } else {
  116. log.Printf("added project: %s", project)
  117. }
  118. }
  119. that.Lock.Lock()
  120. that.ActiveSlugs = slugProjectMap
  121. that.Lock.Unlock()
  122. // remove feeds for old projects
  123. for project, projectBackfeedManager := range that.ActiveFeeds {
  124. if _, has := projectSlugMap[project]; has {
  125. continue
  126. }
  127. log.Printf("removing project: %s", project)
  128. that.Lock.Lock()
  129. delete(that.ActiveFeeds, project)
  130. that.Lock.Unlock()
  131. projectBackfeedManager.Cancel()
  132. <-projectBackfeedManager.Done
  133. log.Printf("removed project: %s", project)
  134. }
  135. if !that.Populated.IsSet() {
  136. that.Populated.Set()
  137. }
  138. return nil
  139. }
  140. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  141. that.Lock.RLock()
  142. defer that.Lock.RUnlock()
  143. project, has := that.ActiveSlugs[slug]
  144. if !has {
  145. return nil
  146. }
  147. projectBackfeedManager, has := that.ActiveFeeds[project]
  148. if !has {
  149. return nil
  150. }
  151. return projectBackfeedManager
  152. }
  153. func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) {
  154. defer req.Body.Close()
  155. vars := mux.Vars(req)
  156. slug := vars["slug"]
  157. secondaryShard := req.URL.Query().Get("shard")
  158. if strings.ContainsAny(secondaryShard, ":/") {
  159. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid shard name"))
  160. return
  161. }
  162. projectBackfeedManager := that.GetFeed(slug)
  163. if projectBackfeedManager == nil {
  164. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  165. return
  166. }
  167. splitter := &Splitter{
  168. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  169. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  170. }
  171. if len(splitter.Delimiter) == 0 {
  172. splitter.Delimiter = []byte{0x00}
  173. }
  174. var body io.ReadCloser
  175. switch req.Header.Get("Content-Encoding") {
  176. case "":
  177. body = req.Body
  178. case "gzip":
  179. var err error
  180. body, err = gzip.NewReader(req.Body)
  181. if err != nil {
  182. WriteResponse(res, http.StatusBadRequest, err)
  183. return
  184. }
  185. defer body.Close()
  186. case "deflate":
  187. body = flate.NewReader(req.Body)
  188. defer body.Close()
  189. default:
  190. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("unsupported Content-Encoding: %s", req.Header.Get("Content-Encoding")))
  191. }
  192. scanner := bufio.NewScanner(body)
  193. scanner.Split(splitter.Split)
  194. n := 0
  195. for scanner.Scan() {
  196. b := scanner.Bytes()
  197. if len(b) == 0 {
  198. continue
  199. }
  200. bcopy := make([]byte, len(b))
  201. copy(bcopy, b)
  202. item := &BackfeedItem{
  203. PrimaryShard: GenShardHash(bcopy),
  204. SecondaryShard: secondaryShard,
  205. Item: bcopy,
  206. }
  207. if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil {
  208. WriteResponse(res, http.StatusInternalServerError, err)
  209. return
  210. }
  211. n++
  212. }
  213. if err := scanner.Err(); err != nil {
  214. WriteResponse(res, http.StatusBadRequest, err)
  215. return
  216. }
  217. WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
  218. return
  219. }
  220. func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) {
  221. if that.Populated.IsNotSet() {
  222. WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated"))
  223. return
  224. }
  225. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  226. client.ClientGetName(ctx)
  227. return client.Ping(ctx).Err()
  228. }); err != nil {
  229. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err))
  230. return
  231. }
  232. WriteResponse(res, http.StatusOK, "ok")
  233. }
  234. func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) {
  235. WriteResponse(res, http.StatusOK, "pong")
  236. }
  237. func (that *GlobalBackfeedManager) HandleUnlink(res http.ResponseWriter, req *http.Request) {
  238. vars := mux.Vars(req)
  239. key := vars["key"]
  240. if strings.Count(key, ":") < 2 {
  241. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format"))
  242. return
  243. }
  244. lock := sync.Mutex{}
  245. keys := []string{}
  246. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  247. cursor := uint64(0)
  248. var shardKeys []string
  249. for {
  250. var err error
  251. var keysBatch []string
  252. keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result()
  253. if err != nil && err != redis.Nil {
  254. return err
  255. }
  256. shardKeys = append(shardKeys, keysBatch...)
  257. if cursor == 0 {
  258. break
  259. }
  260. }
  261. lock.Lock()
  262. defer lock.Unlock()
  263. keys = append(keys, shardKeys...)
  264. return nil
  265. }); err != nil && err != redis.Nil {
  266. WriteResponse(res, http.StatusInternalServerError, err)
  267. return
  268. }
  269. pipe := that.BackfeedRedis.Pipeline()
  270. pipe.HDel(req.Context(), ":last_ts", keys...)
  271. for _, key := range keys {
  272. pipe.Unlink(req.Context(), key)
  273. }
  274. if _, err := pipe.Exec(req.Context()); err != nil && err != redis.Nil {
  275. WriteResponse(res, http.StatusInternalServerError, err)
  276. return
  277. }
  278. WriteResponse(res, http.StatusOK, keys)
  279. }
  280. func (that *GlobalBackfeedManager) HandleRedisInfo(res http.ResponseWriter, req *http.Request) {
  281. infos := map[string]string{}
  282. lock := sync.Mutex{}
  283. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  284. if info, err := client.Info(ctx, "all").Result(); err != nil && err != redis.Nil {
  285. return err
  286. } else {
  287. lock.Lock()
  288. defer lock.Unlock()
  289. infos[client.String()] = info
  290. }
  291. return nil
  292. }); err != nil {
  293. WriteResponse(res, http.StatusInternalServerError, err)
  294. return
  295. }
  296. WriteResponse(res, http.StatusOK, infos)
  297. }
  298. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  299. that.Populated.UnSet()
  300. that.Cancel()
  301. for project, projectBackfeedManager := range that.ActiveFeeds {
  302. log.Printf("waiting for %s channel to shut down...", project)
  303. <-projectBackfeedManager.Done
  304. delete(that.ActiveFeeds, project)
  305. }
  306. }