You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

565 lines
14 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net/http"
  11. _ "net/http/pprof"
  12. "os"
  13. "os/signal"
  14. "strings"
  15. "sync"
  16. "syscall"
  17. "time"
  18. "github.com/go-redis/redis/v8"
  19. "github.com/gorilla/mux"
  20. "github.com/prometheus/client_golang/prometheus/promhttp"
  21. )
  22. const (
  23. ItemChannelBuffer = 100000
  24. ItemWrapSize = 100000
  25. )
  26. type ProjectRedisConfig struct {
  27. Host string `json:"host"`
  28. Pass string `json:"pass"`
  29. Port int `json:"port"`
  30. }
  31. type ProjectConfig struct {
  32. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  33. }
  34. type BackfeedItem struct {
  35. PrimaryShard byte
  36. SecondaryShard string
  37. Item []byte
  38. }
  39. type ProjectBackfeedManager struct {
  40. Context context.Context
  41. Cancel context.CancelFunc
  42. Done chan bool
  43. C chan *BackfeedItem
  44. Name string
  45. BackfeedRedis *redis.ClusterClient
  46. ProjectRedis *redis.Client
  47. LegacyRedis *redis.Client
  48. Lock sync.RWMutex
  49. ProjectConfig ProjectConfig
  50. }
  51. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  52. if that.ProjectConfig.RedisConfig == nil && new == nil {
  53. return false
  54. }
  55. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  56. }
  57. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) bool {
  58. that.Lock.RLock()
  59. defer that.Lock.RUnlock()
  60. if that.C == nil {
  61. return false
  62. }
  63. select {
  64. case <-ctx.Done():
  65. return false
  66. case <-that.Context.Done():
  67. return false
  68. case that.C <- item:
  69. return true
  70. }
  71. }
  72. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  73. if blocking {
  74. select {
  75. case <-that.Context.Done():
  76. return nil, false
  77. case item := <-that.C:
  78. return item, true
  79. }
  80. } else {
  81. select {
  82. case <-that.Context.Done():
  83. return nil, false
  84. case item := <-that.C:
  85. return item, true
  86. default:
  87. return nil, false
  88. }
  89. }
  90. }
  91. func (that *ProjectBackfeedManager) CloseItemChannel() {
  92. that.Lock.Lock()
  93. defer that.Lock.Unlock()
  94. if that.C == nil {
  95. return
  96. }
  97. close(that.C)
  98. that.C = nil
  99. }
  100. func (that *ProjectBackfeedManager) Do() {
  101. defer close(that.Done)
  102. defer that.CloseItemChannel()
  103. defer that.Cancel()
  104. for {
  105. select {
  106. case <-that.Context.Done():
  107. break
  108. case <-that.Done:
  109. break
  110. default:
  111. }
  112. item, ok := that.PopItem(true)
  113. if !ok {
  114. break
  115. }
  116. keyMap := map[string][][]byte{}
  117. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  118. keyMap[key] = append(keyMap[key], item.Item)
  119. wrapped := 1
  120. for wrapped < ItemWrapSize {
  121. item, ok := that.PopItem(false)
  122. if !ok {
  123. break
  124. }
  125. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  126. keyMap[key] = append(keyMap[key], item.Item)
  127. wrapped++
  128. }
  129. select {
  130. case <-that.Context.Done():
  131. break
  132. case <-that.Done:
  133. break
  134. default:
  135. }
  136. resultMap := map[string]*redis.Cmd{}
  137. pipe := that.BackfeedRedis.Pipeline()
  138. for key, items := range keyMap {
  139. args := []interface{}{
  140. "bf.madd",
  141. key,
  142. }
  143. for _, item := range items {
  144. args = append(args, item)
  145. }
  146. resultMap[key] = pipe.Do(context.Background(), args...)
  147. }
  148. _, err := pipe.Exec(context.Background())
  149. if err != nil {
  150. log.Printf("%s", err)
  151. }
  152. var sAddItems []interface{}
  153. for key, items := range keyMap {
  154. res, err := resultMap[key].BoolSlice()
  155. if err != nil {
  156. log.Printf("%s", err)
  157. continue
  158. }
  159. if len(res) != len(keyMap[key]) {
  160. continue
  161. }
  162. for i, v := range res {
  163. if v {
  164. sAddItems = append(sAddItems, items[i])
  165. }
  166. }
  167. }
  168. dupes := wrapped - len(sAddItems)
  169. if len(sAddItems) != 0 {
  170. args := []interface{}{
  171. "bf.mexists",
  172. that.Name,
  173. }
  174. args = append(args, sAddItems...)
  175. res, err := that.LegacyRedis.Do(context.Background(), args...).BoolSlice()
  176. if err != nil {
  177. log.Printf("unable to dedupe against %s legacy backfeed: %s", that.Name, err)
  178. } else if len(res) == len(sAddItems) {
  179. var filteredSAddItems []interface{}
  180. for i, v := range res {
  181. if !v {
  182. filteredSAddItems = append(filteredSAddItems, sAddItems[i])
  183. }
  184. }
  185. sAddItems = filteredSAddItems
  186. }
  187. }
  188. if len(sAddItems) != 0 {
  189. err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err()
  190. if err != nil {
  191. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  192. }
  193. }
  194. if dupes > 0 {
  195. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  196. }
  197. }
  198. }
  199. type GlobalBackfeedManager struct {
  200. Context context.Context
  201. Cancel context.CancelFunc
  202. ActiveFeeds map[string]*ProjectBackfeedManager
  203. ActiveSlugs map[string]string
  204. TrackerRedis *redis.Client
  205. BackfeedRedis *redis.ClusterClient
  206. LegacyRedis *redis.Client
  207. Lock sync.RWMutex
  208. }
  209. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  210. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  211. if err != nil {
  212. return err
  213. }
  214. var projects []string
  215. projectSlugMap := map[string][]string{}
  216. for slug, project := range slugProjectMap {
  217. projectSlugMap[project] = append(projectSlugMap[project], slug)
  218. }
  219. for project := range projectSlugMap {
  220. projects = append(projects, project)
  221. }
  222. projectConfigs := map[string]ProjectConfig{}
  223. if len(projects) != 0 {
  224. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  225. if err != nil {
  226. return err
  227. }
  228. if len(projects) != len(cfgi) {
  229. return fmt.Errorf("hmget result had unexpected length")
  230. }
  231. for i, project := range projects {
  232. configString, ok := cfgi[i].(string)
  233. if !ok {
  234. continue
  235. }
  236. config := ProjectConfig{}
  237. err := json.Unmarshal([]byte(configString), &config)
  238. if err != nil {
  239. continue
  240. }
  241. projectConfigs[project] = config
  242. }
  243. }
  244. projects = nil
  245. for project := range projectSlugMap {
  246. if _, has := projectConfigs[project]; !has {
  247. delete(projectSlugMap, project)
  248. continue
  249. }
  250. projects = append(projects, project)
  251. }
  252. for slug, project := range slugProjectMap {
  253. if _, has := projectConfigs[project]; !has {
  254. delete(slugProjectMap, slug)
  255. }
  256. }
  257. // add feeds for new projects
  258. for _, project := range projects {
  259. projectConfig := projectConfigs[project]
  260. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  261. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  262. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  263. outdatedProjectBackfeedManager = projectBackfeedManager
  264. } else {
  265. continue
  266. }
  267. }
  268. ctx, cancel := context.WithCancel(that.Context)
  269. projectBackfeedManager := &ProjectBackfeedManager{
  270. Context: ctx,
  271. Cancel: cancel,
  272. Done: make(chan bool),
  273. C: make(chan *BackfeedItem, ItemChannelBuffer),
  274. BackfeedRedis: that.BackfeedRedis,
  275. Name: project,
  276. ProjectConfig: projectConfig,
  277. LegacyRedis: that.LegacyRedis,
  278. }
  279. if projectConfig.RedisConfig != nil {
  280. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  281. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  282. Username: "default",
  283. Password: projectConfig.RedisConfig.Pass,
  284. ReadTimeout: 15 * time.Minute,
  285. })
  286. } else {
  287. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  288. }
  289. go projectBackfeedManager.Do()
  290. that.Lock.Lock()
  291. that.ActiveFeeds[project] = projectBackfeedManager
  292. that.Lock.Unlock()
  293. if outdatedProjectBackfeedManager != nil {
  294. outdatedProjectBackfeedManager.Cancel()
  295. <-outdatedProjectBackfeedManager.Done
  296. log.Printf("updated project: %s", project)
  297. } else {
  298. log.Printf("added project: %s", project)
  299. }
  300. }
  301. that.Lock.Lock()
  302. that.ActiveSlugs = slugProjectMap
  303. that.Lock.Unlock()
  304. // remove feeds for old projects
  305. for project, projectBackfeedManager := range that.ActiveFeeds {
  306. if _, has := projectSlugMap[project]; has {
  307. continue
  308. }
  309. log.Printf("removing project: %s", project)
  310. that.Lock.Lock()
  311. delete(that.ActiveFeeds, project)
  312. that.Lock.Unlock()
  313. projectBackfeedManager.Cancel()
  314. <-projectBackfeedManager.Done
  315. log.Printf("removed project: %s", project)
  316. }
  317. return nil
  318. }
  319. type Splitter struct {
  320. Delimiter []byte
  321. IgnoreEOF bool
  322. }
  323. func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) {
  324. for i := 0; i < len(data); i++ {
  325. if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) {
  326. return i + len(that.Delimiter), data[:i], nil
  327. }
  328. }
  329. if len(data) == 0 || !atEOF {
  330. return 0, nil, nil
  331. }
  332. if atEOF && that.IgnoreEOF {
  333. return len(data), data, nil
  334. }
  335. return 0, data, io.ErrUnexpectedEOF
  336. }
  337. func GenShardHash(b []byte) (final byte) {
  338. for i, b := range b {
  339. final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i)
  340. }
  341. return final
  342. }
  343. func WriteResponse(res http.ResponseWriter, statusCode int, v interface{}) {
  344. res.Header().Set("Content-Type", "application/json")
  345. res.WriteHeader(statusCode)
  346. if statusCode == http.StatusNoContent {
  347. return
  348. }
  349. if err, isError := v.(error); isError {
  350. v = map[string]interface{}{
  351. "error": fmt.Sprintf("%v", err),
  352. "status_code": statusCode,
  353. }
  354. } else {
  355. v = map[string]interface{}{
  356. "data": v,
  357. "status_code": statusCode,
  358. }
  359. }
  360. json.NewEncoder(res).Encode(v)
  361. }
  362. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  363. that.Lock.RLock()
  364. defer that.Lock.RUnlock()
  365. project, has := that.ActiveSlugs[slug]
  366. if !has {
  367. return nil
  368. }
  369. projectBackfeedManager, has := that.ActiveFeeds[project]
  370. if !has {
  371. return nil
  372. }
  373. return projectBackfeedManager
  374. }
  375. func (that *GlobalBackfeedManager) Handle(res http.ResponseWriter, req *http.Request) {
  376. defer req.Body.Close()
  377. vars := mux.Vars(req)
  378. slug := vars["slug"]
  379. secondaryShard := req.URL.Query().Get("shard")
  380. projectBackfeedManager := that.GetFeed(slug)
  381. if projectBackfeedManager == nil {
  382. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  383. return
  384. }
  385. splitter := &Splitter{
  386. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  387. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  388. }
  389. if len(splitter.Delimiter) == 0 {
  390. splitter.Delimiter = []byte{0x00}
  391. }
  392. scanner := bufio.NewScanner(req.Body)
  393. scanner.Split(splitter.Split)
  394. var err error
  395. statusCode := http.StatusNoContent
  396. n := 0
  397. for scanner.Scan() {
  398. b := scanner.Bytes()
  399. if len(b) == 0 {
  400. continue
  401. }
  402. bcopy := make([]byte, len(b))
  403. copy(bcopy, b)
  404. item := &BackfeedItem{
  405. PrimaryShard: GenShardHash(bcopy),
  406. SecondaryShard: secondaryShard,
  407. Item: bcopy,
  408. }
  409. ok := projectBackfeedManager.PushItem(req.Context(), item)
  410. if !ok {
  411. err = fmt.Errorf("channel closed")
  412. statusCode = http.StatusServiceUnavailable
  413. break
  414. }
  415. n++
  416. }
  417. if err == nil {
  418. err = scanner.Err()
  419. if err != nil {
  420. statusCode = http.StatusBadRequest
  421. }
  422. }
  423. if err != nil {
  424. WriteResponse(res, statusCode, err)
  425. } else {
  426. WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
  427. }
  428. return
  429. }
  430. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  431. that.Cancel()
  432. for project, projectBackfeedManager := range that.ActiveFeeds {
  433. log.Printf("waiting for %s channel to shut down...", project)
  434. <-projectBackfeedManager.Done
  435. delete(that.ActiveFeeds, project)
  436. }
  437. }
  438. func main() {
  439. log.SetFlags(log.Flags() | log.Lshortfile)
  440. trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER"))
  441. if err != nil {
  442. log.Panicf("invalid REDIS_TRACKER url: %s", err)
  443. }
  444. trackerRedisOptions.ReadTimeout = 15 * time.Minute
  445. trackerRedisClient := redis.NewClient(trackerRedisOptions)
  446. legacyRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_LEGACY"))
  447. if err != nil {
  448. log.Panicf("invalid REDIS_LEGACY url: %s", err)
  449. }
  450. legacyRedisOptions.ReadTimeout = 15 * time.Minute
  451. legacyRedisClient := redis.NewClient(legacyRedisOptions)
  452. backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{
  453. Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","),
  454. Username: os.Getenv("REDIS_BACKFEED_USERNAME"),
  455. Password: os.Getenv("REDIS_BACKFEED_PASSWORD"),
  456. ReadTimeout: 15 * time.Minute,
  457. })
  458. if err := trackerRedisClient.Ping(context.Background()).Err(); err != nil {
  459. log.Panicf("unable to ping tracker redis: %s", err)
  460. }
  461. if err := backfeedRedisClient.Ping(context.Background()).Err(); err != nil {
  462. log.Panicf("unable to ping backfeed redis: %s", err)
  463. }
  464. if err := legacyRedisClient.Ping(context.Background()).Err(); err != nil {
  465. log.Panicf("unable to ping legacy redis: %s", err)
  466. }
  467. globalBackfeedManager := &GlobalBackfeedManager{
  468. ActiveFeeds: map[string]*ProjectBackfeedManager{},
  469. ActiveSlugs: map[string]string{},
  470. TrackerRedis: trackerRedisClient,
  471. BackfeedRedis: backfeedRedisClient,
  472. LegacyRedis: legacyRedisClient,
  473. }
  474. globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background())
  475. defer globalBackfeedManager.CancelAllFeeds()
  476. err = globalBackfeedManager.RefreshFeeds()
  477. if err != nil {
  478. log.Panicf("unable to set up backfeed projects: %s", err)
  479. }
  480. r := mux.NewRouter()
  481. r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.Handle)
  482. rMetrics := mux.NewRouter()
  483. rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
  484. rMetrics.Path("/metrics").Handler(promhttp.Handler())
  485. doneChan := make(chan bool)
  486. serveErrChan := make(chan error)
  487. go func() {
  488. s := &http.Server{
  489. Addr: os.Getenv("HTTP_ADDR"),
  490. IdleTimeout: 1 * time.Hour,
  491. MaxHeaderBytes: 1 * 1024 * 1024,
  492. Handler: r,
  493. }
  494. serveErrChan <- s.ListenAndServe()
  495. }()
  496. metricsErrChan := make(chan error)
  497. go func() {
  498. if os.Getenv("METRICS_ADDR") != "" {
  499. s := &http.Server{
  500. Addr: os.Getenv("METRICS_ADDR"),
  501. IdleTimeout: 1 * time.Hour,
  502. MaxHeaderBytes: 1 * 1024 * 1024,
  503. Handler: rMetrics,
  504. }
  505. metricsErrChan <- s.ListenAndServe()
  506. } else {
  507. <-doneChan
  508. metricsErrChan <- nil
  509. }
  510. }()
  511. log.Printf("backfeed listening on %s", os.Getenv("HTTP_ADDR"))
  512. if os.Getenv("METRICS_ADDR") != "" {
  513. log.Printf("metrics/debug listening on %s", os.Getenv("METRICS_ADDR"))
  514. }
  515. sc := make(chan os.Signal, 1)
  516. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  517. ticker := time.NewTicker(1 * time.Second)
  518. for {
  519. select {
  520. case <-sc:
  521. return
  522. case <-ticker.C:
  523. }
  524. err = globalBackfeedManager.RefreshFeeds()
  525. if err != nil {
  526. log.Printf("unable to refresh backfeed projects: %s", err)
  527. }
  528. }
  529. }