diff --git a/main.go b/main.go index 3e30cc6..b4d4874 100644 --- a/main.go +++ b/main.go @@ -81,6 +81,7 @@ func main() { r.Methods(http.MethodGet).Path("/health").HandlerFunc(globalBackfeedManager.HandleHealth) r.Methods(http.MethodGet).Path("/lastaccessstats").HandlerFunc(globalBackfeedManager.HandleLastAccessStats) r.Methods(http.MethodGet).Path("/dump/{key}").HandlerFunc(globalBackfeedManager.HandleDump) + r.Methods(http.MethodPut).Path("/load").HandlerFunc(globalBackfeedManager.HandleLoad) rMetrics := mux.NewRouter() rMetrics.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux) rMetrics.Path("/metrics").Handler(promhttp.Handler()) diff --git a/persistence.go b/persistence.go index 4cefce3..75e319c 100644 --- a/persistence.go +++ b/persistence.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "hash/crc64" + "io" "net/http" "sort" "strconv" @@ -24,6 +25,74 @@ type DumpChunkName struct { Checksum uint64 `json:"checksum"` } +func (that *GlobalBackfeedManager) HandleLoad(res http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + tarReader := tar.NewReader(req.Body) + existed := []string{} + recreate := req.URL.Query().Get("recreate") != "" + skipKeys := map[string]struct{}{} + for { + header, err := tarReader.Next() + if err != nil { + if err == io.EOF { + break + } + WriteResponse(res, http.StatusInternalServerError, err) + return + } + if header.Typeflag != tar.TypeReg { + WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] unexpected tar entry type: %d", header.Name, header.Typeflag)) + return + } + var name DumpChunkName + if err := json.Unmarshal([]byte(header.Name), &name); err != nil { + WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk name: %s", header.Name, err)) + return + } + if strings.Count(name.Key, ":") < 2 { + WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk key: %s", header.Name, name.Key)) + return + } + chunk := make([]byte, header.Size) + if _, err := io.ReadFull(tarReader, chunk); err != nil { + WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] failed to read chunk: %s", header.Name, err)) + return + } + checksum := crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA)) + if name.Checksum != checksum { + WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk checksum: %d (expected) != %d (actual)", header.Name, name.Checksum, checksum)) + return + } + if len(chunk) == 0 && name.Cursor == 0 { + continue + } + if name.Distance == 0 { + if exists, err := that.BackfeedRedis.Exists(req.Context(), name.Key).Result(); err != nil && err != redis.Nil { + WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to check key existence: %s", header.Name, err)) + return + } else if exists == 1 { + existed = append(existed, name.Key) + if recreate { + if _, err := that.BackfeedRedis.Unlink(req.Context(), name.Key).Result(); err != nil { + WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to unlink key: %s", header.Name, err)) + return + } + } else { + skipKeys[name.Key] = struct{}{} + continue + } + } + } + if _, has := skipKeys[name.Key]; has { + continue + } + if _, err := that.BackfeedRedis.Do(req.Context(), "bf.loadchunk", name.Key, name.Cursor, chunk).Result(); err != nil { + WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to import chunk: %s", header.Name, err)) + return + } + } +} + func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) key := vars["key"] @@ -79,6 +148,24 @@ func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http tarWriter := tar.NewWriter(res) defer tarWriter.Close() pipe := that.BackfeedRedis.Pipeline() + writeError := func(err error) { + errChunk := []byte(err.Error()) + if err := tarWriter.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + Name: "error", + Size: int64(len(errChunk)), + Mode: 0600, + PAXRecords: map[string]string{ + "ARCHIVETEAM.bffchunk.error": fmt.Sprintf("%s", err), + }, + Format: tar.FormatPAX, + }); err != nil { + return + } + if _, err := tarWriter.Write(errChunk); err != nil { + return + } + } for _, key := range keys { cursor := int64(0) for i := 0; ; i++ { @@ -86,12 +173,12 @@ func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http tsStringResult := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key) _, err := pipe.Exec(req.Context()) if err != nil && err != redis.Nil { - WriteResponse(res, http.StatusInternalServerError, err) + writeError(err) return } rawRes, err := rawResResult.Result() if err != nil && err != redis.Nil { - WriteResponse(res, http.StatusInternalServerError, err) + writeError(err) return } if rawRes == nil { @@ -99,21 +186,21 @@ func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http } resSlice, ok := rawRes.([]any) if !ok { - WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response type: %T", rawRes)) + writeError(fmt.Errorf("unexpected response type: %T", rawRes)) return } if len(resSlice) != 2 { - WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response length: %d", len(resSlice))) + writeError(fmt.Errorf("unexpected response length: %d", len(resSlice))) return } cursor, ok = resSlice[0].(int64) if !ok { - WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response first element type: %T", resSlice[0])) + writeError(fmt.Errorf("unexpected response first element type: %T", resSlice[0])) return } chunkString, ok := resSlice[1].(string) if !ok { - WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("unexpected response second element type: %T", resSlice[1])) + writeError(fmt.Errorf("unexpected response second element type: %T", resSlice[1])) return } chunk := []byte(chunkString) @@ -135,7 +222,7 @@ func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http } name, err := json.Marshal(nameStruct) if err != nil { - WriteResponse(res, http.StatusInternalServerError, err) + writeError(err) return } if err := tarWriter.WriteHeader(&tar.Header{ @@ -154,10 +241,12 @@ func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http }, Format: tar.FormatPAX, }); err != nil { - WriteResponse(res, http.StatusInternalServerError, err) + writeError(err) + return } if _, err := tarWriter.Write(chunk); err != nil { - WriteResponse(res, http.StatusInternalServerError, err) + writeError(err) + return } if cursor == 0 && len(chunk) == 0 { break