You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

542 line
14 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net/http"
  11. "os"
  12. "os/signal"
  13. "strings"
  14. "sync"
  15. "syscall"
  16. "time"
  17. "github.com/go-redis/redis/v8"
  18. "github.com/gorilla/mux"
  19. )
  20. const (
  21. ItemChannelBuffer = 100000
  22. ItemWrapSize = 100000
  23. )
  24. type ProjectRedisConfig struct {
  25. Host string `json:"host"`
  26. Pass string `json:"pass"`
  27. Port int `json:"port"`
  28. }
  29. type ProjectConfig struct {
  30. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  31. }
  32. type BackfeedItem struct {
  33. PrimaryShard byte
  34. SecondaryShard string
  35. Item []byte
  36. }
  37. type ProjectBackfeedManager struct {
  38. Context context.Context
  39. Cancel context.CancelFunc
  40. Done chan bool
  41. C chan *BackfeedItem
  42. Name string
  43. BackfeedRedis *redis.ClusterClient
  44. ProjectRedis *redis.Client
  45. LegacyRedis *redis.Client
  46. Lock sync.RWMutex
  47. ProjectConfig ProjectConfig
  48. }
  49. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  50. if that.ProjectConfig.RedisConfig == nil && new == nil {
  51. return false
  52. }
  53. return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
  54. }
  55. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) bool {
  56. that.Lock.RLock()
  57. defer that.Lock.RUnlock()
  58. if that.C == nil {
  59. return false
  60. }
  61. select {
  62. case <-ctx.Done():
  63. return false
  64. case <-that.Context.Done():
  65. return false
  66. case that.C <- item:
  67. return true
  68. }
  69. }
  70. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  71. if blocking {
  72. select {
  73. case <-that.Context.Done():
  74. return nil, false
  75. case item := <-that.C:
  76. return item, true
  77. }
  78. } else {
  79. select {
  80. case <-that.Context.Done():
  81. return nil, false
  82. case item := <-that.C:
  83. return item, true
  84. default:
  85. return nil, false
  86. }
  87. }
  88. }
  89. func (that *ProjectBackfeedManager) CloseItemChannel() {
  90. that.Lock.Lock()
  91. defer that.Lock.Unlock()
  92. if that.C == nil {
  93. return
  94. }
  95. close(that.C)
  96. that.C = nil
  97. }
  98. func (that *ProjectBackfeedManager) Do() {
  99. defer close(that.Done)
  100. defer that.CloseItemChannel()
  101. defer that.Cancel()
  102. for {
  103. select {
  104. case <-that.Context.Done():
  105. break
  106. case <-that.Done:
  107. break
  108. default:
  109. }
  110. item, ok := that.PopItem(true)
  111. if !ok {
  112. break
  113. }
  114. keyMap := map[string][][]byte{}
  115. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  116. keyMap[key] = append(keyMap[key], item.Item)
  117. wrapped := 1
  118. for wrapped < ItemWrapSize {
  119. item, ok := that.PopItem(false)
  120. if !ok {
  121. break
  122. }
  123. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  124. keyMap[key] = append(keyMap[key], item.Item)
  125. wrapped++
  126. }
  127. select {
  128. case <-that.Context.Done():
  129. break
  130. case <-that.Done:
  131. break
  132. default:
  133. }
  134. resultMap := map[string]*redis.Cmd{}
  135. pipe := that.BackfeedRedis.Pipeline()
  136. for key, items := range keyMap {
  137. args := []interface{}{
  138. "bf.madd",
  139. key,
  140. }
  141. for _, item := range items {
  142. args = append(args, item)
  143. }
  144. resultMap[key] = pipe.Do(context.Background(), args...)
  145. }
  146. _, err := pipe.Exec(context.Background())
  147. if err != nil {
  148. log.Printf("%s", err)
  149. continue
  150. }
  151. var sAddItems []interface{}
  152. for key, items := range keyMap {
  153. rawRes, err := resultMap[key].Result()
  154. if err != nil {
  155. log.Printf("%s", err)
  156. continue
  157. }
  158. rawResArray, ok := rawRes.([]interface{})
  159. if !ok || len(keyMap[key]) != len(rawResArray) {
  160. continue
  161. }
  162. for i, vi := range rawResArray {
  163. v, ok := vi.(int64)
  164. if !ok || v != 1 {
  165. continue
  166. }
  167. sAddItems = append(sAddItems, items[i])
  168. }
  169. }
  170. dupes := wrapped - len(sAddItems)
  171. if len(sAddItems) != 0 {
  172. args := []interface{}{
  173. "bf.mexists",
  174. that.Name,
  175. }
  176. args = append(args, sAddItems...)
  177. rawRes, err := that.LegacyRedis.Do(context.Background(), args...).Result()
  178. if err != nil {
  179. log.Printf("unable to dedupe against %s legacy backfeed: %s", that.Name, err)
  180. } else {
  181. rawResArray, ok := rawRes.([]interface{})
  182. if ok && len(sAddItems) == len(rawResArray) {
  183. var filteredSAddItems []interface{}
  184. for i, vi := range rawResArray {
  185. v, ok := vi.(int64)
  186. if !ok || v != 0 {
  187. continue
  188. }
  189. filteredSAddItems = append(filteredSAddItems, sAddItems[i])
  190. }
  191. sAddItems = filteredSAddItems
  192. }
  193. }
  194. }
  195. if len(sAddItems) != 0 {
  196. err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err()
  197. if err != nil {
  198. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  199. }
  200. }
  201. if dupes > 0 {
  202. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  203. }
  204. }
  205. }
  206. type GlobalBackfeedManager struct {
  207. Context context.Context
  208. Cancel context.CancelFunc
  209. ActiveFeeds map[string]*ProjectBackfeedManager
  210. ActiveSlugs map[string]string
  211. TrackerRedis *redis.Client
  212. BackfeedRedis *redis.ClusterClient
  213. LegacyRedis *redis.Client
  214. Lock sync.RWMutex
  215. }
  216. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  217. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  218. if err != nil {
  219. return err
  220. }
  221. var projects []string
  222. projectSlugMap := map[string][]string{}
  223. for slug, project := range slugProjectMap {
  224. projectSlugMap[project] = append(projectSlugMap[project], slug)
  225. }
  226. for project := range projectSlugMap {
  227. projects = append(projects, project)
  228. }
  229. projectConfigs := map[string]ProjectConfig{}
  230. if len(projects) != 0 {
  231. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  232. if err != nil {
  233. return err
  234. }
  235. if len(projects) != len(cfgi) {
  236. return fmt.Errorf("hmget result had unexpected length")
  237. }
  238. for i, project := range projects {
  239. configString, ok := cfgi[i].(string)
  240. if !ok {
  241. continue
  242. }
  243. config := ProjectConfig{}
  244. err := json.Unmarshal([]byte(configString), &config)
  245. if err != nil {
  246. continue
  247. }
  248. projectConfigs[project] = config
  249. }
  250. }
  251. projects = nil
  252. for project := range projectSlugMap {
  253. if _, has := projectConfigs[project]; !has {
  254. delete(projectSlugMap, project)
  255. continue
  256. }
  257. projects = append(projects, project)
  258. }
  259. for slug, project := range slugProjectMap {
  260. if _, has := projectConfigs[project]; !has {
  261. delete(slugProjectMap, slug)
  262. }
  263. }
  264. // add feeds for new projects
  265. for _, project := range projects {
  266. projectConfig := projectConfigs[project]
  267. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  268. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  269. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  270. outdatedProjectBackfeedManager = projectBackfeedManager
  271. } else {
  272. continue
  273. }
  274. }
  275. ctx, cancel := context.WithCancel(that.Context)
  276. projectBackfeedManager := &ProjectBackfeedManager{
  277. Context: ctx,
  278. Cancel: cancel,
  279. Done: make(chan bool),
  280. C: make(chan *BackfeedItem, ItemChannelBuffer),
  281. BackfeedRedis: that.BackfeedRedis,
  282. Name: project,
  283. ProjectConfig: projectConfig,
  284. LegacyRedis: that.LegacyRedis,
  285. }
  286. if projectConfig.RedisConfig != nil {
  287. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  288. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  289. Username: "default",
  290. Password: projectConfig.RedisConfig.Pass,
  291. ReadTimeout: 15 * time.Minute,
  292. })
  293. } else {
  294. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  295. }
  296. go projectBackfeedManager.Do()
  297. that.Lock.Lock()
  298. that.ActiveFeeds[project] = projectBackfeedManager
  299. that.Lock.Unlock()
  300. if outdatedProjectBackfeedManager != nil {
  301. outdatedProjectBackfeedManager.Cancel()
  302. <-outdatedProjectBackfeedManager.Done
  303. log.Printf("updated project: %s", project)
  304. } else {
  305. log.Printf("added project: %s", project)
  306. }
  307. }
  308. that.Lock.Lock()
  309. that.ActiveSlugs = slugProjectMap
  310. that.Lock.Unlock()
  311. // remove feeds for old projects
  312. for project, projectBackfeedManager := range that.ActiveFeeds {
  313. if _, has := projectSlugMap[project]; has {
  314. continue
  315. }
  316. log.Printf("removing project: %s", project)
  317. that.Lock.Lock()
  318. delete(that.ActiveFeeds, project)
  319. that.Lock.Unlock()
  320. projectBackfeedManager.Cancel()
  321. <-projectBackfeedManager.Done
  322. log.Printf("removed project: %s", project)
  323. }
  324. return nil
  325. }
  326. type Splitter struct {
  327. Delimiter []byte
  328. IgnoreEOF bool
  329. }
  330. func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) {
  331. for i := 0; i < len(data); i++ {
  332. if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) {
  333. return i + len(that.Delimiter), data[:i], nil
  334. }
  335. }
  336. if len(data) == 0 || !atEOF {
  337. return 0, nil, nil
  338. }
  339. if atEOF && that.IgnoreEOF {
  340. return len(data), data, nil
  341. }
  342. return 0, data, io.ErrUnexpectedEOF
  343. }
  344. func GenShardHash(b []byte) (final byte) {
  345. for i, b := range b {
  346. final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i)
  347. }
  348. return final
  349. }
  350. func WriteResponse(res http.ResponseWriter, statusCode int, v interface{}) {
  351. res.Header().Set("Content-Type", "application/json")
  352. res.WriteHeader(statusCode)
  353. if statusCode == http.StatusNoContent {
  354. return
  355. }
  356. if err, isError := v.(error); isError {
  357. v = map[string]interface{}{
  358. "error": fmt.Sprintf("%v", err),
  359. "status_code": statusCode,
  360. }
  361. } else {
  362. log.Printf("%#v", v)
  363. v = map[string]interface{}{
  364. "data": v,
  365. "status_code": statusCode,
  366. }
  367. }
  368. json.NewEncoder(res).Encode(v)
  369. }
  370. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  371. that.Lock.RLock()
  372. defer that.Lock.RUnlock()
  373. project, has := that.ActiveSlugs[slug]
  374. if !has {
  375. return nil
  376. }
  377. projectBackfeedManager, has := that.ActiveFeeds[project]
  378. if !has {
  379. return nil
  380. }
  381. return projectBackfeedManager
  382. }
  383. func (that *GlobalBackfeedManager) Handle(res http.ResponseWriter, req *http.Request) {
  384. defer req.Body.Close()
  385. vars := mux.Vars(req)
  386. slug := vars["slug"]
  387. secondaryShard := req.URL.Query().Get("shard")
  388. projectBackfeedManager := that.GetFeed(slug)
  389. if projectBackfeedManager == nil {
  390. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  391. return
  392. }
  393. splitter := &Splitter{
  394. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  395. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  396. }
  397. if len(splitter.Delimiter) == 0 {
  398. splitter.Delimiter = []byte{0x00}
  399. }
  400. scanner := bufio.NewScanner(req.Body)
  401. scanner.Split(splitter.Split)
  402. var err error
  403. statusCode := http.StatusNoContent
  404. for scanner.Scan() {
  405. b := scanner.Bytes()
  406. if len(b) == 0 {
  407. continue
  408. }
  409. item := &BackfeedItem{
  410. PrimaryShard: GenShardHash(b),
  411. SecondaryShard: secondaryShard,
  412. Item: b,
  413. }
  414. ok := projectBackfeedManager.PushItem(req.Context(), item)
  415. if !ok {
  416. err = fmt.Errorf("channel closed")
  417. statusCode = http.StatusServiceUnavailable
  418. break
  419. }
  420. }
  421. if err == nil {
  422. err = scanner.Err()
  423. if err != nil {
  424. statusCode = http.StatusBadRequest
  425. }
  426. }
  427. WriteResponse(res, statusCode, err)
  428. return
  429. }
  430. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  431. that.Cancel()
  432. for project, projectBackfeedManager := range that.ActiveFeeds {
  433. log.Printf("waiting for %s channel to shut down...", project)
  434. <-projectBackfeedManager.Done
  435. delete(that.ActiveFeeds, project)
  436. }
  437. }
  438. func main() {
  439. log.SetFlags(log.Flags() | log.Lshortfile)
  440. trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER"))
  441. if err != nil {
  442. log.Panicf("invalid REDIS_TRACKER url: %s", err)
  443. }
  444. trackerRedisOptions.ReadTimeout = 15 * time.Minute
  445. trackerRedisClient := redis.NewClient(trackerRedisOptions)
  446. legacyRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_LEGACY"))
  447. if err != nil {
  448. log.Panicf("invalid REDIS_LEGACY url: %s", err)
  449. }
  450. legacyRedisOptions.ReadTimeout = 15 * time.Minute
  451. legacyRedisClient := redis.NewClient(legacyRedisOptions)
  452. backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{
  453. Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","),
  454. Username: os.Getenv("REDIS_BACKFEED_USERNAME"),
  455. Password: os.Getenv("REDIS_BACKFEED_PASSWORD"),
  456. ReadTimeout: 15 * time.Minute,
  457. })
  458. if err := trackerRedisClient.Ping(context.Background()).Err(); err != nil {
  459. log.Panicf("unable to ping tracker redis: %s", err)
  460. }
  461. if err := backfeedRedisClient.Ping(context.Background()).Err(); err != nil {
  462. log.Panicf("unable to ping backfeed redis: %s", err)
  463. }
  464. if err := legacyRedisClient.Ping(context.Background()).Err(); err != nil {
  465. log.Panicf("unable to ping legacy redis: %s", err)
  466. }
  467. globalBackfeedManager := &GlobalBackfeedManager{
  468. ActiveFeeds: map[string]*ProjectBackfeedManager{},
  469. ActiveSlugs: map[string]string{},
  470. TrackerRedis: trackerRedisClient,
  471. BackfeedRedis: backfeedRedisClient,
  472. LegacyRedis: legacyRedisClient,
  473. }
  474. globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background())
  475. defer globalBackfeedManager.CancelAllFeeds()
  476. err = globalBackfeedManager.RefreshFeeds()
  477. if err != nil {
  478. log.Panicf("unable to set up backfeed projects: %s", err)
  479. }
  480. r := mux.NewRouter()
  481. r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.Handle)
  482. serveErrChan := make(chan error)
  483. go func() {
  484. s := &http.Server{
  485. Addr: os.Getenv("HTTP_ADDR"),
  486. IdleTimeout: 1 * time.Hour,
  487. MaxHeaderBytes: 1 * 1024 * 1024,
  488. Handler: r,
  489. }
  490. serveErrChan <- s.ListenAndServe()
  491. }()
  492. sc := make(chan os.Signal, 1)
  493. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  494. ticker := time.NewTicker(1 * time.Second)
  495. for {
  496. select {
  497. case <-sc:
  498. return
  499. case <-ticker.C:
  500. }
  501. err = globalBackfeedManager.RefreshFeeds()
  502. if err != nil {
  503. log.Printf("unable to refresh backfeed projects: %s", err)
  504. }
  505. }
  506. }