@@ -0,0 +1,259 @@ | |||
package main | |||
import ( | |||
"bufio" | |||
"compress/flate" | |||
"compress/gzip" | |||
"context" | |||
"encoding/json" | |||
"fmt" | |||
"io" | |||
"log" | |||
"net/http" | |||
"strings" | |||
"sync" | |||
"time" | |||
"github.com/go-redis/redis/v8" | |||
"github.com/gorilla/mux" | |||
"github.com/tevino/abool/v2" | |||
) | |||
type GlobalBackfeedManager struct { | |||
Context context.Context | |||
Cancel context.CancelFunc | |||
ActiveFeeds map[string]*ProjectBackfeedManager | |||
ActiveSlugs map[string]string | |||
TrackerRedis *redis.Client | |||
BackfeedRedis *redis.ClusterClient | |||
Lock sync.RWMutex | |||
Populated *abool.AtomicBool | |||
} | |||
func (that *GlobalBackfeedManager) RefreshFeeds() error { | |||
slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result() | |||
if err != nil { | |||
return err | |||
} | |||
var projects []string | |||
projectSlugMap := map[string][]string{} | |||
for slug, project := range slugProjectMap { | |||
projectSlugMap[project] = append(projectSlugMap[project], slug) | |||
} | |||
for project := range projectSlugMap { | |||
projects = append(projects, project) | |||
} | |||
projectConfigs := map[string]ProjectConfig{} | |||
if len(projects) != 0 { | |||
cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result() | |||
if err != nil { | |||
return err | |||
} | |||
if len(projects) != len(cfgi) { | |||
return fmt.Errorf("hmget result had unexpected length") | |||
} | |||
for i, project := range projects { | |||
configString, ok := cfgi[i].(string) | |||
if !ok { | |||
continue | |||
} | |||
config := ProjectConfig{} | |||
if err := json.Unmarshal([]byte(configString), &config); err != nil { | |||
continue | |||
} | |||
projectConfigs[project] = config | |||
} | |||
} | |||
projects = nil | |||
for project := range projectSlugMap { | |||
if _, has := projectConfigs[project]; !has { | |||
delete(projectSlugMap, project) | |||
continue | |||
} | |||
projects = append(projects, project) | |||
} | |||
for slug, project := range slugProjectMap { | |||
if _, has := projectConfigs[project]; !has { | |||
delete(slugProjectMap, slug) | |||
} | |||
} | |||
// add feeds for new projects | |||
for _, project := range projects { | |||
projectConfig := projectConfigs[project] | |||
var outdatedProjectBackfeedManager *ProjectBackfeedManager | |||
if projectBackfeedManager, has := that.ActiveFeeds[project]; has { | |||
if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) { | |||
outdatedProjectBackfeedManager = projectBackfeedManager | |||
} else { | |||
continue | |||
} | |||
} | |||
ctx, cancel := context.WithCancel(that.Context) | |||
projectBackfeedManager := &ProjectBackfeedManager{ | |||
Context: ctx, | |||
Cancel: cancel, | |||
Done: make(chan bool), | |||
C: make(chan *BackfeedItem, ItemChannelBuffer), | |||
BackfeedRedis: that.BackfeedRedis, | |||
Name: project, | |||
ProjectConfig: projectConfig, | |||
} | |||
if projectConfig.RedisConfig != nil { | |||
projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{ | |||
Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port), | |||
Username: "default", | |||
Password: projectConfig.RedisConfig.Pass, | |||
ReadTimeout: 15 * time.Minute, | |||
}) | |||
} else { | |||
projectBackfeedManager.ProjectRedis = that.TrackerRedis | |||
} | |||
go projectBackfeedManager.Do() | |||
that.Lock.Lock() | |||
that.ActiveFeeds[project] = projectBackfeedManager | |||
that.Lock.Unlock() | |||
if outdatedProjectBackfeedManager != nil { | |||
outdatedProjectBackfeedManager.Cancel() | |||
<-outdatedProjectBackfeedManager.Done | |||
log.Printf("updated project: %s", project) | |||
} else { | |||
log.Printf("added project: %s", project) | |||
} | |||
} | |||
that.Lock.Lock() | |||
that.ActiveSlugs = slugProjectMap | |||
that.Lock.Unlock() | |||
// remove feeds for old projects | |||
for project, projectBackfeedManager := range that.ActiveFeeds { | |||
if _, has := projectSlugMap[project]; has { | |||
continue | |||
} | |||
log.Printf("removing project: %s", project) | |||
that.Lock.Lock() | |||
delete(that.ActiveFeeds, project) | |||
that.Lock.Unlock() | |||
projectBackfeedManager.Cancel() | |||
<-projectBackfeedManager.Done | |||
log.Printf("removed project: %s", project) | |||
} | |||
if !that.Populated.IsSet() { | |||
that.Populated.Set() | |||
} | |||
return nil | |||
} | |||
func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager { | |||
that.Lock.RLock() | |||
defer that.Lock.RUnlock() | |||
project, has := that.ActiveSlugs[slug] | |||
if !has { | |||
return nil | |||
} | |||
projectBackfeedManager, has := that.ActiveFeeds[project] | |||
if !has { | |||
return nil | |||
} | |||
return projectBackfeedManager | |||
} | |||
func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) { | |||
defer req.Body.Close() | |||
vars := mux.Vars(req) | |||
slug := vars["slug"] | |||
secondaryShard := req.URL.Query().Get("shard") | |||
if strings.ContainsAny(secondaryShard, ":/") { | |||
WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid shard name")) | |||
return | |||
} | |||
projectBackfeedManager := that.GetFeed(slug) | |||
if projectBackfeedManager == nil { | |||
WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel")) | |||
return | |||
} | |||
splitter := &Splitter{ | |||
Delimiter: []byte(req.URL.Query().Get("delimiter")), | |||
IgnoreEOF: req.URL.Query().Get("ignoreeof") != "", | |||
} | |||
if len(splitter.Delimiter) == 0 { | |||
splitter.Delimiter = []byte{0x00} | |||
} | |||
var body io.ReadCloser | |||
switch req.Header.Get("Content-Encoding") { | |||
case "": | |||
body = req.Body | |||
case "gzip": | |||
var err error | |||
body, err = gzip.NewReader(req.Body) | |||
if err != nil { | |||
WriteResponse(res, http.StatusBadRequest, err) | |||
return | |||
} | |||
defer body.Close() | |||
case "deflate": | |||
body = flate.NewReader(req.Body) | |||
defer body.Close() | |||
default: | |||
WriteResponse(res, http.StatusBadRequest, fmt.Errorf("unsupported Content-Encoding: %s", req.Header.Get("Content-Encoding"))) | |||
} | |||
scanner := bufio.NewScanner(body) | |||
scanner.Split(splitter.Split) | |||
n := 0 | |||
for scanner.Scan() { | |||
b := scanner.Bytes() | |||
if len(b) == 0 { | |||
continue | |||
} | |||
bcopy := make([]byte, len(b)) | |||
copy(bcopy, b) | |||
item := &BackfeedItem{ | |||
PrimaryShard: GenShardHash(bcopy), | |||
SecondaryShard: secondaryShard, | |||
Item: bcopy, | |||
} | |||
if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
n++ | |||
} | |||
if err := scanner.Err(); err != nil { | |||
WriteResponse(res, http.StatusBadRequest, err) | |||
return | |||
} | |||
WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n)) | |||
return | |||
} | |||
func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) { | |||
if that.Populated.IsNotSet() { | |||
WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated")) | |||
return | |||
} | |||
if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error { | |||
client.ClientGetName(ctx) | |||
return client.Ping(ctx).Err() | |||
}); err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err)) | |||
return | |||
} | |||
WriteResponse(res, http.StatusOK, "ok") | |||
} | |||
func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) { | |||
WriteResponse(res, http.StatusOK, "pong") | |||
} | |||
func (that *GlobalBackfeedManager) CancelAllFeeds() { | |||
that.Populated.UnSet() | |||
that.Cancel() | |||
for project, projectBackfeedManager := range that.ActiveFeeds { | |||
log.Printf("waiting for %s channel to shut down...", project) | |||
<-projectBackfeedManager.Done | |||
delete(that.ActiveFeeds, project) | |||
} | |||
} |
@@ -0,0 +1,34 @@ | |||
package main | |||
import ( | |||
"encoding/json" | |||
"fmt" | |||
"net/http" | |||
) | |||
func GenShardHash(b []byte) (final byte) { | |||
for i, b := range b { | |||
final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i) | |||
} | |||
return final | |||
} | |||
func WriteResponse(res http.ResponseWriter, statusCode int, v any) { | |||
res.Header().Set("Content-Type", "application/json") | |||
res.WriteHeader(statusCode) | |||
if statusCode == http.StatusNoContent { | |||
return | |||
} | |||
if err, isError := v.(error); isError { | |||
v = map[string]any{ | |||
"error": fmt.Sprintf("%v", err), | |||
"status_code": statusCode, | |||
} | |||
} else { | |||
v = map[string]any{ | |||
"data": v, | |||
"status_code": statusCode, | |||
} | |||
} | |||
json.NewEncoder(res).Encode(v) | |||
} |
@@ -0,0 +1,115 @@ | |||
package main | |||
import ( | |||
"encoding/json" | |||
"fmt" | |||
"net/http" | |||
"strconv" | |||
"strings" | |||
"time" | |||
"github.com/go-redis/redis/v8" | |||
) | |||
type LastAccessStatsKey struct { | |||
Project string | |||
Shard string | |||
SubShard string | |||
} | |||
type LastAccessStatsEntry struct { | |||
First time.Time `json:"first"` | |||
Last time.Time `json:"last"` | |||
Size int64 `json:"size"` | |||
} | |||
type LastAccessStatsMap map[LastAccessStatsKey]*LastAccessStatsEntry | |||
func (that LastAccessStatsMap) MarshalJSON() ([]byte, error) { | |||
mapped := map[string]map[string]any{} | |||
for key, value := range that { | |||
mapped[fmt.Sprintf("%s:%s:%s", key.Project, key.Shard, key.SubShard)] = map[string]any{ | |||
"first": value.First.Format(time.RFC3339), | |||
"last": value.Last.Format(time.RFC3339), | |||
"size": value.Size, | |||
} | |||
} | |||
return json.Marshal(mapped) | |||
} | |||
func LastAccessStatsKeyFromString(s string) (LastAccessStatsKey, error) { | |||
parts := strings.SplitN(s, ":", 3) | |||
if len(parts) != 3 { | |||
return LastAccessStatsKey{}, fmt.Errorf("invalid key: %s", s) | |||
} | |||
return LastAccessStatsKey{ | |||
Project: parts[0], | |||
Shard: parts[1], | |||
SubShard: parts[2], | |||
}, nil | |||
} | |||
func (that *GlobalBackfeedManager) HandleLastAccessStats(res http.ResponseWriter, req *http.Request) { | |||
defer req.Body.Close() | |||
merge := map[string]bool{} | |||
if vv, ok := req.URL.Query()["merge"]; ok { | |||
for _, v := range vv { | |||
merge[v] = true | |||
} | |||
} | |||
lastTs, err := that.BackfeedRedis.HGetAll(req.Context(), ":last_ts").Result() | |||
if err != nil && err != redis.Nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
memoryUsages := map[string]*redis.IntCmd{} | |||
pipe := that.BackfeedRedis.Pipeline() | |||
for key := range lastTs { | |||
memoryUsages[key] = pipe.MemoryUsage(req.Context(), key) | |||
} | |||
_, err = pipe.Exec(req.Context()) | |||
if err != nil && err != redis.Nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
lastAccessStats := LastAccessStatsMap{} | |||
for key, value := range lastTs { | |||
// value is in unix timestamp format | |||
ts, err := strconv.ParseInt(value, 10, 64) | |||
if err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
lastAccessStatsKey, err := LastAccessStatsKeyFromString(key) | |||
if err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
if merge["project"] { | |||
lastAccessStatsKey.Project = "*" | |||
} | |||
if merge["shard"] { | |||
lastAccessStatsKey.Shard = "*" | |||
} | |||
if merge["sub_shard"] { | |||
lastAccessStatsKey.SubShard = "*" | |||
} | |||
parsedTs := time.Unix(ts, 0) | |||
if v, has := lastAccessStats[lastAccessStatsKey]; !has { | |||
lastAccessStats[lastAccessStatsKey] = &LastAccessStatsEntry{ | |||
First: parsedTs, | |||
Last: parsedTs, | |||
Size: memoryUsages[key].Val(), | |||
} | |||
} else { | |||
if v.First.After(parsedTs) { | |||
v.First = parsedTs | |||
} | |||
if v.Last.Before(parsedTs) { | |||
v.Last = parsedTs | |||
} | |||
v.Size += memoryUsages[key].Val() | |||
} | |||
} | |||
WriteResponse(res, http.StatusOK, lastAccessStats) | |||
} |
@@ -1,25 +1,13 @@ | |||
package main | |||
import ( | |||
"archive/tar" | |||
"bufio" | |||
"bytes" | |||
"compress/flate" | |||
"compress/gzip" | |||
"context" | |||
"encoding/json" | |||
"fmt" | |||
"hash/crc64" | |||
"io" | |||
"log" | |||
"net/http" | |||
_ "net/http/pprof" | |||
"os" | |||
"os/signal" | |||
"sort" | |||
"strconv" | |||
"strings" | |||
"sync" | |||
"syscall" | |||
"time" | |||
@@ -35,710 +23,6 @@ const ( | |||
ItemWrapSize = 100000 | |||
) | |||
type ProjectRedisConfig struct { | |||
Host string `json:"host"` | |||
Pass string `json:"pass"` | |||
Port int `json:"port"` | |||
} | |||
type ProjectConfig struct { | |||
RedisConfig *ProjectRedisConfig `json:"redis,omitempty"` | |||
} | |||
type BackfeedItem struct { | |||
PrimaryShard byte | |||
SecondaryShard string | |||
Item []byte | |||
} | |||
type ProjectBackfeedManager struct { | |||
Context context.Context | |||
Cancel context.CancelFunc | |||
Done chan bool | |||
C chan *BackfeedItem | |||
Name string | |||
BackfeedRedis *redis.ClusterClient | |||
ProjectRedis *redis.Client | |||
//Lock sync.RWMutex | |||
ProjectConfig ProjectConfig | |||
} | |||
func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool { | |||
if that.ProjectConfig.RedisConfig == nil && new == nil { | |||
return false | |||
} | |||
return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new | |||
} | |||
func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error { | |||
//that.Lock.RLock() | |||
//defer that.Lock.RUnlock() | |||
//if that.C == nil { | |||
// return false | |||
//} | |||
select { | |||
case <-ctx.Done(): | |||
return ctx.Err() | |||
case <-that.Context.Done(): | |||
return fmt.Errorf("backfeed channel closed") | |||
case that.C <- item: | |||
return nil | |||
//default: | |||
// return fmt.Errorf("backfeed channel full") | |||
} | |||
} | |||
func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) { | |||
if blocking { | |||
select { | |||
case <-that.Context.Done(): | |||
return nil, false | |||
case item, ok := <-that.C: | |||
return item, ok | |||
} | |||
} else { | |||
select { | |||
case <-that.Context.Done(): | |||
return nil, false | |||
case item, ok := <-that.C: | |||
return item, ok | |||
default: | |||
return nil, false | |||
} | |||
} | |||
} | |||
//func (that *ProjectBackfeedManager) CloseItemChannel() { | |||
// log.Printf("closing item channel for %s", that.Name) | |||
// that.Lock.Lock() | |||
// defer that.Lock.Unlock() | |||
// if that.C == nil { | |||
// return | |||
// } | |||
// close(that.C) | |||
// that.C = nil | |||
//} | |||
func (that *ProjectBackfeedManager) Do() { | |||
defer close(that.Done) | |||
//defer that.CloseItemChannel() | |||
defer that.Cancel() | |||
pipe := that.BackfeedRedis.Pipeline() | |||
for { | |||
select { | |||
case <-that.Context.Done(): | |||
break | |||
case <-that.Done: | |||
break | |||
default: | |||
} | |||
item, ok := that.PopItem(true) | |||
if !ok { | |||
break | |||
} | |||
keyMap := map[string][][]byte{} | |||
key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) | |||
keyMap[key] = append(keyMap[key], item.Item) | |||
wrapped := 1 | |||
for wrapped < ItemWrapSize { | |||
item, ok := that.PopItem(false) | |||
if !ok { | |||
break | |||
} | |||
key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) | |||
keyMap[key] = append(keyMap[key], item.Item) | |||
wrapped++ | |||
} | |||
select { | |||
case <-that.Context.Done(): | |||
break | |||
case <-that.Done: | |||
break | |||
default: | |||
} | |||
now := time.Now() | |||
resultMap := map[string]*redis.Cmd{} | |||
lastTS := make([]any, 0, len(keyMap)*2) | |||
for key := range keyMap { | |||
lastTS = append(lastTS, key) | |||
lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix())) | |||
} | |||
pipe.HSet(context.Background(), ":last_ts", lastTS...) | |||
for key, items := range keyMap { | |||
args := []any{ | |||
"bf.madd", | |||
key, | |||
} | |||
for _, item := range items { | |||
args = append(args, item) | |||
} | |||
resultMap[key] = pipe.Do(context.Background(), args...) | |||
} | |||
if _, err := pipe.Exec(context.Background()); err != nil { | |||
log.Printf("%s", err) | |||
} | |||
var sAddItems []any | |||
for key, items := range keyMap { | |||
res, err := resultMap[key].BoolSlice() | |||
if err != nil { | |||
log.Printf("%s", err) | |||
continue | |||
} | |||
if len(res) != len(keyMap[key]) { | |||
continue | |||
} | |||
for i, v := range res { | |||
if v { | |||
sAddItems = append(sAddItems, items[i]) | |||
} | |||
} | |||
} | |||
dupes := wrapped - len(sAddItems) | |||
if len(sAddItems) != 0 { | |||
if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil { | |||
log.Printf("failed to sadd items for %s: %s", that.Name, err) | |||
} | |||
} | |||
if dupes > 0 { | |||
that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes)) | |||
} | |||
} | |||
} | |||
type GlobalBackfeedManager struct { | |||
Context context.Context | |||
Cancel context.CancelFunc | |||
ActiveFeeds map[string]*ProjectBackfeedManager | |||
ActiveSlugs map[string]string | |||
TrackerRedis *redis.Client | |||
BackfeedRedis *redis.ClusterClient | |||
Lock sync.RWMutex | |||
Populated *abool.AtomicBool | |||
} | |||
func (that *GlobalBackfeedManager) RefreshFeeds() error { | |||
slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result() | |||
if err != nil { | |||
return err | |||
} | |||
var projects []string | |||
projectSlugMap := map[string][]string{} | |||
for slug, project := range slugProjectMap { | |||
projectSlugMap[project] = append(projectSlugMap[project], slug) | |||
} | |||
for project := range projectSlugMap { | |||
projects = append(projects, project) | |||
} | |||
projectConfigs := map[string]ProjectConfig{} | |||
if len(projects) != 0 { | |||
cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result() | |||
if err != nil { | |||
return err | |||
} | |||
if len(projects) != len(cfgi) { | |||
return fmt.Errorf("hmget result had unexpected length") | |||
} | |||
for i, project := range projects { | |||
configString, ok := cfgi[i].(string) | |||
if !ok { | |||
continue | |||
} | |||
config := ProjectConfig{} | |||
if err := json.Unmarshal([]byte(configString), &config); err != nil { | |||
continue | |||
} | |||
projectConfigs[project] = config | |||
} | |||
} | |||
projects = nil | |||
for project := range projectSlugMap { | |||
if _, has := projectConfigs[project]; !has { | |||
delete(projectSlugMap, project) | |||
continue | |||
} | |||
projects = append(projects, project) | |||
} | |||
for slug, project := range slugProjectMap { | |||
if _, has := projectConfigs[project]; !has { | |||
delete(slugProjectMap, slug) | |||
} | |||
} | |||
// add feeds for new projects | |||
for _, project := range projects { | |||
projectConfig := projectConfigs[project] | |||
var outdatedProjectBackfeedManager *ProjectBackfeedManager | |||
if projectBackfeedManager, has := that.ActiveFeeds[project]; has { | |||
if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) { | |||
outdatedProjectBackfeedManager = projectBackfeedManager | |||
} else { | |||
continue | |||
} | |||
} | |||
ctx, cancel := context.WithCancel(that.Context) | |||
projectBackfeedManager := &ProjectBackfeedManager{ | |||
Context: ctx, | |||
Cancel: cancel, | |||
Done: make(chan bool), | |||
C: make(chan *BackfeedItem, ItemChannelBuffer), | |||
BackfeedRedis: that.BackfeedRedis, | |||
Name: project, | |||
ProjectConfig: projectConfig, | |||
} | |||
if projectConfig.RedisConfig != nil { | |||
projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{ | |||
Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port), | |||
Username: "default", | |||
Password: projectConfig.RedisConfig.Pass, | |||
ReadTimeout: 15 * time.Minute, | |||
}) | |||
} else { | |||
projectBackfeedManager.ProjectRedis = that.TrackerRedis | |||
} | |||
go projectBackfeedManager.Do() | |||
that.Lock.Lock() | |||
that.ActiveFeeds[project] = projectBackfeedManager | |||
that.Lock.Unlock() | |||
if outdatedProjectBackfeedManager != nil { | |||
outdatedProjectBackfeedManager.Cancel() | |||
<-outdatedProjectBackfeedManager.Done | |||
log.Printf("updated project: %s", project) | |||
} else { | |||
log.Printf("added project: %s", project) | |||
} | |||
} | |||
that.Lock.Lock() | |||
that.ActiveSlugs = slugProjectMap | |||
that.Lock.Unlock() | |||
// remove feeds for old projects | |||
for project, projectBackfeedManager := range that.ActiveFeeds { | |||
if _, has := projectSlugMap[project]; has { | |||
continue | |||
} | |||
log.Printf("removing project: %s", project) | |||
that.Lock.Lock() | |||
delete(that.ActiveFeeds, project) | |||
that.Lock.Unlock() | |||
projectBackfeedManager.Cancel() | |||
<-projectBackfeedManager.Done | |||
log.Printf("removed project: %s", project) | |||
} | |||
if !that.Populated.IsSet() { | |||
that.Populated.Set() | |||
} | |||
return nil | |||
} | |||
type Splitter struct { | |||
Delimiter []byte | |||
IgnoreEOF bool | |||
} | |||
func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) { | |||
for i := 0; i < len(data); i++ { | |||
if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) { | |||
return i + len(that.Delimiter), data[:i], nil | |||
} | |||
} | |||
if len(data) == 0 || !atEOF { | |||
return 0, nil, nil | |||
} | |||
if atEOF && that.IgnoreEOF { | |||
return len(data), data, nil | |||
} | |||
return 0, data, io.ErrUnexpectedEOF | |||
} | |||
func GenShardHash(b []byte) (final byte) { | |||
for i, b := range b { | |||
final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i) | |||
} | |||
return final | |||
} | |||
func WriteResponse(res http.ResponseWriter, statusCode int, v any) { | |||
res.Header().Set("Content-Type", "application/json") | |||
res.WriteHeader(statusCode) | |||
if statusCode == http.StatusNoContent { | |||
return | |||
} | |||
if err, isError := v.(error); isError { | |||
v = map[string]any{ | |||
"error": fmt.Sprintf("%v", err), | |||
"status_code": statusCode, | |||
} | |||
} else { | |||
v = map[string]any{ | |||
"data": v, | |||
"status_code": statusCode, | |||
} | |||
} | |||
json.NewEncoder(res).Encode(v) | |||
} | |||
func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager { | |||
that.Lock.RLock() | |||
defer that.Lock.RUnlock() | |||
project, has := that.ActiveSlugs[slug] | |||
if !has { | |||
return nil | |||
} | |||
projectBackfeedManager, has := that.ActiveFeeds[project] | |||
if !has { | |||
return nil | |||
} | |||
return projectBackfeedManager | |||
} | |||
type LastAccessStatsKey struct { | |||
Project string | |||
Shard string | |||
SubShard string | |||
} | |||
type LastAccessStatsEntry struct { | |||
First time.Time `json:"first"` | |||
Last time.Time `json:"last"` | |||
Size int64 `json:"size"` | |||
} | |||
type LastAccessStatsMap map[LastAccessStatsKey]*LastAccessStatsEntry | |||
func (that LastAccessStatsMap) MarshalJSON() ([]byte, error) { | |||
mapped := map[string]map[string]any{} | |||
for key, value := range that { | |||
mapped[fmt.Sprintf("%s:%s:%s", key.Project, key.Shard, key.SubShard)] = map[string]any{ | |||
"first": value.First.Format(time.RFC3339), | |||
"last": value.Last.Format(time.RFC3339), | |||
"size": value.Size, | |||
} | |||
} | |||
return json.Marshal(mapped) | |||
} | |||
func LastAccessStatsKeyFromString(s string) (LastAccessStatsKey, error) { | |||
parts := strings.SplitN(s, ":", 3) | |||
if len(parts) != 3 { | |||
return LastAccessStatsKey{}, fmt.Errorf("invalid key: %s", s) | |||
} | |||
return LastAccessStatsKey{ | |||
Project: parts[0], | |||
Shard: parts[1], | |||
SubShard: parts[2], | |||
}, nil | |||
} | |||
func (that *GlobalBackfeedManager) HandleLastAccessStats(res http.ResponseWriter, req *http.Request) { | |||
defer req.Body.Close() | |||
merge := map[string]bool{} | |||
if vv, ok := req.URL.Query()["merge"]; ok { | |||
for _, v := range vv { | |||
merge[v] = true | |||
} | |||
} | |||
lastTs, err := that.BackfeedRedis.HGetAll(req.Context(), ":last_ts").Result() | |||
if err != nil && err != redis.Nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
memoryUsages := map[string]*redis.IntCmd{} | |||
pipe := that.BackfeedRedis.Pipeline() | |||
for key := range lastTs { | |||
memoryUsages[key] = pipe.MemoryUsage(req.Context(), key) | |||
} | |||
_, err = pipe.Exec(req.Context()) | |||
if err != nil && err != redis.Nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
lastAccessStats := LastAccessStatsMap{} | |||
for key, value := range lastTs { | |||
// value is in unix timestamp format | |||
ts, err := strconv.ParseInt(value, 10, 64) | |||
if err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
lastAccessStatsKey, err := LastAccessStatsKeyFromString(key) | |||
if err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
if merge["project"] { | |||
lastAccessStatsKey.Project = "*" | |||
} | |||
if merge["shard"] { | |||
lastAccessStatsKey.Shard = "*" | |||
} | |||
if merge["sub_shard"] { | |||
lastAccessStatsKey.SubShard = "*" | |||
} | |||
parsedTs := time.Unix(ts, 0) | |||
if v, has := lastAccessStats[lastAccessStatsKey]; !has { | |||
lastAccessStats[lastAccessStatsKey] = &LastAccessStatsEntry{ | |||
First: parsedTs, | |||
Last: parsedTs, | |||
Size: memoryUsages[key].Val(), | |||
} | |||
} else { | |||
if v.First.After(parsedTs) { | |||
v.First = parsedTs | |||
} | |||
if v.Last.Before(parsedTs) { | |||
v.Last = parsedTs | |||
} | |||
v.Size += memoryUsages[key].Val() | |||
} | |||
} | |||
WriteResponse(res, http.StatusOK, lastAccessStats) | |||
} | |||
func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *http.Request) { | |||
defer req.Body.Close() | |||
vars := mux.Vars(req) | |||
slug := vars["slug"] | |||
secondaryShard := req.URL.Query().Get("shard") | |||
projectBackfeedManager := that.GetFeed(slug) | |||
if projectBackfeedManager == nil { | |||
WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel")) | |||
return | |||
} | |||
splitter := &Splitter{ | |||
Delimiter: []byte(req.URL.Query().Get("delimiter")), | |||
IgnoreEOF: req.URL.Query().Get("ignoreeof") != "", | |||
} | |||
if len(splitter.Delimiter) == 0 { | |||
splitter.Delimiter = []byte{0x00} | |||
} | |||
var body io.ReadCloser | |||
switch req.Header.Get("Content-Encoding") { | |||
case "": | |||
body = req.Body | |||
case "gzip": | |||
var err error | |||
body, err = gzip.NewReader(req.Body) | |||
if err != nil { | |||
WriteResponse(res, http.StatusBadRequest, err) | |||
return | |||
} | |||
defer body.Close() | |||
case "deflate": | |||
body = flate.NewReader(req.Body) | |||
defer body.Close() | |||
default: | |||
WriteResponse(res, http.StatusBadRequest, fmt.Errorf("unsupported Content-Encoding: %s", req.Header.Get("Content-Encoding"))) | |||
} | |||
scanner := bufio.NewScanner(body) | |||
scanner.Split(splitter.Split) | |||
n := 0 | |||
for scanner.Scan() { | |||
b := scanner.Bytes() | |||
if len(b) == 0 { | |||
continue | |||
} | |||
bcopy := make([]byte, len(b)) | |||
copy(bcopy, b) | |||
item := &BackfeedItem{ | |||
PrimaryShard: GenShardHash(bcopy), | |||
SecondaryShard: secondaryShard, | |||
Item: bcopy, | |||
} | |||
if err := projectBackfeedManager.PushItem(req.Context(), item); err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
n++ | |||
} | |||
if err := scanner.Err(); err != nil { | |||
WriteResponse(res, http.StatusBadRequest, err) | |||
return | |||
} | |||
WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n)) | |||
return | |||
} | |||
func (that *GlobalBackfeedManager) HandleHealth(res http.ResponseWriter, req *http.Request) { | |||
if that.Populated.IsNotSet() { | |||
WriteResponse(res, http.StatusServiceUnavailable, fmt.Errorf("%s", "backfeed not populated")) | |||
return | |||
} | |||
if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error { | |||
client.ClientGetName(ctx) | |||
return client.Ping(ctx).Err() | |||
}); err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("failed to ping backfeed redis: %s", err)) | |||
return | |||
} | |||
WriteResponse(res, http.StatusOK, "ok") | |||
} | |||
func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.Request) { | |||
WriteResponse(res, http.StatusOK, "pong") | |||
} | |||
type DumpChunkName struct { | |||
Key string `json:"key"` | |||
Distance int `json:"distance"` | |||
Cursor int64 `json:"cursor"` | |||
Checksum uint64 `json:"checksum"` | |||
} | |||
func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http.Request) { | |||
vars := mux.Vars(req) | |||
key := vars["key"] | |||
if key == "" { | |||
key = "*:*:*" | |||
} | |||
if strings.Count(key, ":") < 2 { | |||
WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format")) | |||
return | |||
} | |||
lock := sync.Mutex{} | |||
keys := []string{} | |||
if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error { | |||
cursor := uint64(0) | |||
var shardKeys []string | |||
for { | |||
var err error | |||
var keysBatch []string | |||
keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result() | |||
if err != nil && err != redis.Nil { | |||
return err | |||
} | |||
shardKeys = append(shardKeys, keysBatch...) | |||
if cursor == 0 { | |||
break | |||
} | |||
} | |||
lock.Lock() | |||
defer lock.Unlock() | |||
keys = append(keys, shardKeys...) | |||
return nil | |||
}); err != nil && err != redis.Nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
sort.Strings(keys) | |||
hasJsonAcceptHeader := false | |||
for _, accept := range strings.Split(req.Header.Get("Accept"), ",") { | |||
accept = strings.TrimSpace(accept) | |||
if accept == "application/json" || strings.HasPrefix(accept, "application/json;") { | |||
hasJsonAcceptHeader = true | |||
break | |||
} | |||
} | |||
if hasJsonAcceptHeader { | |||
WriteResponse(res, http.StatusOK, keys) | |||
return | |||
} | |||
if len(keys) == 0 { | |||
WriteResponse(res, http.StatusNoContent, nil) | |||
return | |||
} | |||
tarWriter := tar.NewWriter(res) | |||
defer tarWriter.Close() | |||
pipe := that.BackfeedRedis.Pipeline() | |||
for _, key := range keys { | |||
cursor := int64(0) | |||
for i := 0; ; i++ { | |||
rawResResult := pipe.Do(req.Context(), "bf.scandump", key, cursor) | |||
tsStringResult := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key) | |||
_, err := pipe.Exec(req.Context()) | |||
if err != nil && err != redis.Nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
rawRes, err := rawResResult.Result() | |||
if err != nil && err != redis.Nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
if rawRes == nil { | |||
break | |||
} | |||
resSlice, ok := rawRes.([]any) | |||
if !ok { | |||
WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response type: %T", rawRes)) | |||
return | |||
} | |||
if len(resSlice) != 2 { | |||
WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response length: %d", len(resSlice))) | |||
return | |||
} | |||
cursor, ok = resSlice[0].(int64) | |||
if !ok { | |||
WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response first element type: %T", resSlice[0])) | |||
return | |||
} | |||
chunkString, ok := resSlice[1].(string) | |||
if !ok { | |||
WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response second element type: %T", resSlice[1])) | |||
return | |||
} | |||
chunk := []byte(chunkString) | |||
lastAccess := time.Time{} | |||
tsString, err := tsStringResult.Result() | |||
if err == nil && tsString != "" { | |||
ts, err := strconv.ParseInt(tsString, 10, 64) | |||
if err == nil { | |||
lastAccess = time.Unix(ts, 0) | |||
} | |||
} | |||
nameStruct := DumpChunkName{ | |||
Key: key, | |||
Cursor: cursor, | |||
Distance: i, | |||
Checksum: crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA)), | |||
} | |||
name, err := json.Marshal(nameStruct) | |||
if err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
if err := tarWriter.WriteHeader(&tar.Header{ | |||
Typeflag: tar.TypeReg, | |||
Name: string(name), | |||
Size: int64(len(chunk)), | |||
Mode: 0600, | |||
ModTime: lastAccess, | |||
AccessTime: lastAccess, | |||
ChangeTime: lastAccess, | |||
PAXRecords: map[string]string{ | |||
"ARCHIVETEAM.bffchunk.key": key, | |||
"ARCHIVETEAM.bffchunk.cursor": fmt.Sprintf("%d", cursor), | |||
"ARCHIVETEAM.bffchunk.distance": fmt.Sprintf("%d", i), | |||
"ARCHIVETEAM.bffchunk.checksum": fmt.Sprintf("%d", nameStruct.Checksum), | |||
}, | |||
Format: tar.FormatPAX, | |||
}); err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
} | |||
if _, err := tarWriter.Write(chunk); err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
} | |||
if cursor == 0 && len(chunk) == 0 { | |||
break | |||
} | |||
} | |||
} | |||
} | |||
func (that *GlobalBackfeedManager) CancelAllFeeds() { | |||
that.Populated.UnSet() | |||
that.Cancel() | |||
for project, projectBackfeedManager := range that.ActiveFeeds { | |||
log.Printf("waiting for %s channel to shut down...", project) | |||
<-projectBackfeedManager.Done | |||
delete(that.ActiveFeeds, project) | |||
} | |||
} | |||
func main() { | |||
log.SetFlags(log.Flags() | log.Lshortfile) | |||
@@ -0,0 +1,167 @@ | |||
package main | |||
import ( | |||
"archive/tar" | |||
"context" | |||
"encoding/json" | |||
"fmt" | |||
"hash/crc64" | |||
"net/http" | |||
"sort" | |||
"strconv" | |||
"strings" | |||
"sync" | |||
"time" | |||
"github.com/go-redis/redis/v8" | |||
"github.com/gorilla/mux" | |||
) | |||
type DumpChunkName struct { | |||
Key string `json:"key"` | |||
Distance int `json:"distance"` | |||
Cursor int64 `json:"cursor"` | |||
Checksum uint64 `json:"checksum"` | |||
} | |||
func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http.Request) { | |||
vars := mux.Vars(req) | |||
key := vars["key"] | |||
if key == "" { | |||
key = "*:*:*" | |||
} | |||
if strings.Count(key, ":") < 2 { | |||
WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format")) | |||
return | |||
} | |||
lock := sync.Mutex{} | |||
keys := []string{} | |||
if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error { | |||
cursor := uint64(0) | |||
var shardKeys []string | |||
for { | |||
var err error | |||
var keysBatch []string | |||
keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result() | |||
if err != nil && err != redis.Nil { | |||
return err | |||
} | |||
shardKeys = append(shardKeys, keysBatch...) | |||
if cursor == 0 { | |||
break | |||
} | |||
} | |||
lock.Lock() | |||
defer lock.Unlock() | |||
keys = append(keys, shardKeys...) | |||
return nil | |||
}); err != nil && err != redis.Nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
sort.Strings(keys) | |||
hasJsonAcceptHeader := false | |||
for _, accept := range strings.Split(req.Header.Get("Accept"), ",") { | |||
accept = strings.TrimSpace(accept) | |||
if accept == "application/json" || strings.HasPrefix(accept, "application/json;") { | |||
hasJsonAcceptHeader = true | |||
break | |||
} | |||
} | |||
if hasJsonAcceptHeader { | |||
WriteResponse(res, http.StatusOK, keys) | |||
return | |||
} | |||
if len(keys) == 0 { | |||
WriteResponse(res, http.StatusNoContent, nil) | |||
return | |||
} | |||
tarWriter := tar.NewWriter(res) | |||
defer tarWriter.Close() | |||
pipe := that.BackfeedRedis.Pipeline() | |||
for _, key := range keys { | |||
cursor := int64(0) | |||
for i := 0; ; i++ { | |||
rawResResult := pipe.Do(req.Context(), "bf.scandump", key, cursor) | |||
tsStringResult := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key) | |||
_, err := pipe.Exec(req.Context()) | |||
if err != nil && err != redis.Nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
rawRes, err := rawResResult.Result() | |||
if err != nil && err != redis.Nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
if rawRes == nil { | |||
break | |||
} | |||
resSlice, ok := rawRes.([]any) | |||
if !ok { | |||
WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response type: %T", rawRes)) | |||
return | |||
} | |||
if len(resSlice) != 2 { | |||
WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response length: %d", len(resSlice))) | |||
return | |||
} | |||
cursor, ok = resSlice[0].(int64) | |||
if !ok { | |||
WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response first element type: %T", resSlice[0])) | |||
return | |||
} | |||
chunkString, ok := resSlice[1].(string) | |||
if !ok { | |||
WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response second element type: %T", resSlice[1])) | |||
return | |||
} | |||
chunk := []byte(chunkString) | |||
lastAccess := time.Time{} | |||
tsString, err := tsStringResult.Result() | |||
if err == nil && tsString != "" { | |||
ts, err := strconv.ParseInt(tsString, 10, 64) | |||
if err == nil { | |||
lastAccess = time.Unix(ts, 0) | |||
} | |||
} | |||
nameStruct := DumpChunkName{ | |||
Key: key, | |||
Cursor: cursor, | |||
Distance: i, | |||
Checksum: crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA)), | |||
} | |||
name, err := json.Marshal(nameStruct) | |||
if err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
return | |||
} | |||
if err := tarWriter.WriteHeader(&tar.Header{ | |||
Typeflag: tar.TypeReg, | |||
Name: string(name), | |||
Size: int64(len(chunk)), | |||
Mode: 0600, | |||
ModTime: lastAccess, | |||
AccessTime: lastAccess, | |||
ChangeTime: lastAccess, | |||
PAXRecords: map[string]string{ | |||
"ARCHIVETEAM.bffchunk.key": key, | |||
"ARCHIVETEAM.bffchunk.cursor": fmt.Sprintf("%d", cursor), | |||
"ARCHIVETEAM.bffchunk.distance": fmt.Sprintf("%d", i), | |||
"ARCHIVETEAM.bffchunk.checksum": fmt.Sprintf("%d", nameStruct.Checksum), | |||
}, | |||
Format: tar.FormatPAX, | |||
}); err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
} | |||
if _, err := tarWriter.Write(chunk); err != nil { | |||
WriteResponse(res, http.StatusInternalServerError, err) | |||
} | |||
if cursor == 0 && len(chunk) == 0 { | |||
break | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,160 @@ | |||
package main | |||
import ( | |||
"context" | |||
"fmt" | |||
"log" | |||
"time" | |||
"github.com/go-redis/redis/v8" | |||
) | |||
type ProjectBackfeedManager struct { | |||
Context context.Context | |||
Cancel context.CancelFunc | |||
Done chan bool | |||
C chan *BackfeedItem | |||
Name string | |||
BackfeedRedis *redis.ClusterClient | |||
ProjectRedis *redis.Client | |||
//Lock sync.RWMutex | |||
ProjectConfig ProjectConfig | |||
} | |||
type ProjectRedisConfig struct { | |||
Host string `json:"host"` | |||
Pass string `json:"pass"` | |||
Port int `json:"port"` | |||
} | |||
func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool { | |||
if that.ProjectConfig.RedisConfig == nil && new == nil { | |||
return false | |||
} | |||
return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new | |||
} | |||
func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error { | |||
//that.Lock.RLock() | |||
//defer that.Lock.RUnlock() | |||
//if that.C == nil { | |||
// return false | |||
//} | |||
select { | |||
case <-ctx.Done(): | |||
return ctx.Err() | |||
case <-that.Context.Done(): | |||
return fmt.Errorf("backfeed channel closed") | |||
case that.C <- item: | |||
return nil | |||
//default: | |||
// return fmt.Errorf("backfeed channel full") | |||
} | |||
} | |||
func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) { | |||
if blocking { | |||
select { | |||
case <-that.Context.Done(): | |||
return nil, false | |||
case item, ok := <-that.C: | |||
return item, ok | |||
} | |||
} else { | |||
select { | |||
case <-that.Context.Done(): | |||
return nil, false | |||
case item, ok := <-that.C: | |||
return item, ok | |||
default: | |||
return nil, false | |||
} | |||
} | |||
} | |||
func (that *ProjectBackfeedManager) Do() { | |||
defer close(that.Done) | |||
//defer that.CloseItemChannel() | |||
defer that.Cancel() | |||
pipe := that.BackfeedRedis.Pipeline() | |||
for { | |||
select { | |||
case <-that.Context.Done(): | |||
break | |||
case <-that.Done: | |||
break | |||
default: | |||
} | |||
item, ok := that.PopItem(true) | |||
if !ok { | |||
break | |||
} | |||
keyMap := map[string][][]byte{} | |||
key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) | |||
keyMap[key] = append(keyMap[key], item.Item) | |||
wrapped := 1 | |||
for wrapped < ItemWrapSize { | |||
item, ok := that.PopItem(false) | |||
if !ok { | |||
break | |||
} | |||
key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard) | |||
keyMap[key] = append(keyMap[key], item.Item) | |||
wrapped++ | |||
} | |||
select { | |||
case <-that.Context.Done(): | |||
break | |||
case <-that.Done: | |||
break | |||
default: | |||
} | |||
now := time.Now() | |||
resultMap := map[string]*redis.Cmd{} | |||
lastTS := make([]any, 0, len(keyMap)*2) | |||
for key := range keyMap { | |||
lastTS = append(lastTS, key) | |||
lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix())) | |||
} | |||
pipe.HSet(context.Background(), ":last_ts", lastTS...) | |||
for key, items := range keyMap { | |||
args := []any{ | |||
"bf.madd", | |||
key, | |||
} | |||
for _, item := range items { | |||
args = append(args, item) | |||
} | |||
resultMap[key] = pipe.Do(context.Background(), args...) | |||
} | |||
if _, err := pipe.Exec(context.Background()); err != nil { | |||
log.Printf("%s", err) | |||
} | |||
var sAddItems []any | |||
for key, items := range keyMap { | |||
res, err := resultMap[key].BoolSlice() | |||
if err != nil { | |||
log.Printf("%s", err) | |||
continue | |||
} | |||
if len(res) != len(keyMap[key]) { | |||
continue | |||
} | |||
for i, v := range res { | |||
if v { | |||
sAddItems = append(sAddItems, items[i]) | |||
} | |||
} | |||
} | |||
dupes := wrapped - len(sAddItems) | |||
if len(sAddItems) != 0 { | |||
if err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err(); err != nil { | |||
log.Printf("failed to sadd items for %s: %s", that.Name, err) | |||
} | |||
} | |||
if dupes > 0 { | |||
that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes)) | |||
} | |||
} | |||
} |
@@ -0,0 +1,26 @@ | |||
package main | |||
import ( | |||
"bytes" | |||
"io" | |||
) | |||
type Splitter struct { | |||
Delimiter []byte | |||
IgnoreEOF bool | |||
} | |||
func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) { | |||
for i := 0; i < len(data); i++ { | |||
if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) { | |||
return i + len(that.Delimiter), data[:i], nil | |||
} | |||
} | |||
if len(data) == 0 || !atEOF { | |||
return 0, nil, nil | |||
} | |||
if atEOF && that.IgnoreEOF { | |||
return len(data), data, nil | |||
} | |||
return 0, data, io.ErrUnexpectedEOF | |||
} |
@@ -0,0 +1,11 @@ | |||
package main | |||
type ProjectConfig struct { | |||
RedisConfig *ProjectRedisConfig `json:"redis,omitempty"` | |||
} | |||
type BackfeedItem struct { | |||
PrimaryShard byte | |||
SecondaryShard string | |||
Item []byte | |||
} |