diff --git a/globalbackfeedmanager.go b/globalbackfeedmanager.go new file mode 100644 index 0000000..4d4b344 --- /dev/null +++ b/globalbackfeedmanager.go @@ -0,0 +1,259 @@ +package main + +import ( + "bufio" + "compress/flate" + "compress/gzip" + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "strings" + "sync" + "time" + + "github.com/go-redis/redis/v8" + "github.com/gorilla/mux" + "github.com/tevino/abool/v2" +) + +type GlobalBackfeedManager struct { + Context context.Context + Cancel context.CancelFunc + ActiveFeeds map[string]*ProjectBackfeedManager + ActiveSlugs map[string]string + TrackerRedis *redis.Client + BackfeedRedis *redis.ClusterClient + Lock sync.RWMutex + Populated *abool.AtomicBool +} + +func (that *GlobalBackfeedManager) RefreshFeeds() error { + slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result() + if err != nil { + return err + } + var projects []string + projectSlugMap := map[string][]string{} + for slug, project := range slugProjectMap { + projectSlugMap[project] = append(projectSlugMap[project], slug) + } + for project := range projectSlugMap { + projects = append(projects, project) + } + projectConfigs := map[string]ProjectConfig{} + if len(projects) != 0 { + cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result() + if err != nil { + return err + } + if len(projects) != len(cfgi) { + return fmt.Errorf("hmget result had unexpected length") + } + for i, project := range projects { + configString, ok := cfgi[i].(string) + if !ok { + continue + } + config := ProjectConfig{} + if err := json.Unmarshal([]byte(configString), &config); err != nil { + continue + } + projectConfigs[project] = config + } + } + projects = nil + for project := range projectSlugMap { + if _, has := projectConfigs[project]; !has { + delete(projectSlugMap, project) + continue + } + projects = append(projects, project) + } + for slug, project := range slugProjectMap { + if _, has := projectConfigs[project]; !has { + delete(slugProjectMap, slug) + } + } + // add feeds for new projects + for _, project := range projects { + projectConfig := projectConfigs[project] + var outdatedProjectBackfeedManager *ProjectBackfeedManager + if projectBackfeedManager, has := that.ActiveFeeds[project]; has { + if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) { + outdatedProjectBackfeedManager = projectBackfeedManager + } else { + continue + } + } + ctx, cancel := context.WithCancel(that.Context) + projectBackfeedManager := &ProjectBackfeedManager{ + Context: ctx, + Cancel: cancel, + Done: make(chan bool), + C: make(chan *BackfeedItem, ItemChannelBuffer), + BackfeedRedis: that.BackfeedRedis, + Name: project, + ProjectConfig: projectConfig, + } + if projectConfig.RedisConfig != nil { + projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port), + Username: "default", + Password: projectConfig.RedisConfig.Pass, + ReadTimeout: 15 * time.Minute, + }) + } else { + projectBackfeedManager.ProjectRedis = that.TrackerRedis + } + go projectBackfeedManager.Do() + that.Lock.Lock() + that.ActiveFeeds[project] = projectBackfeedManager + that.Lock.Unlock() + if outdatedProjectBackfeedManager != nil { + outdatedProjectBackfeedManager.Cancel() + <-outdatedProjectBackfeedManager.Done + log.Printf("updated project: %s", project) + } else { + log.Printf("added project: %s", project) + } + } + that.Lock.Lock() + that.ActiveSlugs = slugProjectMap + that.Lock.Unlock() + // remove feeds for old projects + for project, projectBackfeedManager := range that.ActiveFeeds { + if _, has := projectSlugMap[project]; has { + continue + } + log.Printf("removing project: %s", project) + that.Lock.Lock() + delete(that.ActiveFeeds, project) + that.Lock.Unlock() + projectBackfeedManager.Cancel() + <-projectBackfeedManager.Done + log.Printf("removed project: %s", project) + } + if !that.Populated.IsSet() { + that.Populated.Set() + } + return nil +} + +func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager { + that.Lock.RLock() + defer that.Lock.RUnlock() + project, has := that.ActiveSlugs[slug] + if !has { + return nil + } + projectBackfeedManager, has := that.ActiveFeeds[project] + if !has { + return nil + } + return projectBackfeedManager +} + +func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + + vars := mux.Vars(req) + slug := vars["slug"] + secondaryShard := req.URL.Query().Get("shard") + + if strings.ContainsAny(secondaryShard, ":/") { + WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid shard name")) + return + } + + projectBackfeedManager := that.GetFeed(slug) + if projectBackfeedManager == nil { + WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel")) + return + } + + splitter := &Splitter{ + Delimiter: []byte(req.URL.Query().Get("delimiter")), + IgnoreEOF: req.URL.Query().Get("ignoreeof") != "", + } + if len(splitter.Delimiter) == 0 { + splitter.Delimiter = []byte{0x00} + } + var body io.ReadCloser + switch req.Header.Get("Content-Encoding") { + case "": + body = req.Body + case "gzip": + var err error + body, err = gzip.NewReader(req.Body) + if err != nil { + WriteResponse(res, http.StatusBadRequest, err) + return + } + defer body.Close() + case "deflate": + body = flate.NewReader(req.Body) + defer body.Close() + default: + WriteResponse(res, http.StatusBadRequest, fmt.Errorf("unsupported Content-Encoding: %s", req.Header.Get("Content-Encoding"))) + } + scanner := bufio.NewScanner(body) + scanner.Split(splitter.Split) + + n := 0 + for scanner.Scan() { + b := scanner.Bytes() + if len(b) == 0 { + continue + } + bcopy := make([]byte, len(b)) + copy(bcopy, b) + item := &BackfeedItem{ + PrimaryShard: GenShardHash(bcopy), + SecondaryShard: secondaryShard, + Item: bcopy, + } + if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + n++ + } + if err := scanner.Err(); err != nil { + WriteResponse(res, http.StatusBadRequest, err) + return + } + WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n)) + return +} + +func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) { + if that.Populated.IsNotSet() { + WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated")) + return + } + if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error { + client.ClientGetName(ctx) + return client.Ping(ctx).Err() + }); err != nil { + WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err)) + return + } + WriteResponse(res, http.StatusOK, "ok") +} + +func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) { + WriteResponse(res, http.StatusOK, "pong") +} + +func (that *GlobalBackfeedManager) CancelAllFeeds() { + that.Populated.UnSet() + that.Cancel() + for project, projectBackfeedManager := range that.ActiveFeeds { + log.Printf("waiting for %s channel to shut down...", project) + <-projectBackfeedManager.Done + delete(that.ActiveFeeds, project) + } +} diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..8e65052 --- /dev/null +++ b/helpers.go @@ -0,0 +1,34 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" +) + +func GenShardHash(b []byte) (final byte) { + for i, b := range b { + final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i) + } + return final +} + +func WriteResponse(res http.ResponseWriter, statusCode int, v any) { + res.Header().Set("Content-Type", "application/json") + res.WriteHeader(statusCode) + if statusCode == http.StatusNoContent { + return + } + if err, isError := v.(error); isError { + v = map[string]any{ + "error": fmt.Sprintf("%v", err), + "status_code": statusCode, + } + } else { + v = map[string]any{ + "data": v, + "status_code": statusCode, + } + } + json.NewEncoder(res).Encode(v) +} diff --git a/lastaccessstats.go b/lastaccessstats.go new file mode 100644 index 0000000..aa568f0 --- /dev/null +++ b/lastaccessstats.go @@ -0,0 +1,115 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/go-redis/redis/v8" +) + +type LastAccessStatsKey struct { + Project string + Shard string + SubShard string +} + +type LastAccessStatsEntry struct { + First time.Time `json:"first"` + Last time.Time `json:"last"` + Size int64 `json:"size"` +} + +type LastAccessStatsMap map[LastAccessStatsKey]*LastAccessStatsEntry + +func (that LastAccessStatsMap) MarshalJSON() ([]byte, error) { + mapped := map[string]map[string]any{} + for key, value := range that { + mapped[fmt.Sprintf("%s:%s:%s", key.Project, key.Shard, key.SubShard)] = map[string]any{ + "first": value.First.Format(time.RFC3339), + "last": value.Last.Format(time.RFC3339), + "size": value.Size, + } + } + return json.Marshal(mapped) +} + +func LastAccessStatsKeyFromString(s string) (LastAccessStatsKey, error) { + parts := strings.SplitN(s, ":", 3) + if len(parts) != 3 { + return LastAccessStatsKey{}, fmt.Errorf("invalid key: %s", s) + } + return LastAccessStatsKey{ + Project: parts[0], + Shard: parts[1], + SubShard: parts[2], + }, nil +} + +func (that *GlobalBackfeedManager) HandleLastAccessStats(res http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + merge := map[string]bool{} + if vv, ok := req.URL.Query()["merge"]; ok { + for _, v := range vv { + merge[v] = true + } + } + lastTs, err := that.BackfeedRedis.HGetAll(req.Context(), ":last_ts").Result() + if err != nil && err != redis.Nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + memoryUsages := map[string]*redis.IntCmd{} + pipe := that.BackfeedRedis.Pipeline() + for key := range lastTs { + memoryUsages[key] = pipe.MemoryUsage(req.Context(), key) + } + _, err = pipe.Exec(req.Context()) + if err != nil && err != redis.Nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + lastAccessStats := LastAccessStatsMap{} + for key, value := range lastTs { + // value is in unix timestamp format + ts, err := strconv.ParseInt(value, 10, 64) + if err != nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + lastAccessStatsKey, err := LastAccessStatsKeyFromString(key) + if err != nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + if merge["project"] { + lastAccessStatsKey.Project = "*" + } + if merge["shard"] { + lastAccessStatsKey.Shard = "*" + } + if merge["sub_shard"] { + lastAccessStatsKey.SubShard = "*" + } + parsedTs := time.Unix(ts, 0) + if v, has := lastAccessStats[lastAccessStatsKey]; !has { + lastAccessStats[lastAccessStatsKey] = &LastAccessStatsEntry{ + First: parsedTs, + Last: parsedTs, + Size: memoryUsages[key].Val(), + } + } else { + if v.First.After(parsedTs) { + v.First = parsedTs + } + if v.Last.Before(parsedTs) { + v.Last = parsedTs + } + v.Size += memoryUsages[key].Val() + } + } + WriteResponse(res, http.StatusOK, lastAccessStats) +} diff --git a/main.go b/main.go index 27aa14b..3e30cc6 100644 --- a/main.go +++ b/main.go @@ -1,25 +1,13 @@ package main import ( - "archive/tar" - "bufio" - "bytes" - "compress/flate" - "compress/gzip" "context" - "encoding/json" - "fmt" - "hash/crc64" - "io" "log" "net/http" _ "net/http/pprof" "os" "os/signal" - "sort" - "strconv" "strings" - "sync" "syscall" "time" @@ -35,710 +23,6 @@ const ( ItemWrapSize = 100000 ) -type ProjectRedisConfig struct { - Host string `json:"host"` - Pass string `json:"pass"` - Port int `json:"port"` -} - -type ProjectConfig struct { - RedisConfig *ProjectRedisConfig `json:"redis,omitempty"` -} - -type BackfeedItem struct { - PrimaryShard byte - SecondaryShard string - Item []byte -} - -type ProjectBackfeedManager struct { - Context context.Context - Cancel context.CancelFunc - Done chan bool - C chan *BackfeedItem - Name string - BackfeedRedis *redis.ClusterClient - ProjectRedis *redis.Client - //Lock sync.RWMutex - ProjectConfig ProjectConfig -} - -func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool { - if that.ProjectConfig.RedisConfig == nil && new == nil { - return false - } - return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new -} - -func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error { - //that.Lock.RLock() - //defer that.Lock.RUnlock() - //if that.C == nil { - // return false - //} - select { - case <-ctx.Done(): - return ctx.Err() - case <-that.Context.Done(): - return fmt.Errorf("backfeed channel closed") - case that.C <- item: - return nil - //default: - // return fmt.Errorf("backfeed channel full") - } -} - -func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) { - if blocking { - select { - case <-that.Context.Done(): - return nil, false - case item, ok := <-that.C: - return item, ok - } - } else { - select { - case <-that.Context.Done(): - return nil, false - case item, ok := <-that.C: - return item, ok - default: - return nil, false - } - } -} - -//func (that *ProjectBackfeedManager) CloseItemChannel() { -// log.Printf("closing item channel for %s", that.Name) -// that.Lock.Lock() -// defer that.Lock.Unlock() -// if that.C == nil { -// return -// } -// close(that.C) -// that.C = nil -//} - -func (that *ProjectBackfeedManager) Do() { - defer close(that.Done) - //defer that.CloseItemChannel() - defer that.Cancel() - - pipe := that.BackfeedRedis.Pipeline() - for { - select { - case <-that.Context.Done(): - break - case <-that.Done: - break - default: - } - item, ok := that.PopItem(true) - if !ok { - break - } - keyMap := map[string][][]byte{} - key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) - keyMap[key] = append(keyMap[key], item.Item) - wrapped := 1 - for wrapped < ItemWrapSize { - item, ok := that.PopItem(false) - if !ok { - break - } - key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) - keyMap[key] = append(keyMap[key], item.Item) - wrapped++ - } - select { - case <-that.Context.Done(): - break - case <-that.Done: - break - default: - } - now := time.Now() - resultMap := map[string]*redis.Cmd{} - lastTS := make([]any, 0, len(keyMap)*2) - for key := range keyMap { - lastTS = append(lastTS, key) - lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix())) - } - pipe.HSet(context.Background(), ":last_ts", lastTS...) - for key, items := range keyMap { - args := []any{ - "bf.madd", - key, - } - for _, item := range items { - args = append(args, item) - } - resultMap[key] = pipe.Do(context.Background(), args...) - } - if _, err := pipe.Exec(context.Background()); err != nil { - log.Printf("%s", err) - } - var sAddItems []any - for key, items := range keyMap { - res, err := resultMap[key].BoolSlice() - if err != nil { - log.Printf("%s", err) - continue - } - if len(res) != len(keyMap[key]) { - continue - } - for i, v := range res { - if v { - sAddItems = append(sAddItems, items[i]) - } - } - } - dupes := wrapped - len(sAddItems) - if len(sAddItems) != 0 { - if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil { - log.Printf("failed to sadd items for %s: %s", that.Name, err) - } - } - if dupes > 0 { - that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes)) - } - } -} - -type GlobalBackfeedManager struct { - Context context.Context - Cancel context.CancelFunc - ActiveFeeds map[string]*ProjectBackfeedManager - ActiveSlugs map[string]string - TrackerRedis *redis.Client - BackfeedRedis *redis.ClusterClient - Lock sync.RWMutex - Populated *abool.AtomicBool -} - -func (that *GlobalBackfeedManager) RefreshFeeds() error { - slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result() - if err != nil { - return err - } - var projects []string - projectSlugMap := map[string][]string{} - for slug, project := range slugProjectMap { - projectSlugMap[project] = append(projectSlugMap[project], slug) - } - for project := range projectSlugMap { - projects = append(projects, project) - } - projectConfigs := map[string]ProjectConfig{} - if len(projects) != 0 { - cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result() - if err != nil { - return err - } - if len(projects) != len(cfgi) { - return fmt.Errorf("hmget result had unexpected length") - } - for i, project := range projects { - configString, ok := cfgi[i].(string) - if !ok { - continue - } - config := ProjectConfig{} - if err := json.Unmarshal([]byte(configString), &config); err != nil { - continue - } - projectConfigs[project] = config - } - } - projects = nil - for project := range projectSlugMap { - if _, has := projectConfigs[project]; !has { - delete(projectSlugMap, project) - continue - } - projects = append(projects, project) - } - for slug, project := range slugProjectMap { - if _, has := projectConfigs[project]; !has { - delete(slugProjectMap, slug) - } - } - // add feeds for new projects - for _, project := range projects { - projectConfig := projectConfigs[project] - var outdatedProjectBackfeedManager *ProjectBackfeedManager - if projectBackfeedManager, has := that.ActiveFeeds[project]; has { - if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) { - outdatedProjectBackfeedManager = projectBackfeedManager - } else { - continue - } - } - ctx, cancel := context.WithCancel(that.Context) - projectBackfeedManager := &ProjectBackfeedManager{ - Context: ctx, - Cancel: cancel, - Done: make(chan bool), - C: make(chan *BackfeedItem, ItemChannelBuffer), - BackfeedRedis: that.BackfeedRedis, - Name: project, - ProjectConfig: projectConfig, - } - if projectConfig.RedisConfig != nil { - projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{ - Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port), - Username: "default", - Password: projectConfig.RedisConfig.Pass, - ReadTimeout: 15 * time.Minute, - }) - } else { - projectBackfeedManager.ProjectRedis = that.TrackerRedis - } - go projectBackfeedManager.Do() - that.Lock.Lock() - that.ActiveFeeds[project] = projectBackfeedManager - that.Lock.Unlock() - if outdatedProjectBackfeedManager != nil { - outdatedProjectBackfeedManager.Cancel() - <-outdatedProjectBackfeedManager.Done - log.Printf("updated project: %s", project) - } else { - log.Printf("added project: %s", project) - } - } - that.Lock.Lock() - that.ActiveSlugs = slugProjectMap - that.Lock.Unlock() - // remove feeds for old projects - for project, projectBackfeedManager := range that.ActiveFeeds { - if _, has := projectSlugMap[project]; has { - continue - } - log.Printf("removing project: %s", project) - that.Lock.Lock() - delete(that.ActiveFeeds, project) - that.Lock.Unlock() - projectBackfeedManager.Cancel() - <-projectBackfeedManager.Done - log.Printf("removed project: %s", project) - } - if !that.Populated.IsSet() { - that.Populated.Set() - } - return nil -} - -type Splitter struct { - Delimiter []byte - IgnoreEOF bool -} - -func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) { - for i := 0; i < len(data); i++ { - if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) { - return i + len(that.Delimiter), data[:i], nil - } - } - if len(data) == 0 || !atEOF { - return 0, nil, nil - } - if atEOF && that.IgnoreEOF { - return len(data), data, nil - } - return 0, data, io.ErrUnexpectedEOF -} - -func GenShardHash(b []byte) (final byte) { - for i, b := range b { - final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i) - } - return final -} - -func WriteResponse(res http.ResponseWriter, statusCode int, v any) { - res.Header().Set("Content-Type", "application/json") - res.WriteHeader(statusCode) - if statusCode == http.StatusNoContent { - return - } - if err, isError := v.(error); isError { - v = map[string]any{ - "error": fmt.Sprintf("%v", err), - "status_code": statusCode, - } - } else { - v = map[string]any{ - "data": v, - "status_code": statusCode, - } - } - json.NewEncoder(res).Encode(v) -} - -func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager { - that.Lock.RLock() - defer that.Lock.RUnlock() - project, has := that.ActiveSlugs[slug] - if !has { - return nil - } - projectBackfeedManager, has := that.ActiveFeeds[project] - if !has { - return nil - } - return projectBackfeedManager -} - -type LastAccessStatsKey struct { - Project string - Shard string - SubShard string -} - -type LastAccessStatsEntry struct { - First time.Time `json:"first"` - Last time.Time `json:"last"` - Size int64 `json:"size"` -} - -type LastAccessStatsMap map[LastAccessStatsKey]*LastAccessStatsEntry - -func (that LastAccessStatsMap) MarshalJSON() ([]byte, error) { - mapped := map[string]map[string]any{} - for key, value := range that { - mapped[fmt.Sprintf("%s:%s:%s", key.Project, key.Shard, key.SubShard)] = map[string]any{ - "first": value.First.Format(time.RFC3339), - "last": value.Last.Format(time.RFC3339), - "size": value.Size, - } - } - return json.Marshal(mapped) -} - -func LastAccessStatsKeyFromString(s string) (LastAccessStatsKey, error) { - parts := strings.SplitN(s, ":", 3) - if len(parts) != 3 { - return LastAccessStatsKey{}, fmt.Errorf("invalid key: %s", s) - } - return LastAccessStatsKey{ - Project: parts[0], - Shard: parts[1], - SubShard: parts[2], - }, nil -} - -func (that *GlobalBackfeedManager) HandleLastAccessStats(res http.ResponseWriter, req *http.Request) { - defer req.Body.Close() - merge := map[string]bool{} - if vv, ok := req.URL.Query()["merge"]; ok { - for _, v := range vv { - merge[v] = true - } - } - lastTs, err := that.BackfeedRedis.HGetAll(req.Context(), ":last_ts").Result() - if err != nil && err != redis.Nil { - WriteResponse(res, http.StatusInternalServerError, err) - return - } - memoryUsages := map[string]*redis.IntCmd{} - pipe := that.BackfeedRedis.Pipeline() - for key := range lastTs { - memoryUsages[key] = pipe.MemoryUsage(req.Context(), key) - } - _, err = pipe.Exec(req.Context()) - if err != nil && err != redis.Nil { - WriteResponse(res, http.StatusInternalServerError, err) - return - } - lastAccessStats := LastAccessStatsMap{} - for key, value := range lastTs { - // value is in unix timestamp format - ts, err := strconv.ParseInt(value, 10, 64) - if err != nil { - WriteResponse(res, http.StatusInternalServerError, err) - return - } - lastAccessStatsKey, err := LastAccessStatsKeyFromString(key) - if err != nil { - WriteResponse(res, http.StatusInternalServerError, err) - return - } - if merge["project"] { - lastAccessStatsKey.Project = "*" - } - if merge["shard"] { - lastAccessStatsKey.Shard = "*" - } - if merge["sub_shard"] { - lastAccessStatsKey.SubShard = "*" - } - parsedTs := time.Unix(ts, 0) - if v, has := lastAccessStats[lastAccessStatsKey]; !has { - lastAccessStats[lastAccessStatsKey] = &LastAccessStatsEntry{ - First: parsedTs, - Last: parsedTs, - Size: memoryUsages[key].Val(), - } - } else { - if v.First.After(parsedTs) { - v.First = parsedTs - } - if v.Last.Before(parsedTs) { - v.Last = parsedTs - } - v.Size += memoryUsages[key].Val() - } - } - WriteResponse(res, http.StatusOK, lastAccessStats) -} - -func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) { - defer req.Body.Close() - - vars := mux.Vars(req) - slug := vars["slug"] - secondaryShard := req.URL.Query().Get("shard") - - projectBackfeedManager := that.GetFeed(slug) - if projectBackfeedManager == nil { - WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel")) - return - } - - splitter := &Splitter{ - Delimiter: []byte(req.URL.Query().Get("delimiter")), - IgnoreEOF: req.URL.Query().Get("ignoreeof") != "", - } - if len(splitter.Delimiter) == 0 { - splitter.Delimiter = []byte{0x00} - } - var body io.ReadCloser - switch req.Header.Get("Content-Encoding") { - case "": - body = req.Body - case "gzip": - var err error - body, err = gzip.NewReader(req.Body) - if err != nil { - WriteResponse(res, http.StatusBadRequest, err) - return - } - defer body.Close() - case "deflate": - body = flate.NewReader(req.Body) - defer body.Close() - default: - WriteResponse(res, http.StatusBadRequest, fmt.Errorf("unsupported Content-Encoding: %s", req.Header.Get("Content-Encoding"))) - } - scanner := bufio.NewScanner(body) - scanner.Split(splitter.Split) - - n := 0 - for scanner.Scan() { - b := scanner.Bytes() - if len(b) == 0 { - continue - } - bcopy := make([]byte, len(b)) - copy(bcopy, b) - item := &BackfeedItem{ - PrimaryShard: GenShardHash(bcopy), - SecondaryShard: secondaryShard, - Item: bcopy, - } - if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil { - WriteResponse(res, http.StatusInternalServerError, err) - return - } - n++ - } - if err := scanner.Err(); err != nil { - WriteResponse(res, http.StatusBadRequest, err) - return - } - WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n)) - return -} - -func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) { - if that.Populated.IsNotSet() { - WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated")) - return - } - if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error { - client.ClientGetName(ctx) - return client.Ping(ctx).Err() - }); err != nil { - WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err)) - return - } - WriteResponse(res, http.StatusOK, "ok") -} - -func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) { - WriteResponse(res, http.StatusOK, "pong") -} - -type DumpChunkName struct { - Key string `json:"key"` - Distance int `json:"distance"` - Cursor int64 `json:"cursor"` - Checksum uint64 `json:"checksum"` -} - -func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http.Request) { - vars := mux.Vars(req) - key := vars["key"] - if key == "" { - key = "*:*:*" - } - if strings.Count(key, ":") < 2 { - WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format")) - return - } - lock := sync.Mutex{} - keys := []string{} - if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error { - cursor := uint64(0) - var shardKeys []string - for { - var err error - var keysBatch []string - keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result() - if err != nil && err != redis.Nil { - return err - } - shardKeys = append(shardKeys, keysBatch...) - if cursor == 0 { - break - } - } - lock.Lock() - defer lock.Unlock() - keys = append(keys, shardKeys...) - return nil - }); err != nil && err != redis.Nil { - WriteResponse(res, http.StatusInternalServerError, err) - return - } - sort.Strings(keys) - hasJsonAcceptHeader := false - for _, accept := range strings.Split(req.Header.Get("Accept"), ",") { - accept = strings.TrimSpace(accept) - if accept == "application/json" || strings.HasPrefix(accept, "application/json;") { - hasJsonAcceptHeader = true - break - } - } - if hasJsonAcceptHeader { - WriteResponse(res, http.StatusOK, keys) - return - } - if len(keys) == 0 { - WriteResponse(res, http.StatusNoContent, nil) - return - } - tarWriter := tar.NewWriter(res) - defer tarWriter.Close() - pipe := that.BackfeedRedis.Pipeline() - for _, key := range keys { - cursor := int64(0) - for i := 0; ; i++ { - rawResResult := pipe.Do(req.Context(), "bf.scandump", key, cursor) - tsStringResult := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key) - _, err := pipe.Exec(req.Context()) - if err != nil && err != redis.Nil { - WriteResponse(res, http.StatusInternalServerError, err) - return - } - rawRes, err := rawResResult.Result() - if err != nil && err != redis.Nil { - WriteResponse(res, http.StatusInternalServerError, err) - return - } - if rawRes == nil { - break - } - resSlice, ok := rawRes.([]any) - if !ok { - WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response type: %T", rawRes)) - return - } - if len(resSlice) != 2 { - WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response length: %d", len(resSlice))) - return - } - cursor, ok = resSlice[0].(int64) - if !ok { - WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response first element type: %T", resSlice[0])) - return - } - chunkString, ok := resSlice[1].(string) - if !ok { - WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response second element type: %T", resSlice[1])) - return - } - chunk := []byte(chunkString) - - lastAccess := time.Time{} - tsString, err := tsStringResult.Result() - if err == nil && tsString != "" { - ts, err := strconv.ParseInt(tsString, 10, 64) - if err == nil { - lastAccess = time.Unix(ts, 0) - } - } - - nameStruct := DumpChunkName{ - Key: key, - Cursor: cursor, - Distance: i, - Checksum: crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA)), - } - name, err := json.Marshal(nameStruct) - if err != nil { - WriteResponse(res, http.StatusInternalServerError, err) - return - } - if err := tarWriter.WriteHeader(&tar.Header{ - Typeflag: tar.TypeReg, - Name: string(name), - Size: int64(len(chunk)), - Mode: 0600, - ModTime: lastAccess, - AccessTime: lastAccess, - ChangeTime: lastAccess, - PAXRecords: map[string]string{ - "ARCHIVETEAM.bffchunk.key": key, - "ARCHIVETEAM.bffchunk.cursor": fmt.Sprintf("%d", cursor), - "ARCHIVETEAM.bffchunk.distance": fmt.Sprintf("%d", i), - "ARCHIVETEAM.bffchunk.checksum": fmt.Sprintf("%d", nameStruct.Checksum), - }, - Format: tar.FormatPAX, - }); err != nil { - WriteResponse(res, http.StatusInternalServerError, err) - } - if _, err := tarWriter.Write(chunk); err != nil { - WriteResponse(res, http.StatusInternalServerError, err) - } - if cursor == 0 && len(chunk) == 0 { - break - } - } - } -} - -func (that *GlobalBackfeedManager) CancelAllFeeds() { - that.Populated.UnSet() - that.Cancel() - for project, projectBackfeedManager := range that.ActiveFeeds { - log.Printf("waiting for %s channel to shut down...", project) - <-projectBackfeedManager.Done - delete(that.ActiveFeeds, project) - } -} - func main() { log.SetFlags(log.Flags() | log.Lshortfile) diff --git a/persistence.go b/persistence.go new file mode 100644 index 0000000..4cefce3 --- /dev/null +++ b/persistence.go @@ -0,0 +1,167 @@ +package main + +import ( + "archive/tar" + "context" + "encoding/json" + "fmt" + "hash/crc64" + "net/http" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/go-redis/redis/v8" + "github.com/gorilla/mux" +) + +type DumpChunkName struct { + Key string `json:"key"` + Distance int `json:"distance"` + Cursor int64 `json:"cursor"` + Checksum uint64 `json:"checksum"` +} + +func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + key := vars["key"] + if key == "" { + key = "*:*:*" + } + if strings.Count(key, ":") < 2 { + WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format")) + return + } + lock := sync.Mutex{} + keys := []string{} + if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error { + cursor := uint64(0) + var shardKeys []string + for { + var err error + var keysBatch []string + keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result() + if err != nil && err != redis.Nil { + return err + } + shardKeys = append(shardKeys, keysBatch...) + if cursor == 0 { + break + } + } + lock.Lock() + defer lock.Unlock() + keys = append(keys, shardKeys...) + return nil + }); err != nil && err != redis.Nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + sort.Strings(keys) + hasJsonAcceptHeader := false + for _, accept := range strings.Split(req.Header.Get("Accept"), ",") { + accept = strings.TrimSpace(accept) + if accept == "application/json" || strings.HasPrefix(accept, "application/json;") { + hasJsonAcceptHeader = true + break + } + } + if hasJsonAcceptHeader { + WriteResponse(res, http.StatusOK, keys) + return + } + if len(keys) == 0 { + WriteResponse(res, http.StatusNoContent, nil) + return + } + tarWriter := tar.NewWriter(res) + defer tarWriter.Close() + pipe := that.BackfeedRedis.Pipeline() + for _, key := range keys { + cursor := int64(0) + for i := 0; ; i++ { + rawResResult := pipe.Do(req.Context(), "bf.scandump", key, cursor) + tsStringResult := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key) + _, err := pipe.Exec(req.Context()) + if err != nil && err != redis.Nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + rawRes, err := rawResResult.Result() + if err != nil && err != redis.Nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + if rawRes == nil { + break + } + resSlice, ok := rawRes.([]any) + if !ok { + WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response type: %T", rawRes)) + return + } + if len(resSlice) != 2 { + WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response length: %d", len(resSlice))) + return + } + cursor, ok = resSlice[0].(int64) + if !ok { + WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response first element type: %T", resSlice[0])) + return + } + chunkString, ok := resSlice[1].(string) + if !ok { + WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response second element type: %T", resSlice[1])) + return + } + chunk := []byte(chunkString) + + lastAccess := time.Time{} + tsString, err := tsStringResult.Result() + if err == nil && tsString != "" { + ts, err := strconv.ParseInt(tsString, 10, 64) + if err == nil { + lastAccess = time.Unix(ts, 0) + } + } + + nameStruct := DumpChunkName{ + Key: key, + Cursor: cursor, + Distance: i, + Checksum: crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA)), + } + name, err := json.Marshal(nameStruct) + if err != nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + if err := tarWriter.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + Name: string(name), + Size: int64(len(chunk)), + Mode: 0600, + ModTime: lastAccess, + AccessTime: lastAccess, + ChangeTime: lastAccess, + PAXRecords: map[string]string{ + "ARCHIVETEAM.bffchunk.key": key, + "ARCHIVETEAM.bffchunk.cursor": fmt.Sprintf("%d", cursor), + "ARCHIVETEAM.bffchunk.distance": fmt.Sprintf("%d", i), + "ARCHIVETEAM.bffchunk.checksum": fmt.Sprintf("%d", nameStruct.Checksum), + }, + Format: tar.FormatPAX, + }); err != nil { + WriteResponse(res, http.StatusInternalServerError, err) + } + if _, err := tarWriter.Write(chunk); err != nil { + WriteResponse(res, http.StatusInternalServerError, err) + } + if cursor == 0 && len(chunk) == 0 { + break + } + } + } +} diff --git a/projectbackfeedmanager.go b/projectbackfeedmanager.go new file mode 100644 index 0000000..5bd5f4a --- /dev/null +++ b/projectbackfeedmanager.go @@ -0,0 +1,160 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/go-redis/redis/v8" +) + +type ProjectBackfeedManager struct { + Context context.Context + Cancel context.CancelFunc + Done chan bool + C chan *BackfeedItem + Name string + BackfeedRedis *redis.ClusterClient + ProjectRedis *redis.Client + //Lock sync.RWMutex + ProjectConfig ProjectConfig +} + +type ProjectRedisConfig struct { + Host string `json:"host"` + Pass string `json:"pass"` + Port int `json:"port"` +} + +func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool { + if that.ProjectConfig.RedisConfig == nil && new == nil { + return false + } + return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new +} + +func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error { + //that.Lock.RLock() + //defer that.Lock.RUnlock() + //if that.C == nil { + // return false + //} + select { + case <-ctx.Done(): + return ctx.Err() + case <-that.Context.Done(): + return fmt.Errorf("backfeed channel closed") + case that.C <- item: + return nil + //default: + // return fmt.Errorf("backfeed channel full") + } +} + +func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) { + if blocking { + select { + case <-that.Context.Done(): + return nil, false + case item, ok := <-that.C: + return item, ok + } + } else { + select { + case <-that.Context.Done(): + return nil, false + case item, ok := <-that.C: + return item, ok + default: + return nil, false + } + } +} + +func (that *ProjectBackfeedManager) Do() { + defer close(that.Done) + //defer that.CloseItemChannel() + defer that.Cancel() + + pipe := that.BackfeedRedis.Pipeline() + for { + select { + case <-that.Context.Done(): + break + case <-that.Done: + break + default: + } + item, ok := that.PopItem(true) + if !ok { + break + } + keyMap := map[string][][]byte{} + key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) + keyMap[key] = append(keyMap[key], item.Item) + wrapped := 1 + for wrapped < ItemWrapSize { + item, ok := that.PopItem(false) + if !ok { + break + } + key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) + keyMap[key] = append(keyMap[key], item.Item) + wrapped++ + } + select { + case <-that.Context.Done(): + break + case <-that.Done: + break + default: + } + now := time.Now() + resultMap := map[string]*redis.Cmd{} + lastTS := make([]any, 0, len(keyMap)*2) + for key := range keyMap { + lastTS = append(lastTS, key) + lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix())) + } + pipe.HSet(context.Background(), ":last_ts", lastTS...) + for key, items := range keyMap { + args := []any{ + "bf.madd", + key, + } + for _, item := range items { + args = append(args, item) + } + resultMap[key] = pipe.Do(context.Background(), args...) + } + if _, err := pipe.Exec(context.Background()); err != nil { + log.Printf("%s", err) + } + var sAddItems []any + for key, items := range keyMap { + res, err := resultMap[key].BoolSlice() + if err != nil { + log.Printf("%s", err) + continue + } + if len(res) != len(keyMap[key]) { + continue + } + for i, v := range res { + if v { + sAddItems = append(sAddItems, items[i]) + } + } + } + dupes := wrapped - len(sAddItems) + if len(sAddItems) != 0 { + if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil { + log.Printf("failed to sadd items for %s: %s", that.Name, err) + } + } + if dupes > 0 { + that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes)) + } + } +} diff --git a/splitter.go b/splitter.go new file mode 100644 index 0000000..96190cb --- /dev/null +++ b/splitter.go @@ -0,0 +1,26 @@ +package main + +import ( + "bytes" + "io" +) + +type Splitter struct { + Delimiter []byte + IgnoreEOF bool +} + +func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) { + for i := 0; i < len(data); i++ { + if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) { + return i + len(that.Delimiter), data[:i], nil + } + } + if len(data) == 0 || !atEOF { + return 0, nil, nil + } + if atEOF && that.IgnoreEOF { + return len(data), data, nil + } + return 0, data, io.ErrUnexpectedEOF +} diff --git a/structs.go b/structs.go new file mode 100644 index 0000000..160f056 --- /dev/null +++ b/structs.go @@ -0,0 +1,11 @@ +package main + +type ProjectConfig struct { + RedisConfig *ProjectRedisConfig `json:"redis,omitempty"` +} + +type BackfeedItem struct { + PrimaryShard byte + SecondaryShard string + Item []byte +}