Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

511 рядки
14 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "math/rand"
  8. "net/http"
  9. _ "net/http/pprof"
  10. "os"
  11. "os/signal"
  12. "strings"
  13. "syscall"
  14. "time"
  15. "github.com/go-redis/redis/v8"
  16. dq "github.com/nsqio/go-diskqueue"
  17. )
  18. const (
  19. DefaultWatermarkHigh int64 = 100000
  20. DefaultWatermarkLow int64 = 100000
  21. DefaultBatchSize int64 = 10000
  22. )
  23. func l(_ dq.LogLevel, f string, args ...interface{}) {
  24. log.Printf(f, args...)
  25. }
  26. type Core struct {
  27. DataDir string
  28. MainClient *redis.Client
  29. Offloaders map[string]*Offloader
  30. MigrationMode bool
  31. }
  32. type ProjectRedisConfig struct {
  33. Host string `json:"host"`
  34. Pass string `json:"pass"`
  35. Port int `json:"port"`
  36. }
  37. type ProjectOffloadConfig struct {
  38. WatermarkHigh int64 `json:"high"`
  39. WatermarkLow int64 `json:"low"`
  40. BatchSize int64 `json:"batchsize"`
  41. }
  42. type ProjectConfig struct {
  43. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  44. OffloadConfig ProjectOffloadConfig `json:"offload"`
  45. }
  46. type Offloader struct {
  47. RedisClient *redis.Client
  48. ProjectConfig ProjectConfig
  49. Context context.Context
  50. Cancel context.CancelFunc
  51. Done chan bool
  52. Queues map[string]dq.Interface
  53. Sets map[string]string
  54. Name string
  55. Core *Core
  56. }
  57. func (that *Offloader) CleanName(s string) string {
  58. return strings.ReplaceAll(strings.ReplaceAll(s, "/", "_"), "\x00", "_")
  59. }
  60. func (that *Offloader) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  61. if that.ProjectConfig.RedisConfig == nil && new == nil {
  62. return false
  63. }
  64. if that.ProjectConfig.RedisConfig == nil || new == nil || that.ProjectConfig.RedisConfig.Host != new.Host || that.ProjectConfig.RedisConfig.Port != new.Port || that.ProjectConfig.RedisConfig.Pass != new.Pass {
  65. return true
  66. }
  67. return false
  68. }
  69. func (that *Offloader) OffloadConfigDiffers(new ProjectOffloadConfig) bool {
  70. return that.ProjectConfig.OffloadConfig.WatermarkHigh != new.WatermarkHigh || that.ProjectConfig.OffloadConfig.WatermarkLow != new.WatermarkLow || that.ProjectConfig.OffloadConfig.BatchSize != new.BatchSize
  71. }
  72. func (that *Offloader) RefreshQueues() error {
  73. pipe := that.RedisClient.Pipeline()
  74. prioritiesCmdRes := pipe.ZRange(that.Context, fmt.Sprintf("%s:priorities", that.Name), 0, -1)
  75. filtersCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:filters", that.Name))
  76. stashesCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:stashes", that.Name))
  77. _, err := pipe.Exec(that.Context)
  78. if err != nil {
  79. return err
  80. }
  81. priorities, err := prioritiesCmdRes.Result()
  82. if err != nil {
  83. return err
  84. }
  85. filters, err := filtersCmdRes.Result()
  86. if err != nil {
  87. return err
  88. }
  89. stashes, err := stashesCmdRes.Result()
  90. if err != nil {
  91. return err
  92. }
  93. setQueueMap := map[string]string{
  94. "todo": "todo",
  95. "todo:secondary": "todo:secondary",
  96. "todo:redo": "todo:redo",
  97. "todo:backfeed": "todo:backfeed",
  98. "done": "done",
  99. "unretrievable": "unretrievable",
  100. "backfeed_retry": "backfeed_retry",
  101. }
  102. for _, filter := range filters {
  103. setQueueMap[fmt.Sprintf("filtered:%s", filter)] = "filtered"
  104. }
  105. for _, priority := range priorities {
  106. setQueueMap[fmt.Sprintf("todo:prio:%s", priority)] = fmt.Sprintf("todo:prio:%s", priority)
  107. }
  108. for _, stash := range stashes {
  109. setQueueMap[fmt.Sprintf("stash:%s", stash)] = fmt.Sprintf("stash:%s", stash)
  110. }
  111. needQueueMap := map[string]bool{}
  112. for setName, queueName := range setQueueMap {
  113. needQueueMap[queueName] = true
  114. if _, has := that.Queues[queueName]; !has {
  115. log.Printf("opening queue %s for %s:%s", queueName, that.Name, setName)
  116. queue := dq.New(fmt.Sprintf("%s:%s", that.Name, that.CleanName(queueName)), that.Core.DataDir, 128*1024*1024, 0, 128*1024*1024, 1_000_000, 5*time.Second, l)
  117. if queue == nil {
  118. return fmt.Errorf("unable to open disk queue %s:%s (dq.New()==nil)", that.Name, that.CleanName(queueName))
  119. }
  120. that.Queues[queueName] = queue
  121. }
  122. that.Sets[setName] = queueName
  123. }
  124. for k, v := range that.Queues {
  125. if _, has := needQueueMap[k]; !has {
  126. v.Close()
  127. delete(that.Queues, k)
  128. }
  129. }
  130. return nil
  131. }
  132. func (that *Offloader) CloseQueues() {
  133. for k, q := range that.Queues {
  134. log.Printf("closing queue %s for %s", k, that.Name)
  135. q.Close()
  136. }
  137. }
  138. func (that *Offloader) UpdateStats() {
  139. hset := map[string]interface{}{}
  140. for k, q := range that.Sets {
  141. if k != q {
  142. continue
  143. }
  144. queue := that.Queues[q]
  145. if queue != nil {
  146. hset[k] = fmt.Sprintf("%d", queue.Depth())
  147. }
  148. }
  149. _, err := that.RedisClient.HSet(that.Context, fmt.Sprintf("%s:offloaded", that.Name), hset).Result()
  150. if err != nil {
  151. log.Printf("unable to hmset %s:offloaded: %s", that.Name, err)
  152. }
  153. }
  154. func (that *Offloader) Do() {
  155. defer close(that.Done)
  156. defer that.Cancel()
  157. if that.ProjectConfig.RedisConfig != nil {
  158. defer that.RedisClient.Close()
  159. }
  160. that.Sets = map[string]string{}
  161. that.Queues = map[string]dq.Interface{}
  162. defer that.CloseQueues()
  163. ticker := time.NewTicker(1 * time.Second)
  164. defer ticker.Stop()
  165. refreshTicker := time.NewTicker(5 * time.Minute)
  166. defer refreshTicker.Stop()
  167. if err := that.RefreshQueues(); err != nil {
  168. log.Printf("unable to refresh queues for %s: %s", that.Name, err)
  169. return
  170. }
  171. that.UpdateStats()
  172. skipSleepChan := make(chan bool, 1)
  173. defer close(skipSleepChan)
  174. watermarkHigh := that.ProjectConfig.OffloadConfig.WatermarkHigh
  175. if watermarkHigh == 0 {
  176. watermarkHigh = DefaultWatermarkHigh
  177. }
  178. watermarkLow := that.ProjectConfig.OffloadConfig.WatermarkLow
  179. if watermarkLow == 0 {
  180. watermarkLow = DefaultWatermarkLow
  181. }
  182. batchSize := that.ProjectConfig.OffloadConfig.BatchSize
  183. if batchSize == 0 {
  184. batchSize = DefaultBatchSize
  185. }
  186. for {
  187. //for k, q := range that.Queues {
  188. // key := fmt.Sprintf("%s:%s", that.Name, k)
  189. // scard, err := that.RedisClient.SCard(that.Context, key).Result()
  190. // if err != nil {
  191. // log.Printf("unable to scard %s: %s", key, err)
  192. // continue
  193. // }
  194. // for scard > watermarkHigh || scard < watermarkLow {
  195. // select {
  196. // case <-that.Context.Done():
  197. // return
  198. // case <-refreshTicker.C:
  199. // that.RefreshQueues()
  200. // that.UpdateStats()
  201. // default:
  202. // }
  203. // if scard > watermarkHigh {
  204. // spopLimit := scard - watermarkHigh
  205. // if spopLimit > batchSize {
  206. // spopLimit = batchSize
  207. // }
  208. // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  209. // entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result()
  210. // cancel()
  211. // if err != nil {
  212. // log.Printf("unable to spop %s: %s", key, err)
  213. // }
  214. // scard = scard - int64(len(entries))
  215. // for _, entry := range entries {
  216. // err := q.Put([]byte(entry))
  217. // if err != nil {
  218. // log.Printf("unable to q.Put %s: %s", key, err)
  219. // return
  220. // }
  221. // }
  222. // } else if scard < watermarkLow {
  223. // spopLimit := watermarkLow - scard
  224. // if spopLimit > batchSize {
  225. // spopLimit = batchSize
  226. // }
  227. // var entries []interface{}
  228. // for q.Depth() > 0 && int64(len(entries)) < spopLimit {
  229. // entry := <-q.ReadChan()
  230. // entries = append(entries, string(entry))
  231. // }
  232. // if len(entries) == 0 {
  233. // break
  234. // }
  235. // ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
  236. // _, err := that.RedisClient.SAdd(ctx, key, entries...).Result()
  237. // cancel()
  238. // if err != nil {
  239. // log.Printf("unable to sadd %s %#v: %s", key, entries, err)
  240. // for _, entry := range entries {
  241. // err := q.Put([]byte(entry.(string)))
  242. // if err != nil {
  243. // log.Printf("unable to q.Put %s: %s", key, err)
  244. // }
  245. // }
  246. // return
  247. // }
  248. // scard = scard + int64(len(entries))
  249. // }
  250. // }
  251. //}
  252. scards := map[string]*redis.IntCmd{}
  253. pipe := that.RedisClient.Pipeline()
  254. for k := range that.Sets {
  255. key := fmt.Sprintf("%s:%s", that.Name, k)
  256. scards[k] = pipe.SCard(that.Context, key)
  257. }
  258. _, err := pipe.Exec(that.Context)
  259. if err != nil {
  260. log.Printf("unable to scard %s: %s", that.Name, err)
  261. } else {
  262. rerun := false
  263. for k, q := range that.Sets {
  264. key := fmt.Sprintf("%s:%s", that.Name, k)
  265. scard, err := scards[k].Result()
  266. if err != nil {
  267. log.Printf("unable to scard %s: %s", key, err)
  268. continue
  269. }
  270. if !that.Core.MigrationMode && (scard > watermarkHigh || (k != q && scard > 0)) {
  271. spopLimit := scard - watermarkHigh
  272. if k != q || spopLimit > batchSize {
  273. spopLimit = batchSize
  274. }
  275. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  276. entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result()
  277. cancel()
  278. if err != nil {
  279. log.Printf("unable to spop %s: %s", key, err)
  280. }
  281. if len(entries) == 0 {
  282. continue
  283. }
  284. for _, entry := range entries {
  285. err := that.Queues[q].Put([]byte(entry))
  286. if err != nil {
  287. log.Printf("unable to q.Put %s: %s", key, err)
  288. return
  289. }
  290. }
  291. rerun = true
  292. } else if k == q && ((that.Core.MigrationMode && scard < watermarkHigh*2) || scard < watermarkLow) && that.Queues[q].Depth() > 0 {
  293. spopLimit := watermarkLow - scard
  294. if spopLimit > batchSize || that.Core.MigrationMode {
  295. spopLimit = batchSize
  296. }
  297. var entries []interface{}
  298. for that.Queues[q].Depth() > 0 && int64(len(entries)) < spopLimit {
  299. entry := <-that.Queues[q].ReadChan()
  300. entries = append(entries, string(entry))
  301. }
  302. if len(entries) == 0 {
  303. continue
  304. }
  305. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
  306. _, err := that.RedisClient.SAdd(ctx, key, entries...).Result()
  307. cancel()
  308. if err != nil {
  309. log.Printf("unable to sadd %s: %s", key, err)
  310. for _, entry := range entries {
  311. err := that.Queues[q].Put([]byte(entry.(string)))
  312. if err != nil {
  313. log.Printf("unable to q.Put %s: %s", key, err)
  314. }
  315. }
  316. return
  317. }
  318. rerun = true
  319. }
  320. }
  321. if rerun {
  322. select {
  323. case skipSleepChan <- true:
  324. default:
  325. }
  326. }
  327. that.UpdateStats()
  328. }
  329. select {
  330. case <-that.Context.Done():
  331. return
  332. case <-refreshTicker.C:
  333. if err := that.RefreshQueues(); err != nil {
  334. log.Printf("unable to refresh queues for %s: %s", that.Name, err)
  335. return
  336. }
  337. that.UpdateStats()
  338. case <-ticker.C:
  339. that.UpdateStats()
  340. case <-skipSleepChan:
  341. }
  342. }
  343. }
  344. func (that *Core) StopProjects() {
  345. var doneChans []chan bool
  346. for project, offloader := range that.Offloaders {
  347. log.Printf("stopping offloader %s", project)
  348. offloader.Cancel()
  349. doneChans = append(doneChans, offloader.Done)
  350. }
  351. for _, c := range doneChans {
  352. <-c
  353. }
  354. }
  355. func (that *Core) RefreshProjects(redisClient *redis.Client) {
  356. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  357. res, err := redisClient.HGetAll(ctx, "trackers").Result()
  358. cancel()
  359. if err != nil {
  360. log.Printf("unable to refresh trackers table: %s", err)
  361. return
  362. }
  363. updatedProjects := map[string]ProjectConfig{}
  364. for project, configString := range res {
  365. //if project != "ua" && project != "ukr-net" && project != "ua-urls" {
  366. // continue
  367. //}
  368. config := ProjectConfig{}
  369. err := json.Unmarshal([]byte(configString), &config)
  370. if err != nil {
  371. log.Printf("unable to decode project %s config: %s", project, err)
  372. continue
  373. }
  374. updatedProjects[project] = config
  375. }
  376. for project, offloader := range that.Offloaders {
  377. _, stopRequired := updatedProjects[project]
  378. stopRequired = !stopRequired
  379. if !stopRequired {
  380. stopRequired = offloader.OffloadConfigDiffers(updatedProjects[project].OffloadConfig)
  381. if !stopRequired {
  382. stopRequired = offloader.RedisConfigDiffers(updatedProjects[project].RedisConfig)
  383. if !stopRequired {
  384. select {
  385. case <-offloader.Context.Done():
  386. stopRequired = true
  387. case <-offloader.Done:
  388. stopRequired = true
  389. default:
  390. }
  391. }
  392. }
  393. }
  394. if stopRequired {
  395. log.Printf("stopping offloader %s", project)
  396. offloader.Cancel()
  397. <-offloader.Done
  398. delete(that.Offloaders, project)
  399. }
  400. }
  401. for project, config := range updatedProjects {
  402. if _, has := that.Offloaders[project]; !has {
  403. log.Printf("starting offloader %s", project)
  404. offloader := &Offloader{}
  405. offloader.Name = project
  406. offloader.ProjectConfig = config
  407. offloader.Core = that
  408. if config.RedisConfig != nil {
  409. offloader.RedisClient = redis.NewClient(&redis.Options{
  410. Addr: fmt.Sprintf("%s:%d", config.RedisConfig.Host, config.RedisConfig.Port),
  411. Username: "default",
  412. Password: config.RedisConfig.Pass,
  413. ReadTimeout: 15 * time.Minute,
  414. })
  415. } else {
  416. offloader.RedisClient = redisClient
  417. }
  418. offloader.Context, offloader.Cancel = context.WithCancel(context.Background())
  419. offloader.Done = make(chan bool)
  420. that.Offloaders[project] = offloader
  421. go offloader.Do()
  422. }
  423. }
  424. }
  425. func main() {
  426. rand.Seed(time.Now().UnixNano())
  427. log.SetFlags(log.Flags() | log.Lshortfile)
  428. go func() {
  429. var err error
  430. for i := 0; i < 10; i++ {
  431. port := rand.Intn(65535-1024) + 1024
  432. err = http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", port), nil)
  433. if err != nil && err != http.ErrServerClosed {
  434. log.Printf("unable to listen on port %d: %s", port, err)
  435. continue
  436. }
  437. break
  438. }
  439. }()
  440. dataDir := os.Getenv("DATA_DIR")
  441. if dataDir == "" {
  442. log.Panicf("no DATA_DIR specified")
  443. }
  444. // check if dataDir exists and is a directory
  445. if stat, err := os.Stat(dataDir); os.IsNotExist(err) {
  446. log.Panicf("DATA_DIR %s does not exist", dataDir)
  447. } else if !stat.IsDir() {
  448. log.Panicf("DATA_DIR %s is not a directory", dataDir)
  449. }
  450. mainOptions, err := redis.ParseURL(os.Getenv("REDIS_URL"))
  451. if err != nil {
  452. log.Panicf("%s", err)
  453. }
  454. mainOptions.ReadTimeout = 15 * time.Minute
  455. mainClient := redis.NewClient(mainOptions)
  456. sc := make(chan os.Signal, 1)
  457. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  458. ticker := time.NewTicker(1 * time.Minute)
  459. core := &Core{
  460. DataDir: dataDir,
  461. MainClient: mainClient,
  462. Offloaders: map[string]*Offloader{},
  463. MigrationMode: os.Getenv("MIGRATION_MODE") != "",
  464. }
  465. defer core.StopProjects()
  466. for {
  467. core.RefreshProjects(mainClient)
  468. select {
  469. case <-sc:
  470. core.StopProjects()
  471. return
  472. case <-ticker.C:
  473. }
  474. }
  475. }