package main import ( "encoding/json" "fmt" "net/http" "strconv" "strings" "time" "github.com/go-redis/redis/v8" ) type LastAccessStatsKey struct { Project string Shard string SubShard string } type LastAccessStatsEntry struct { First time.Time `json:"first"` Last time.Time `json:"last"` Size int64 `json:"size"` } type LastAccessStatsMap map[LastAccessStatsKey]*LastAccessStatsEntry func (that LastAccessStatsMap) MarshalJSON() ([]byte, error) { mapped := map[string]map[string]any{} for key, value := range that { mapped[fmt.Sprintf("%s:%s:%s", key.Project, key.Shard, key.SubShard)] = map[string]any{ "first": value.First.Format(time.RFC3339), "last": value.Last.Format(time.RFC3339), "size": value.Size, } } return json.Marshal(mapped) } func LastAccessStatsKeyFromString(s string) (LastAccessStatsKey, error) { parts := strings.SplitN(s, ":", 3) if len(parts) != 3 { return LastAccessStatsKey{}, fmt.Errorf("invalid key: %s", s) } return LastAccessStatsKey{ Project: parts[0], Shard: parts[1], SubShard: parts[2], }, nil } func (that *GlobalBackfeedManager) HandleLastAccessStats(res http.ResponseWriter, req *http.Request) { defer req.Body.Close() merge := map[string]bool{} if vv, ok := req.URL.Query()["merge"]; ok { for _, v := range vv { merge[v] = true } } lastTs, err := that.BackfeedRedis.HGetAll(req.Context(), ":last_ts").Result() if err != nil && err != redis.Nil { WriteResponse(res, http.StatusInternalServerError, err) return } memoryUsages := map[string]*redis.IntCmd{} pipe := that.BackfeedRedis.Pipeline() for key := range lastTs { memoryUsages[key] = pipe.MemoryUsage(req.Context(), key) } _, err = pipe.Exec(req.Context()) if err != nil && err != redis.Nil { WriteResponse(res, http.StatusInternalServerError, err) return } lastAccessStats := LastAccessStatsMap{} for key, value := range lastTs { // value is in unix timestamp format ts, err := strconv.ParseInt(value, 10, 64) if err != nil { WriteResponse(res, http.StatusInternalServerError, err) return } lastAccessStatsKey, err := LastAccessStatsKeyFromString(key) if err != nil { WriteResponse(res, http.StatusInternalServerError, err) return } if merge["project"] { lastAccessStatsKey.Project = "*" } if merge["shard"] { lastAccessStatsKey.Shard = "*" } if merge["sub_shard"] { lastAccessStatsKey.SubShard = "*" } parsedTs := time.Unix(ts, 0) if v, has := lastAccessStats[lastAccessStatsKey]; !has { lastAccessStats[lastAccessStatsKey] = &LastAccessStatsEntry{ First: parsedTs, Last: parsedTs, Size: memoryUsages[key].Val(), } } else { if v.First.After(parsedTs) { v.First = parsedTs } if v.Last.Before(parsedTs) { v.Last = parsedTs } v.Size += memoryUsages[key].Val() } } WriteResponse(res, http.StatusOK, lastAccessStats) }