|
- package main
-
- import (
- "bufio"
- "compress/flate"
- "compress/gzip"
- "context"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "net/http"
- "strings"
- "sync"
- "time"
-
- "github.com/go-redis/redis/v8"
- "github.com/gorilla/mux"
- "github.com/tevino/abool/v2"
- )
-
- type GlobalBackfeedManager struct {
- Context context.Context
- Cancel context.CancelFunc
- ActiveFeeds map[string]*ProjectBackfeedManager
- ActiveSlugs map[string]string
- TrackerRedis *redis.Client
- BackfeedRedis *redis.ClusterClient
- Lock sync.RWMutex
- Populated *abool.AtomicBool
- }
-
- func (that *GlobalBackfeedManager) RefreshFeeds() error {
- slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
- if err != nil {
- return err
- }
- var projects []string
- projectSlugMap := map[string][]string{}
- for slug, project := range slugProjectMap {
- projectSlugMap[project] = append(projectSlugMap[project], slug)
- }
- for project := range projectSlugMap {
- projects = append(projects, project)
- }
- projectConfigs := map[string]ProjectConfig{}
- if len(projects) != 0 {
- cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
- if err != nil {
- return err
- }
- if len(projects) != len(cfgi) {
- return fmt.Errorf("hmget result had unexpected length")
- }
- for i, project := range projects {
- configString, ok := cfgi[i].(string)
- if !ok {
- continue
- }
- config := ProjectConfig{}
- if err := json.Unmarshal([]byte(configString), &config); err != nil {
- continue
- }
- projectConfigs[project] = config
- }
- }
- projects = nil
- for project := range projectSlugMap {
- if _, has := projectConfigs[project]; !has {
- delete(projectSlugMap, project)
- continue
- }
- projects = append(projects, project)
- }
- for slug, project := range slugProjectMap {
- if _, has := projectConfigs[project]; !has {
- delete(slugProjectMap, slug)
- }
- }
- // add feeds for new projects
- for _, project := range projects {
- projectConfig := projectConfigs[project]
- var outdatedProjectBackfeedManager *ProjectBackfeedManager
- if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
- if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
- outdatedProjectBackfeedManager = projectBackfeedManager
- } else {
- continue
- }
- }
- ctx, cancel := context.WithCancel(that.Context)
- projectBackfeedManager := &ProjectBackfeedManager{
- Context: ctx,
- Cancel: cancel,
- Done: make(chan bool),
- C: make(chan *BackfeedItem, ItemChannelBuffer),
- BackfeedRedis: that.BackfeedRedis,
- Name: project,
- ProjectConfig: projectConfig,
- }
- if projectConfig.RedisConfig != nil {
- projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
- Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
- Username: "default",
- Password: projectConfig.RedisConfig.Pass,
- ReadTimeout: 15 * time.Minute,
- })
- } else {
- projectBackfeedManager.ProjectRedis = that.TrackerRedis
- }
- go projectBackfeedManager.Do()
- that.Lock.Lock()
- that.ActiveFeeds[project] = projectBackfeedManager
- that.Lock.Unlock()
- if outdatedProjectBackfeedManager != nil {
- outdatedProjectBackfeedManager.Cancel()
- <-outdatedProjectBackfeedManager.Done
- log.Printf("updated project: %s", project)
- } else {
- log.Printf("added project: %s", project)
- }
- }
- that.Lock.Lock()
- that.ActiveSlugs = slugProjectMap
- that.Lock.Unlock()
- // remove feeds for old projects
- for project, projectBackfeedManager := range that.ActiveFeeds {
- if _, has := projectSlugMap[project]; has {
- continue
- }
- log.Printf("removing project: %s", project)
- that.Lock.Lock()
- delete(that.ActiveFeeds, project)
- that.Lock.Unlock()
- projectBackfeedManager.Cancel()
- <-projectBackfeedManager.Done
- log.Printf("removed project: %s", project)
- }
- if !that.Populated.IsSet() {
- that.Populated.Set()
- }
- return nil
- }
-
- func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
- that.Lock.RLock()
- defer that.Lock.RUnlock()
- project, has := that.ActiveSlugs[slug]
- if !has {
- return nil
- }
- projectBackfeedManager, has := that.ActiveFeeds[project]
- if !has {
- return nil
- }
- return projectBackfeedManager
- }
-
- func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) {
- defer req.Body.Close()
-
- vars := mux.Vars(req)
- slug := vars["slug"]
- secondaryShard := req.URL.Query().Get("shard")
- queue := req.URL.Query().Get("queue")
-
-
-
- if strings.ContainsAny(secondaryShard, ":/") {
- WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid shard name"))
- return
- }
-
- if strings.ContainsAny(queue, "/") {
- WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid queue name"))
- return
- }
- if queue == "" {
- queue = "todo:backfeed"
- }
-
- skipBloom := req.URL.Query().Get("skipbloom") != ""
- skipFeed := req.URL.Query().Get("skipfeed") != ""
- if skipBloom && skipFeed {
- WriteResponse(res, http.StatusBadRequest, fmt.Errorf("skipbloom and skipfeed are mutually exclusive"))
- return
- }
-
- projectBackfeedManager := that.GetFeed(slug)
- if projectBackfeedManager == nil {
- WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
- return
- }
-
- splitter := &Splitter{
- Delimiter: []byte(req.URL.Query().Get("delimiter")),
- IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
- }
- if len(splitter.Delimiter) == 0 {
- splitter.Delimiter = []byte{0x00}
- }
- var body io.ReadCloser
- switch req.Header.Get("Content-Encoding") {
- case "":
- body = req.Body
- case "gzip":
- var err error
- body, err = gzip.NewReader(req.Body)
- if err != nil {
- WriteResponse(res, http.StatusBadRequest, err)
- return
- }
- defer body.Close()
- case "deflate":
- body = flate.NewReader(req.Body)
- defer body.Close()
- default:
- WriteResponse(res, http.StatusBadRequest, fmt.Errorf("unsupported Content-Encoding: %s", req.Header.Get("Content-Encoding")))
- }
- scanner := bufio.NewScanner(body)
- scanner.Split(splitter.Split)
-
- n := 0
- for scanner.Scan() {
- b := scanner.Bytes()
- if len(b) == 0 {
- continue
- }
- bcopy := make([]byte, len(b))
- copy(bcopy, b)
- item := &BackfeedItem{
- PrimaryShard: GenShardHash(bcopy),
- SecondaryShard: secondaryShard,
- Item: bcopy,
- SkipBloom: skipBloom,
- SkipFeed: skipFeed,
- Queue: queue,
- }
- if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil {
- WriteResponse(res, http.StatusInternalServerError, err)
- return
- }
- n++
- }
- if err := scanner.Err(); err != nil {
- WriteResponse(res, http.StatusBadRequest, err)
- return
- }
- WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
- return
- }
-
- func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) {
- if that.Populated.IsNotSet() {
- WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated"))
- return
- }
- if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
- client.ClientGetName(ctx)
- return client.Ping(ctx).Err()
- }); err != nil {
- WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err))
- return
- }
- WriteResponse(res, http.StatusOK, "ok")
- }
-
- func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) {
- WriteResponse(res, http.StatusOK, "pong")
- }
-
- func (that *GlobalBackfeedManager) HandleUnlink(res http.ResponseWriter, req *http.Request) {
- vars := mux.Vars(req)
- key := vars["key"]
- if strings.Count(key, ":") < 2 {
- WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format"))
- return
- }
- lock := sync.Mutex{}
- keys := []string{}
- if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
- cursor := uint64(0)
- var shardKeys []string
- for {
- var err error
- var keysBatch []string
- keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result()
- if err != nil && err != redis.Nil {
- return err
- }
- shardKeys = append(shardKeys, keysBatch...)
- if cursor == 0 {
- break
- }
- }
- lock.Lock()
- defer lock.Unlock()
- keys = append(keys, shardKeys...)
- return nil
- }); err != nil && err != redis.Nil {
- WriteResponse(res, http.StatusInternalServerError, err)
- return
- }
- pipe := that.BackfeedRedis.Pipeline()
- pipe.HDel(req.Context(), ":last_ts", keys...)
- for _, key := range keys {
- pipe.Unlink(req.Context(), key)
- }
- if _, err := pipe.Exec(req.Context()); err != nil && err != redis.Nil {
- WriteResponse(res, http.StatusInternalServerError, err)
- return
- }
- WriteResponse(res, http.StatusOK, keys)
- }
-
- func (that *GlobalBackfeedManager) HandleRedisInfo(res http.ResponseWriter, req *http.Request) {
- infos := map[string]string{}
- lock := sync.Mutex{}
- if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
- if info, err := client.Info(ctx, "all").Result(); err != nil && err != redis.Nil {
- return err
- } else {
- lock.Lock()
- defer lock.Unlock()
- infos[client.String()] = info
- }
- return nil
- }); err != nil {
- WriteResponse(res, http.StatusInternalServerError, err)
- return
- }
- WriteResponse(res, http.StatusOK, infos)
- }
-
- func (that *GlobalBackfeedManager) CancelAllFeeds() {
- that.Populated.UnSet()
- that.Cancel()
- for project, projectBackfeedManager := range that.ActiveFeeds {
- log.Printf("waiting for %s channel to shut down...", project)
- <-projectBackfeedManager.Done
- delete(that.ActiveFeeds, project)
- }
- }
|