25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 

597 satır
15 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net/http"
  11. _ "net/http/pprof"
  12. "os"
  13. "os/signal"
  14. "strings"
  15. "sync"
  16. "syscall"
  17. "time"
  18. "github.com/go-redis/redis/v8"
  19. "github.com/gorilla/mux"
  20. "github.com/prometheus/client_golang/prometheus/promhttp"
  21. "github.com/tevino/abool/v2"
  22. )
  23. const (
  24. ItemChannelBuffer = 100000
  25. ItemWrapSize = 100000
  26. )
  27. type ProjectRedisConfig struct {
  28. Host string `json:"host"`
  29. Pass string `json:"pass"`
  30. Port int `json:"port"`
  31. }
  32. type ProjectConfig struct {
  33. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  34. }
  35. type BackfeedItem struct {
  36. PrimaryShard byte
  37. SecondaryShard string
  38. Item []byte
  39. }
  40. type ProjectBackfeedManager struct {
  41. Context context.Context
  42. Cancel context.CancelFunc
  43. Done chan bool
  44. C chan *BackfeedItem
  45. Name string
  46. BackfeedRedis *redis.ClusterClient
  47. ProjectRedis *redis.Client
  48. LegacyRedis *redis.Client
  49. Lock sync.RWMutex
  50. ProjectConfig ProjectConfig
  51. }
  52. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  53. if that.ProjectConfig.RedisConfig == nil && new == nil {
  54. return false
  55. }
  56. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  57. }
  58. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) bool {
  59. that.Lock.RLock()
  60. defer that.Lock.RUnlock()
  61. if that.C == nil {
  62. return false
  63. }
  64. select {
  65. case <-ctx.Done():
  66. return false
  67. case <-that.Context.Done():
  68. return false
  69. case that.C <- item:
  70. return true
  71. }
  72. }
  73. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  74. if blocking {
  75. select {
  76. case <-that.Context.Done():
  77. return nil, false
  78. case item := <-that.C:
  79. return item, true
  80. }
  81. } else {
  82. select {
  83. case <-that.Context.Done():
  84. return nil, false
  85. case item := <-that.C:
  86. return item, true
  87. default:
  88. return nil, false
  89. }
  90. }
  91. }
  92. func (that *ProjectBackfeedManager) CloseItemChannel() {
  93. that.Lock.Lock()
  94. defer that.Lock.Unlock()
  95. if that.C == nil {
  96. return
  97. }
  98. close(that.C)
  99. that.C = nil
  100. }
  101. func (that *ProjectBackfeedManager) Do() {
  102. defer close(that.Done)
  103. defer that.CloseItemChannel()
  104. defer that.Cancel()
  105. for {
  106. select {
  107. case <-that.Context.Done():
  108. break
  109. case <-that.Done:
  110. break
  111. default:
  112. }
  113. item, ok := that.PopItem(true)
  114. if !ok {
  115. break
  116. }
  117. keyMap := map[string][][]byte{}
  118. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  119. keyMap[key] = append(keyMap[key], item.Item)
  120. wrapped := 1
  121. for wrapped < ItemWrapSize {
  122. item, ok := that.PopItem(false)
  123. if !ok {
  124. break
  125. }
  126. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  127. keyMap[key] = append(keyMap[key], item.Item)
  128. wrapped++
  129. }
  130. select {
  131. case <-that.Context.Done():
  132. break
  133. case <-that.Done:
  134. break
  135. default:
  136. }
  137. resultMap := map[string]*redis.Cmd{}
  138. pipe := that.BackfeedRedis.Pipeline()
  139. for key, items := range keyMap {
  140. args := []interface{}{
  141. "bf.madd",
  142. key,
  143. }
  144. for _, item := range items {
  145. args = append(args, item)
  146. }
  147. resultMap[key] = pipe.Do(context.Background(), args...)
  148. }
  149. _, err := pipe.Exec(context.Background())
  150. if err != nil {
  151. log.Printf("%s", err)
  152. }
  153. var sAddItems []interface{}
  154. for key, items := range keyMap {
  155. res, err := resultMap[key].BoolSlice()
  156. if err != nil {
  157. log.Printf("%s", err)
  158. continue
  159. }
  160. if len(res) != len(keyMap[key]) {
  161. continue
  162. }
  163. for i, v := range res {
  164. if v {
  165. sAddItems = append(sAddItems, items[i])
  166. }
  167. }
  168. }
  169. dupes := wrapped - len(sAddItems)
  170. if len(sAddItems) != 0 {
  171. args := []interface{}{
  172. "bf.mexists",
  173. that.Name,
  174. }
  175. args = append(args, sAddItems...)
  176. res, err := that.LegacyRedis.Do(context.Background(), args...).BoolSlice()
  177. if err != nil {
  178. log.Printf("unable to dedupe against %s legacy backfeed: %s", that.Name, err)
  179. } else if len(res) == len(sAddItems) {
  180. var filteredSAddItems []interface{}
  181. for i, v := range res {
  182. if !v {
  183. filteredSAddItems = append(filteredSAddItems, sAddItems[i])
  184. }
  185. }
  186. sAddItems = filteredSAddItems
  187. }
  188. }
  189. if len(sAddItems) != 0 {
  190. err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err()
  191. if err != nil {
  192. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  193. }
  194. }
  195. if dupes > 0 {
  196. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  197. }
  198. }
  199. }
  200. type GlobalBackfeedManager struct {
  201. Context context.Context
  202. Cancel context.CancelFunc
  203. ActiveFeeds map[string]*ProjectBackfeedManager
  204. ActiveSlugs map[string]string
  205. TrackerRedis *redis.Client
  206. BackfeedRedis *redis.ClusterClient
  207. LegacyRedis *redis.Client
  208. Lock sync.RWMutex
  209. Populated *abool.AtomicBool
  210. }
  211. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  212. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  213. if err != nil {
  214. return err
  215. }
  216. var projects []string
  217. projectSlugMap := map[string][]string{}
  218. for slug, project := range slugProjectMap {
  219. projectSlugMap[project] = append(projectSlugMap[project], slug)
  220. }
  221. for project := range projectSlugMap {
  222. projects = append(projects, project)
  223. }
  224. projectConfigs := map[string]ProjectConfig{}
  225. if len(projects) != 0 {
  226. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  227. if err != nil {
  228. return err
  229. }
  230. if len(projects) != len(cfgi) {
  231. return fmt.Errorf("hmget result had unexpected length")
  232. }
  233. for i, project := range projects {
  234. configString, ok := cfgi[i].(string)
  235. if !ok {
  236. continue
  237. }
  238. config := ProjectConfig{}
  239. err := json.Unmarshal([]byte(configString), &config)
  240. if err != nil {
  241. continue
  242. }
  243. projectConfigs[project] = config
  244. }
  245. }
  246. projects = nil
  247. for project := range projectSlugMap {
  248. if _, has := projectConfigs[project]; !has {
  249. delete(projectSlugMap, project)
  250. continue
  251. }
  252. projects = append(projects, project)
  253. }
  254. for slug, project := range slugProjectMap {
  255. if _, has := projectConfigs[project]; !has {
  256. delete(slugProjectMap, slug)
  257. }
  258. }
  259. // add feeds for new projects
  260. for _, project := range projects {
  261. projectConfig := projectConfigs[project]
  262. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  263. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  264. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  265. outdatedProjectBackfeedManager = projectBackfeedManager
  266. } else {
  267. continue
  268. }
  269. }
  270. ctx, cancel := context.WithCancel(that.Context)
  271. projectBackfeedManager := &ProjectBackfeedManager{
  272. Context: ctx,
  273. Cancel: cancel,
  274. Done: make(chan bool),
  275. C: make(chan *BackfeedItem, ItemChannelBuffer),
  276. BackfeedRedis: that.BackfeedRedis,
  277. Name: project,
  278. ProjectConfig: projectConfig,
  279. LegacyRedis: that.LegacyRedis,
  280. }
  281. if projectConfig.RedisConfig != nil {
  282. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  283. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  284. Username: "default",
  285. Password: projectConfig.RedisConfig.Pass,
  286. ReadTimeout: 15 * time.Minute,
  287. })
  288. } else {
  289. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  290. }
  291. go projectBackfeedManager.Do()
  292. that.Lock.Lock()
  293. that.ActiveFeeds[project] = projectBackfeedManager
  294. that.Lock.Unlock()
  295. if outdatedProjectBackfeedManager != nil {
  296. outdatedProjectBackfeedManager.Cancel()
  297. <-outdatedProjectBackfeedManager.Done
  298. log.Printf("updated project: %s", project)
  299. } else {
  300. log.Printf("added project: %s", project)
  301. }
  302. }
  303. that.Lock.Lock()
  304. that.ActiveSlugs = slugProjectMap
  305. that.Lock.Unlock()
  306. // remove feeds for old projects
  307. for project, projectBackfeedManager := range that.ActiveFeeds {
  308. if _, has := projectSlugMap[project]; has {
  309. continue
  310. }
  311. log.Printf("removing project: %s", project)
  312. that.Lock.Lock()
  313. delete(that.ActiveFeeds, project)
  314. that.Lock.Unlock()
  315. projectBackfeedManager.Cancel()
  316. <-projectBackfeedManager.Done
  317. log.Printf("removed project: %s", project)
  318. }
  319. if !that.Populated.IsSet() {
  320. that.Populated.Set()
  321. }
  322. return nil
  323. }
  324. type Splitter struct {
  325. Delimiter []byte
  326. IgnoreEOF bool
  327. }
  328. func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) {
  329. for i := 0; i < len(data); i++ {
  330. if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) {
  331. return i + len(that.Delimiter), data[:i], nil
  332. }
  333. }
  334. if len(data) == 0 || !atEOF {
  335. return 0, nil, nil
  336. }
  337. if atEOF && that.IgnoreEOF {
  338. return len(data), data, nil
  339. }
  340. return 0, data, io.ErrUnexpectedEOF
  341. }
  342. func GenShardHash(b []byte) (final byte) {
  343. for i, b := range b {
  344. final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i)
  345. }
  346. return final
  347. }
  348. func WriteResponse(res http.ResponseWriter, statusCode int, v interface{}) {
  349. res.Header().Set("Content-Type", "application/json")
  350. res.WriteHeader(statusCode)
  351. if statusCode == http.StatusNoContent {
  352. return
  353. }
  354. if err, isError := v.(error); isError {
  355. v = map[string]interface{}{
  356. "error": fmt.Sprintf("%v", err),
  357. "status_code": statusCode,
  358. }
  359. } else {
  360. v = map[string]interface{}{
  361. "data": v,
  362. "status_code": statusCode,
  363. }
  364. }
  365. json.NewEncoder(res).Encode(v)
  366. }
  367. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  368. that.Lock.RLock()
  369. defer that.Lock.RUnlock()
  370. project, has := that.ActiveSlugs[slug]
  371. if !has {
  372. return nil
  373. }
  374. projectBackfeedManager, has := that.ActiveFeeds[project]
  375. if !has {
  376. return nil
  377. }
  378. return projectBackfeedManager
  379. }
  380. func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) {
  381. defer req.Body.Close()
  382. vars := mux.Vars(req)
  383. slug := vars["slug"]
  384. secondaryShard := req.URL.Query().Get("shard")
  385. projectBackfeedManager := that.GetFeed(slug)
  386. if projectBackfeedManager == nil {
  387. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  388. return
  389. }
  390. splitter := &Splitter{
  391. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  392. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  393. }
  394. if len(splitter.Delimiter) == 0 {
  395. splitter.Delimiter = []byte{0x00}
  396. }
  397. scanner := bufio.NewScanner(req.Body)
  398. scanner.Split(splitter.Split)
  399. var err error
  400. statusCode := http.StatusNoContent
  401. n := 0
  402. for scanner.Scan() {
  403. b := scanner.Bytes()
  404. if len(b) == 0 {
  405. continue
  406. }
  407. bcopy := make([]byte, len(b))
  408. copy(bcopy, b)
  409. item := &BackfeedItem{
  410. PrimaryShard: GenShardHash(bcopy),
  411. SecondaryShard: secondaryShard,
  412. Item: bcopy,
  413. }
  414. ok := projectBackfeedManager.PushItem(req.Context(), item)
  415. if !ok {
  416. err = fmt.Errorf("channel closed")
  417. statusCode = http.StatusServiceUnavailable
  418. break
  419. }
  420. n++
  421. }
  422. if err == nil {
  423. err = scanner.Err()
  424. if err != nil {
  425. statusCode = http.StatusBadRequest
  426. }
  427. }
  428. if err != nil {
  429. WriteResponse(res, statusCode, err)
  430. } else {
  431. WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
  432. }
  433. return
  434. }
  435. func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) {
  436. if that.Populated.IsNotSet() {
  437. WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated"))
  438. return
  439. }
  440. err := that.LegacyRedis.Ping(req.Context()).Err()
  441. if err != nil {
  442. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping legacy redis: %s", err))
  443. return
  444. }
  445. err = that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  446. return client.Ping(ctx).Err()
  447. })
  448. if err != nil {
  449. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err))
  450. return
  451. }
  452. WriteResponse(res, http.StatusOK, "ok")
  453. }
  454. func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) {
  455. WriteResponse(res, http.StatusOK, "pong")
  456. }
  457. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  458. that.Populated.UnSet()
  459. that.Cancel()
  460. for project, projectBackfeedManager := range that.ActiveFeeds {
  461. log.Printf("waiting for %s channel to shut down...", project)
  462. <-projectBackfeedManager.Done
  463. delete(that.ActiveFeeds, project)
  464. }
  465. }
  466. func main() {
  467. log.SetFlags(log.Flags() | log.Lshortfile)
  468. trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER"))
  469. if err != nil {
  470. log.Panicf("invalid REDIS_TRACKER url: %s", err)
  471. }
  472. trackerRedisOptions.ReadTimeout = 15 * time.Minute
  473. trackerRedisClient := redis.NewClient(trackerRedisOptions)
  474. legacyRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_LEGACY"))
  475. if err != nil {
  476. log.Panicf("invalid REDIS_LEGACY url: %s", err)
  477. }
  478. legacyRedisOptions.ReadTimeout = 15 * time.Minute
  479. legacyRedisClient := redis.NewClient(legacyRedisOptions)
  480. backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{
  481. Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","),
  482. Username: os.Getenv("REDIS_BACKFEED_USERNAME"),
  483. Password: os.Getenv("REDIS_BACKFEED_PASSWORD"),
  484. ReadTimeout: 15 * time.Minute,
  485. })
  486. if err := trackerRedisClient.Ping(context.Background()).Err(); err != nil {
  487. log.Panicf("unable to ping tracker redis: %s", err)
  488. }
  489. if err := backfeedRedisClient.Ping(context.Background()).Err(); err != nil {
  490. log.Panicf("unable to ping backfeed redis: %s", err)
  491. }
  492. if err := legacyRedisClient.Ping(context.Background()).Err(); err != nil {
  493. log.Panicf("unable to ping legacy redis: %s", err)
  494. }
  495. globalBackfeedManager := &GlobalBackfeedManager{
  496. ActiveFeeds: map[string]*ProjectBackfeedManager{},
  497. ActiveSlugs: map[string]string{},
  498. TrackerRedis: trackerRedisClient,
  499. BackfeedRedis: backfeedRedisClient,
  500. LegacyRedis: legacyRedisClient,
  501. }
  502. globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background())
  503. defer globalBackfeedManager.CancelAllFeeds()
  504. err = globalBackfeedManager.RefreshFeeds()
  505. if err != nil {
  506. log.Panicf("unable to set up backfeed projects: %s", err)
  507. }
  508. r := mux.NewRouter()
  509. r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.HandleLegacy)
  510. r.Methods(http.MethodGet).Path("/ping").HandlerFunc(globalBackfeedManager.HandlePing)
  511. r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth)
  512. rMetrics := mux.NewRouter()
  513. rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
  514. rMetrics.Path("/metrics").Handler(promhttp.Handler())
  515. doneChan := make(chan bool)
  516. serveErrChan := make(chan error)
  517. go func() {
  518. s := &http.Server{
  519. Addr: os.Getenv("HTTP_ADDR"),
  520. IdleTimeout: 1 * time.Hour,
  521. MaxHeaderBytes: 1 * 1024 * 1024,
  522. Handler: r,
  523. }
  524. serveErrChan <- s.ListenAndServe()
  525. }()
  526. metricsErrChan := make(chan error)
  527. go func() {
  528. if os.Getenv("METRICS_ADDR") != "" {
  529. s := &http.Server{
  530. Addr: os.Getenv("METRICS_ADDR"),
  531. IdleTimeout: 1 * time.Hour,
  532. MaxHeaderBytes: 1 * 1024 * 1024,
  533. Handler: rMetrics,
  534. }
  535. metricsErrChan <- s.ListenAndServe()
  536. } else {
  537. <-doneChan
  538. metricsErrChan <- nil
  539. }
  540. }()
  541. log.Printf("backfeed listening on %s", os.Getenv("HTTP_ADDR"))
  542. if os.Getenv("METRICS_ADDR") != "" {
  543. log.Printf("metrics/debug listening on %s", os.Getenv("METRICS_ADDR"))
  544. }
  545. sc := make(chan os.Signal, 1)
  546. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  547. ticker := time.NewTicker(1 * time.Second)
  548. for {
  549. select {
  550. case <-sc:
  551. return
  552. case <-ticker.C:
  553. }
  554. err = globalBackfeedManager.RefreshFeeds()
  555. if err != nil {
  556. log.Printf("unable to refresh backfeed projects: %s", err)
  557. }
  558. }
  559. }