diff --git a/globalbackfeedmanager.go b/globalbackfeedmanager.go index 4d4b344..d7d39f5 100644 --- a/globalbackfeedmanager.go +++ b/globalbackfeedmanager.go @@ -248,6 +248,68 @@ func (that *GlobalBackfeedManager) HandlePing(res http.ResponseWriter, _ *http.R WriteResponse(res, http.StatusOK, "pong") } +func (that *GlobalBackfeedManager) HandleUnlink(res http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + key := vars["key"] + if strings.Count(key, ":") < 2 { + WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format")) + return + } + lock := sync.Mutex{} + keys := []string{} + if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error { + cursor := uint64(0) + var shardKeys []string + for { + var err error + var keysBatch []string + keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result() + if err != nil && err != redis.Nil { + return err + } + shardKeys = append(shardKeys, keysBatch...) + if cursor == 0 { + break + } + } + lock.Lock() + defer lock.Unlock() + keys = append(keys, shardKeys...) + return nil + }); err != nil && err != redis.Nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + pipe := that.BackfeedRedis.Pipeline() + for _, key := range keys { + pipe.Unlink(req.Context(), key) + } + if _, err := pipe.Exec(req.Context()); err != nil && err != redis.Nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + WriteResponse(res, http.StatusOK, keys) +} + +func (that *GlobalBackfeedManager) HandleRedisInfo(res http.ResponseWriter, req *http.Request) { + infos := map[string]string{} + lock := sync.Mutex{} + if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error { + if info, err := client.Info(ctx, "all").Result(); err != nil && err != redis.Nil { + return err + } else { + lock.Lock() + defer lock.Unlock() + infos[client.String()] = info + } + return nil + }); err != nil { + WriteResponse(res, http.StatusInternalServerError, err) + return + } + WriteResponse(res, http.StatusOK, infos) +} + func (that *GlobalBackfeedManager) CancelAllFeeds() { that.Populated.UnSet() that.Cancel() diff --git a/main.go b/main.go index b4d4874..582007b 100644 --- a/main.go +++ b/main.go @@ -79,9 +79,11 @@ func main() { r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.HandleLegacy) r.Methods(http.MethodGet).Path("/ping").HandlerFunc(globalBackfeedManager.HandlePing) r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth) - r.Methods(http.MethodGet).Path("/lastaccessstats").HandlerFunc(globalBackfeedManager.HandleLastAccessStats) - r.Methods(http.MethodGet).Path("/dump/{key}").HandlerFunc(globalBackfeedManager.HandleDump) - r.Methods(http.MethodPut).Path("/load").HandlerFunc(globalBackfeedManager.HandleLoad) + r.Methods(http.MethodGet).Path("/manage/lastaccessstats").HandlerFunc(globalBackfeedManager.HandleLastAccessStats) + r.Methods(http.MethodGet).Path("/manage/dump/{key}").HandlerFunc(globalBackfeedManager.HandleDump) + r.Methods(http.MethodPut).Path("/manage/load").HandlerFunc(globalBackfeedManager.HandleLoad) + r.Methods(http.MethodDelete).Path("/manage/unlink/{key}").HandlerFunc(globalBackfeedManager.HandleUnlink) + r.Methods(http.MethodGet).Path("/manage/redisinfo").HandlerFunc(globalBackfeedManager.HandleRedisInfo) rMetrics := mux.NewRouter() rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux) rMetrics.Path("/metrics").Handler(promhttp.Handler()) diff --git a/persistence.go b/persistence.go index 75e319c..1eaa94b 100644 --- a/persistence.go +++ b/persistence.go @@ -29,6 +29,7 @@ func (that *GlobalBackfeedManager) HandleLoad(res http.ResponseWriter, req *http defer req.Body.Close() tarReader := tar.NewReader(req.Body) existed := []string{} + imported := []string{} recreate := req.URL.Query().Get("recreate") != "" skipKeys := map[string]struct{}{} for { @@ -90,7 +91,14 @@ func (that *GlobalBackfeedManager) HandleLoad(res http.ResponseWriter, req *http WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to import chunk: %s", header.Name, err)) return } + if name.Distance == 0 { + imported = append(imported, name.Key) + } } + WriteResponse(res, http.StatusOK, map[string]any{ + "existed": existed, + "imported": imported, + }) } func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http.Request) {