You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

116 lines
2.8 KiB

  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "github.com/go-redis/redis/v8"
  10. )
  11. type LastAccessStatsKey struct {
  12. Project string
  13. Shard string
  14. SubShard string
  15. }
  16. type LastAccessStatsEntry struct {
  17. First time.Time `json:"first"`
  18. Last time.Time `json:"last"`
  19. Size int64 `json:"size"`
  20. }
  21. type LastAccessStatsMap map[LastAccessStatsKey]*LastAccessStatsEntry
  22. func (that LastAccessStatsMap) MarshalJSON() ([]byte, error) {
  23. mapped := map[string]map[string]any{}
  24. for key, value := range that {
  25. mapped[fmt.Sprintf("%s:%s:%s", key.Project, key.Shard, key.SubShard)] = map[string]any{
  26. "first": value.First.Format(time.RFC3339),
  27. "last": value.Last.Format(time.RFC3339),
  28. "size": value.Size,
  29. }
  30. }
  31. return json.Marshal(mapped)
  32. }
  33. func LastAccessStatsKeyFromString(s string) (LastAccessStatsKey, error) {
  34. parts := strings.SplitN(s, ":", 3)
  35. if len(parts) != 3 {
  36. return LastAccessStatsKey{}, fmt.Errorf("invalid key: %s", s)
  37. }
  38. return LastAccessStatsKey{
  39. Project: parts[0],
  40. Shard: parts[1],
  41. SubShard: parts[2],
  42. }, nil
  43. }
  44. func (that *GlobalBackfeedManager) HandleLastAccessStats(res http.ResponseWriter, req *http.Request) {
  45. defer req.Body.Close()
  46. merge := map[string]bool{}
  47. if vv, ok := req.URL.Query()["merge"]; ok {
  48. for _, v := range vv {
  49. merge[v] = true
  50. }
  51. }
  52. lastTs, err := that.BackfeedRedis.HGetAll(req.Context(), ":last_ts").Result()
  53. if err != nil && err != redis.Nil {
  54. WriteResponse(res, http.StatusInternalServerError, err)
  55. return
  56. }
  57. memoryUsages := map[string]*redis.IntCmd{}
  58. pipe := that.BackfeedRedis.Pipeline()
  59. for key := range lastTs {
  60. memoryUsages[key] = pipe.MemoryUsage(req.Context(), key)
  61. }
  62. _, err = pipe.Exec(req.Context())
  63. if err != nil && err != redis.Nil {
  64. WriteResponse(res, http.StatusInternalServerError, err)
  65. return
  66. }
  67. lastAccessStats := LastAccessStatsMap{}
  68. for key, value := range lastTs {
  69. // value is in unix timestamp format
  70. ts, err := strconv.ParseInt(value, 10, 64)
  71. if err != nil {
  72. WriteResponse(res, http.StatusInternalServerError, err)
  73. return
  74. }
  75. lastAccessStatsKey, err := LastAccessStatsKeyFromString(key)
  76. if err != nil {
  77. WriteResponse(res, http.StatusInternalServerError, err)
  78. return
  79. }
  80. if merge["project"] {
  81. lastAccessStatsKey.Project = "*"
  82. }
  83. if merge["shard"] {
  84. lastAccessStatsKey.Shard = "*"
  85. }
  86. if merge["sub_shard"] {
  87. lastAccessStatsKey.SubShard = "*"
  88. }
  89. parsedTs := time.Unix(ts, 0)
  90. if v, has := lastAccessStats[lastAccessStatsKey]; !has {
  91. lastAccessStats[lastAccessStatsKey] = &LastAccessStatsEntry{
  92. First: parsedTs,
  93. Last: parsedTs,
  94. Size: memoryUsages[key].Val(),
  95. }
  96. } else {
  97. if v.First.After(parsedTs) {
  98. v.First = parsedTs
  99. }
  100. if v.Last.Before(parsedTs) {
  101. v.Last = parsedTs
  102. }
  103. v.Size += memoryUsages[key].Val()
  104. }
  105. }
  106. WriteResponse(res, http.StatusOK, lastAccessStats)
  107. }