package main import ( "archive/tar" "bufio" "context" "encoding/json" "fmt" "hash/crc64" "io" "net/http" "sort" "strconv" "strings" "sync" "time" "github.com/go-redis/redis/v8" "github.com/gorilla/mux" ) type DumpChunkName struct { Key string `json:"key"` Distance int `json:"distance"` Cursor int64 `json:"cursor"` Checksum uint64 `json:"checksum"` } func (that *GlobalBackfeedManager) HandleLoad(res http.ResponseWriter, req *http.Request) { defer req.Body.Close() reqBuffer := bufio.NewReader(req.Body) tarReader := tar.NewReader(reqBuffer) existed := []string{} imported := []string{} recreate := req.URL.Query().Get("recreate") != "" skipKeys := map[string]struct{}{} for { header, err := tarReader.Next() if err != nil { if err == io.EOF { break } WriteResponse(res, http.StatusInternalServerError, err) return } if header.Typeflag != tar.TypeReg { WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] unexpected tar entry type: %d", header.Name, header.Typeflag)) return } var name DumpChunkName if err := json.Unmarshal([]byte(header.Name), &name); err != nil { WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk name: %s", header.Name, err)) return } if strings.Count(name.Key, ":") < 2 { WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk key: %s", header.Name, name.Key)) return } chunk := make([]byte, header.Size) if _, err := io.ReadFull(tarReader, chunk); err != nil { WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] failed to read chunk: %s", header.Name, err)) return } checksum := crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA)) if name.Checksum != checksum { WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk checksum: %d (expected) != %d (actual)", header.Name, name.Checksum, checksum)) return } if len(chunk) == 0 && name.Cursor == 0 { continue } if name.Distance == 0 { if exists, err := that.BackfeedRedis.Exists(req.Context(), name.Key).Result(); err != nil && err != redis.Nil { WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to check key existence: %s", header.Name, err)) return } else if exists == 1 { existed = append(existed, name.Key) if recreate { if _, err := that.BackfeedRedis.Unlink(req.Context(), name.Key).Result(); err != nil { WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to unlink key: %s", header.Name, err)) return } } else { skipKeys[name.Key] = struct{}{} continue } } } if _, has := skipKeys[name.Key]; has { continue } if _, err := that.BackfeedRedis.Do(req.Context(), "bf.loadchunk", name.Key, name.Cursor, chunk).Result(); err != nil { WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to import chunk: %s", header.Name, err)) return } if name.Distance == 0 { imported = append(imported, name.Key) } } WriteResponse(res, http.StatusOK, map[string]any{ "existed": existed, "imported": imported, }) } func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) key := vars["key"] if key == "" { key = "*:*:*" } if strings.Count(key, ":") < 2 { WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format")) return } lock := sync.Mutex{} keys := []string{} if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error { cursor := uint64(0) var shardKeys []string for { var err error var keysBatch []string keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result() if err != nil && err != redis.Nil { return err } shardKeys = append(shardKeys, keysBatch...) if cursor == 0 { break } } lock.Lock() defer lock.Unlock() keys = append(keys, shardKeys...) return nil }); err != nil && err != redis.Nil { WriteResponse(res, http.StatusInternalServerError, err) return } sort.Strings(keys) hasJsonAcceptHeader := false for _, accept := range strings.Split(req.Header.Get("Accept"), ",") { accept = strings.TrimSpace(accept) if accept == "application/json" || strings.HasPrefix(accept, "application/json;") { hasJsonAcceptHeader = true break } } if hasJsonAcceptHeader { WriteResponse(res, http.StatusOK, keys) return } if len(keys) == 0 { WriteResponse(res, http.StatusNoContent, nil) return } resBuf := bufio.NewWriterSize(res, 16*1024*1024) defer resBuf.Flush() tarWriter := tar.NewWriter(resBuf) defer tarWriter.Close() pipe := that.BackfeedRedis.Pipeline() writeError := func(err error) { errChunk := []byte(err.Error()) if err := tarWriter.WriteHeader(&tar.Header{ Typeflag: tar.TypeReg, Name: "error", Size: int64(len(errChunk)), Mode: 0600, PAXRecords: map[string]string{ "ARCHIVETEAM.bffchunk.error": fmt.Sprintf("%s", err), }, Format: tar.FormatPAX, }); err != nil { return } if _, err := tarWriter.Write(errChunk); err != nil { return } } for _, key := range keys { cursor := int64(0) for i := 0; ; i++ { rawResResult := pipe.Do(req.Context(), "bf.scandump", key, cursor) tsStringResult := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key) _, err := pipe.Exec(req.Context()) if err != nil && err != redis.Nil { writeError(err) return } rawRes, err := rawResResult.Result() if err != nil && err != redis.Nil { writeError(err) return } if rawRes == nil { break } resSlice, ok := rawRes.([]any) if !ok { writeError(fmt.Errorf("unexpected response type: %T", rawRes)) return } if len(resSlice) != 2 { writeError(fmt.Errorf("unexpected response length: %d", len(resSlice))) return } cursor, ok = resSlice[0].(int64) if !ok { writeError(fmt.Errorf("unexpected response first element type: %T", resSlice[0])) return } chunkString, ok := resSlice[1].(string) if !ok { writeError(fmt.Errorf("unexpected response second element type: %T", resSlice[1])) return } chunk := []byte(chunkString) lastAccess := time.Time{} tsString, err := tsStringResult.Result() if err == nil && tsString != "" { ts, err := strconv.ParseInt(tsString, 10, 64) if err == nil { lastAccess = time.Unix(ts, 0) } } nameStruct := DumpChunkName{ Key: key, Cursor: cursor, Distance: i, Checksum: crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA)), } name, err := json.Marshal(nameStruct) if err != nil { writeError(err) return } if err := tarWriter.WriteHeader(&tar.Header{ Typeflag: tar.TypeReg, Name: string(name), Size: int64(len(chunk)), Mode: 0600, ModTime: lastAccess, AccessTime: lastAccess, ChangeTime: lastAccess, PAXRecords: map[string]string{ "ARCHIVETEAM.bffchunk.key": key, "ARCHIVETEAM.bffchunk.cursor": fmt.Sprintf("%d", cursor), "ARCHIVETEAM.bffchunk.distance": fmt.Sprintf("%d", i), "ARCHIVETEAM.bffchunk.checksum": fmt.Sprintf("%d", nameStruct.Checksum), }, Format: tar.FormatPAX, }); err != nil { writeError(err) return } if _, err := tarWriter.Write(chunk); err != nil { writeError(err) return } if cursor == 0 && len(chunk) == 0 { break } } } }