You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

269 lines
7.4 KiB

  1. package main
  2. import (
  3. "archive/tar"
  4. "bufio"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "hash/crc64"
  9. "io"
  10. "net/http"
  11. "sort"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/go-redis/redis/v8"
  17. "github.com/gorilla/mux"
  18. )
  19. type DumpChunkName struct {
  20. Key string `json:"key"`
  21. Distance int `json:"distance"`
  22. Cursor int64 `json:"cursor"`
  23. Checksum uint64 `json:"checksum"`
  24. }
  25. func (that *GlobalBackfeedManager) HandleLoad(res http.ResponseWriter, req *http.Request) {
  26. defer req.Body.Close()
  27. reqBuffer := bufio.NewReader(req.Body)
  28. tarReader := tar.NewReader(reqBuffer)
  29. existed := []string{}
  30. imported := []string{}
  31. recreate := req.URL.Query().Get("recreate") != ""
  32. skipKeys := map[string]struct{}{}
  33. for {
  34. header, err := tarReader.Next()
  35. if err != nil {
  36. if err == io.EOF {
  37. break
  38. }
  39. WriteResponse(res, http.StatusInternalServerError, err)
  40. return
  41. }
  42. if header.Typeflag != tar.TypeReg {
  43. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] unexpected tar entry type: %d", header.Name, header.Typeflag))
  44. return
  45. }
  46. var name DumpChunkName
  47. if err := json.Unmarshal([]byte(header.Name), &name); err != nil {
  48. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk name: %s", header.Name, err))
  49. return
  50. }
  51. if strings.Count(name.Key, ":") < 2 {
  52. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk key: %s", header.Name, name.Key))
  53. return
  54. }
  55. chunk := make([]byte, header.Size)
  56. if _, err := io.ReadFull(tarReader, chunk); err != nil {
  57. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] failed to read chunk: %s", header.Name, err))
  58. return
  59. }
  60. checksum := crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA))
  61. if name.Checksum != checksum {
  62. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk checksum: %d (expected) != %d (actual)", header.Name, name.Checksum, checksum))
  63. return
  64. }
  65. if len(chunk) == 0 && name.Cursor == 0 {
  66. continue
  67. }
  68. if name.Distance == 0 {
  69. if exists, err := that.BackfeedRedis.Exists(req.Context(), name.Key).Result(); err != nil && err != redis.Nil {
  70. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to check key existence: %s", header.Name, err))
  71. return
  72. } else if exists == 1 {
  73. existed = append(existed, name.Key)
  74. if recreate {
  75. if _, err := that.BackfeedRedis.Unlink(req.Context(), name.Key).Result(); err != nil {
  76. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to unlink key: %s", header.Name, err))
  77. return
  78. }
  79. } else {
  80. skipKeys[name.Key] = struct{}{}
  81. continue
  82. }
  83. }
  84. }
  85. if _, has := skipKeys[name.Key]; has {
  86. continue
  87. }
  88. if _, err := that.BackfeedRedis.Do(req.Context(), "bf.loadchunk", name.Key, name.Cursor, chunk).Result(); err != nil {
  89. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to import chunk: %s", header.Name, err))
  90. return
  91. }
  92. if name.Distance == 0 {
  93. imported = append(imported, name.Key)
  94. }
  95. }
  96. WriteResponse(res, http.StatusOK, map[string]any{
  97. "existed": existed,
  98. "imported": imported,
  99. })
  100. }
  101. func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http.Request) {
  102. vars := mux.Vars(req)
  103. key := vars["key"]
  104. if key == "" {
  105. key = "*:*:*"
  106. }
  107. if strings.Count(key, ":") < 2 {
  108. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format"))
  109. return
  110. }
  111. lock := sync.Mutex{}
  112. keys := []string{}
  113. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  114. cursor := uint64(0)
  115. var shardKeys []string
  116. for {
  117. var err error
  118. var keysBatch []string
  119. keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result()
  120. if err != nil && err != redis.Nil {
  121. return err
  122. }
  123. shardKeys = append(shardKeys, keysBatch...)
  124. if cursor == 0 {
  125. break
  126. }
  127. }
  128. lock.Lock()
  129. defer lock.Unlock()
  130. keys = append(keys, shardKeys...)
  131. return nil
  132. }); err != nil && err != redis.Nil {
  133. WriteResponse(res, http.StatusInternalServerError, err)
  134. return
  135. }
  136. sort.Strings(keys)
  137. hasJsonAcceptHeader := false
  138. for _, accept := range strings.Split(req.Header.Get("Accept"), ",") {
  139. accept = strings.TrimSpace(accept)
  140. if accept == "application/json" || strings.HasPrefix(accept, "application/json;") {
  141. hasJsonAcceptHeader = true
  142. break
  143. }
  144. }
  145. if hasJsonAcceptHeader {
  146. WriteResponse(res, http.StatusOK, keys)
  147. return
  148. }
  149. if len(keys) == 0 {
  150. WriteResponse(res, http.StatusNoContent, nil)
  151. return
  152. }
  153. resBuf := bufio.NewWriterSize(res, 16*1024*1024)
  154. defer resBuf.Flush()
  155. tarWriter := tar.NewWriter(resBuf)
  156. defer tarWriter.Close()
  157. pipe := that.BackfeedRedis.Pipeline()
  158. writeError := func(err error) {
  159. errChunk := []byte(err.Error())
  160. if err := tarWriter.WriteHeader(&tar.Header{
  161. Typeflag: tar.TypeReg,
  162. Name: "error",
  163. Size: int64(len(errChunk)),
  164. Mode: 0600,
  165. PAXRecords: map[string]string{
  166. "ARCHIVETEAM.bffchunk.error": fmt.Sprintf("%s", err),
  167. },
  168. Format: tar.FormatPAX,
  169. }); err != nil {
  170. return
  171. }
  172. if _, err := tarWriter.Write(errChunk); err != nil {
  173. return
  174. }
  175. }
  176. for _, key := range keys {
  177. cursor := int64(0)
  178. for i := 0; ; i++ {
  179. rawResResult := pipe.Do(req.Context(), "bf.scandump", key, cursor)
  180. tsStringResult := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key)
  181. _, err := pipe.Exec(req.Context())
  182. if err != nil && err != redis.Nil {
  183. writeError(err)
  184. return
  185. }
  186. rawRes, err := rawResResult.Result()
  187. if err != nil && err != redis.Nil {
  188. writeError(err)
  189. return
  190. }
  191. if rawRes == nil {
  192. break
  193. }
  194. resSlice, ok := rawRes.([]any)
  195. if !ok {
  196. writeError(fmt.Errorf("unexpected response type: %T", rawRes))
  197. return
  198. }
  199. if len(resSlice) != 2 {
  200. writeError(fmt.Errorf("unexpected response length: %d", len(resSlice)))
  201. return
  202. }
  203. cursor, ok = resSlice[0].(int64)
  204. if !ok {
  205. writeError(fmt.Errorf("unexpected response first element type: %T", resSlice[0]))
  206. return
  207. }
  208. chunkString, ok := resSlice[1].(string)
  209. if !ok {
  210. writeError(fmt.Errorf("unexpected response second element type: %T", resSlice[1]))
  211. return
  212. }
  213. chunk := []byte(chunkString)
  214. lastAccess := time.Time{}
  215. tsString, err := tsStringResult.Result()
  216. if err == nil && tsString != "" {
  217. ts, err := strconv.ParseInt(tsString, 10, 64)
  218. if err == nil {
  219. lastAccess = time.Unix(ts, 0)
  220. }
  221. }
  222. nameStruct := DumpChunkName{
  223. Key: key,
  224. Cursor: cursor,
  225. Distance: i,
  226. Checksum: crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA)),
  227. }
  228. name, err := json.Marshal(nameStruct)
  229. if err != nil {
  230. writeError(err)
  231. return
  232. }
  233. if err := tarWriter.WriteHeader(&tar.Header{
  234. Typeflag: tar.TypeReg,
  235. Name: string(name),
  236. Size: int64(len(chunk)),
  237. Mode: 0600,
  238. ModTime: lastAccess,
  239. AccessTime: lastAccess,
  240. ChangeTime: lastAccess,
  241. PAXRecords: map[string]string{
  242. "ARCHIVETEAM.bffchunk.key": key,
  243. "ARCHIVETEAM.bffchunk.cursor": fmt.Sprintf("%d", cursor),
  244. "ARCHIVETEAM.bffchunk.distance": fmt.Sprintf("%d", i),
  245. "ARCHIVETEAM.bffchunk.checksum": fmt.Sprintf("%d", nameStruct.Checksum),
  246. },
  247. Format: tar.FormatPAX,
  248. }); err != nil {
  249. writeError(err)
  250. return
  251. }
  252. if _, err := tarWriter.Write(chunk); err != nil {
  253. writeError(err)
  254. return
  255. }
  256. if cursor == 0 && len(chunk) == 0 {
  257. break
  258. }
  259. }
  260. }
  261. }