Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

617 lignes
16 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net/http"
  11. _ "net/http/pprof"
  12. "os"
  13. "os/signal"
  14. "strings"
  15. "sync"
  16. "syscall"
  17. "time"
  18. redisprom "github.com/globocom/go-redis-prometheus"
  19. "github.com/go-redis/redis/v8"
  20. "github.com/gorilla/mux"
  21. "github.com/prometheus/client_golang/prometheus/promhttp"
  22. "github.com/tevino/abool/v2"
  23. )
  24. const (
  25. ItemChannelBuffer = 100000
  26. ItemWrapSize = 100000
  27. )
  28. type ProjectRedisConfig struct {
  29. Host string `json:"host"`
  30. Pass string `json:"pass"`
  31. Port int `json:"port"`
  32. }
  33. type ProjectConfig struct {
  34. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  35. }
  36. type BackfeedItem struct {
  37. PrimaryShard byte
  38. SecondaryShard string
  39. Item []byte
  40. }
  41. type ProjectBackfeedManager struct {
  42. Context context.Context
  43. Cancel context.CancelFunc
  44. Done chan bool
  45. C chan *BackfeedItem
  46. Name string
  47. BackfeedRedis *redis.ClusterClient
  48. ProjectRedis *redis.Client
  49. LegacyRedis *redis.Client
  50. Lock sync.RWMutex
  51. ProjectConfig ProjectConfig
  52. }
  53. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  54. if that.ProjectConfig.RedisConfig == nil && new == nil {
  55. return false
  56. }
  57. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  58. }
  59. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) bool {
  60. that.Lock.RLock()
  61. defer that.Lock.RUnlock()
  62. if that.C == nil {
  63. return false
  64. }
  65. select {
  66. case <-ctx.Done():
  67. return false
  68. case <-that.Context.Done():
  69. return false
  70. case that.C <- item:
  71. return true
  72. }
  73. }
  74. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  75. if blocking {
  76. select {
  77. case <-that.Context.Done():
  78. return nil, false
  79. case item := <-that.C:
  80. return item, true
  81. }
  82. } else {
  83. select {
  84. case <-that.Context.Done():
  85. return nil, false
  86. case item := <-that.C:
  87. return item, true
  88. default:
  89. return nil, false
  90. }
  91. }
  92. }
  93. func (that *ProjectBackfeedManager) CloseItemChannel() {
  94. that.Lock.Lock()
  95. defer that.Lock.Unlock()
  96. if that.C == nil {
  97. return
  98. }
  99. close(that.C)
  100. that.C = nil
  101. }
  102. func (that *ProjectBackfeedManager) Do() {
  103. defer close(that.Done)
  104. defer that.CloseItemChannel()
  105. defer that.Cancel()
  106. for {
  107. select {
  108. case <-that.Context.Done():
  109. break
  110. case <-that.Done:
  111. break
  112. default:
  113. }
  114. item, ok := that.PopItem(true)
  115. if !ok {
  116. break
  117. }
  118. keyMap := map[string][][]byte{}
  119. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  120. keyMap[key] = append(keyMap[key], item.Item)
  121. wrapped := 1
  122. for wrapped < ItemWrapSize {
  123. item, ok := that.PopItem(false)
  124. if !ok {
  125. break
  126. }
  127. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  128. keyMap[key] = append(keyMap[key], item.Item)
  129. wrapped++
  130. }
  131. select {
  132. case <-that.Context.Done():
  133. break
  134. case <-that.Done:
  135. break
  136. default:
  137. }
  138. resultMap := map[string]*redis.Cmd{}
  139. pipe := that.BackfeedRedis.Pipeline()
  140. for key, items := range keyMap {
  141. args := []interface{}{
  142. "bf.madd",
  143. key,
  144. }
  145. for _, item := range items {
  146. args = append(args, item)
  147. }
  148. resultMap[key] = pipe.Do(context.Background(), args...)
  149. }
  150. _, err := pipe.Exec(context.Background())
  151. if err != nil {
  152. log.Printf("%s", err)
  153. }
  154. var sAddItems []interface{}
  155. for key, items := range keyMap {
  156. res, err := resultMap[key].BoolSlice()
  157. if err != nil {
  158. log.Printf("%s", err)
  159. continue
  160. }
  161. if len(res) != len(keyMap[key]) {
  162. continue
  163. }
  164. for i, v := range res {
  165. if v {
  166. sAddItems = append(sAddItems, items[i])
  167. }
  168. }
  169. }
  170. dupes := wrapped - len(sAddItems)
  171. if len(sAddItems) != 0 {
  172. args := []interface{}{
  173. "bf.mexists",
  174. that.Name,
  175. }
  176. args = append(args, sAddItems...)
  177. res, err := that.LegacyRedis.Do(context.Background(), args...).BoolSlice()
  178. if err != nil {
  179. log.Printf("unable to dedupe against %s legacy backfeed: %s", that.Name, err)
  180. } else if len(res) == len(sAddItems) {
  181. var filteredSAddItems []interface{}
  182. for i, v := range res {
  183. if !v {
  184. filteredSAddItems = append(filteredSAddItems, sAddItems[i])
  185. }
  186. }
  187. sAddItems = filteredSAddItems
  188. }
  189. }
  190. if len(sAddItems) != 0 {
  191. err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err()
  192. if err != nil {
  193. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  194. }
  195. }
  196. if dupes > 0 {
  197. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  198. }
  199. }
  200. }
  201. type GlobalBackfeedManager struct {
  202. Context context.Context
  203. Cancel context.CancelFunc
  204. ActiveFeeds map[string]*ProjectBackfeedManager
  205. ActiveSlugs map[string]string
  206. TrackerRedis *redis.Client
  207. BackfeedRedis *redis.ClusterClient
  208. LegacyRedis *redis.Client
  209. Lock sync.RWMutex
  210. Populated *abool.AtomicBool
  211. }
  212. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  213. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  214. if err != nil {
  215. return err
  216. }
  217. var projects []string
  218. projectSlugMap := map[string][]string{}
  219. for slug, project := range slugProjectMap {
  220. projectSlugMap[project] = append(projectSlugMap[project], slug)
  221. }
  222. for project := range projectSlugMap {
  223. projects = append(projects, project)
  224. }
  225. projectConfigs := map[string]ProjectConfig{}
  226. if len(projects) != 0 {
  227. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  228. if err != nil {
  229. return err
  230. }
  231. if len(projects) != len(cfgi) {
  232. return fmt.Errorf("hmget result had unexpected length")
  233. }
  234. for i, project := range projects {
  235. configString, ok := cfgi[i].(string)
  236. if !ok {
  237. continue
  238. }
  239. config := ProjectConfig{}
  240. err := json.Unmarshal([]byte(configString), &config)
  241. if err != nil {
  242. continue
  243. }
  244. projectConfigs[project] = config
  245. }
  246. }
  247. projects = nil
  248. for project := range projectSlugMap {
  249. if _, has := projectConfigs[project]; !has {
  250. delete(projectSlugMap, project)
  251. continue
  252. }
  253. projects = append(projects, project)
  254. }
  255. for slug, project := range slugProjectMap {
  256. if _, has := projectConfigs[project]; !has {
  257. delete(slugProjectMap, slug)
  258. }
  259. }
  260. // add feeds for new projects
  261. for _, project := range projects {
  262. projectConfig := projectConfigs[project]
  263. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  264. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  265. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  266. outdatedProjectBackfeedManager = projectBackfeedManager
  267. } else {
  268. continue
  269. }
  270. }
  271. ctx, cancel := context.WithCancel(that.Context)
  272. projectBackfeedManager := &ProjectBackfeedManager{
  273. Context: ctx,
  274. Cancel: cancel,
  275. Done: make(chan bool),
  276. C: make(chan *BackfeedItem, ItemChannelBuffer),
  277. BackfeedRedis: that.BackfeedRedis,
  278. Name: project,
  279. ProjectConfig: projectConfig,
  280. LegacyRedis: that.LegacyRedis,
  281. }
  282. if projectConfig.RedisConfig != nil {
  283. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  284. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  285. Username: "default",
  286. Password: projectConfig.RedisConfig.Pass,
  287. ReadTimeout: 15 * time.Minute,
  288. })
  289. } else {
  290. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  291. }
  292. go projectBackfeedManager.Do()
  293. that.Lock.Lock()
  294. that.ActiveFeeds[project] = projectBackfeedManager
  295. that.Lock.Unlock()
  296. if outdatedProjectBackfeedManager != nil {
  297. outdatedProjectBackfeedManager.Cancel()
  298. <-outdatedProjectBackfeedManager.Done
  299. log.Printf("updated project: %s", project)
  300. } else {
  301. log.Printf("added project: %s", project)
  302. }
  303. }
  304. that.Lock.Lock()
  305. that.ActiveSlugs = slugProjectMap
  306. that.Lock.Unlock()
  307. // remove feeds for old projects
  308. for project, projectBackfeedManager := range that.ActiveFeeds {
  309. if _, has := projectSlugMap[project]; has {
  310. continue
  311. }
  312. log.Printf("removing project: %s", project)
  313. that.Lock.Lock()
  314. delete(that.ActiveFeeds, project)
  315. that.Lock.Unlock()
  316. projectBackfeedManager.Cancel()
  317. <-projectBackfeedManager.Done
  318. log.Printf("removed project: %s", project)
  319. }
  320. if !that.Populated.IsSet() {
  321. that.Populated.Set()
  322. }
  323. return nil
  324. }
  325. type Splitter struct {
  326. Delimiter []byte
  327. IgnoreEOF bool
  328. }
  329. func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) {
  330. for i := 0; i < len(data); i++ {
  331. if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) {
  332. return i + len(that.Delimiter), data[:i], nil
  333. }
  334. }
  335. if len(data) == 0 || !atEOF {
  336. return 0, nil, nil
  337. }
  338. if atEOF && that.IgnoreEOF {
  339. return len(data), data, nil
  340. }
  341. return 0, data, io.ErrUnexpectedEOF
  342. }
  343. func GenShardHash(b []byte) (final byte) {
  344. for i, b := range b {
  345. final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i)
  346. }
  347. return final
  348. }
  349. func WriteResponse(res http.ResponseWriter, statusCode int, v interface{}) {
  350. res.Header().Set("Content-Type", "application/json")
  351. res.WriteHeader(statusCode)
  352. if statusCode == http.StatusNoContent {
  353. return
  354. }
  355. if err, isError := v.(error); isError {
  356. v = map[string]interface{}{
  357. "error": fmt.Sprintf("%v", err),
  358. "status_code": statusCode,
  359. }
  360. } else {
  361. v = map[string]interface{}{
  362. "data": v,
  363. "status_code": statusCode,
  364. }
  365. }
  366. json.NewEncoder(res).Encode(v)
  367. }
  368. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  369. that.Lock.RLock()
  370. defer that.Lock.RUnlock()
  371. project, has := that.ActiveSlugs[slug]
  372. if !has {
  373. return nil
  374. }
  375. projectBackfeedManager, has := that.ActiveFeeds[project]
  376. if !has {
  377. return nil
  378. }
  379. return projectBackfeedManager
  380. }
  381. func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) {
  382. defer req.Body.Close()
  383. vars := mux.Vars(req)
  384. slug := vars["slug"]
  385. secondaryShard := req.URL.Query().Get("shard")
  386. projectBackfeedManager := that.GetFeed(slug)
  387. if projectBackfeedManager == nil {
  388. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  389. return
  390. }
  391. splitter := &Splitter{
  392. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  393. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  394. }
  395. if len(splitter.Delimiter) == 0 {
  396. splitter.Delimiter = []byte{0x00}
  397. }
  398. scanner := bufio.NewScanner(req.Body)
  399. scanner.Split(splitter.Split)
  400. var err error
  401. statusCode := http.StatusNoContent
  402. n := 0
  403. for scanner.Scan() {
  404. b := scanner.Bytes()
  405. if len(b) == 0 {
  406. continue
  407. }
  408. bcopy := make([]byte, len(b))
  409. copy(bcopy, b)
  410. item := &BackfeedItem{
  411. PrimaryShard: GenShardHash(bcopy),
  412. SecondaryShard: secondaryShard,
  413. Item: bcopy,
  414. }
  415. ok := projectBackfeedManager.PushItem(req.Context(), item)
  416. if !ok {
  417. err = fmt.Errorf("channel closed")
  418. statusCode = http.StatusServiceUnavailable
  419. break
  420. }
  421. n++
  422. }
  423. if err == nil {
  424. err = scanner.Err()
  425. if err != nil {
  426. statusCode = http.StatusBadRequest
  427. }
  428. }
  429. if err != nil {
  430. WriteResponse(res, statusCode, err)
  431. } else {
  432. WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
  433. }
  434. return
  435. }
  436. func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) {
  437. if that.Populated.IsNotSet() {
  438. WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated"))
  439. return
  440. }
  441. err := that.LegacyRedis.Ping(req.Context()).Err()
  442. if err != nil {
  443. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping legacy redis: %s", err))
  444. return
  445. }
  446. err = that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  447. client.ClientGetName(ctx)
  448. return client.Ping(ctx).Err()
  449. })
  450. if err != nil {
  451. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err))
  452. return
  453. }
  454. WriteResponse(res, http.StatusOK, "ok")
  455. }
  456. func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) {
  457. WriteResponse(res, http.StatusOK, "pong")
  458. }
  459. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  460. that.Populated.UnSet()
  461. that.Cancel()
  462. for project, projectBackfeedManager := range that.ActiveFeeds {
  463. log.Printf("waiting for %s channel to shut down...", project)
  464. <-projectBackfeedManager.Done
  465. delete(that.ActiveFeeds, project)
  466. }
  467. }
  468. func main() {
  469. log.SetFlags(log.Flags() | log.Lshortfile)
  470. trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER"))
  471. if err != nil {
  472. log.Panicf("invalid REDIS_TRACKER url: %s", err)
  473. }
  474. trackerRedisOptions.ReadTimeout = 15 * time.Minute
  475. trackerRedisClient := redis.NewClient(trackerRedisOptions)
  476. legacyRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_LEGACY"))
  477. if err != nil {
  478. log.Panicf("invalid REDIS_LEGACY url: %s", err)
  479. }
  480. legacyRedisOptions.ReadTimeout = 15 * time.Minute
  481. legacyRedisClient := redis.NewClient(legacyRedisOptions)
  482. backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{
  483. Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","),
  484. Username: os.Getenv("REDIS_BACKFEED_USERNAME"),
  485. Password: os.Getenv("REDIS_BACKFEED_PASSWORD"),
  486. ReadTimeout: 15 * time.Minute,
  487. })
  488. legacyRedisMetricsHook := redisprom.NewHook(
  489. redisprom.WithInstanceName("legacy"),
  490. )
  491. legacyRedisClient.AddHook(legacyRedisMetricsHook)
  492. backfeedRedisMetricsHook := redisprom.NewHook(
  493. redisprom.WithInstanceName("backfeed"),
  494. )
  495. backfeedRedisClient.AddHook(backfeedRedisMetricsHook)
  496. trackerRedisMetricsHook := redisprom.NewHook(
  497. redisprom.WithInstanceName("tracker"),
  498. )
  499. trackerRedisClient.AddHook(trackerRedisMetricsHook)
  500. if err := trackerRedisClient.Ping(context.Background()).Err(); err != nil {
  501. log.Panicf("unable to ping tracker redis: %s", err)
  502. }
  503. if err := backfeedRedisClient.Ping(context.Background()).Err(); err != nil {
  504. log.Panicf("unable to ping backfeed redis: %s", err)
  505. }
  506. err = backfeedRedisClient.ForEachShard(context.Background(), func(ctx context.Context, client *redis.Client) error {
  507. client.ClientGetName(ctx)
  508. return client.Ping(ctx).Err()
  509. })
  510. if err := legacyRedisClient.Ping(context.Background()).Err(); err != nil {
  511. log.Panicf("unable to ping legacy redis: %s", err)
  512. }
  513. globalBackfeedManager := &GlobalBackfeedManager{
  514. ActiveFeeds: map[string]*ProjectBackfeedManager{},
  515. ActiveSlugs: map[string]string{},
  516. TrackerRedis: trackerRedisClient,
  517. BackfeedRedis: backfeedRedisClient,
  518. LegacyRedis: legacyRedisClient,
  519. Populated: abool.New(),
  520. }
  521. globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background())
  522. defer globalBackfeedManager.CancelAllFeeds()
  523. err = globalBackfeedManager.RefreshFeeds()
  524. if err != nil {
  525. log.Panicf("unable to set up backfeed projects: %s", err)
  526. }
  527. r := mux.NewRouter()
  528. r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.HandleLegacy)
  529. r.Methods(http.MethodGet).Path("/ping").HandlerFunc(globalBackfeedManager.HandlePing)
  530. r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth)
  531. rMetrics := mux.NewRouter()
  532. rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
  533. rMetrics.Path("/metrics").Handler(promhttp.Handler())
  534. doneChan := make(chan bool)
  535. serveErrChan := make(chan error)
  536. go func() {
  537. s := &http.Server{
  538. Addr: os.Getenv("HTTP_ADDR"),
  539. IdleTimeout: 1 * time.Hour,
  540. MaxHeaderBytes: 1 * 1024 * 1024,
  541. Handler: r,
  542. }
  543. serveErrChan <- s.ListenAndServe()
  544. }()
  545. metricsErrChan := make(chan error)
  546. go func() {
  547. if os.Getenv("METRICS_ADDR") != "" {
  548. s := &http.Server{
  549. Addr: os.Getenv("METRICS_ADDR"),
  550. IdleTimeout: 1 * time.Hour,
  551. MaxHeaderBytes: 1 * 1024 * 1024,
  552. Handler: rMetrics,
  553. }
  554. metricsErrChan <- s.ListenAndServe()
  555. } else {
  556. <-doneChan
  557. metricsErrChan <- nil
  558. }
  559. }()
  560. log.Printf("backfeed listening on %s", os.Getenv("HTTP_ADDR"))
  561. if os.Getenv("METRICS_ADDR") != "" {
  562. log.Printf("metrics/debug listening on %s", os.Getenv("METRICS_ADDR"))
  563. }
  564. sc := make(chan os.Signal, 1)
  565. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  566. ticker := time.NewTicker(1 * time.Second)
  567. for {
  568. select {
  569. case <-sc:
  570. return
  571. case <-ticker.C:
  572. }
  573. err = globalBackfeedManager.RefreshFeeds()
  574. if err != nil {
  575. log.Printf("unable to refresh backfeed projects: %s", err)
  576. }
  577. }
  578. }