You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

615 rivejä
16 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net/http"
  11. _ "net/http/pprof"
  12. "os"
  13. "os/signal"
  14. "strings"
  15. "sync"
  16. "syscall"
  17. "time"
  18. redisprom "github.com/globocom/go-redis-prometheus"
  19. "github.com/go-redis/redis/v8"
  20. "github.com/gorilla/mux"
  21. "github.com/prometheus/client_golang/prometheus/promhttp"
  22. "github.com/tevino/abool/v2"
  23. )
  24. const (
  25. ItemChannelBuffer = 100000
  26. ItemWrapSize = 100000
  27. )
  28. type ProjectRedisConfig struct {
  29. Host string `json:"host"`
  30. Pass string `json:"pass"`
  31. Port int `json:"port"`
  32. }
  33. type ProjectConfig struct {
  34. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  35. }
  36. type BackfeedItem struct {
  37. PrimaryShard byte
  38. SecondaryShard string
  39. Item []byte
  40. }
  41. type ProjectBackfeedManager struct {
  42. Context context.Context
  43. Cancel context.CancelFunc
  44. Done chan bool
  45. C chan *BackfeedItem
  46. Name string
  47. BackfeedRedis *redis.ClusterClient
  48. ProjectRedis *redis.Client
  49. LegacyRedis *redis.Client
  50. //Lock sync.RWMutex
  51. ProjectConfig ProjectConfig
  52. }
  53. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  54. if that.ProjectConfig.RedisConfig == nil && new == nil {
  55. return false
  56. }
  57. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  58. }
  59. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error {
  60. //that.Lock.RLock()
  61. //defer that.Lock.RUnlock()
  62. //if that.C == nil {
  63. // return false
  64. //}
  65. select {
  66. case <-ctx.Done():
  67. return ctx.Err()
  68. case <-that.Context.Done():
  69. return fmt.Errorf("backfeed channel closed")
  70. case that.C <- item:
  71. return nil
  72. default:
  73. return fmt.Errorf("backfeed channel full")
  74. }
  75. }
  76. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  77. if blocking {
  78. select {
  79. case <-that.Context.Done():
  80. return nil, false
  81. case item, ok := <-that.C:
  82. return item, ok
  83. }
  84. } else {
  85. select {
  86. case <-that.Context.Done():
  87. return nil, false
  88. case item, ok := <-that.C:
  89. return item, ok
  90. default:
  91. return nil, false
  92. }
  93. }
  94. }
  95. //func (that *ProjectBackfeedManager) CloseItemChannel() {
  96. // log.Printf("closing item channel for %s", that.Name)
  97. // that.Lock.Lock()
  98. // defer that.Lock.Unlock()
  99. // if that.C == nil {
  100. // return
  101. // }
  102. // close(that.C)
  103. // that.C = nil
  104. //}
  105. func (that *ProjectBackfeedManager) Do() {
  106. defer close(that.Done)
  107. //defer that.CloseItemChannel()
  108. defer that.Cancel()
  109. for {
  110. select {
  111. case <-that.Context.Done():
  112. break
  113. case <-that.Done:
  114. break
  115. default:
  116. }
  117. item, ok := that.PopItem(true)
  118. if !ok {
  119. break
  120. }
  121. keyMap := map[string][][]byte{}
  122. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  123. keyMap[key] = append(keyMap[key], item.Item)
  124. wrapped := 1
  125. for wrapped < ItemWrapSize {
  126. item, ok := that.PopItem(false)
  127. if !ok {
  128. break
  129. }
  130. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  131. keyMap[key] = append(keyMap[key], item.Item)
  132. wrapped++
  133. }
  134. select {
  135. case <-that.Context.Done():
  136. break
  137. case <-that.Done:
  138. break
  139. default:
  140. }
  141. resultMap := map[string]*redis.Cmd{}
  142. pipe := that.BackfeedRedis.Pipeline()
  143. for key, items := range keyMap {
  144. args := []interface{}{
  145. "bf.madd",
  146. key,
  147. }
  148. for _, item := range items {
  149. args = append(args, item)
  150. }
  151. resultMap[key] = pipe.Do(context.Background(), args...)
  152. }
  153. _, err := pipe.Exec(context.Background())
  154. if err != nil {
  155. log.Printf("%s", err)
  156. }
  157. var sAddItems []interface{}
  158. for key, items := range keyMap {
  159. res, err := resultMap[key].BoolSlice()
  160. if err != nil {
  161. log.Printf("%s", err)
  162. continue
  163. }
  164. if len(res) != len(keyMap[key]) {
  165. continue
  166. }
  167. for i, v := range res {
  168. if v {
  169. sAddItems = append(sAddItems, items[i])
  170. }
  171. }
  172. }
  173. dupes := wrapped - len(sAddItems)
  174. if len(sAddItems) != 0 {
  175. args := []interface{}{
  176. "bf.mexists",
  177. that.Name,
  178. }
  179. args = append(args, sAddItems...)
  180. res, err := that.LegacyRedis.Do(context.Background(), args...).BoolSlice()
  181. if err != nil {
  182. log.Printf("unable to dedupe against %s legacy backfeed: %s", that.Name, err)
  183. } else if len(res) == len(sAddItems) {
  184. var filteredSAddItems []interface{}
  185. for i, v := range res {
  186. if !v {
  187. filteredSAddItems = append(filteredSAddItems, sAddItems[i])
  188. }
  189. }
  190. sAddItems = filteredSAddItems
  191. }
  192. }
  193. if len(sAddItems) != 0 {
  194. err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err()
  195. if err != nil {
  196. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  197. }
  198. }
  199. if dupes > 0 {
  200. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  201. }
  202. }
  203. }
  204. type GlobalBackfeedManager struct {
  205. Context context.Context
  206. Cancel context.CancelFunc
  207. ActiveFeeds map[string]*ProjectBackfeedManager
  208. ActiveSlugs map[string]string
  209. TrackerRedis *redis.Client
  210. BackfeedRedis *redis.ClusterClient
  211. LegacyRedis *redis.Client
  212. Lock sync.RWMutex
  213. Populated *abool.AtomicBool
  214. }
  215. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  216. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  217. if err != nil {
  218. return err
  219. }
  220. var projects []string
  221. projectSlugMap := map[string][]string{}
  222. for slug, project := range slugProjectMap {
  223. projectSlugMap[project] = append(projectSlugMap[project], slug)
  224. }
  225. for project := range projectSlugMap {
  226. projects = append(projects, project)
  227. }
  228. projectConfigs := map[string]ProjectConfig{}
  229. if len(projects) != 0 {
  230. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  231. if err != nil {
  232. return err
  233. }
  234. if len(projects) != len(cfgi) {
  235. return fmt.Errorf("hmget result had unexpected length")
  236. }
  237. for i, project := range projects {
  238. configString, ok := cfgi[i].(string)
  239. if !ok {
  240. continue
  241. }
  242. config := ProjectConfig{}
  243. err := json.Unmarshal([]byte(configString), &config)
  244. if err != nil {
  245. continue
  246. }
  247. projectConfigs[project] = config
  248. }
  249. }
  250. projects = nil
  251. for project := range projectSlugMap {
  252. if _, has := projectConfigs[project]; !has {
  253. delete(projectSlugMap, project)
  254. continue
  255. }
  256. projects = append(projects, project)
  257. }
  258. for slug, project := range slugProjectMap {
  259. if _, has := projectConfigs[project]; !has {
  260. delete(slugProjectMap, slug)
  261. }
  262. }
  263. // add feeds for new projects
  264. for _, project := range projects {
  265. projectConfig := projectConfigs[project]
  266. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  267. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  268. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  269. outdatedProjectBackfeedManager = projectBackfeedManager
  270. } else {
  271. continue
  272. }
  273. }
  274. ctx, cancel := context.WithCancel(that.Context)
  275. projectBackfeedManager := &ProjectBackfeedManager{
  276. Context: ctx,
  277. Cancel: cancel,
  278. Done: make(chan bool),
  279. C: make(chan *BackfeedItem, ItemChannelBuffer),
  280. BackfeedRedis: that.BackfeedRedis,
  281. Name: project,
  282. ProjectConfig: projectConfig,
  283. LegacyRedis: that.LegacyRedis,
  284. }
  285. if projectConfig.RedisConfig != nil {
  286. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  287. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  288. Username: "default",
  289. Password: projectConfig.RedisConfig.Pass,
  290. ReadTimeout: 15 * time.Minute,
  291. })
  292. } else {
  293. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  294. }
  295. go projectBackfeedManager.Do()
  296. that.Lock.Lock()
  297. that.ActiveFeeds[project] = projectBackfeedManager
  298. that.Lock.Unlock()
  299. if outdatedProjectBackfeedManager != nil {
  300. outdatedProjectBackfeedManager.Cancel()
  301. <-outdatedProjectBackfeedManager.Done
  302. log.Printf("updated project: %s", project)
  303. } else {
  304. log.Printf("added project: %s", project)
  305. }
  306. }
  307. that.Lock.Lock()
  308. that.ActiveSlugs = slugProjectMap
  309. that.Lock.Unlock()
  310. // remove feeds for old projects
  311. for project, projectBackfeedManager := range that.ActiveFeeds {
  312. if _, has := projectSlugMap[project]; has {
  313. continue
  314. }
  315. log.Printf("removing project: %s", project)
  316. that.Lock.Lock()
  317. delete(that.ActiveFeeds, project)
  318. that.Lock.Unlock()
  319. projectBackfeedManager.Cancel()
  320. <-projectBackfeedManager.Done
  321. log.Printf("removed project: %s", project)
  322. }
  323. if !that.Populated.IsSet() {
  324. that.Populated.Set()
  325. }
  326. return nil
  327. }
  328. type Splitter struct {
  329. Delimiter []byte
  330. IgnoreEOF bool
  331. }
  332. func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) {
  333. for i := 0; i < len(data); i++ {
  334. if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) {
  335. return i + len(that.Delimiter), data[:i], nil
  336. }
  337. }
  338. if len(data) == 0 || !atEOF {
  339. return 0, nil, nil
  340. }
  341. if atEOF && that.IgnoreEOF {
  342. return len(data), data, nil
  343. }
  344. return 0, data, io.ErrUnexpectedEOF
  345. }
  346. func GenShardHash(b []byte) (final byte) {
  347. for i, b := range b {
  348. final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i)
  349. }
  350. return final
  351. }
  352. func WriteResponse(res http.ResponseWriter, statusCode int, v interface{}) {
  353. res.Header().Set("Content-Type", "application/json")
  354. res.WriteHeader(statusCode)
  355. if statusCode == http.StatusNoContent {
  356. return
  357. }
  358. if err, isError := v.(error); isError {
  359. v = map[string]interface{}{
  360. "error": fmt.Sprintf("%v", err),
  361. "status_code": statusCode,
  362. }
  363. } else {
  364. v = map[string]interface{}{
  365. "data": v,
  366. "status_code": statusCode,
  367. }
  368. }
  369. json.NewEncoder(res).Encode(v)
  370. }
  371. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  372. that.Lock.RLock()
  373. defer that.Lock.RUnlock()
  374. project, has := that.ActiveSlugs[slug]
  375. if !has {
  376. return nil
  377. }
  378. projectBackfeedManager, has := that.ActiveFeeds[project]
  379. if !has {
  380. return nil
  381. }
  382. return projectBackfeedManager
  383. }
  384. func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) {
  385. defer req.Body.Close()
  386. vars := mux.Vars(req)
  387. slug := vars["slug"]
  388. secondaryShard := req.URL.Query().Get("shard")
  389. projectBackfeedManager := that.GetFeed(slug)
  390. if projectBackfeedManager == nil {
  391. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  392. return
  393. }
  394. splitter := &Splitter{
  395. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  396. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  397. }
  398. if len(splitter.Delimiter) == 0 {
  399. splitter.Delimiter = []byte{0x00}
  400. }
  401. scanner := bufio.NewScanner(req.Body)
  402. scanner.Split(splitter.Split)
  403. statusCode := http.StatusNoContent
  404. n := 0
  405. for scanner.Scan() {
  406. b := scanner.Bytes()
  407. if len(b) == 0 {
  408. continue
  409. }
  410. bcopy := make([]byte, len(b))
  411. copy(bcopy, b)
  412. item := &BackfeedItem{
  413. PrimaryShard: GenShardHash(bcopy),
  414. SecondaryShard: secondaryShard,
  415. Item: bcopy,
  416. }
  417. err := projectBackfeedManager.PushItem(req.Context(), item)
  418. if err != nil {
  419. WriteResponse(res, http.StatusServiceUnavailable, err)
  420. return
  421. }
  422. n++
  423. }
  424. err := scanner.Err()
  425. if err != nil {
  426. WriteResponse(res, statusCode, err)
  427. return
  428. }
  429. WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
  430. return
  431. }
  432. func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) {
  433. if that.Populated.IsNotSet() {
  434. WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated"))
  435. return
  436. }
  437. err := that.LegacyRedis.Ping(req.Context()).Err()
  438. if err != nil {
  439. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping legacy redis: %s", err))
  440. return
  441. }
  442. err = that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  443. client.ClientGetName(ctx)
  444. return client.Ping(ctx).Err()
  445. })
  446. if err != nil {
  447. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err))
  448. return
  449. }
  450. WriteResponse(res, http.StatusOK, "ok")
  451. }
  452. func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) {
  453. WriteResponse(res, http.StatusOK, "pong")
  454. }
  455. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  456. that.Populated.UnSet()
  457. that.Cancel()
  458. for project, projectBackfeedManager := range that.ActiveFeeds {
  459. log.Printf("waiting for %s channel to shut down...", project)
  460. <-projectBackfeedManager.Done
  461. delete(that.ActiveFeeds, project)
  462. }
  463. }
  464. func main() {
  465. log.SetFlags(log.Flags() | log.Lshortfile)
  466. trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER"))
  467. if err != nil {
  468. log.Panicf("invalid REDIS_TRACKER url: %s", err)
  469. }
  470. trackerRedisOptions.ReadTimeout = 15 * time.Minute
  471. trackerRedisClient := redis.NewClient(trackerRedisOptions)
  472. legacyRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_LEGACY"))
  473. if err != nil {
  474. log.Panicf("invalid REDIS_LEGACY url: %s", err)
  475. }
  476. legacyRedisOptions.ReadTimeout = 15 * time.Minute
  477. legacyRedisOptions.PoolSize = 128
  478. legacyRedisClient := redis.NewClient(legacyRedisOptions)
  479. backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{
  480. Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","),
  481. Username: os.Getenv("REDIS_BACKFEED_USERNAME"),
  482. Password: os.Getenv("REDIS_BACKFEED_PASSWORD"),
  483. ReadTimeout: 15 * time.Minute,
  484. PoolSize: 256,
  485. })
  486. legacyRedisMetricsHook := redisprom.NewHook(
  487. redisprom.WithInstanceName("legacy"),
  488. )
  489. legacyRedisClient.AddHook(legacyRedisMetricsHook)
  490. backfeedRedisMetricsHook := redisprom.NewHook(
  491. redisprom.WithInstanceName("backfeed"),
  492. )
  493. backfeedRedisClient.AddHook(backfeedRedisMetricsHook)
  494. trackerRedisMetricsHook := redisprom.NewHook(
  495. redisprom.WithInstanceName("tracker"),
  496. )
  497. trackerRedisClient.AddHook(trackerRedisMetricsHook)
  498. if err := trackerRedisClient.Ping(context.Background()).Err(); err != nil {
  499. log.Panicf("unable to ping tracker redis: %s", err)
  500. }
  501. if err := backfeedRedisClient.Ping(context.Background()).Err(); err != nil {
  502. log.Panicf("unable to ping backfeed redis: %s", err)
  503. }
  504. err = backfeedRedisClient.ForEachShard(context.Background(), func(ctx context.Context, client *redis.Client) error {
  505. client.ClientGetName(ctx)
  506. return client.Ping(ctx).Err()
  507. })
  508. if err := legacyRedisClient.Ping(context.Background()).Err(); err != nil {
  509. log.Panicf("unable to ping legacy redis: %s", err)
  510. }
  511. globalBackfeedManager := &GlobalBackfeedManager{
  512. ActiveFeeds: map[string]*ProjectBackfeedManager{},
  513. ActiveSlugs: map[string]string{},
  514. TrackerRedis: trackerRedisClient,
  515. BackfeedRedis: backfeedRedisClient,
  516. LegacyRedis: legacyRedisClient,
  517. Populated: abool.New(),
  518. }
  519. globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background())
  520. defer globalBackfeedManager.CancelAllFeeds()
  521. err = globalBackfeedManager.RefreshFeeds()
  522. if err != nil {
  523. log.Panicf("unable to set up backfeed projects: %s", err)
  524. }
  525. r := mux.NewRouter()
  526. r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.HandleLegacy)
  527. r.Methods(http.MethodGet).Path("/ping").HandlerFunc(globalBackfeedManager.HandlePing)
  528. r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth)
  529. rMetrics := mux.NewRouter()
  530. rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
  531. rMetrics.Path("/metrics").Handler(promhttp.Handler())
  532. doneChan := make(chan bool)
  533. serveErrChan := make(chan error)
  534. go func() {
  535. s := &http.Server{
  536. Addr: os.Getenv("HTTP_ADDR"),
  537. IdleTimeout: 1 * time.Hour,
  538. MaxHeaderBytes: 1 * 1024 * 1024,
  539. Handler: r,
  540. }
  541. serveErrChan <- s.ListenAndServe()
  542. }()
  543. metricsErrChan := make(chan error)
  544. go func() {
  545. if os.Getenv("METRICS_ADDR") != "" {
  546. s := &http.Server{
  547. Addr: os.Getenv("METRICS_ADDR"),
  548. IdleTimeout: 1 * time.Hour,
  549. MaxHeaderBytes: 1 * 1024 * 1024,
  550. Handler: rMetrics,
  551. }
  552. metricsErrChan <- s.ListenAndServe()
  553. } else {
  554. <-doneChan
  555. metricsErrChan <- nil
  556. }
  557. }()
  558. log.Printf("backfeed listening on %s", os.Getenv("HTTP_ADDR"))
  559. if os.Getenv("METRICS_ADDR") != "" {
  560. log.Printf("metrics/debug listening on %s", os.Getenv("METRICS_ADDR"))
  561. }
  562. sc := make(chan os.Signal, 1)
  563. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  564. ticker := time.NewTicker(1 * time.Second)
  565. for {
  566. select {
  567. case <-sc:
  568. return
  569. case <-ticker.C:
  570. }
  571. err = globalBackfeedManager.RefreshFeeds()
  572. if err != nil {
  573. log.Printf("unable to refresh backfeed projects: %s", err)
  574. }
  575. }
  576. }