Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.

257 righe
7.1 KiB

  1. package main
  2. import (
  3. "archive/tar"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "hash/crc64"
  8. "io"
  9. "net/http"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/go-redis/redis/v8"
  16. "github.com/gorilla/mux"
  17. )
  18. type DumpChunkName struct {
  19. Key string `json:"key"`
  20. Distance int `json:"distance"`
  21. Cursor int64 `json:"cursor"`
  22. Checksum uint64 `json:"checksum"`
  23. }
  24. func (that *GlobalBackfeedManager) HandleLoad(res http.ResponseWriter, req *http.Request) {
  25. defer req.Body.Close()
  26. tarReader := tar.NewReader(req.Body)
  27. existed := []string{}
  28. recreate := req.URL.Query().Get("recreate") != ""
  29. skipKeys := map[string]struct{}{}
  30. for {
  31. header, err := tarReader.Next()
  32. if err != nil {
  33. if err == io.EOF {
  34. break
  35. }
  36. WriteResponse(res, http.StatusInternalServerError, err)
  37. return
  38. }
  39. if header.Typeflag != tar.TypeReg {
  40. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] unexpected tar entry type: %d", header.Name, header.Typeflag))
  41. return
  42. }
  43. var name DumpChunkName
  44. if err := json.Unmarshal([]byte(header.Name), &name); err != nil {
  45. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk name: %s", header.Name, err))
  46. return
  47. }
  48. if strings.Count(name.Key, ":") < 2 {
  49. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk key: %s", header.Name, name.Key))
  50. return
  51. }
  52. chunk := make([]byte, header.Size)
  53. if _, err := io.ReadFull(tarReader, chunk); err != nil {
  54. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] failed to read chunk: %s", header.Name, err))
  55. return
  56. }
  57. checksum := crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA))
  58. if name.Checksum != checksum {
  59. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("[chunk %s] invalid chunk checksum: %d (expected) != %d (actual)", header.Name, name.Checksum, checksum))
  60. return
  61. }
  62. if len(chunk) == 0 && name.Cursor == 0 {
  63. continue
  64. }
  65. if name.Distance == 0 {
  66. if exists, err := that.BackfeedRedis.Exists(req.Context(), name.Key).Result(); err != nil && err != redis.Nil {
  67. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to check key existence: %s", header.Name, err))
  68. return
  69. } else if exists == 1 {
  70. existed = append(existed, name.Key)
  71. if recreate {
  72. if _, err := that.BackfeedRedis.Unlink(req.Context(), name.Key).Result(); err != nil {
  73. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to unlink key: %s", header.Name, err))
  74. return
  75. }
  76. } else {
  77. skipKeys[name.Key] = struct{}{}
  78. continue
  79. }
  80. }
  81. }
  82. if _, has := skipKeys[name.Key]; has {
  83. continue
  84. }
  85. if _, err := that.BackfeedRedis.Do(req.Context(), "bf.loadchunk", name.Key, name.Cursor, chunk).Result(); err != nil {
  86. WriteResponse(res, http.StatusInternalServerError, fmt.Errorf("[chunk %s] failed to import chunk: %s", header.Name, err))
  87. return
  88. }
  89. }
  90. }
  91. func (that *GlobalBackfeedManager) HandleDump(res http.ResponseWriter, req *http.Request) {
  92. vars := mux.Vars(req)
  93. key := vars["key"]
  94. if key == "" {
  95. key = "*:*:*"
  96. }
  97. if strings.Count(key, ":") < 2 {
  98. WriteResponse(res, http.StatusBadRequest, fmt.Errorf("invalid key format"))
  99. return
  100. }
  101. lock := sync.Mutex{}
  102. keys := []string{}
  103. if err := that.BackfeedRedis.ForEachShard(req.Context(), func(ctx context.Context, client *redis.Client) error {
  104. cursor := uint64(0)
  105. var shardKeys []string
  106. for {
  107. var err error
  108. var keysBatch []string
  109. keysBatch, cursor, err = client.Scan(ctx, cursor, key, 1000).Result()
  110. if err != nil && err != redis.Nil {
  111. return err
  112. }
  113. shardKeys = append(shardKeys, keysBatch...)
  114. if cursor == 0 {
  115. break
  116. }
  117. }
  118. lock.Lock()
  119. defer lock.Unlock()
  120. keys = append(keys, shardKeys...)
  121. return nil
  122. }); err != nil && err != redis.Nil {
  123. WriteResponse(res, http.StatusInternalServerError, err)
  124. return
  125. }
  126. sort.Strings(keys)
  127. hasJsonAcceptHeader := false
  128. for _, accept := range strings.Split(req.Header.Get("Accept"), ",") {
  129. accept = strings.TrimSpace(accept)
  130. if accept == "application/json" || strings.HasPrefix(accept, "application/json;") {
  131. hasJsonAcceptHeader = true
  132. break
  133. }
  134. }
  135. if hasJsonAcceptHeader {
  136. WriteResponse(res, http.StatusOK, keys)
  137. return
  138. }
  139. if len(keys) == 0 {
  140. WriteResponse(res, http.StatusNoContent, nil)
  141. return
  142. }
  143. tarWriter := tar.NewWriter(res)
  144. defer tarWriter.Close()
  145. pipe := that.BackfeedRedis.Pipeline()
  146. writeError := func(err error) {
  147. errChunk := []byte(err.Error())
  148. if err := tarWriter.WriteHeader(&tar.Header{
  149. Typeflag: tar.TypeReg,
  150. Name: "error",
  151. Size: int64(len(errChunk)),
  152. Mode: 0600,
  153. PAXRecords: map[string]string{
  154. "ARCHIVETEAM.bffchunk.error": fmt.Sprintf("%s", err),
  155. },
  156. Format: tar.FormatPAX,
  157. }); err != nil {
  158. return
  159. }
  160. if _, err := tarWriter.Write(errChunk); err != nil {
  161. return
  162. }
  163. }
  164. for _, key := range keys {
  165. cursor := int64(0)
  166. for i := 0; ; i++ {
  167. rawResResult := pipe.Do(req.Context(), "bf.scandump", key, cursor)
  168. tsStringResult := that.BackfeedRedis.HGet(req.Context(), ":last_ts", key)
  169. _, err := pipe.Exec(req.Context())
  170. if err != nil && err != redis.Nil {
  171. writeError(err)
  172. return
  173. }
  174. rawRes, err := rawResResult.Result()
  175. if err != nil && err != redis.Nil {
  176. writeError(err)
  177. return
  178. }
  179. if rawRes == nil {
  180. break
  181. }
  182. resSlice, ok := rawRes.([]any)
  183. if !ok {
  184. writeError(fmt.Errorf("unexpected response type: %T", rawRes))
  185. return
  186. }
  187. if len(resSlice) != 2 {
  188. writeError(fmt.Errorf("unexpected response length: %d", len(resSlice)))
  189. return
  190. }
  191. cursor, ok = resSlice[0].(int64)
  192. if !ok {
  193. writeError(fmt.Errorf("unexpected response first element type: %T", resSlice[0]))
  194. return
  195. }
  196. chunkString, ok := resSlice[1].(string)
  197. if !ok {
  198. writeError(fmt.Errorf("unexpected response second element type: %T", resSlice[1]))
  199. return
  200. }
  201. chunk := []byte(chunkString)
  202. lastAccess := time.Time{}
  203. tsString, err := tsStringResult.Result()
  204. if err == nil && tsString != "" {
  205. ts, err := strconv.ParseInt(tsString, 10, 64)
  206. if err == nil {
  207. lastAccess = time.Unix(ts, 0)
  208. }
  209. }
  210. nameStruct := DumpChunkName{
  211. Key: key,
  212. Cursor: cursor,
  213. Distance: i,
  214. Checksum: crc64.Checksum(chunk, crc64.MakeTable(crc64.ECMA)),
  215. }
  216. name, err := json.Marshal(nameStruct)
  217. if err != nil {
  218. writeError(err)
  219. return
  220. }
  221. if err := tarWriter.WriteHeader(&tar.Header{
  222. Typeflag: tar.TypeReg,
  223. Name: string(name),
  224. Size: int64(len(chunk)),
  225. Mode: 0600,
  226. ModTime: lastAccess,
  227. AccessTime: lastAccess,
  228. ChangeTime: lastAccess,
  229. PAXRecords: map[string]string{
  230. "ARCHIVETEAM.bffchunk.key": key,
  231. "ARCHIVETEAM.bffchunk.cursor": fmt.Sprintf("%d", cursor),
  232. "ARCHIVETEAM.bffchunk.distance": fmt.Sprintf("%d", i),
  233. "ARCHIVETEAM.bffchunk.checksum": fmt.Sprintf("%d", nameStruct.Checksum),
  234. },
  235. Format: tar.FormatPAX,
  236. }); err != nil {
  237. writeError(err)
  238. return
  239. }
  240. if _, err := tarWriter.Write(chunk); err != nil {
  241. writeError(err)
  242. return
  243. }
  244. if cursor == 0 && len(chunk) == 0 {
  245. break
  246. }
  247. }
  248. }
  249. }