Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.

618 righe
16 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net/http"
  11. _ "net/http/pprof"
  12. "os"
  13. "os/signal"
  14. "strings"
  15. "sync"
  16. "syscall"
  17. "time"
  18. redisprom "github.com/globocom/go-redis-prometheus"
  19. "github.com/go-redis/redis/v8"
  20. "github.com/gorilla/mux"
  21. "github.com/prometheus/client_golang/prometheus/promhttp"
  22. "github.com/tevino/abool/v2"
  23. )
  24. const (
  25. ItemChannelBuffer = 100000
  26. ItemWrapSize = 100000
  27. )
  28. type ProjectRedisConfig struct {
  29. Host string `json:"host"`
  30. Pass string `json:"pass"`
  31. Port int `json:"port"`
  32. }
  33. type ProjectConfig struct {
  34. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  35. }
  36. type BackfeedItem struct {
  37. PrimaryShard byte
  38. SecondaryShard string
  39. Item []byte
  40. }
  41. type ProjectBackfeedManager struct {
  42. Context context.Context
  43. Cancel context.CancelFunc
  44. Done chan bool
  45. C chan *BackfeedItem
  46. Name string
  47. BackfeedRedis *redis.ClusterClient
  48. ProjectRedis *redis.Client
  49. LegacyRedis *redis.Client
  50. //Lock sync.RWMutex
  51. ProjectConfig ProjectConfig
  52. }
  53. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  54. if that.ProjectConfig.RedisConfig == nil && new == nil {
  55. return false
  56. }
  57. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  58. }
  59. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) bool {
  60. //that.Lock.RLock()
  61. //defer that.Lock.RUnlock()
  62. //if that.C == nil {
  63. // return false
  64. //}
  65. select {
  66. case <-ctx.Done():
  67. return false
  68. case <-that.Context.Done():
  69. return false
  70. case that.C <- item:
  71. return true
  72. }
  73. }
  74. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  75. if blocking {
  76. select {
  77. case <-that.Context.Done():
  78. return nil, false
  79. case item := <-that.C:
  80. return item, true
  81. }
  82. } else {
  83. select {
  84. case <-that.Context.Done():
  85. return nil, false
  86. case item := <-that.C:
  87. return item, true
  88. default:
  89. return nil, false
  90. }
  91. }
  92. }
  93. //func (that *ProjectBackfeedManager) CloseItemChannel() {
  94. // log.Printf("closing item channel for %s", that.Name)
  95. // that.Lock.Lock()
  96. // defer that.Lock.Unlock()
  97. // if that.C == nil {
  98. // return
  99. // }
  100. // close(that.C)
  101. // that.C = nil
  102. //}
  103. func (that *ProjectBackfeedManager) Do() {
  104. defer close(that.Done)
  105. //defer that.CloseItemChannel()
  106. defer that.Cancel()
  107. for {
  108. select {
  109. case <-that.Context.Done():
  110. break
  111. case <-that.Done:
  112. break
  113. default:
  114. }
  115. item, ok := that.PopItem(true)
  116. if !ok {
  117. break
  118. }
  119. keyMap := map[string][][]byte{}
  120. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  121. keyMap[key] = append(keyMap[key], item.Item)
  122. wrapped := 1
  123. for wrapped < ItemWrapSize {
  124. item, ok := that.PopItem(false)
  125. if !ok {
  126. break
  127. }
  128. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  129. keyMap[key] = append(keyMap[key], item.Item)
  130. wrapped++
  131. }
  132. select {
  133. case <-that.Context.Done():
  134. break
  135. case <-that.Done:
  136. break
  137. default:
  138. }
  139. resultMap := map[string]*redis.Cmd{}
  140. pipe := that.BackfeedRedis.Pipeline()
  141. for key, items := range keyMap {
  142. args := []interface{}{
  143. "bf.madd",
  144. key,
  145. }
  146. for _, item := range items {
  147. args = append(args, item)
  148. }
  149. resultMap[key] = pipe.Do(context.Background(), args...)
  150. }
  151. _, err := pipe.Exec(context.Background())
  152. if err != nil {
  153. log.Printf("%s", err)
  154. }
  155. var sAddItems []interface{}
  156. for key, items := range keyMap {
  157. res, err := resultMap[key].BoolSlice()
  158. if err != nil {
  159. log.Printf("%s", err)
  160. continue
  161. }
  162. if len(res) != len(keyMap[key]) {
  163. continue
  164. }
  165. for i, v := range res {
  166. if v {
  167. sAddItems = append(sAddItems, items[i])
  168. }
  169. }
  170. }
  171. dupes := wrapped - len(sAddItems)
  172. if len(sAddItems) != 0 {
  173. args := []interface{}{
  174. "bf.mexists",
  175. that.Name,
  176. }
  177. args = append(args, sAddItems...)
  178. res, err := that.LegacyRedis.Do(context.Background(), args...).BoolSlice()
  179. if err != nil {
  180. log.Printf("unable to dedupe against %s legacy backfeed: %s", that.Name, err)
  181. } else if len(res) == len(sAddItems) {
  182. var filteredSAddItems []interface{}
  183. for i, v := range res {
  184. if !v {
  185. filteredSAddItems = append(filteredSAddItems, sAddItems[i])
  186. }
  187. }
  188. sAddItems = filteredSAddItems
  189. }
  190. }
  191. if len(sAddItems) != 0 {
  192. err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err()
  193. if err != nil {
  194. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  195. }
  196. }
  197. if dupes > 0 {
  198. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  199. }
  200. }
  201. }
  202. type GlobalBackfeedManager struct {
  203. Context context.Context
  204. Cancel context.CancelFunc
  205. ActiveFeeds map[string]*ProjectBackfeedManager
  206. ActiveSlugs map[string]string
  207. TrackerRedis *redis.Client
  208. BackfeedRedis *redis.ClusterClient
  209. LegacyRedis *redis.Client
  210. Lock sync.RWMutex
  211. Populated *abool.AtomicBool
  212. }
  213. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  214. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  215. if err != nil {
  216. return err
  217. }
  218. var projects []string
  219. projectSlugMap := map[string][]string{}
  220. for slug, project := range slugProjectMap {
  221. projectSlugMap[project] = append(projectSlugMap[project], slug)
  222. }
  223. for project := range projectSlugMap {
  224. projects = append(projects, project)
  225. }
  226. projectConfigs := map[string]ProjectConfig{}
  227. if len(projects) != 0 {
  228. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  229. if err != nil {
  230. return err
  231. }
  232. if len(projects) != len(cfgi) {
  233. return fmt.Errorf("hmget result had unexpected length")
  234. }
  235. for i, project := range projects {
  236. configString, ok := cfgi[i].(string)
  237. if !ok {
  238. continue
  239. }
  240. config := ProjectConfig{}
  241. err := json.Unmarshal([]byte(configString), &config)
  242. if err != nil {
  243. continue
  244. }
  245. projectConfigs[project] = config
  246. }
  247. }
  248. projects = nil
  249. for project := range projectSlugMap {
  250. if _, has := projectConfigs[project]; !has {
  251. delete(projectSlugMap, project)
  252. continue
  253. }
  254. projects = append(projects, project)
  255. }
  256. for slug, project := range slugProjectMap {
  257. if _, has := projectConfigs[project]; !has {
  258. delete(slugProjectMap, slug)
  259. }
  260. }
  261. // add feeds for new projects
  262. for _, project := range projects {
  263. projectConfig := projectConfigs[project]
  264. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  265. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  266. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  267. outdatedProjectBackfeedManager = projectBackfeedManager
  268. } else {
  269. continue
  270. }
  271. }
  272. ctx, cancel := context.WithCancel(that.Context)
  273. projectBackfeedManager := &ProjectBackfeedManager{
  274. Context: ctx,
  275. Cancel: cancel,
  276. Done: make(chan bool),
  277. C: make(chan *BackfeedItem, ItemChannelBuffer),
  278. BackfeedRedis: that.BackfeedRedis,
  279. Name: project,
  280. ProjectConfig: projectConfig,
  281. LegacyRedis: that.LegacyRedis,
  282. }
  283. if projectConfig.RedisConfig != nil {
  284. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  285. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  286. Username: "default",
  287. Password: projectConfig.RedisConfig.Pass,
  288. ReadTimeout: 15 * time.Minute,
  289. })
  290. } else {
  291. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  292. }
  293. go projectBackfeedManager.Do()
  294. that.Lock.Lock()
  295. that.ActiveFeeds[project] = projectBackfeedManager
  296. that.Lock.Unlock()
  297. if outdatedProjectBackfeedManager != nil {
  298. outdatedProjectBackfeedManager.Cancel()
  299. <-outdatedProjectBackfeedManager.Done
  300. log.Printf("updated project: %s", project)
  301. } else {
  302. log.Printf("added project: %s", project)
  303. }
  304. }
  305. that.Lock.Lock()
  306. that.ActiveSlugs = slugProjectMap
  307. that.Lock.Unlock()
  308. // remove feeds for old projects
  309. for project, projectBackfeedManager := range that.ActiveFeeds {
  310. if _, has := projectSlugMap[project]; has {
  311. continue
  312. }
  313. log.Printf("removing project: %s", project)
  314. that.Lock.Lock()
  315. delete(that.ActiveFeeds, project)
  316. that.Lock.Unlock()
  317. projectBackfeedManager.Cancel()
  318. <-projectBackfeedManager.Done
  319. log.Printf("removed project: %s", project)
  320. }
  321. if !that.Populated.IsSet() {
  322. that.Populated.Set()
  323. }
  324. return nil
  325. }
  326. type Splitter struct {
  327. Delimiter []byte
  328. IgnoreEOF bool
  329. }
  330. func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) {
  331. for i := 0; i < len(data); i++ {
  332. if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) {
  333. return i + len(that.Delimiter), data[:i], nil
  334. }
  335. }
  336. if len(data) == 0 || !atEOF {
  337. return 0, nil, nil
  338. }
  339. if atEOF && that.IgnoreEOF {
  340. return len(data), data, nil
  341. }
  342. return 0, data, io.ErrUnexpectedEOF
  343. }
  344. func GenShardHash(b []byte) (final byte) {
  345. for i, b := range b {
  346. final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i)
  347. }
  348. return final
  349. }
  350. func WriteResponse(res http.ResponseWriter, statusCode int, v interface{}) {
  351. res.Header().Set("Content-Type", "application/json")
  352. res.WriteHeader(statusCode)
  353. if statusCode == http.StatusNoContent {
  354. return
  355. }
  356. if err, isError := v.(error); isError {
  357. v = map[string]interface{}{
  358. "error": fmt.Sprintf("%v", err),
  359. "status_code": statusCode,
  360. }
  361. } else {
  362. v = map[string]interface{}{
  363. "data": v,
  364. "status_code": statusCode,
  365. }
  366. }
  367. json.NewEncoder(res).Encode(v)
  368. }
  369. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  370. that.Lock.RLock()
  371. defer that.Lock.RUnlock()
  372. project, has := that.ActiveSlugs[slug]
  373. if !has {
  374. return nil
  375. }
  376. projectBackfeedManager, has := that.ActiveFeeds[project]
  377. if !has {
  378. return nil
  379. }
  380. return projectBackfeedManager
  381. }
  382. func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) {
  383. defer req.Body.Close()
  384. vars := mux.Vars(req)
  385. slug := vars["slug"]
  386. secondaryShard := req.URL.Query().Get("shard")
  387. projectBackfeedManager := that.GetFeed(slug)
  388. if projectBackfeedManager == nil {
  389. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  390. return
  391. }
  392. splitter := &Splitter{
  393. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  394. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  395. }
  396. if len(splitter.Delimiter) == 0 {
  397. splitter.Delimiter = []byte{0x00}
  398. }
  399. scanner := bufio.NewScanner(req.Body)
  400. scanner.Split(splitter.Split)
  401. var err error
  402. statusCode := http.StatusNoContent
  403. n := 0
  404. for scanner.Scan() {
  405. b := scanner.Bytes()
  406. if len(b) == 0 {
  407. continue
  408. }
  409. bcopy := make([]byte, len(b))
  410. copy(bcopy, b)
  411. item := &BackfeedItem{
  412. PrimaryShard: GenShardHash(bcopy),
  413. SecondaryShard: secondaryShard,
  414. Item: bcopy,
  415. }
  416. ok := projectBackfeedManager.PushItem(req.Context(), item)
  417. if !ok {
  418. err = fmt.Errorf("channel closed")
  419. statusCode = http.StatusServiceUnavailable
  420. break
  421. }
  422. n++
  423. }
  424. if err == nil {
  425. err = scanner.Err()
  426. if err != nil {
  427. statusCode = http.StatusBadRequest
  428. }
  429. }
  430. if err != nil {
  431. WriteResponse(res, statusCode, err)
  432. } else {
  433. WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
  434. }
  435. return
  436. }
  437. func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) {
  438. if that.Populated.IsNotSet() {
  439. WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated"))
  440. return
  441. }
  442. err := that.LegacyRedis.Ping(req.Context()).Err()
  443. if err != nil {
  444. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping legacy redis: %s", err))
  445. return
  446. }
  447. err = that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  448. client.ClientGetName(ctx)
  449. return client.Ping(ctx).Err()
  450. })
  451. if err != nil {
  452. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err))
  453. return
  454. }
  455. WriteResponse(res, http.StatusOK, "ok")
  456. }
  457. func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) {
  458. WriteResponse(res, http.StatusOK, "pong")
  459. }
  460. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  461. that.Populated.UnSet()
  462. that.Cancel()
  463. for project, projectBackfeedManager := range that.ActiveFeeds {
  464. log.Printf("waiting for %s channel to shut down...", project)
  465. <-projectBackfeedManager.Done
  466. delete(that.ActiveFeeds, project)
  467. }
  468. }
  469. func main() {
  470. log.SetFlags(log.Flags() | log.Lshortfile)
  471. trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER"))
  472. if err != nil {
  473. log.Panicf("invalid REDIS_TRACKER url: %s", err)
  474. }
  475. trackerRedisOptions.ReadTimeout = 15 * time.Minute
  476. trackerRedisClient := redis.NewClient(trackerRedisOptions)
  477. legacyRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_LEGACY"))
  478. if err != nil {
  479. log.Panicf("invalid REDIS_LEGACY url: %s", err)
  480. }
  481. legacyRedisOptions.ReadTimeout = 15 * time.Minute
  482. legacyRedisClient := redis.NewClient(legacyRedisOptions)
  483. backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{
  484. Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","),
  485. Username: os.Getenv("REDIS_BACKFEED_USERNAME"),
  486. Password: os.Getenv("REDIS_BACKFEED_PASSWORD"),
  487. ReadTimeout: 15 * time.Minute,
  488. })
  489. legacyRedisMetricsHook := redisprom.NewHook(
  490. redisprom.WithInstanceName("legacy"),
  491. )
  492. legacyRedisClient.AddHook(legacyRedisMetricsHook)
  493. backfeedRedisMetricsHook := redisprom.NewHook(
  494. redisprom.WithInstanceName("backfeed"),
  495. )
  496. backfeedRedisClient.AddHook(backfeedRedisMetricsHook)
  497. trackerRedisMetricsHook := redisprom.NewHook(
  498. redisprom.WithInstanceName("tracker"),
  499. )
  500. trackerRedisClient.AddHook(trackerRedisMetricsHook)
  501. if err := trackerRedisClient.Ping(context.Background()).Err(); err != nil {
  502. log.Panicf("unable to ping tracker redis: %s", err)
  503. }
  504. if err := backfeedRedisClient.Ping(context.Background()).Err(); err != nil {
  505. log.Panicf("unable to ping backfeed redis: %s", err)
  506. }
  507. err = backfeedRedisClient.ForEachShard(context.Background(), func(ctx context.Context, client *redis.Client) error {
  508. client.ClientGetName(ctx)
  509. return client.Ping(ctx).Err()
  510. })
  511. if err := legacyRedisClient.Ping(context.Background()).Err(); err != nil {
  512. log.Panicf("unable to ping legacy redis: %s", err)
  513. }
  514. globalBackfeedManager := &GlobalBackfeedManager{
  515. ActiveFeeds: map[string]*ProjectBackfeedManager{},
  516. ActiveSlugs: map[string]string{},
  517. TrackerRedis: trackerRedisClient,
  518. BackfeedRedis: backfeedRedisClient,
  519. LegacyRedis: legacyRedisClient,
  520. Populated: abool.New(),
  521. }
  522. globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background())
  523. defer globalBackfeedManager.CancelAllFeeds()
  524. err = globalBackfeedManager.RefreshFeeds()
  525. if err != nil {
  526. log.Panicf("unable to set up backfeed projects: %s", err)
  527. }
  528. r := mux.NewRouter()
  529. r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.HandleLegacy)
  530. r.Methods(http.MethodGet).Path("/ping").HandlerFunc(globalBackfeedManager.HandlePing)
  531. r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth)
  532. rMetrics := mux.NewRouter()
  533. rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
  534. rMetrics.Path("/metrics").Handler(promhttp.Handler())
  535. doneChan := make(chan bool)
  536. serveErrChan := make(chan error)
  537. go func() {
  538. s := &http.Server{
  539. Addr: os.Getenv("HTTP_ADDR"),
  540. IdleTimeout: 1 * time.Hour,
  541. MaxHeaderBytes: 1 * 1024 * 1024,
  542. Handler: r,
  543. }
  544. serveErrChan <- s.ListenAndServe()
  545. }()
  546. metricsErrChan := make(chan error)
  547. go func() {
  548. if os.Getenv("METRICS_ADDR") != "" {
  549. s := &http.Server{
  550. Addr: os.Getenv("METRICS_ADDR"),
  551. IdleTimeout: 1 * time.Hour,
  552. MaxHeaderBytes: 1 * 1024 * 1024,
  553. Handler: rMetrics,
  554. }
  555. metricsErrChan <- s.ListenAndServe()
  556. } else {
  557. <-doneChan
  558. metricsErrChan <- nil
  559. }
  560. }()
  561. log.Printf("backfeed listening on %s", os.Getenv("HTTP_ADDR"))
  562. if os.Getenv("METRICS_ADDR") != "" {
  563. log.Printf("metrics/debug listening on %s", os.Getenv("METRICS_ADDR"))
  564. }
  565. sc := make(chan os.Signal, 1)
  566. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  567. ticker := time.NewTicker(1 * time.Second)
  568. for {
  569. select {
  570. case <-sc:
  571. return
  572. case <-ticker.C:
  573. }
  574. err = globalBackfeedManager.RefreshFeeds()
  575. if err != nil {
  576. log.Printf("unable to refresh backfeed projects: %s", err)
  577. }
  578. }
  579. }