You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

502 lines
14 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "math/rand"
  8. "net/http"
  9. _ "net/http/pprof"
  10. "os"
  11. "os/signal"
  12. "strings"
  13. "syscall"
  14. "time"
  15. "github.com/go-redis/redis/v8"
  16. dq "github.com/nsqio/go-diskqueue"
  17. )
  18. const (
  19. DefaultWatermarkHigh int64 = 100000
  20. DefaultWatermarkLow int64 = 100000
  21. DefaultBatchSize int64 = 10000
  22. )
  23. func l(_ dq.LogLevel, f string, args ...interface{}) {
  24. log.Printf(f, args...)
  25. }
  26. type Core struct {
  27. DataDir string
  28. MainClient *redis.Client
  29. Offloaders map[string]*Offloader
  30. MigrationMode bool
  31. }
  32. type ProjectRedisConfig struct {
  33. Host string `json:"host"`
  34. Pass string `json:"pass"`
  35. Port int `json:"port"`
  36. }
  37. type ProjectOffloadConfig struct {
  38. WatermarkHigh int64 `json:"high"`
  39. WatermarkLow int64 `json:"low"`
  40. BatchSize int64 `json:"batchsize"`
  41. }
  42. type ProjectConfig struct {
  43. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  44. OffloadConfig ProjectOffloadConfig `json:"offload"`
  45. }
  46. type Offloader struct {
  47. RedisClient *redis.Client
  48. ProjectConfig ProjectConfig
  49. Context context.Context
  50. Cancel context.CancelFunc
  51. Done chan bool
  52. Queues map[string]dq.Interface
  53. Sets map[string]string
  54. Name string
  55. Core *Core
  56. }
  57. func (that *Offloader) CleanName(s string) string {
  58. return strings.ReplaceAll(strings.ReplaceAll(s, "/", "_"), "\x00", "_")
  59. }
  60. func (that *Offloader) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  61. if that.ProjectConfig.RedisConfig == nil && new == nil {
  62. return false
  63. }
  64. if that.ProjectConfig.RedisConfig == nil || new == nil || that.ProjectConfig.RedisConfig.Host != new.Host || that.ProjectConfig.RedisConfig.Port != new.Port || that.ProjectConfig.RedisConfig.Pass != new.Pass {
  65. return true
  66. }
  67. return false
  68. }
  69. func (that *Offloader) OffloadConfigDiffers(new ProjectOffloadConfig) bool {
  70. return that.ProjectConfig.OffloadConfig.WatermarkHigh != new.WatermarkHigh || that.ProjectConfig.OffloadConfig.WatermarkLow != new.WatermarkLow || that.ProjectConfig.OffloadConfig.BatchSize != new.BatchSize
  71. }
  72. func (that *Offloader) RefreshQueues() error {
  73. pipe := that.RedisClient.Pipeline()
  74. prioritiesCmdRes := pipe.ZRange(that.Context, fmt.Sprintf("%s:priorities", that.Name), 0, -1)
  75. filtersCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:filters", that.Name))
  76. _, err := pipe.Exec(that.Context)
  77. if err != nil {
  78. return err
  79. }
  80. priorities, err := prioritiesCmdRes.Result()
  81. if err != nil {
  82. return err
  83. }
  84. filters, err := filtersCmdRes.Result()
  85. if err != nil {
  86. return err
  87. }
  88. setQueueMap := map[string]string{
  89. "todo": "todo",
  90. "todo:secondary": "todo:secondary",
  91. "todo:redo": "todo:redo",
  92. "todo:backfeed": "todo:backfeed",
  93. "done": "done",
  94. "unretrievable": "unretrievable",
  95. }
  96. for _, filter := range filters {
  97. setQueueMap[fmt.Sprintf("filtered:%s", filter)] = "filtered"
  98. }
  99. for _, priority := range priorities {
  100. setQueueMap[fmt.Sprintf("todo:prio:%s", priority)] = fmt.Sprintf("todo:prio:%s", priority)
  101. }
  102. needQueueMap := map[string]bool{}
  103. for setName, queueName := range setQueueMap {
  104. needQueueMap[queueName] = true
  105. if _, has := that.Queues[queueName]; !has {
  106. log.Printf("opening queue %s for %s:%s", queueName, that.Name, setName)
  107. queue := dq.New(fmt.Sprintf("%s:%s", that.Name, that.CleanName(queueName)), that.Core.DataDir, 128*1024*1024, 0, 128*1024*1024, 1_000_000, 5*time.Second, l)
  108. if queue == nil {
  109. return fmt.Errorf("unable to open disk queue %s:%s (dq.New()==nil)", that.Name, that.CleanName(queueName))
  110. }
  111. that.Queues[queueName] = queue
  112. }
  113. that.Sets[setName] = queueName
  114. }
  115. for k, v := range that.Queues {
  116. if _, has := needQueueMap[k]; !has {
  117. v.Close()
  118. delete(that.Queues, k)
  119. }
  120. }
  121. return nil
  122. }
  123. func (that *Offloader) CloseQueues() {
  124. for k, q := range that.Queues {
  125. log.Printf("closing queue %s for %s", k, that.Name)
  126. q.Close()
  127. }
  128. }
  129. func (that *Offloader) UpdateStats() {
  130. hset := map[string]interface{}{}
  131. for k, q := range that.Sets {
  132. if k != q {
  133. continue
  134. }
  135. queue := that.Queues[q]
  136. if queue != nil {
  137. hset[k] = fmt.Sprintf("%d", queue.Depth())
  138. }
  139. }
  140. _, err := that.RedisClient.HSet(that.Context, fmt.Sprintf("%s:offloaded", that.Name), hset).Result()
  141. if err != nil {
  142. log.Printf("unable to hmset %s:offloaded: %s", that.Name, err)
  143. }
  144. }
  145. func (that *Offloader) Do() {
  146. defer close(that.Done)
  147. defer that.Cancel()
  148. if that.ProjectConfig.RedisConfig != nil {
  149. defer that.RedisClient.Close()
  150. }
  151. that.Sets = map[string]string{}
  152. that.Queues = map[string]dq.Interface{}
  153. defer that.CloseQueues()
  154. ticker := time.NewTicker(1 * time.Second)
  155. defer ticker.Stop()
  156. refreshTicker := time.NewTicker(5 * time.Minute)
  157. defer refreshTicker.Stop()
  158. if err := that.RefreshQueues(); err != nil {
  159. log.Printf("unable to refresh queues for %s: %s", that.Name, err)
  160. return
  161. }
  162. that.UpdateStats()
  163. skipSleepChan := make(chan bool, 1)
  164. defer close(skipSleepChan)
  165. watermarkHigh := that.ProjectConfig.OffloadConfig.WatermarkHigh
  166. if watermarkHigh == 0 {
  167. watermarkHigh = DefaultWatermarkHigh
  168. }
  169. watermarkLow := that.ProjectConfig.OffloadConfig.WatermarkLow
  170. if watermarkLow == 0 {
  171. watermarkLow = DefaultWatermarkLow
  172. }
  173. batchSize := that.ProjectConfig.OffloadConfig.BatchSize
  174. if batchSize == 0 {
  175. batchSize = DefaultBatchSize
  176. }
  177. for {
  178. //for k, q := range that.Queues {
  179. // key := fmt.Sprintf("%s:%s", that.Name, k)
  180. // scard, err := that.RedisClient.SCard(that.Context, key).Result()
  181. // if err != nil {
  182. // log.Printf("unable to scard %s: %s", key, err)
  183. // continue
  184. // }
  185. // for scard > watermarkHigh || scard < watermarkLow {
  186. // select {
  187. // case <-that.Context.Done():
  188. // return
  189. // case <-refreshTicker.C:
  190. // that.RefreshQueues()
  191. // that.UpdateStats()
  192. // default:
  193. // }
  194. // if scard > watermarkHigh {
  195. // spopLimit := scard - watermarkHigh
  196. // if spopLimit > batchSize {
  197. // spopLimit = batchSize
  198. // }
  199. // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  200. // entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result()
  201. // cancel()
  202. // if err != nil {
  203. // log.Printf("unable to spop %s: %s", key, err)
  204. // }
  205. // scard = scard - int64(len(entries))
  206. // for _, entry := range entries {
  207. // err := q.Put([]byte(entry))
  208. // if err != nil {
  209. // log.Printf("unable to q.Put %s: %s", key, err)
  210. // return
  211. // }
  212. // }
  213. // } else if scard < watermarkLow {
  214. // spopLimit := watermarkLow - scard
  215. // if spopLimit > batchSize {
  216. // spopLimit = batchSize
  217. // }
  218. // var entries []interface{}
  219. // for q.Depth() > 0 && int64(len(entries)) < spopLimit {
  220. // entry := <-q.ReadChan()
  221. // entries = append(entries, string(entry))
  222. // }
  223. // if len(entries) == 0 {
  224. // break
  225. // }
  226. // ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
  227. // _, err := that.RedisClient.SAdd(ctx, key, entries...).Result()
  228. // cancel()
  229. // if err != nil {
  230. // log.Printf("unable to sadd %s %#v: %s", key, entries, err)
  231. // for _, entry := range entries {
  232. // err := q.Put([]byte(entry.(string)))
  233. // if err != nil {
  234. // log.Printf("unable to q.Put %s: %s", key, err)
  235. // }
  236. // }
  237. // return
  238. // }
  239. // scard = scard + int64(len(entries))
  240. // }
  241. // }
  242. //}
  243. scards := map[string]*redis.IntCmd{}
  244. pipe := that.RedisClient.Pipeline()
  245. for k := range that.Sets {
  246. key := fmt.Sprintf("%s:%s", that.Name, k)
  247. scards[k] = pipe.SCard(that.Context, key)
  248. }
  249. _, err := pipe.Exec(that.Context)
  250. if err != nil {
  251. log.Printf("unable to scard %s: %s", that.Name, err)
  252. } else {
  253. rerun := false
  254. for k, q := range that.Sets {
  255. key := fmt.Sprintf("%s:%s", that.Name, k)
  256. scard, err := scards[k].Result()
  257. if err != nil {
  258. log.Printf("unable to scard %s: %s", key, err)
  259. continue
  260. }
  261. if !that.Core.MigrationMode && (scard > watermarkHigh || (k != q && scard > 0)) {
  262. spopLimit := scard - watermarkHigh
  263. if k != q || spopLimit > batchSize {
  264. spopLimit = batchSize
  265. }
  266. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  267. entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result()
  268. cancel()
  269. if err != nil {
  270. log.Printf("unable to spop %s: %s", key, err)
  271. }
  272. if len(entries) == 0 {
  273. continue
  274. }
  275. for _, entry := range entries {
  276. err := that.Queues[q].Put([]byte(entry))
  277. if err != nil {
  278. log.Printf("unable to q.Put %s: %s", key, err)
  279. return
  280. }
  281. }
  282. rerun = true
  283. } else if k == q && ((that.Core.MigrationMode && scard < watermarkHigh*2) || scard < watermarkLow) && that.Queues[q].Depth() > 0 {
  284. spopLimit := watermarkLow - scard
  285. if spopLimit > batchSize || that.Core.MigrationMode {
  286. spopLimit = batchSize
  287. }
  288. var entries []interface{}
  289. for that.Queues[q].Depth() > 0 && int64(len(entries)) < spopLimit {
  290. entry := <-that.Queues[q].ReadChan()
  291. entries = append(entries, string(entry))
  292. }
  293. if len(entries) == 0 {
  294. continue
  295. }
  296. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
  297. _, err := that.RedisClient.SAdd(ctx, key, entries...).Result()
  298. cancel()
  299. if err != nil {
  300. log.Printf("unable to sadd %s: %s", key, err)
  301. for _, entry := range entries {
  302. err := that.Queues[q].Put([]byte(entry.(string)))
  303. if err != nil {
  304. log.Printf("unable to q.Put %s: %s", key, err)
  305. }
  306. }
  307. return
  308. }
  309. rerun = true
  310. }
  311. }
  312. if rerun {
  313. select {
  314. case skipSleepChan <- true:
  315. default:
  316. }
  317. }
  318. that.UpdateStats()
  319. }
  320. select {
  321. case <-that.Context.Done():
  322. return
  323. case <-refreshTicker.C:
  324. if err := that.RefreshQueues(); err != nil {
  325. log.Printf("unable to refresh queues for %s: %s", that.Name, err)
  326. return
  327. }
  328. that.UpdateStats()
  329. case <-ticker.C:
  330. that.UpdateStats()
  331. case <-skipSleepChan:
  332. }
  333. }
  334. }
  335. func (that *Core) StopProjects() {
  336. var doneChans []chan bool
  337. for project, offloader := range that.Offloaders {
  338. log.Printf("stopping offloader %s", project)
  339. offloader.Cancel()
  340. doneChans = append(doneChans, offloader.Done)
  341. }
  342. for _, c := range doneChans {
  343. <-c
  344. }
  345. }
  346. func (that *Core) RefreshProjects(redisClient *redis.Client) {
  347. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  348. res, err := redisClient.HGetAll(ctx, "trackers").Result()
  349. cancel()
  350. if err != nil {
  351. log.Printf("unable to refresh trackers table: %s", err)
  352. return
  353. }
  354. updatedProjects := map[string]ProjectConfig{}
  355. for project, configString := range res {
  356. //if project != "ua" && project != "ukr-net" && project != "ua-urls" {
  357. // continue
  358. //}
  359. config := ProjectConfig{}
  360. err := json.Unmarshal([]byte(configString), &config)
  361. if err != nil {
  362. log.Printf("unable to decode project %s config: %s", project, err)
  363. continue
  364. }
  365. updatedProjects[project] = config
  366. }
  367. for project, offloader := range that.Offloaders {
  368. _, stopRequired := updatedProjects[project]
  369. stopRequired = !stopRequired
  370. if !stopRequired {
  371. stopRequired = offloader.OffloadConfigDiffers(updatedProjects[project].OffloadConfig)
  372. if !stopRequired {
  373. stopRequired = offloader.RedisConfigDiffers(updatedProjects[project].RedisConfig)
  374. if !stopRequired {
  375. select {
  376. case <-offloader.Context.Done():
  377. stopRequired = true
  378. case <-offloader.Done:
  379. stopRequired = true
  380. default:
  381. }
  382. }
  383. }
  384. }
  385. if stopRequired {
  386. log.Printf("stopping offloader %s", project)
  387. offloader.Cancel()
  388. <-offloader.Done
  389. delete(that.Offloaders, project)
  390. }
  391. }
  392. for project, config := range updatedProjects {
  393. if _, has := that.Offloaders[project]; !has {
  394. log.Printf("starting offloader %s", project)
  395. offloader := &Offloader{}
  396. offloader.Name = project
  397. offloader.ProjectConfig = config
  398. offloader.Core = that
  399. if config.RedisConfig != nil {
  400. offloader.RedisClient = redis.NewClient(&redis.Options{
  401. Addr: fmt.Sprintf("%s:%d", config.RedisConfig.Host, config.RedisConfig.Port),
  402. Username: "default",
  403. Password: config.RedisConfig.Pass,
  404. ReadTimeout: 15 * time.Minute,
  405. })
  406. } else {
  407. offloader.RedisClient = redisClient
  408. }
  409. offloader.Context, offloader.Cancel = context.WithCancel(context.Background())
  410. offloader.Done = make(chan bool)
  411. that.Offloaders[project] = offloader
  412. go offloader.Do()
  413. }
  414. }
  415. }
  416. func main() {
  417. rand.Seed(time.Now().UnixNano())
  418. log.SetFlags(log.Flags() | log.Lshortfile)
  419. go func() {
  420. var err error
  421. for i := 0; i < 10; i++ {
  422. port := rand.Intn(65535-1024) + 1024
  423. err = http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", port), nil)
  424. if err != nil && err != http.ErrServerClosed {
  425. log.Printf("unable to listen on port %d: %s", port, err)
  426. continue
  427. }
  428. break
  429. }
  430. }()
  431. dataDir := os.Getenv("DATA_DIR")
  432. if dataDir == "" {
  433. log.Panicf("no DATA_DIR specified")
  434. }
  435. // check if dataDir exists and is a directory
  436. if stat, err := os.Stat(dataDir); os.IsNotExist(err) {
  437. log.Panicf("DATA_DIR %s does not exist", dataDir)
  438. } else if !stat.IsDir() {
  439. log.Panicf("DATA_DIR %s is not a directory", dataDir)
  440. }
  441. mainOptions, err := redis.ParseURL(os.Getenv("REDIS_URL"))
  442. if err != nil {
  443. log.Panicf("%s", err)
  444. }
  445. mainOptions.ReadTimeout = 15 * time.Minute
  446. mainClient := redis.NewClient(mainOptions)
  447. sc := make(chan os.Signal, 1)
  448. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  449. ticker := time.NewTicker(1 * time.Minute)
  450. core := &Core{
  451. DataDir: dataDir,
  452. MainClient: mainClient,
  453. Offloaders: map[string]*Offloader{},
  454. MigrationMode: os.Getenv("MIGRATION_MODE") != "",
  455. }
  456. defer core.StopProjects()
  457. for {
  458. core.RefreshProjects(mainClient)
  459. select {
  460. case <-sc:
  461. core.StopProjects()
  462. return
  463. case <-ticker.C:
  464. }
  465. }
  466. }