|
- package main
-
- import (
- "bufio"
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "net/http"
- _ "net/http/pprof"
- "os"
- "os/signal"
- "strings"
- "sync"
- "syscall"
- "time"
-
- "github.com/go-redis/redis/v8"
- "github.com/gorilla/mux"
- "github.com/prometheus/client_golang/prometheus/promhttp"
- "github.com/tevino/abool/v2"
- )
-
- const (
- ItemChannelBuffer = 100000
- ItemWrapSize = 100000
- )
-
- type ProjectRedisConfig struct {
- Host string `json:"host"`
- Pass string `json:"pass"`
- Port int `json:"port"`
- }
-
- type ProjectConfig struct {
- RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
- }
-
- type BackfeedItem struct {
- PrimaryShard byte
- SecondaryShard string
- Item []byte
- }
-
- type ProjectBackfeedManager struct {
- Context context.Context
- Cancel context.CancelFunc
- Done chan bool
- C chan *BackfeedItem
- Name string
- BackfeedRedis *redis.ClusterClient
- ProjectRedis *redis.Client
- LegacyRedis *redis.Client
- Lock sync.RWMutex
- ProjectConfig ProjectConfig
- }
-
- func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
- if that.ProjectConfig.RedisConfig == nil && new == nil {
- return false
- }
- return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new
- }
-
- func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) bool {
- that.Lock.RLock()
- defer that.Lock.RUnlock()
- if that.C == nil {
- return false
- }
- select {
- case <-ctx.Done():
- return false
- case <-that.Context.Done():
- return false
- case that.C <- item:
- return true
- }
- }
-
- func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
- if blocking {
- select {
- case <-that.Context.Done():
- return nil, false
- case item := <-that.C:
- return item, true
- }
- } else {
- select {
- case <-that.Context.Done():
- return nil, false
- case item := <-that.C:
- return item, true
- default:
- return nil, false
- }
- }
- }
-
- func (that *ProjectBackfeedManager) CloseItemChannel() {
- that.Lock.Lock()
- defer that.Lock.Unlock()
- if that.C == nil {
- return
- }
- close(that.C)
- that.C = nil
- }
-
- func (that *ProjectBackfeedManager) Do() {
- defer close(that.Done)
- defer that.CloseItemChannel()
- defer that.Cancel()
-
- for {
- select {
- case <-that.Context.Done():
- break
- case <-that.Done:
- break
- default:
- }
- item, ok := that.PopItem(true)
- if !ok {
- break
- }
- keyMap := map[string][][]byte{}
- key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
- keyMap[key] = append(keyMap[key], item.Item)
- wrapped := 1
- for wrapped < ItemWrapSize {
- item, ok := that.PopItem(false)
- if !ok {
- break
- }
- key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
- keyMap[key] = append(keyMap[key], item.Item)
- wrapped++
- }
- select {
- case <-that.Context.Done():
- break
- case <-that.Done:
- break
- default:
- }
- resultMap := map[string]*redis.Cmd{}
- pipe := that.BackfeedRedis.Pipeline()
- for key, items := range keyMap {
- args := []interface{}{
- "bf.madd",
- key,
- }
- for _, item := range items {
- args = append(args, item)
- }
- resultMap[key] = pipe.Do(context.Background(), args...)
- }
- _, err := pipe.Exec(context.Background())
- if err != nil {
- log.Printf("%s", err)
- }
- var sAddItems []interface{}
- for key, items := range keyMap {
- res, err := resultMap[key].BoolSlice()
- if err != nil {
- log.Printf("%s", err)
- continue
- }
- if len(res) != len(keyMap[key]) {
- continue
- }
- for i, v := range res {
- if v {
- sAddItems = append(sAddItems, items[i])
- }
- }
- }
- dupes := wrapped - len(sAddItems)
- if len(sAddItems) != 0 {
- args := []interface{}{
- "bf.mexists",
- that.Name,
- }
- args = append(args, sAddItems...)
-
- res, err := that.LegacyRedis.Do(context.Background(), args...).BoolSlice()
- if err != nil {
- log.Printf("unable to dedupe against %s legacy backfeed: %s", that.Name, err)
- } else if len(res) == len(sAddItems) {
- var filteredSAddItems []interface{}
- for i, v := range res {
- if !v {
- filteredSAddItems = append(filteredSAddItems, sAddItems[i])
- }
- }
- sAddItems = filteredSAddItems
- }
- }
- if len(sAddItems) != 0 {
- err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err()
- if err != nil {
- log.Printf("failed to sadd items for %s: %s", that.Name, err)
- }
- }
- if dupes > 0 {
- that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
- }
- }
- }
-
- type GlobalBackfeedManager struct {
- Context context.Context
- Cancel context.CancelFunc
- ActiveFeeds map[string]*ProjectBackfeedManager
- ActiveSlugs map[string]string
- TrackerRedis *redis.Client
- BackfeedRedis *redis.ClusterClient
- LegacyRedis *redis.Client
- Lock sync.RWMutex
- Populated *abool.AtomicBool
- }
-
- func (that *GlobalBackfeedManager) RefreshFeeds() error {
- slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
- if err != nil {
- return err
- }
- var projects []string
- projectSlugMap := map[string][]string{}
- for slug, project := range slugProjectMap {
- projectSlugMap[project] = append(projectSlugMap[project], slug)
- }
- for project := range projectSlugMap {
- projects = append(projects, project)
- }
- projectConfigs := map[string]ProjectConfig{}
- if len(projects) != 0 {
- cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
- if err != nil {
- return err
- }
- if len(projects) != len(cfgi) {
- return fmt.Errorf("hmget result had unexpected length")
- }
- for i, project := range projects {
- configString, ok := cfgi[i].(string)
- if !ok {
- continue
- }
- config := ProjectConfig{}
- err := json.Unmarshal([]byte(configString), &config)
- if err != nil {
- continue
- }
- projectConfigs[project] = config
- }
- }
- projects = nil
- for project := range projectSlugMap {
- if _, has := projectConfigs[project]; !has {
- delete(projectSlugMap, project)
- continue
- }
- projects = append(projects, project)
- }
- for slug, project := range slugProjectMap {
- if _, has := projectConfigs[project]; !has {
- delete(slugProjectMap, slug)
- }
- }
- // add feeds for new projects
- for _, project := range projects {
- projectConfig := projectConfigs[project]
- var outdatedProjectBackfeedManager *ProjectBackfeedManager
- if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
- if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
- outdatedProjectBackfeedManager = projectBackfeedManager
- } else {
- continue
- }
- }
- ctx, cancel := context.WithCancel(that.Context)
- projectBackfeedManager := &ProjectBackfeedManager{
- Context: ctx,
- Cancel: cancel,
- Done: make(chan bool),
- C: make(chan *BackfeedItem, ItemChannelBuffer),
- BackfeedRedis: that.BackfeedRedis,
- Name: project,
- ProjectConfig: projectConfig,
- LegacyRedis: that.LegacyRedis,
- }
- if projectConfig.RedisConfig != nil {
- projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
- Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
- Username: "default",
- Password: projectConfig.RedisConfig.Pass,
- ReadTimeout: 15 * time.Minute,
- })
- } else {
- projectBackfeedManager.ProjectRedis = that.TrackerRedis
- }
- go projectBackfeedManager.Do()
- that.Lock.Lock()
- that.ActiveFeeds[project] = projectBackfeedManager
- that.Lock.Unlock()
- if outdatedProjectBackfeedManager != nil {
- outdatedProjectBackfeedManager.Cancel()
- <-outdatedProjectBackfeedManager.Done
- log.Printf("updated project: %s", project)
- } else {
- log.Printf("added project: %s", project)
- }
- }
- that.Lock.Lock()
- that.ActiveSlugs = slugProjectMap
- that.Lock.Unlock()
- // remove feeds for old projects
- for project, projectBackfeedManager := range that.ActiveFeeds {
- if _, has := projectSlugMap[project]; has {
- continue
- }
- log.Printf("removing project: %s", project)
- that.Lock.Lock()
- delete(that.ActiveFeeds, project)
- that.Lock.Unlock()
- projectBackfeedManager.Cancel()
- <-projectBackfeedManager.Done
- log.Printf("removed project: %s", project)
- }
- if !that.Populated.IsSet() {
- that.Populated.Set()
- }
- return nil
- }
-
- type Splitter struct {
- Delimiter []byte
- IgnoreEOF bool
- }
-
- func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) {
- for i := 0; i < len(data); i++ {
- if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) {
- return i + len(that.Delimiter), data[:i], nil
- }
- }
- if len(data) == 0 || !atEOF {
- return 0, nil, nil
- }
- if atEOF && that.IgnoreEOF {
- return len(data), data, nil
- }
- return 0, data, io.ErrUnexpectedEOF
- }
-
- func GenShardHash(b []byte) (final byte) {
- for i, b := range b {
- final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i)
- }
- return final
- }
-
- func WriteResponse(res http.ResponseWriter, statusCode int, v interface{}) {
- res.Header().Set("Content-Type", "application/json")
- res.WriteHeader(statusCode)
- if statusCode == http.StatusNoContent {
- return
- }
- if err, isError := v.(error); isError {
- v = map[string]interface{}{
- "error": fmt.Sprintf("%v", err),
- "status_code": statusCode,
- }
- } else {
- v = map[string]interface{}{
- "data": v,
- "status_code": statusCode,
- }
- }
- json.NewEncoder(res).Encode(v)
- }
-
- func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
- that.Lock.RLock()
- defer that.Lock.RUnlock()
- project, has := that.ActiveSlugs[slug]
- if !has {
- return nil
- }
- projectBackfeedManager, has := that.ActiveFeeds[project]
- if !has {
- return nil
- }
- return projectBackfeedManager
- }
-
- func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) {
- defer req.Body.Close()
-
- vars := mux.Vars(req)
- slug := vars["slug"]
- secondaryShard := req.URL.Query().Get("shard")
-
- projectBackfeedManager := that.GetFeed(slug)
- if projectBackfeedManager == nil {
- WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
- return
- }
-
- splitter := &Splitter{
- Delimiter: []byte(req.URL.Query().Get("delimiter")),
- IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
- }
- if len(splitter.Delimiter) == 0 {
- splitter.Delimiter = []byte{0x00}
- }
- scanner := bufio.NewScanner(req.Body)
- scanner.Split(splitter.Split)
-
- var err error
- statusCode := http.StatusNoContent
- n := 0
- for scanner.Scan() {
- b := scanner.Bytes()
- if len(b) == 0 {
- continue
- }
- bcopy := make([]byte, len(b))
- copy(bcopy, b)
- item := &BackfeedItem{
- PrimaryShard: GenShardHash(bcopy),
- SecondaryShard: secondaryShard,
- Item: bcopy,
- }
- ok := projectBackfeedManager.PushItem(req.Context(), item)
- if !ok {
- err = fmt.Errorf("channel closed")
- statusCode = http.StatusServiceUnavailable
- break
- }
- n++
- }
- if err == nil {
- err = scanner.Err()
- if err != nil {
- statusCode = http.StatusBadRequest
- }
- }
- if err != nil {
- WriteResponse(res, statusCode, err)
- } else {
- WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n))
- }
- return
- }
-
- func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) {
- if that.Populated.IsNotSet() {
- WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated"))
- return
- }
- err := that.LegacyRedis.Ping(req.Context()).Err()
- if err != nil {
- WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping legacy redis: %s", err))
- return
- }
- err = that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
- return client.Ping(ctx).Err()
- })
- if err != nil {
- WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err))
- return
- }
- WriteResponse(res, http.StatusOK, "ok")
- }
-
- func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) {
- WriteResponse(res, http.StatusOK, "pong")
- }
-
- func (that *GlobalBackfeedManager) CancelAllFeeds() {
- that.Populated.UnSet()
- that.Cancel()
- for project, projectBackfeedManager := range that.ActiveFeeds {
- log.Printf("waiting for %s channel to shut down...", project)
- <-projectBackfeedManager.Done
- delete(that.ActiveFeeds, project)
- }
- }
-
- func main() {
- log.SetFlags(log.Flags() | log.Lshortfile)
-
- trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER"))
- if err != nil {
- log.Panicf("invalid REDIS_TRACKER url: %s", err)
- }
- trackerRedisOptions.ReadTimeout = 15 * time.Minute
- trackerRedisClient := redis.NewClient(trackerRedisOptions)
-
- legacyRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_LEGACY"))
- if err != nil {
- log.Panicf("invalid REDIS_LEGACY url: %s", err)
- }
- legacyRedisOptions.ReadTimeout = 15 * time.Minute
- legacyRedisClient := redis.NewClient(legacyRedisOptions)
-
- backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","),
- Username: os.Getenv("REDIS_BACKFEED_USERNAME"),
- Password: os.Getenv("REDIS_BACKFEED_PASSWORD"),
- ReadTimeout: 15 * time.Minute,
- })
-
- if err := trackerRedisClient.Ping(context.Background()).Err(); err != nil {
- log.Panicf("unable to ping tracker redis: %s", err)
- }
- if err := backfeedRedisClient.Ping(context.Background()).Err(); err != nil {
- log.Panicf("unable to ping backfeed redis: %s", err)
- }
- if err := legacyRedisClient.Ping(context.Background()).Err(); err != nil {
- log.Panicf("unable to ping legacy redis: %s", err)
- }
-
- globalBackfeedManager := &GlobalBackfeedManager{
- ActiveFeeds: map[string]*ProjectBackfeedManager{},
- ActiveSlugs: map[string]string{},
- TrackerRedis: trackerRedisClient,
- BackfeedRedis: backfeedRedisClient,
- LegacyRedis: legacyRedisClient,
- Populated: abool.New(),
- }
-
- globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background())
- defer globalBackfeedManager.CancelAllFeeds()
-
- err = globalBackfeedManager.RefreshFeeds()
- if err != nil {
- log.Panicf("unable to set up backfeed projects: %s", err)
- }
- r := mux.NewRouter()
- r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.HandleLegacy)
- r.Methods(http.MethodGet).Path("/ping").HandlerFunc(globalBackfeedManager.HandlePing)
- r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth)
- rMetrics := mux.NewRouter()
- rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
- rMetrics.Path("/metrics").Handler(promhttp.Handler())
- doneChan := make(chan bool)
- serveErrChan := make(chan error)
- go func() {
- s := &http.Server{
- Addr: os.Getenv("HTTP_ADDR"),
- IdleTimeout: 1 * time.Hour,
- MaxHeaderBytes: 1 * 1024 * 1024,
- Handler: r,
- }
- serveErrChan <- s.ListenAndServe()
- }()
- metricsErrChan := make(chan error)
- go func() {
- if os.Getenv("METRICS_ADDR") != "" {
- s := &http.Server{
- Addr: os.Getenv("METRICS_ADDR"),
- IdleTimeout: 1 * time.Hour,
- MaxHeaderBytes: 1 * 1024 * 1024,
- Handler: rMetrics,
- }
- metricsErrChan <- s.ListenAndServe()
- } else {
- <-doneChan
- metricsErrChan <- nil
- }
- }()
- log.Printf("backfeed listening on %s", os.Getenv("HTTP_ADDR"))
- if os.Getenv("METRICS_ADDR") != "" {
- log.Printf("metrics/debug listening on %s", os.Getenv("METRICS_ADDR"))
- }
- sc := make(chan os.Signal, 1)
- signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
- ticker := time.NewTicker(1 * time.Second)
- for {
- select {
- case <-sc:
- return
- case <-ticker.C:
- }
- err = globalBackfeedManager.RefreshFeeds()
- if err != nil {
- log.Printf("unable to refresh backfeed projects: %s", err)
- }
- }
- }
|