25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 

586 satır
16 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net/http"
  11. _ "net/http/pprof"
  12. "os"
  13. "os/signal"
  14. "strings"
  15. "sync"
  16. "syscall"
  17. "time"
  18. redisprom "github.com/globocom/go-redis-prometheus"
  19. "github.com/go-redis/redis/v8"
  20. "github.com/gorilla/mux"
  21. "github.com/prometheus/client_golang/prometheus/promhttp"
  22. "github.com/tevino/abool/v2"
  23. )
  24. const (
  25. ItemChannelBuffer = 100000
  26. ItemWrapSize = 100000
  27. )
  28. type ProjectRedisConfig struct {
  29. Host string `json:"host"`
  30. Pass string `json:"pass"`
  31. Port int `json:"port"`
  32. }
  33. type ProjectConfig struct {
  34. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  35. }
  36. type BackfeedItem struct {
  37. PrimaryShard byte
  38. SecondaryShard string
  39. Item []byte
  40. }
  41. type ProjectBackfeedManager struct {
  42. Context context.Context
  43. Cancel context.CancelFunc
  44. Done chan bool
  45. C chan *BackfeedItem
  46. Name string
  47. BackfeedRedis *redis.ClusterClient
  48. ProjectRedis *redis.Client
  49. //Lock sync.RWMutex
  50. ProjectConfig ProjectConfig
  51. }
  52. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  53. if that.ProjectConfig.RedisConfig == nil && new == nil {
  54. return false
  55. }
  56. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  57. }
  58. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error {
  59. //that.Lock.RLock()
  60. //defer that.Lock.RUnlock()
  61. //if that.C == nil {
  62. // return false
  63. //}
  64. select {
  65. case <-ctx.Done():
  66. return ctx.Err()
  67. case <-that.Context.Done():
  68. return fmt.Errorf("backfeed channel closed")
  69. case that.C <- item:
  70. return nil
  71. default:
  72. return fmt.Errorf("backfeed channel full")
  73. }
  74. }
  75. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  76. if blocking {
  77. select {
  78. case <-that.Context.Done():
  79. return nil, false
  80. case item, ok := <-that.C:
  81. return item, ok
  82. }
  83. } else {
  84. select {
  85. case <-that.Context.Done():
  86. return nil, false
  87. case item, ok := <-that.C:
  88. return item, ok
  89. default:
  90. return nil, false
  91. }
  92. }
  93. }
  94. //func (that *ProjectBackfeedManager) CloseItemChannel() {
  95. // log.Printf("closing item channel for %s", that.Name)
  96. // that.Lock.Lock()
  97. // defer that.Lock.Unlock()
  98. // if that.C == nil {
  99. // return
  100. // }
  101. // close(that.C)
  102. // that.C = nil
  103. //}
  104. func (that *ProjectBackfeedManager) Do() {
  105. defer close(that.Done)
  106. //defer that.CloseItemChannel()
  107. defer that.Cancel()
  108. for {
  109. select {
  110. case <-that.Context.Done():
  111. break
  112. case <-that.Done:
  113. break
  114. default:
  115. }
  116. item, ok := that.PopItem(true)
  117. if !ok {
  118. break
  119. }
  120. keyMap := map[string][][]byte{}
  121. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  122. keyMap[key] = append(keyMap[key], item.Item)
  123. wrapped := 1
  124. for wrapped < ItemWrapSize {
  125. item, ok := that.PopItem(false)
  126. if !ok {
  127. break
  128. }
  129. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  130. keyMap[key] = append(keyMap[key], item.Item)
  131. wrapped++
  132. }
  133. select {
  134. case <-that.Context.Done():
  135. break
  136. case <-that.Done:
  137. break
  138. default:
  139. }
  140. now := time.Now()
  141. resultMap := map[string]*redis.Cmd{}
  142. pipe := that.BackfeedRedis.Pipeline()
  143. lastTS := make([]any, 0, len(keyMap)*2)
  144. for key := range keyMap {
  145. lastTS = append(lastTS, key)
  146. lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix()))
  147. }
  148. pipe.HSet(context.Background(), ":last_ts", lastTS...)
  149. for key, items := range keyMap {
  150. args := []any{
  151. "bf.madd",
  152. key,
  153. }
  154. for _, item := range items {
  155. args = append(args, item)
  156. }
  157. resultMap[key] = pipe.Do(context.Background(), args...)
  158. }
  159. var sAddErrItems []any
  160. if _, err := pipe.Exec(context.Background()); err != nil {
  161. log.Printf("%s", err)
  162. for _, items := range keyMap {
  163. for _, item := range items {
  164. sAddErrItems = append(sAddErrItems, item)
  165. }
  166. }
  167. if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:backfeed_retry", that.Name), sAddErrItems...).Err(); err != nil {
  168. log.Printf("failed to sadd failed items for %s: %s", that.Name, err)
  169. }
  170. continue
  171. }
  172. var sAddItems []any
  173. for key, items := range keyMap {
  174. res, err := resultMap[key].BoolSlice()
  175. if err != nil {
  176. log.Printf("%s", err)
  177. for _, item := range items {
  178. sAddErrItems = append(sAddErrItems, item)
  179. }
  180. if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:backfeed_retry", that.Name), sAddErrItems...).Err(); err != nil {
  181. log.Printf("failed to sadd failed items for %s: %s", that.Name, err)
  182. }
  183. continue
  184. }
  185. if len(res) != len(keyMap[key]) {
  186. continue
  187. }
  188. for i, v := range res {
  189. if v {
  190. sAddItems = append(sAddItems, items[i])
  191. }
  192. }
  193. }
  194. dupes := wrapped - len(sAddItems)
  195. if len(sAddItems) != 0 {
  196. if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil {
  197. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  198. }
  199. }
  200. if dupes > 0 {
  201. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  202. }
  203. }
  204. }
  205. type GlobalBackfeedManager struct {
  206. Context context.Context
  207. Cancel context.CancelFunc
  208. ActiveFeeds map[string]*ProjectBackfeedManager
  209. ActiveSlugs map[string]string
  210. TrackerRedis *redis.Client
  211. BackfeedRedis *redis.ClusterClient
  212. Lock sync.RWMutex
  213. Populated *abool.AtomicBool
  214. }
  215. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  216. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  217. if err != nil {
  218. return err
  219. }
  220. var projects []string
  221. projectSlugMap := map[string][]string{}
  222. for slug, project := range slugProjectMap {
  223. projectSlugMap[project] = append(projectSlugMap[project], slug)
  224. }
  225. for project := range projectSlugMap {
  226. projects = append(projects, project)
  227. }
  228. projectConfigs := map[string]ProjectConfig{}
  229. if len(projects) != 0 {
  230. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  231. if err != nil {
  232. return err
  233. }
  234. if len(projects) != len(cfgi) {
  235. return fmt.Errorf("hmget result had unexpected length")
  236. }
  237. for i, project := range projects {
  238. configString, ok := cfgi[i].(string)
  239. if !ok {
  240. continue
  241. }
  242. config := ProjectConfig{}
  243. if err := json.Unmarshal([]byte(configString), &config); err != nil {
  244. continue
  245. }
  246. projectConfigs[project] = config
  247. }
  248. }
  249. projects = nil
  250. for project := range projectSlugMap {
  251. if _, has := projectConfigs[project]; !has {
  252. delete(projectSlugMap, project)
  253. continue
  254. }
  255. projects = append(projects, project)
  256. }
  257. for slug, project := range slugProjectMap {
  258. if _, has := projectConfigs[project]; !has {
  259. delete(slugProjectMap, slug)
  260. }
  261. }
  262. // add feeds for new projects
  263. for _, project := range projects {
  264. projectConfig := projectConfigs[project]
  265. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  266. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  267. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  268. outdatedProjectBackfeedManager = projectBackfeedManager
  269. } else {
  270. continue
  271. }
  272. }
  273. ctx, cancel := context.WithCancel(that.Context)
  274. projectBackfeedManager := &ProjectBackfeedManager{
  275. Context: ctx,
  276. Cancel: cancel,
  277. Done: make(chan bool),
  278. C: make(chan *BackfeedItem, ItemChannelBuffer),
  279. BackfeedRedis: that.BackfeedRedis,
  280. Name: project,
  281. ProjectConfig: projectConfig,
  282. }
  283. if projectConfig.RedisConfig != nil {
  284. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  285. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  286. Username: "default",
  287. Password: projectConfig.RedisConfig.Pass,
  288. ReadTimeout: 15 * time.Minute,
  289. })
  290. } else {
  291. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  292. }
  293. go projectBackfeedManager.Do()
  294. that.Lock.Lock()
  295. that.ActiveFeeds[project] = projectBackfeedManager
  296. that.Lock.Unlock()
  297. if outdatedProjectBackfeedManager != nil {
  298. outdatedProjectBackfeedManager.Cancel()
  299. <-outdatedProjectBackfeedManager.Done
  300. log.Printf("updated project: %s", project)
  301. } else {
  302. log.Printf("added project: %s", project)
  303. }
  304. }
  305. that.Lock.Lock()
  306. that.ActiveSlugs = slugProjectMap
  307. that.Lock.Unlock()
  308. // remove feeds for old projects
  309. for project, projectBackfeedManager := range that.ActiveFeeds {
  310. if _, has := projectSlugMap[project]; has {
  311. continue
  312. }
  313. log.Printf("removing project: %s", project)
  314. that.Lock.Lock()
  315. delete(that.ActiveFeeds, project)
  316. that.Lock.Unlock()
  317. projectBackfeedManager.Cancel()
  318. <-projectBackfeedManager.Done
  319. log.Printf("removed project: %s", project)
  320. }
  321. if !that.Populated.IsSet() {
  322. that.Populated.Set()
  323. }
  324. return nil
  325. }
  326. type Splitter struct {
  327. Delimiter []byte
  328. IgnoreEOF bool
  329. }
  330. func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) {
  331. for i := 0; i < len(data); i++ {
  332. if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) {
  333. return i + len(that.Delimiter), data[:i], nil
  334. }
  335. }
  336. if len(data) == 0 || !atEOF {
  337. return 0, nil, nil
  338. }
  339. if atEOF && that.IgnoreEOF {
  340. return len(data), data, nil
  341. }
  342. return 0, data, io.ErrUnexpectedEOF
  343. }
  344. func GenShardHash(b []byte) (final byte) {
  345. for i, b := range b {
  346. final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i)
  347. }
  348. return final
  349. }
  350. func WriteResponse(res http.ResponseWriter, statusCode int, v any) {
  351. res.Header().Set("Content-Type", "application/json")
  352. res.WriteHeader(statusCode)
  353. if statusCode == http.StatusNoContent {
  354. return
  355. }
  356. if err, isError := v.(error); isError {
  357. v = map[string]any{
  358. "error": fmt.Sprintf("%v", err),
  359. "status_code": statusCode,
  360. }
  361. } else {
  362. v = map[string]any{
  363. "data": v,
  364. "status_code": statusCode,
  365. }
  366. }
  367. json.NewEncoder(res).Encode(v)
  368. }
  369. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  370. that.Lock.RLock()
  371. defer that.Lock.RUnlock()
  372. project, has := that.ActiveSlugs[slug]
  373. if !has {
  374. return nil
  375. }
  376. projectBackfeedManager, has := that.ActiveFeeds[project]
  377. if !has {
  378. return nil
  379. }
  380. return projectBackfeedManager
  381. }
  382. func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) {
  383. defer req.Body.Close()
  384. vars := mux.Vars(req)
  385. slug := vars["slug"]
  386. secondaryShard := req.URL.Query().Get("shard")
  387. projectBackfeedManager := that.GetFeed(slug)
  388. if projectBackfeedManager == nil {
  389. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  390. return
  391. }
  392. splitter := &Splitter{
  393. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  394. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  395. }
  396. if len(splitter.Delimiter) == 0 {
  397. splitter.Delimiter = []byte{0x00}
  398. }
  399. scanner := bufio.NewScanner(req.Body)
  400. scanner.Split(splitter.Split)
  401. statusCode := http.StatusNoContent
  402. n := 0
  403. for scanner.Scan() {
  404. b := scanner.Bytes()
  405. if len(b) == 0 {
  406. continue
  407. }
  408. bcopy := make([]byte, len(b))
  409. copy(bcopy, b)
  410. item := &BackfeedItem{
  411. PrimaryShard: GenShardHash(bcopy),
  412. SecondaryShard: secondaryShard,
  413. Item: bcopy,
  414. }
  415. if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil {
  416. WriteResponse(res, http.StatusServiceUnavailable, err)
  417. return
  418. }
  419. n++
  420. }
  421. if err := scanner.Err(); err != nil {
  422. WriteResponse(res, statusCode, err)
  423. return
  424. }
  425. WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
  426. return
  427. }
  428. func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) {
  429. if that.Populated.IsNotSet() {
  430. WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated"))
  431. return
  432. }
  433. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  434. client.ClientGetName(ctx)
  435. return client.Ping(ctx).Err()
  436. }); err != nil {
  437. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err))
  438. return
  439. }
  440. WriteResponse(res, http.StatusOK, "ok")
  441. }
  442. func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) {
  443. WriteResponse(res, http.StatusOK, "pong")
  444. }
  445. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  446. that.Populated.UnSet()
  447. that.Cancel()
  448. for project, projectBackfeedManager := range that.ActiveFeeds {
  449. log.Printf("waiting for %s channel to shut down...", project)
  450. <-projectBackfeedManager.Done
  451. delete(that.ActiveFeeds, project)
  452. }
  453. }
  454. func main() {
  455. log.SetFlags(log.Flags() | log.Lshortfile)
  456. trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER"))
  457. if err != nil {
  458. log.Panicf("invalid REDIS_TRACKER url: %s", err)
  459. }
  460. trackerRedisOptions.ReadTimeout = 15 * time.Minute
  461. trackerRedisClient := redis.NewClient(trackerRedisOptions)
  462. backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{
  463. Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","),
  464. Username: os.Getenv("REDIS_BACKFEED_USERNAME"),
  465. Password: os.Getenv("REDIS_BACKFEED_PASSWORD"),
  466. ReadTimeout: 15 * time.Minute,
  467. PoolSize: 256,
  468. })
  469. backfeedRedisMetricsHook := redisprom.NewHook(
  470. redisprom.WithInstanceName("backfeed"),
  471. )
  472. backfeedRedisClient.AddHook(backfeedRedisMetricsHook)
  473. trackerRedisMetricsHook := redisprom.NewHook(
  474. redisprom.WithInstanceName("tracker"),
  475. )
  476. trackerRedisClient.AddHook(trackerRedisMetricsHook)
  477. if err := trackerRedisClient.Ping(context.Background()).Err(); err != nil {
  478. log.Panicf("unable to ping tracker redis: %s", err)
  479. }
  480. if err := backfeedRedisClient.Ping(context.Background()).Err(); err != nil {
  481. log.Panicf("unable to ping backfeed redis: %s", err)
  482. }
  483. err = backfeedRedisClient.ForEachShard(context.Background(), func(ctx context.Context, client *redis.Client) error {
  484. client.ClientGetName(ctx)
  485. return client.Ping(ctx).Err()
  486. })
  487. globalBackfeedManager := &GlobalBackfeedManager{
  488. ActiveFeeds: map[string]*ProjectBackfeedManager{},
  489. ActiveSlugs: map[string]string{},
  490. TrackerRedis: trackerRedisClient,
  491. BackfeedRedis: backfeedRedisClient,
  492. Populated: abool.New(),
  493. }
  494. globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background())
  495. defer globalBackfeedManager.CancelAllFeeds()
  496. if err := globalBackfeedManager.RefreshFeeds(); err != nil {
  497. log.Panicf("unable to set up backfeed projects: %s", err)
  498. }
  499. r := mux.NewRouter()
  500. r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.HandleLegacy)
  501. r.Methods(http.MethodGet).Path("/ping").HandlerFunc(globalBackfeedManager.HandlePing)
  502. r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth)
  503. rMetrics := mux.NewRouter()
  504. rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
  505. rMetrics.Path("/metrics").Handler(promhttp.Handler())
  506. doneChan := make(chan bool)
  507. serveErrChan := make(chan error)
  508. go func() {
  509. s := &http.Server{
  510. Addr: os.Getenv("HTTP_ADDR"),
  511. IdleTimeout: 1 * time.Hour,
  512. MaxHeaderBytes: 1 * 1024 * 1024,
  513. Handler: r,
  514. }
  515. serveErrChan <- s.ListenAndServe()
  516. }()
  517. metricsErrChan := make(chan error)
  518. go func() {
  519. if os.Getenv("METRICS_ADDR") != "" {
  520. s := &http.Server{
  521. Addr: os.Getenv("METRICS_ADDR"),
  522. IdleTimeout: 1 * time.Hour,
  523. MaxHeaderBytes: 1 * 1024 * 1024,
  524. Handler: rMetrics,
  525. }
  526. metricsErrChan <- s.ListenAndServe()
  527. } else {
  528. <-doneChan
  529. metricsErrChan <- nil
  530. }
  531. }()
  532. log.Printf("backfeed listening on %s", os.Getenv("HTTP_ADDR"))
  533. if os.Getenv("METRICS_ADDR") != "" {
  534. log.Printf("metrics/debug listening on %s", os.Getenv("METRICS_ADDR"))
  535. }
  536. sc := make(chan os.Signal, 1)
  537. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  538. ticker := time.NewTicker(1 * time.Second)
  539. for {
  540. select {
  541. case <-sc:
  542. return
  543. case <-ticker.C:
  544. }
  545. if err := globalBackfeedManager.RefreshFeeds(); err != nil {
  546. log.Printf("unable to refresh backfeed projects: %s", err)
  547. }
  548. }
  549. }