You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

265 lines
7.3 KiB

  1. package main
  2. import (
  3. "archive/tar"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "hash/crc64"
  8. "io"
  9. "net/http"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/go-redis/redis/v8"
  16. "github.com/gorilla/mux"
  17. )
  18. type DumpChunkName struct {
  19. Key string `json:"key"`
  20. Distance int `json:"distance"`
  21. Cursor int64 `json:"cursor"`
  22. Checksum uint64 `json:"checksum"`
  23. }
  24. func (that *GlobalBackfeedManager) HandleLoad(res http.ResponseWriter, req *http.Request) {
  25. defer req.Body.Close()
  26. tarReader := tar.NewReader(req.Body)
  27. existed := []string{}
  28. imported := []string{}
  29. recreate := req.URL.Query().Get("recreate") != ""
  30. skipKeys := map[string]struct{}{}
  31. for {
  32. header, err := tarReader.Next()
  33. if err != nil {
  34. if err == io.EOF {
  35. break
  36. }
  37. WriteResponse(res, http.StatusInternalServerError, err)
  38. return
  39. }
  40. if header.Typeflag != tar.TypeReg {
  41. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] unexpected tar entry type: %d", header.Name, header.Typeflag))
  42. return
  43. }
  44. var name DumpChunkName
  45. if err := json.Unmarshal([]byte(header.Name), &name); err != nil {
  46. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk name: %s", header.Name, err))
  47. return
  48. }
  49. if strings.Count(name.Key, ":") < 2 {
  50. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk key: %s", header.Name, name.Key))
  51. return
  52. }
  53. chunk := make([]byte, header.Size)
  54. if _, err := io.ReadFull(tarReader, chunk); err != nil {
  55. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] failed to read chunk: %s", header.Name, err))
  56. return
  57. }
  58. checksum := crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA))
  59. if name.Checksum != checksum {
  60. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk checksum: %d (expected) != %d (actual)", header.Name, name.Checksum, checksum))
  61. return
  62. }
  63. if len(chunk) == 0 && name.Cursor == 0 {
  64. continue
  65. }
  66. if name.Distance == 0 {
  67. if exists, err := that.BackfeedRedis.Exists(req.Context(), name.Key).Result(); err != nil && err != redis.Nil {
  68. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to check key existence: %s", header.Name, err))
  69. return
  70. } else if exists == 1 {
  71. existed = append(existed, name.Key)
  72. if recreate {
  73. if _, err := that.BackfeedRedis.Unlink(req.Context(), name.Key).Result(); err != nil {
  74. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to unlink key: %s", header.Name, err))
  75. return
  76. }
  77. } else {
  78. skipKeys[name.Key] = struct{}{}
  79. continue
  80. }
  81. }
  82. }
  83. if _, has := skipKeys[name.Key]; has {
  84. continue
  85. }
  86. if _, err := that.BackfeedRedis.Do(req.Context(), "bf.loadchunk", name.Key, name.Cursor, chunk).Result(); err != nil {
  87. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to import chunk: %s", header.Name, err))
  88. return
  89. }
  90. if name.Distance == 0 {
  91. imported = append(imported, name.Key)
  92. }
  93. }
  94. WriteResponse(res, http.StatusOK, map[string]any{
  95. "existed": existed,
  96. "imported": imported,
  97. })
  98. }
  99. func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http.Request) {
  100. vars := mux.Vars(req)
  101. key := vars["key"]
  102. if key == "" {
  103. key = "*:*:*"
  104. }
  105. if strings.Count(key, ":") < 2 {
  106. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format"))
  107. return
  108. }
  109. lock := sync.Mutex{}
  110. keys := []string{}
  111. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  112. cursor := uint64(0)
  113. var shardKeys []string
  114. for {
  115. var err error
  116. var keysBatch []string
  117. keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result()
  118. if err != nil && err != redis.Nil {
  119. return err
  120. }
  121. shardKeys = append(shardKeys, keysBatch...)
  122. if cursor == 0 {
  123. break
  124. }
  125. }
  126. lock.Lock()
  127. defer lock.Unlock()
  128. keys = append(keys, shardKeys...)
  129. return nil
  130. }); err != nil && err != redis.Nil {
  131. WriteResponse(res, http.StatusInternalServerError, err)
  132. return
  133. }
  134. sort.Strings(keys)
  135. hasJsonAcceptHeader := false
  136. for _, accept := range strings.Split(req.Header.Get("Accept"), ",") {
  137. accept = strings.TrimSpace(accept)
  138. if accept == "application/json" || strings.HasPrefix(accept, "application/json;") {
  139. hasJsonAcceptHeader = true
  140. break
  141. }
  142. }
  143. if hasJsonAcceptHeader {
  144. WriteResponse(res, http.StatusOK, keys)
  145. return
  146. }
  147. if len(keys) == 0 {
  148. WriteResponse(res, http.StatusNoContent, nil)
  149. return
  150. }
  151. tarWriter := tar.NewWriter(res)
  152. defer tarWriter.Close()
  153. pipe := that.BackfeedRedis.Pipeline()
  154. writeError := func(err error) {
  155. errChunk := []byte(err.Error())
  156. if err := tarWriter.WriteHeader(&tar.Header{
  157. Typeflag: tar.TypeReg,
  158. Name: "error",
  159. Size: int64(len(errChunk)),
  160. Mode: 0600,
  161. PAXRecords: map[string]string{
  162. "ARCHIVETEAM.bffchunk.error": fmt.Sprintf("%s", err),
  163. },
  164. Format: tar.FormatPAX,
  165. }); err != nil {
  166. return
  167. }
  168. if _, err := tarWriter.Write(errChunk); err != nil {
  169. return
  170. }
  171. }
  172. for _, key := range keys {
  173. cursor := int64(0)
  174. for i := 0; ; i++ {
  175. rawResResult := pipe.Do(req.Context(), "bf.scandump", key, cursor)
  176. tsStringResult := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key)
  177. _, err := pipe.Exec(req.Context())
  178. if err != nil && err != redis.Nil {
  179. writeError(err)
  180. return
  181. }
  182. rawRes, err := rawResResult.Result()
  183. if err != nil && err != redis.Nil {
  184. writeError(err)
  185. return
  186. }
  187. if rawRes == nil {
  188. break
  189. }
  190. resSlice, ok := rawRes.([]any)
  191. if !ok {
  192. writeError(fmt.Errorf("unexpected response type: %T", rawRes))
  193. return
  194. }
  195. if len(resSlice) != 2 {
  196. writeError(fmt.Errorf("unexpected response length: %d", len(resSlice)))
  197. return
  198. }
  199. cursor, ok = resSlice[0].(int64)
  200. if !ok {
  201. writeError(fmt.Errorf("unexpected response first element type: %T", resSlice[0]))
  202. return
  203. }
  204. chunkString, ok := resSlice[1].(string)
  205. if !ok {
  206. writeError(fmt.Errorf("unexpected response second element type: %T", resSlice[1]))
  207. return
  208. }
  209. chunk := []byte(chunkString)
  210. lastAccess := time.Time{}
  211. tsString, err := tsStringResult.Result()
  212. if err == nil && tsString != "" {
  213. ts, err := strconv.ParseInt(tsString, 10, 64)
  214. if err == nil {
  215. lastAccess = time.Unix(ts, 0)
  216. }
  217. }
  218. nameStruct := DumpChunkName{
  219. Key: key,
  220. Cursor: cursor,
  221. Distance: i,
  222. Checksum: crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA)),
  223. }
  224. name, err := json.Marshal(nameStruct)
  225. if err != nil {
  226. writeError(err)
  227. return
  228. }
  229. if err := tarWriter.WriteHeader(&tar.Header{
  230. Typeflag: tar.TypeReg,
  231. Name: string(name),
  232. Size: int64(len(chunk)),
  233. Mode: 0600,
  234. ModTime: lastAccess,
  235. AccessTime: lastAccess,
  236. ChangeTime: lastAccess,
  237. PAXRecords: map[string]string{
  238. "ARCHIVETEAM.bffchunk.key": key,
  239. "ARCHIVETEAM.bffchunk.cursor": fmt.Sprintf("%d", cursor),
  240. "ARCHIVETEAM.bffchunk.distance": fmt.Sprintf("%d", i),
  241. "ARCHIVETEAM.bffchunk.checksum": fmt.Sprintf("%d", nameStruct.Checksum),
  242. },
  243. Format: tar.FormatPAX,
  244. }); err != nil {
  245. writeError(err)
  246. return
  247. }
  248. if _, err := tarWriter.Write(chunk); err != nil {
  249. writeError(err)
  250. return
  251. }
  252. if cursor == 0 && len(chunk) == 0 {
  253. break
  254. }
  255. }
  256. }
  257. }