You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

510 rivejä
14 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "math/rand"
  8. "net/http"
  9. _ "net/http/pprof"
  10. "os"
  11. "os/signal"
  12. "strings"
  13. "syscall"
  14. "time"
  15. "github.com/go-redis/redis/v8"
  16. dq "github.com/nsqio/go-diskqueue"
  17. )
  18. const (
  19. DefaultWatermarkHigh int64 = 100000
  20. DefaultWatermarkLow int64 = 100000
  21. DefaultBatchSize int64 = 10000
  22. )
  23. func l(_ dq.LogLevel, f string, args ...interface{}) {
  24. log.Printf(f, args...)
  25. }
  26. type Core struct {
  27. DataDir string
  28. MainClient *redis.Client
  29. Offloaders map[string]*Offloader
  30. MigrationMode bool
  31. }
  32. type ProjectRedisConfig struct {
  33. Host string `json:"host"`
  34. Pass string `json:"pass"`
  35. Port int `json:"port"`
  36. }
  37. type ProjectOffloadConfig struct {
  38. WatermarkHigh int64 `json:"high"`
  39. WatermarkLow int64 `json:"low"`
  40. BatchSize int64 `json:"batchsize"`
  41. }
  42. type ProjectConfig struct {
  43. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  44. OffloadConfig ProjectOffloadConfig `json:"offload"`
  45. }
  46. type Offloader struct {
  47. RedisClient *redis.Client
  48. ProjectConfig ProjectConfig
  49. Context context.Context
  50. Cancel context.CancelFunc
  51. Done chan bool
  52. Queues map[string]dq.Interface
  53. Sets map[string]string
  54. Name string
  55. Core *Core
  56. }
  57. func (that *Offloader) CleanName(s string) string {
  58. return strings.ReplaceAll(strings.ReplaceAll(s, "/", "_"), "\x00", "_")
  59. }
  60. func (that *Offloader) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  61. if that.ProjectConfig.RedisConfig == nil && new == nil {
  62. return false
  63. }
  64. if that.ProjectConfig.RedisConfig == nil || new == nil || that.ProjectConfig.RedisConfig.Host != new.Host || that.ProjectConfig.RedisConfig.Port != new.Port || that.ProjectConfig.RedisConfig.Pass != new.Pass {
  65. return true
  66. }
  67. return false
  68. }
  69. func (that *Offloader) OffloadConfigDiffers(new ProjectOffloadConfig) bool {
  70. return that.ProjectConfig.OffloadConfig.WatermarkHigh != new.WatermarkHigh || that.ProjectConfig.OffloadConfig.WatermarkLow != new.WatermarkLow || that.ProjectConfig.OffloadConfig.BatchSize != new.BatchSize
  71. }
  72. func (that *Offloader) RefreshQueues() error {
  73. pipe := that.RedisClient.Pipeline()
  74. prioritiesCmdRes := pipe.ZRange(that.Context, fmt.Sprintf("%s:priorities", that.Name), 0, -1)
  75. filtersCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:filters", that.Name))
  76. stashesCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:stashes", that.Name))
  77. _, err := pipe.Exec(that.Context)
  78. if err != nil {
  79. return err
  80. }
  81. priorities, err := prioritiesCmdRes.Result()
  82. if err != nil {
  83. return err
  84. }
  85. filters, err := filtersCmdRes.Result()
  86. if err != nil {
  87. return err
  88. }
  89. stashes, err := stashesCmdRes.Result()
  90. if err != nil {
  91. return err
  92. }
  93. setQueueMap := map[string]string{
  94. "todo": "todo",
  95. "todo:secondary": "todo:secondary",
  96. "todo:redo": "todo:redo",
  97. "todo:backfeed": "todo:backfeed",
  98. "done": "done",
  99. "unretrievable": "unretrievable",
  100. }
  101. for _, filter := range filters {
  102. setQueueMap[fmt.Sprintf("filtered:%s", filter)] = "filtered"
  103. }
  104. for _, priority := range priorities {
  105. setQueueMap[fmt.Sprintf("todo:prio:%s", priority)] = fmt.Sprintf("todo:prio:%s", priority)
  106. }
  107. for _, stash := range stashes {
  108. setQueueMap[fmt.Sprintf("stash:%s", stash)] = fmt.Sprintf("stash:%s", stash)
  109. }
  110. needQueueMap := map[string]bool{}
  111. for setName, queueName := range setQueueMap {
  112. needQueueMap[queueName] = true
  113. if _, has := that.Queues[queueName]; !has {
  114. log.Printf("opening queue %s for %s:%s", queueName, that.Name, setName)
  115. queue := dq.New(fmt.Sprintf("%s:%s", that.Name, that.CleanName(queueName)), that.Core.DataDir, 128*1024*1024, 0, 128*1024*1024, 1_000_000, 5*time.Second, l)
  116. if queue == nil {
  117. return fmt.Errorf("unable to open disk queue %s:%s (dq.New()==nil)", that.Name, that.CleanName(queueName))
  118. }
  119. that.Queues[queueName] = queue
  120. }
  121. that.Sets[setName] = queueName
  122. }
  123. for k, v := range that.Queues {
  124. if _, has := needQueueMap[k]; !has {
  125. v.Close()
  126. delete(that.Queues, k)
  127. }
  128. }
  129. return nil
  130. }
  131. func (that *Offloader) CloseQueues() {
  132. for k, q := range that.Queues {
  133. log.Printf("closing queue %s for %s", k, that.Name)
  134. q.Close()
  135. }
  136. }
  137. func (that *Offloader) UpdateStats() {
  138. hset := map[string]interface{}{}
  139. for k, q := range that.Sets {
  140. if k != q {
  141. continue
  142. }
  143. queue := that.Queues[q]
  144. if queue != nil {
  145. hset[k] = fmt.Sprintf("%d", queue.Depth())
  146. }
  147. }
  148. _, err := that.RedisClient.HSet(that.Context, fmt.Sprintf("%s:offloaded", that.Name), hset).Result()
  149. if err != nil {
  150. log.Printf("unable to hmset %s:offloaded: %s", that.Name, err)
  151. }
  152. }
  153. func (that *Offloader) Do() {
  154. defer close(that.Done)
  155. defer that.Cancel()
  156. if that.ProjectConfig.RedisConfig != nil {
  157. defer that.RedisClient.Close()
  158. }
  159. that.Sets = map[string]string{}
  160. that.Queues = map[string]dq.Interface{}
  161. defer that.CloseQueues()
  162. ticker := time.NewTicker(1 * time.Second)
  163. defer ticker.Stop()
  164. refreshTicker := time.NewTicker(5 * time.Minute)
  165. defer refreshTicker.Stop()
  166. if err := that.RefreshQueues(); err != nil {
  167. log.Printf("unable to refresh queues for %s: %s", that.Name, err)
  168. return
  169. }
  170. that.UpdateStats()
  171. skipSleepChan := make(chan bool, 1)
  172. defer close(skipSleepChan)
  173. watermarkHigh := that.ProjectConfig.OffloadConfig.WatermarkHigh
  174. if watermarkHigh == 0 {
  175. watermarkHigh = DefaultWatermarkHigh
  176. }
  177. watermarkLow := that.ProjectConfig.OffloadConfig.WatermarkLow
  178. if watermarkLow == 0 {
  179. watermarkLow = DefaultWatermarkLow
  180. }
  181. batchSize := that.ProjectConfig.OffloadConfig.BatchSize
  182. if batchSize == 0 {
  183. batchSize = DefaultBatchSize
  184. }
  185. for {
  186. //for k, q := range that.Queues {
  187. // key := fmt.Sprintf("%s:%s", that.Name, k)
  188. // scard, err := that.RedisClient.SCard(that.Context, key).Result()
  189. // if err != nil {
  190. // log.Printf("unable to scard %s: %s", key, err)
  191. // continue
  192. // }
  193. // for scard > watermarkHigh || scard < watermarkLow {
  194. // select {
  195. // case <-that.Context.Done():
  196. // return
  197. // case <-refreshTicker.C:
  198. // that.RefreshQueues()
  199. // that.UpdateStats()
  200. // default:
  201. // }
  202. // if scard > watermarkHigh {
  203. // spopLimit := scard - watermarkHigh
  204. // if spopLimit > batchSize {
  205. // spopLimit = batchSize
  206. // }
  207. // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  208. // entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result()
  209. // cancel()
  210. // if err != nil {
  211. // log.Printf("unable to spop %s: %s", key, err)
  212. // }
  213. // scard = scard - int64(len(entries))
  214. // for _, entry := range entries {
  215. // err := q.Put([]byte(entry))
  216. // if err != nil {
  217. // log.Printf("unable to q.Put %s: %s", key, err)
  218. // return
  219. // }
  220. // }
  221. // } else if scard < watermarkLow {
  222. // spopLimit := watermarkLow - scard
  223. // if spopLimit > batchSize {
  224. // spopLimit = batchSize
  225. // }
  226. // var entries []interface{}
  227. // for q.Depth() > 0 && int64(len(entries)) < spopLimit {
  228. // entry := <-q.ReadChan()
  229. // entries = append(entries, string(entry))
  230. // }
  231. // if len(entries) == 0 {
  232. // break
  233. // }
  234. // ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
  235. // _, err := that.RedisClient.SAdd(ctx, key, entries...).Result()
  236. // cancel()
  237. // if err != nil {
  238. // log.Printf("unable to sadd %s %#v: %s", key, entries, err)
  239. // for _, entry := range entries {
  240. // err := q.Put([]byte(entry.(string)))
  241. // if err != nil {
  242. // log.Printf("unable to q.Put %s: %s", key, err)
  243. // }
  244. // }
  245. // return
  246. // }
  247. // scard = scard + int64(len(entries))
  248. // }
  249. // }
  250. //}
  251. scards := map[string]*redis.IntCmd{}
  252. pipe := that.RedisClient.Pipeline()
  253. for k := range that.Sets {
  254. key := fmt.Sprintf("%s:%s", that.Name, k)
  255. scards[k] = pipe.SCard(that.Context, key)
  256. }
  257. _, err := pipe.Exec(that.Context)
  258. if err != nil {
  259. log.Printf("unable to scard %s: %s", that.Name, err)
  260. } else {
  261. rerun := false
  262. for k, q := range that.Sets {
  263. key := fmt.Sprintf("%s:%s", that.Name, k)
  264. scard, err := scards[k].Result()
  265. if err != nil {
  266. log.Printf("unable to scard %s: %s", key, err)
  267. continue
  268. }
  269. if !that.Core.MigrationMode && (scard > watermarkHigh || (k != q && scard > 0)) {
  270. spopLimit := scard - watermarkHigh
  271. if k != q || spopLimit > batchSize {
  272. spopLimit = batchSize
  273. }
  274. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  275. entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result()
  276. cancel()
  277. if err != nil {
  278. log.Printf("unable to spop %s: %s", key, err)
  279. }
  280. if len(entries) == 0 {
  281. continue
  282. }
  283. for _, entry := range entries {
  284. err := that.Queues[q].Put([]byte(entry))
  285. if err != nil {
  286. log.Printf("unable to q.Put %s: %s", key, err)
  287. return
  288. }
  289. }
  290. rerun = true
  291. } else if k == q && ((that.Core.MigrationMode && scard < watermarkHigh*2) || scard < watermarkLow) && that.Queues[q].Depth() > 0 {
  292. spopLimit := watermarkLow - scard
  293. if spopLimit > batchSize || that.Core.MigrationMode {
  294. spopLimit = batchSize
  295. }
  296. var entries []interface{}
  297. for that.Queues[q].Depth() > 0 && int64(len(entries)) < spopLimit {
  298. entry := <-that.Queues[q].ReadChan()
  299. entries = append(entries, string(entry))
  300. }
  301. if len(entries) == 0 {
  302. continue
  303. }
  304. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
  305. _, err := that.RedisClient.SAdd(ctx, key, entries...).Result()
  306. cancel()
  307. if err != nil {
  308. log.Printf("unable to sadd %s: %s", key, err)
  309. for _, entry := range entries {
  310. err := that.Queues[q].Put([]byte(entry.(string)))
  311. if err != nil {
  312. log.Printf("unable to q.Put %s: %s", key, err)
  313. }
  314. }
  315. return
  316. }
  317. rerun = true
  318. }
  319. }
  320. if rerun {
  321. select {
  322. case skipSleepChan <- true:
  323. default:
  324. }
  325. }
  326. that.UpdateStats()
  327. }
  328. select {
  329. case <-that.Context.Done():
  330. return
  331. case <-refreshTicker.C:
  332. if err := that.RefreshQueues(); err != nil {
  333. log.Printf("unable to refresh queues for %s: %s", that.Name, err)
  334. return
  335. }
  336. that.UpdateStats()
  337. case <-ticker.C:
  338. that.UpdateStats()
  339. case <-skipSleepChan:
  340. }
  341. }
  342. }
  343. func (that *Core) StopProjects() {
  344. var doneChans []chan bool
  345. for project, offloader := range that.Offloaders {
  346. log.Printf("stopping offloader %s", project)
  347. offloader.Cancel()
  348. doneChans = append(doneChans, offloader.Done)
  349. }
  350. for _, c := range doneChans {
  351. <-c
  352. }
  353. }
  354. func (that *Core) RefreshProjects(redisClient *redis.Client) {
  355. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  356. res, err := redisClient.HGetAll(ctx, "trackers").Result()
  357. cancel()
  358. if err != nil {
  359. log.Printf("unable to refresh trackers table: %s", err)
  360. return
  361. }
  362. updatedProjects := map[string]ProjectConfig{}
  363. for project, configString := range res {
  364. //if project != "ua" && project != "ukr-net" && project != "ua-urls" {
  365. // continue
  366. //}
  367. config := ProjectConfig{}
  368. err := json.Unmarshal([]byte(configString), &config)
  369. if err != nil {
  370. log.Printf("unable to decode project %s config: %s", project, err)
  371. continue
  372. }
  373. updatedProjects[project] = config
  374. }
  375. for project, offloader := range that.Offloaders {
  376. _, stopRequired := updatedProjects[project]
  377. stopRequired = !stopRequired
  378. if !stopRequired {
  379. stopRequired = offloader.OffloadConfigDiffers(updatedProjects[project].OffloadConfig)
  380. if !stopRequired {
  381. stopRequired = offloader.RedisConfigDiffers(updatedProjects[project].RedisConfig)
  382. if !stopRequired {
  383. select {
  384. case <-offloader.Context.Done():
  385. stopRequired = true
  386. case <-offloader.Done:
  387. stopRequired = true
  388. default:
  389. }
  390. }
  391. }
  392. }
  393. if stopRequired {
  394. log.Printf("stopping offloader %s", project)
  395. offloader.Cancel()
  396. <-offloader.Done
  397. delete(that.Offloaders, project)
  398. }
  399. }
  400. for project, config := range updatedProjects {
  401. if _, has := that.Offloaders[project]; !has {
  402. log.Printf("starting offloader %s", project)
  403. offloader := &Offloader{}
  404. offloader.Name = project
  405. offloader.ProjectConfig = config
  406. offloader.Core = that
  407. if config.RedisConfig != nil {
  408. offloader.RedisClient = redis.NewClient(&redis.Options{
  409. Addr: fmt.Sprintf("%s:%d", config.RedisConfig.Host, config.RedisConfig.Port),
  410. Username: "default",
  411. Password: config.RedisConfig.Pass,
  412. ReadTimeout: 15 * time.Minute,
  413. })
  414. } else {
  415. offloader.RedisClient = redisClient
  416. }
  417. offloader.Context, offloader.Cancel = context.WithCancel(context.Background())
  418. offloader.Done = make(chan bool)
  419. that.Offloaders[project] = offloader
  420. go offloader.Do()
  421. }
  422. }
  423. }
  424. func main() {
  425. rand.Seed(time.Now().UnixNano())
  426. log.SetFlags(log.Flags() | log.Lshortfile)
  427. go func() {
  428. var err error
  429. for i := 0; i < 10; i++ {
  430. port := rand.Intn(65535-1024) + 1024
  431. err = http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", port), nil)
  432. if err != nil && err != http.ErrServerClosed {
  433. log.Printf("unable to listen on port %d: %s", port, err)
  434. continue
  435. }
  436. break
  437. }
  438. }()
  439. dataDir := os.Getenv("DATA_DIR")
  440. if dataDir == "" {
  441. log.Panicf("no DATA_DIR specified")
  442. }
  443. // check if dataDir exists and is a directory
  444. if stat, err := os.Stat(dataDir); os.IsNotExist(err) {
  445. log.Panicf("DATA_DIR %s does not exist", dataDir)
  446. } else if !stat.IsDir() {
  447. log.Panicf("DATA_DIR %s is not a directory", dataDir)
  448. }
  449. mainOptions, err := redis.ParseURL(os.Getenv("REDIS_URL"))
  450. if err != nil {
  451. log.Panicf("%s", err)
  452. }
  453. mainOptions.ReadTimeout = 15 * time.Minute
  454. mainClient := redis.NewClient(mainOptions)
  455. sc := make(chan os.Signal, 1)
  456. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  457. ticker := time.NewTicker(1 * time.Minute)
  458. core := &Core{
  459. DataDir: dataDir,
  460. MainClient: mainClient,
  461. Offloaders: map[string]*Offloader{},
  462. MigrationMode: os.Getenv("MIGRATION_MODE") != "",
  463. }
  464. defer core.StopProjects()
  465. for {
  466. core.RefreshProjects(mainClient)
  467. select {
  468. case <-sc:
  469. core.StopProjects()
  470. return
  471. case <-ticker.C:
  472. }
  473. }
  474. }