package main import ( "bufio" "bytes" "context" "encoding/json" "fmt" "io" "log" "net/http" "os" "os/signal" "strings" "sync" "syscall" "time" "github.com/go-redis/redis/v8" "github.com/gorilla/mux" ) const ( ItemChannelBuffer = 100000 ItemWrapSize = 100000 ) type ProjectRedisConfig struct { Host string `json:"host"` Pass string `json:"pass"` Port int `json:"port"` } type ProjectConfig struct { RedisConfig *ProjectRedisConfig `json:"redis,omitempty"` } type BackfeedItem struct { PrimaryShard byte SecondaryShard string Item []byte } type ProjectBackfeedManager struct { Context context.Context Cancel context.CancelFunc Done chan bool C chan *BackfeedItem Name string BackfeedRedis *redis.ClusterClient ProjectRedis *redis.Client LegacyRedis *redis.Client Lock sync.RWMutex ProjectConfig ProjectConfig } func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool { if that.ProjectConfig.RedisConfig == nil && new == nil { return false } if that.ProjectConfig.RedisConfig == nil || new == nil || that.ProjectConfig.RedisConfig.Host != new.Host || that.ProjectConfig.RedisConfig.Port != new.Port || that.ProjectConfig.RedisConfig.Pass != new.Pass { return true } return false } func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) bool { that.Lock.RLock() defer that.Lock.RUnlock() if that.C == nil { return false } select { case <-ctx.Done(): return false case <-that.Context.Done(): return false case that.C <- item: return true } } func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) { if blocking { select { case <-that.Context.Done(): return nil, false case item := <-that.C: return item, true } } else { select { case <-that.Context.Done(): return nil, false case item := <-that.C: return item, true default: return nil, false } } } func (that *ProjectBackfeedManager) CloseItemChannel() { that.Lock.Lock() defer that.Lock.Unlock() if that.C == nil { return } close(that.C) that.C = nil } func (that *ProjectBackfeedManager) Do() { defer close(that.Done) defer that.CloseItemChannel() defer that.Cancel() for { select { case <-that.Context.Done(): break case <-that.Done: break default: } item, ok := that.PopItem(true) if !ok { break } keyMap := map[string][][]byte{} key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) keyMap[key] = append(keyMap[key], item.Item) wrapped := 1 for wrapped < ItemWrapSize { item, ok := that.PopItem(false) if !ok { break } key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) keyMap[key] = append(keyMap[key], item.Item) wrapped++ } select { case <-that.Context.Done(): break case <-that.Done: break default: } resultMap := map[string]*redis.Cmd{} pipe := that.BackfeedRedis.Pipeline() for key, items := range keyMap { args := []interface{}{ "bf.madd", key, } for _, item := range items { args = append(args, item) } resultMap[key] = pipe.Do(context.Background(), args...) } _, err := pipe.Exec(context.Background()) if err != nil { log.Printf("%s", err) continue } var sAddItems []interface{} for key, items := range keyMap { rawRes, err := resultMap[key].Result() if err != nil { log.Printf("%s", err) continue } rawResArray, ok := rawRes.([]interface{}) if !ok || len(keyMap[key]) != len(rawResArray) { continue } for i, vi := range rawResArray { v, ok := vi.(int64) if !ok || v != 1 { continue } sAddItems = append(sAddItems, items[i]) } } dupes := wrapped - len(sAddItems) if len(sAddItems) != 0 { args := []interface{}{ "bf.mexists", that.Name, } args = append(args, sAddItems...) rawRes, err := that.LegacyRedis.Do(context.Background(), args...).Result() if err != nil { log.Printf("unable to dedupe against %s legacy backfeed: %s", that.Name, err) } else { rawResArray, ok := rawRes.([]interface{}) if ok && len(sAddItems) != len(rawResArray) { var filteredSAddItems []interface{} for i, vi := range rawResArray { v, ok := vi.(int64) if !ok || v != 0 { continue } filteredSAddItems = append(filteredSAddItems, sAddItems[i]) } sAddItems = filteredSAddItems } } } if len(sAddItems) != 0 { err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err() if err != nil { log.Printf("failed to sadd items for %s: %s", that.Name, err) } } if dupes > 0 { that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes)) } } } type GlobalBackfeedManager struct { Context context.Context Cancel context.CancelFunc ActiveFeeds map[string]*ProjectBackfeedManager ActiveSlugs map[string]string TrackerRedis *redis.Client BackfeedRedis *redis.ClusterClient LegacyRedis *redis.Client Lock sync.RWMutex } func (that *GlobalBackfeedManager) RefreshFeeds() error { slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result() if err != nil { return err } var projects []string projectSlugMap := map[string][]string{} for slug, project := range slugProjectMap { projectSlugMap[project] = append(projectSlugMap[project], slug) } for project := range projectSlugMap { projects = append(projects, project) } projectConfigs := map[string]ProjectConfig{} if len(projects) != 0 { cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result() if err != nil { return err } if len(projects) != len(cfgi) { return fmt.Errorf("hmget result had unexpected length") } for i, project := range projects { configString, ok := cfgi[i].(string) if !ok { continue } config := ProjectConfig{} err := json.Unmarshal([]byte(configString), &config) if err != nil { continue } projectConfigs[project] = config } } projects = nil for project := range projectSlugMap { if _, has := projectConfigs[project]; !has { delete(projectSlugMap, project) continue } projects = append(projects, project) } for slug, project := range slugProjectMap { if _, has := projectConfigs[project]; !has { delete(slugProjectMap, slug) } } // add feeds for new projects for _, project := range projects { projectConfig := projectConfigs[project] var outdatedProjectBackfeedManager *ProjectBackfeedManager if projectBackfeedManager, has := that.ActiveFeeds[project]; has { if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) { outdatedProjectBackfeedManager = projectBackfeedManager } else { continue } } ctx, cancel := context.WithCancel(that.Context) projectBackfeedManager := &ProjectBackfeedManager{ Context: ctx, Cancel: cancel, Done: make(chan bool), C: make(chan *BackfeedItem, ItemChannelBuffer), BackfeedRedis: that.BackfeedRedis, Name: project, ProjectConfig: projectConfig, LegacyRedis: that.LegacyRedis, } if projectConfig.RedisConfig != nil { projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{ Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port), Username: "default", Password: projectConfig.RedisConfig.Pass, ReadTimeout: 15 * time.Minute, }) } else { projectBackfeedManager.ProjectRedis = that.TrackerRedis } go projectBackfeedManager.Do() that.Lock.Lock() that.ActiveFeeds[project] = projectBackfeedManager that.Lock.Unlock() if outdatedProjectBackfeedManager != nil { outdatedProjectBackfeedManager.Cancel() <-outdatedProjectBackfeedManager.Done log.Printf("updated project: %s", project) } else { log.Printf("added project: %s", project) } } that.Lock.Lock() that.ActiveSlugs = slugProjectMap that.Lock.Unlock() // remove feeds for old projects for project, projectBackfeedManager := range that.ActiveFeeds { if _, has := projectSlugMap[project]; has { continue } log.Printf("removing project: %s", project) that.Lock.Lock() delete(that.ActiveFeeds, project) that.Lock.Unlock() projectBackfeedManager.Cancel() <-projectBackfeedManager.Done log.Printf("removed project: %s", project) } return nil } type Splitter struct { Delimiter []byte IgnoreEOF bool } func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) { for i := 0; i < len(data); i++ { if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) { return i + len(that.Delimiter), data[:i], nil } } if len(data) == 0 || !atEOF { return 0, nil, nil } if atEOF && that.IgnoreEOF { return len(data), data, nil } return 0, data, io.ErrUnexpectedEOF } func GenShardHash(b []byte) (final byte) { for i, b := range b { final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i) } return final } func WriteResponse(res http.ResponseWriter, statusCode int, v interface{}) { res.Header().Set("Content-Type", "application/json") res.WriteHeader(statusCode) if statusCode == http.StatusNoContent { return } if err, isError := v.(error); isError { v = map[string]interface{}{ "error": fmt.Sprintf("%v", err), "status_code": statusCode, } } else { log.Printf("%#v", v) v = map[string]interface{}{ "data": v, "status_code": statusCode, } } json.NewEncoder(res).Encode(v) } func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager { that.Lock.RLock() defer that.Lock.RUnlock() project, has := that.ActiveSlugs[slug] if !has { return nil } projectBackfeedManager, has := that.ActiveFeeds[project] if !has { return nil } return projectBackfeedManager } func (that *GlobalBackfeedManager) Handle(res http.ResponseWriter, req *http.Request) { defer req.Body.Close() vars := mux.Vars(req) slug := vars["slug"] secondaryShard := req.URL.Query().Get("shard") projectBackfeedManager := that.GetFeed(slug) if projectBackfeedManager == nil { WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel")) return } splitter := &Splitter{ Delimiter: []byte(req.URL.Query().Get("delimiter")), IgnoreEOF: req.URL.Query().Get("ignoreeof") != "", } if len(splitter.Delimiter) == 0 { splitter.Delimiter = []byte{0x00} } scanner := bufio.NewScanner(req.Body) scanner.Split(splitter.Split) var err error statusCode := http.StatusNoContent for scanner.Scan() { b := scanner.Bytes() if len(b) == 0 { continue } item := &BackfeedItem{ PrimaryShard: GenShardHash(b), SecondaryShard: secondaryShard, Item: b, } ok := projectBackfeedManager.PushItem(req.Context(), item) if !ok { err = fmt.Errorf("channel closed") statusCode = http.StatusServiceUnavailable break } } if err == nil { err = scanner.Err() if err != nil { statusCode = http.StatusBadRequest } } WriteResponse(res, statusCode, err) return } func (that *GlobalBackfeedManager) CancelAllFeeds() { that.Cancel() for project, projectBackfeedManager := range that.ActiveFeeds { log.Printf("waiting for %s channel to shut down...", project) <-projectBackfeedManager.Done delete(that.ActiveFeeds, project) } } func main() { log.SetFlags(log.Flags() | log.Lshortfile) trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER")) if err != nil { log.Panicf("invalid REDIS_TRACKER url: %s", err) } trackerRedisOptions.ReadTimeout = 15 * time.Minute trackerRedisClient := redis.NewClient(trackerRedisOptions) legacyRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_LEGACY")) if err != nil { log.Panicf("invalid REDIS_LEGACY url: %s", err) } legacyRedisOptions.ReadTimeout = 15 * time.Minute legacyRedisClient := redis.NewClient(legacyRedisOptions) backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{ Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","), Username: os.Getenv("REDIS_BACKFEED_USERNAME"), Password: os.Getenv("REDIS_BACKFEED_PASSWORD"), ReadTimeout: 15 * time.Minute, }) if err := trackerRedisClient.Ping(context.Background()).Err(); err != nil { log.Panicf("unable to ping tracker redis: %s", err) } if err := backfeedRedisClient.Ping(context.Background()).Err(); err != nil { log.Panicf("unable to ping backfeed redis: %s", err) } if err := legacyRedisClient.Ping(context.Background()).Err(); err != nil { log.Panicf("unable to ping legacy redis: %s", err) } globalBackfeedManager := &GlobalBackfeedManager{ ActiveFeeds: map[string]*ProjectBackfeedManager{}, ActiveSlugs: map[string]string{}, TrackerRedis: trackerRedisClient, BackfeedRedis: backfeedRedisClient, LegacyRedis: legacyRedisClient, } globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background()) defer globalBackfeedManager.CancelAllFeeds() err = globalBackfeedManager.RefreshFeeds() if err != nil { log.Panicf("unable to set up backfeed projects: %s", err) } r := mux.NewRouter() r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.Handle) serveErrChan := make(chan error) go func() { s := &http.Server{ Addr: ":21581", IdleTimeout: 1 * time.Hour, MaxHeaderBytes: 1 * 1024 * 1024, Handler: r, } serveErrChan <- s.ListenAndServe() }() sc := make(chan os.Signal, 1) signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill) ticker := time.NewTicker(1 * time.Second) for { select { case <-sc: return case <-ticker.C: } err = globalBackfeedManager.RefreshFeeds() if err != nil { log.Printf("unable to refresh backfeed projects: %s", err) } } }