|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- package main
-
- import (
- "archive/tar"
- "bufio"
- "context"
- "encoding/json"
- "fmt"
- "hash/crc64"
- "io"
- "net/http"
- "sort"
- "strconv"
- "strings"
- "sync"
- "time"
-
- "github.com/go-redis/redis/v8"
- "github.com/gorilla/mux"
- )
-
- type DumpChunkName struct {
- Key string `json:"key"`
- Distance int `json:"distance"`
- Cursor int64 `json:"cursor"`
- Checksum uint64 `json:"checksum"`
- }
-
- func (that *GlobalBackfeedManager) HandleLoad(res http.ResponseWriter, req *http.Request) {
- defer req.Body.Close()
- reqBuffer := bufio.NewReader(req.Body)
- tarReader := tar.NewReader(reqBuffer)
- existed := []string{}
- imported := []string{}
- recreate := req.URL.Query().Get("recreate") != ""
- skipKeys := map[string]struct{}{}
- for {
- header, err := tarReader.Next()
- if err != nil {
- if err == io.EOF {
- break
- }
- WriteResponse(res, http.StatusInternalServerError, err)
- return
- }
- if header.Typeflag != tar.TypeReg {
- WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] unexpected tar entry type: %d", header.Name, header.Typeflag))
- return
- }
- var name DumpChunkName
- if err := json.Unmarshal([]byte(header.Name), &name); err != nil {
- WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk name: %s", header.Name, err))
- return
- }
- if strings.Count(name.Key, ":") < 2 {
- WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk key: %s", header.Name, name.Key))
- return
- }
- chunk := make([]byte, header.Size)
- if _, err := io.ReadFull(tarReader, chunk); err != nil {
- WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] failed to read chunk: %s", header.Name, err))
- return
- }
- checksum := crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA))
- if name.Checksum != checksum {
- WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk checksum: %d (expected) != %d (actual)", header.Name, name.Checksum, checksum))
- return
- }
- if len(chunk) == 0 && name.Cursor == 0 {
- continue
- }
- if name.Distance == 0 {
- if exists, err := that.BackfeedRedis.Exists(req.Context(), name.Key).Result(); err != nil && err != redis.Nil {
- WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to check key existence: %s", header.Name, err))
- return
- } else if exists == 1 {
- existed = append(existed, name.Key)
- if recreate {
- if _, err := that.BackfeedRedis.Unlink(req.Context(), name.Key).Result(); err != nil {
- WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to unlink key: %s", header.Name, err))
- return
- }
- } else {
- skipKeys[name.Key] = struct{}{}
- continue
- }
- }
- }
- if _, has := skipKeys[name.Key]; has {
- continue
- }
- if _, err := that.BackfeedRedis.Do(req.Context(), "bf.loadchunk", name.Key, name.Cursor, chunk).Result(); err != nil {
- WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to import chunk: %s", header.Name, err))
- return
- }
- if name.Distance == 0 {
- imported = append(imported, name.Key)
- }
- }
- WriteResponse(res, http.StatusOK, map[string]any{
- "existed": existed,
- "imported": imported,
- })
- }
-
- func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http.Request) {
- vars := mux.Vars(req)
- key := vars["key"]
- if key == "" {
- key = "*:*:*"
- }
- if strings.Count(key, ":") < 2 {
- WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format"))
- return
- }
- lock := sync.Mutex{}
- keys := []string{}
- if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
- cursor := uint64(0)
- var shardKeys []string
- for {
- var err error
- var keysBatch []string
- keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result()
- if err != nil && err != redis.Nil {
- return err
- }
- shardKeys = append(shardKeys, keysBatch...)
- if cursor == 0 {
- break
- }
- }
- lock.Lock()
- defer lock.Unlock()
- keys = append(keys, shardKeys...)
- return nil
- }); err != nil && err != redis.Nil {
- WriteResponse(res, http.StatusInternalServerError, err)
- return
- }
- sort.Strings(keys)
- hasJsonAcceptHeader := false
- for _, accept := range strings.Split(req.Header.Get("Accept"), ",") {
- accept = strings.TrimSpace(accept)
- if accept == "application/json" || strings.HasPrefix(accept, "application/json;") {
- hasJsonAcceptHeader = true
- break
- }
- }
- if hasJsonAcceptHeader {
- WriteResponse(res, http.StatusOK, keys)
- return
- }
- if len(keys) == 0 {
- WriteResponse(res, http.StatusNoContent, nil)
- return
- }
- resBuf := bufio.NewWriterSize(res, 16*1024*1024)
- defer resBuf.Flush()
- tarWriter := tar.NewWriter(resBuf)
- defer tarWriter.Close()
- pipe := that.BackfeedRedis.Pipeline()
- writeError := func(err error) {
- errChunk := []byte(err.Error())
- if err := tarWriter.WriteHeader(&tar.Header{
- Typeflag: tar.TypeReg,
- Name: "error",
- Size: int64(len(errChunk)),
- Mode: 0600,
- PAXRecords: map[string]string{
- "ARCHIVETEAM.bffchunk.error": fmt.Sprintf("%s", err),
- },
- Format: tar.FormatPAX,
- }); err != nil {
- return
- }
- if _, err := tarWriter.Write(errChunk); err != nil {
- return
- }
- }
- for _, key := range keys {
- cursor := int64(0)
- for i := 0; ; i++ {
- rawResResult := pipe.Do(req.Context(), "bf.scandump", key, cursor)
- tsStringResult := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key)
- _, err := pipe.Exec(req.Context())
- if err != nil && err != redis.Nil {
- writeError(err)
- return
- }
- rawRes, err := rawResResult.Result()
- if err != nil && err != redis.Nil {
- writeError(err)
- return
- }
- if rawRes == nil {
- break
- }
- resSlice, ok := rawRes.([]any)
- if !ok {
- writeError(fmt.Errorf("unexpected response type: %T", rawRes))
- return
- }
- if len(resSlice) != 2 {
- writeError(fmt.Errorf("unexpected response length: %d", len(resSlice)))
- return
- }
- cursor, ok = resSlice[0].(int64)
- if !ok {
- writeError(fmt.Errorf("unexpected response first element type: %T", resSlice[0]))
- return
- }
- chunkString, ok := resSlice[1].(string)
- if !ok {
- writeError(fmt.Errorf("unexpected response second element type: %T", resSlice[1]))
- return
- }
- chunk := []byte(chunkString)
-
- lastAccess := time.Time{}
- tsString, err := tsStringResult.Result()
- if err == nil && tsString != "" {
- ts, err := strconv.ParseInt(tsString, 10, 64)
- if err == nil {
- lastAccess = time.Unix(ts, 0)
- }
- }
-
- nameStruct := DumpChunkName{
- Key: key,
- Cursor: cursor,
- Distance: i,
- Checksum: crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA)),
- }
- name, err := json.Marshal(nameStruct)
- if err != nil {
- writeError(err)
- return
- }
- if err := tarWriter.WriteHeader(&tar.Header{
- Typeflag: tar.TypeReg,
- Name: string(name),
- Size: int64(len(chunk)),
- Mode: 0600,
- ModTime: lastAccess,
- AccessTime: lastAccess,
- ChangeTime: lastAccess,
- PAXRecords: map[string]string{
- "ARCHIVETEAM.bffchunk.key": key,
- "ARCHIVETEAM.bffchunk.cursor": fmt.Sprintf("%d", cursor),
- "ARCHIVETEAM.bffchunk.distance": fmt.Sprintf("%d", i),
- "ARCHIVETEAM.bffchunk.checksum": fmt.Sprintf("%d", nameStruct.Checksum),
- },
- Format: tar.FormatPAX,
- }); err != nil {
- writeError(err)
- return
- }
- if _, err := tarWriter.Write(chunk); err != nil {
- writeError(err)
- return
- }
- if cursor == 0 && len(chunk) == 0 {
- break
- }
- }
- }
- }
|