You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

505 lines
14 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "math/rand"
  8. "net/http"
  9. _ "net/http/pprof"
  10. "os"
  11. "os/signal"
  12. "strings"
  13. "syscall"
  14. "time"
  15. "github.com/go-redis/redis/v8"
  16. dq "github.com/nsqio/go-diskqueue"
  17. )
  18. const (
  19. DefaultWatermarkHigh int64 = 100000
  20. DefaultWatermarkLow int64 = 100000
  21. DefaultBatchSize int64 = 10000
  22. )
  23. func l(_ dq.LogLevel, f string, args ...interface{}) {
  24. log.Printf(f, args...)
  25. }
  26. type Core struct {
  27. DataDir string
  28. MainClient *redis.Client
  29. Offloaders map[string]*Offloader
  30. MigrationMode bool
  31. }
  32. type ProjectRedisConfig struct {
  33. Host string `json:"host"`
  34. Pass string `json:"pass"`
  35. Port int `json:"port"`
  36. }
  37. type ProjectOffloadConfig struct {
  38. WatermarkHigh int64 `json:"high"`
  39. WatermarkLow int64 `json:"low"`
  40. BatchSize int64 `json:"batchsize"`
  41. }
  42. type ProjectConfig struct {
  43. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  44. OffloadConfig ProjectOffloadConfig `json:"offload"`
  45. }
  46. type Offloader struct {
  47. RedisClient *redis.Client
  48. ProjectConfig ProjectConfig
  49. Context context.Context
  50. Cancel context.CancelFunc
  51. Done chan bool
  52. Queues map[string]dq.Interface
  53. Sets map[string]string
  54. Name string
  55. Core *Core
  56. }
  57. func (that *Offloader) CleanName(s string) string {
  58. return strings.ReplaceAll(strings.ReplaceAll(s, "/", "_"), "\x00", "_")
  59. }
  60. func (that *Offloader) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  61. if that.ProjectConfig.RedisConfig == nil && new == nil {
  62. return false
  63. }
  64. if that.ProjectConfig.RedisConfig == nil || new == nil || that.ProjectConfig.RedisConfig.Host != new.Host || that.ProjectConfig.RedisConfig.Port != new.Port || that.ProjectConfig.RedisConfig.Pass != new.Pass {
  65. return true
  66. }
  67. return false
  68. }
  69. func (that *Offloader) OffloadConfigDiffers(new ProjectOffloadConfig) bool {
  70. return that.ProjectConfig.OffloadConfig.WatermarkHigh != new.WatermarkHigh || that.ProjectConfig.OffloadConfig.WatermarkLow != new.WatermarkLow || that.ProjectConfig.OffloadConfig.BatchSize != new.BatchSize
  71. }
  72. func (that *Offloader) RefreshQueues() error {
  73. pipe := that.RedisClient.Pipeline()
  74. prioritiesCmdRes := pipe.ZRange(that.Context, fmt.Sprintf("%s:priorities", that.Name), 0, -1)
  75. filtersCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:filters", that.Name))
  76. _, err := pipe.Exec(that.Context)
  77. if err != nil {
  78. return err
  79. }
  80. priorities, err := prioritiesCmdRes.Result()
  81. if err != nil {
  82. return err
  83. }
  84. filters, err := filtersCmdRes.Result()
  85. if err != nil {
  86. return err
  87. }
  88. setQueueMap := map[string]string{
  89. "todo": "todo",
  90. "todo:secondary": "todo:secondary",
  91. "todo:redo": "todo:redo",
  92. "todo:backfeed": "todo:backfeed",
  93. "done": "done",
  94. "unretrievable": "unretrievable",
  95. }
  96. for _, filter := range filters {
  97. setQueueMap[fmt.Sprintf("filtered:%s", filter)] = "filtered"
  98. }
  99. for _, priority := range priorities {
  100. setQueueMap[fmt.Sprintf("todo:prio:%s", priority)] = fmt.Sprintf("todo:prio:%s", priority)
  101. }
  102. needQueueMap := map[string]bool{}
  103. for setName, queueName := range setQueueMap {
  104. needQueueMap[queueName] = true
  105. if _, has := that.Queues[queueName]; !has {
  106. log.Printf("opening queue %s for %s:%s", queueName, that.Name, setName)
  107. queue := dq.New(fmt.Sprintf("%s:%s", that.Name, that.CleanName(queueName)), that.Core.DataDir, 128*1024*1024, 0, 128*1024*1024, 1_000_000, 5*time.Second, l)
  108. if queue == nil {
  109. return fmt.Errorf("unable to open disk queue %s:%s (dq.New()==nil)", that.Name, that.CleanName(queueName))
  110. }
  111. that.Queues[queueName] = queue
  112. }
  113. that.Sets[setName] = queueName
  114. }
  115. for k, v := range that.Queues {
  116. if _, has := needQueueMap[k]; !has {
  117. v.Close()
  118. delete(that.Queues, k)
  119. }
  120. }
  121. return nil
  122. }
  123. func (that *Offloader) CloseQueues() {
  124. for k, q := range that.Queues {
  125. log.Printf("closing queue %s for %s", k, that.Name)
  126. q.Close()
  127. }
  128. }
  129. func (that *Offloader) UpdateStats() {
  130. hset := map[string]interface{}{}
  131. for k, q := range that.Sets {
  132. if k != q {
  133. continue
  134. }
  135. queue := that.Queues[q]
  136. if queue != nil {
  137. hset[k] = fmt.Sprintf("%d", queue.Depth())
  138. }
  139. }
  140. _, err := that.RedisClient.HSet(that.Context, fmt.Sprintf("%s:offloaded", that.Name), hset).Result()
  141. if err != nil {
  142. log.Printf("unable to hmset %s:offloaded: %s", that.Name, err)
  143. }
  144. }
  145. func (that *Offloader) Do() {
  146. defer close(that.Done)
  147. defer that.Cancel()
  148. if that.ProjectConfig.RedisConfig != nil {
  149. defer that.RedisClient.Close()
  150. }
  151. that.Sets = map[string]string{}
  152. that.Queues = map[string]dq.Interface{}
  153. defer that.CloseQueues()
  154. ticker := time.NewTicker(1 * time.Second)
  155. defer ticker.Stop()
  156. refreshTicker := time.NewTicker(5 * time.Minute)
  157. defer refreshTicker.Stop()
  158. if err := that.RefreshQueues(); err != nil {
  159. log.Printf("unable to refresh queues for %s: %s", that.Name, err)
  160. return
  161. }
  162. that.UpdateStats()
  163. skipSleepChan := make(chan bool, 1)
  164. defer close(skipSleepChan)
  165. watermarkHigh := that.ProjectConfig.OffloadConfig.WatermarkHigh
  166. if watermarkHigh == 0 {
  167. watermarkHigh = DefaultWatermarkHigh
  168. }
  169. watermarkLow := that.ProjectConfig.OffloadConfig.WatermarkLow
  170. if watermarkLow == 0 {
  171. watermarkLow = DefaultWatermarkLow
  172. }
  173. batchSize := that.ProjectConfig.OffloadConfig.BatchSize
  174. if batchSize == 0 {
  175. batchSize = DefaultBatchSize
  176. }
  177. for {
  178. //for k, q := range that.Queues {
  179. // key := fmt.Sprintf("%s:%s", that.Name, k)
  180. // scard, err := that.RedisClient.SCard(that.Context, key).Result()
  181. // if err != nil {
  182. // log.Printf("unable to scard %s: %s", key, err)
  183. // continue
  184. // }
  185. // for scard > watermarkHigh || scard < watermarkLow {
  186. // select {
  187. // case <-that.Context.Done():
  188. // return
  189. // case <-refreshTicker.C:
  190. // that.RefreshQueues()
  191. // that.UpdateStats()
  192. // default:
  193. // }
  194. // if scard > watermarkHigh {
  195. // spopLimit := scard - watermarkHigh
  196. // if spopLimit > batchSize {
  197. // spopLimit = batchSize
  198. // }
  199. // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  200. // entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result()
  201. // cancel()
  202. // if err != nil {
  203. // log.Printf("unable to spop %s: %s", key, err)
  204. // }
  205. // scard = scard - int64(len(entries))
  206. // for _, entry := range entries {
  207. // err := q.Put([]byte(entry))
  208. // if err != nil {
  209. // log.Printf("unable to q.Put %s: %s", key, err)
  210. // return
  211. // }
  212. // }
  213. // } else if scard < watermarkLow {
  214. // spopLimit := watermarkLow - scard
  215. // if spopLimit > batchSize {
  216. // spopLimit = batchSize
  217. // }
  218. // var entries []interface{}
  219. // for q.Depth() > 0 && int64(len(entries)) < spopLimit {
  220. // entry := <-q.ReadChan()
  221. // entries = append(entries, string(entry))
  222. // }
  223. // if len(entries) == 0 {
  224. // break
  225. // }
  226. // ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
  227. // _, err := that.RedisClient.SAdd(ctx, key, entries...).Result()
  228. // cancel()
  229. // if err != nil {
  230. // log.Printf("unable to sadd %s %#v: %s", key, entries, err)
  231. // for _, entry := range entries {
  232. // err := q.Put([]byte(entry.(string)))
  233. // if err != nil {
  234. // log.Printf("unable to q.Put %s: %s", key, err)
  235. // }
  236. // }
  237. // return
  238. // }
  239. // scard = scard + int64(len(entries))
  240. // }
  241. // }
  242. //}
  243. scards := map[string]*redis.IntCmd{}
  244. pipe := that.RedisClient.Pipeline()
  245. for k := range that.Sets {
  246. key := fmt.Sprintf("%s:%s", that.Name, k)
  247. scards[k] = pipe.SCard(that.Context, key)
  248. }
  249. _, err := pipe.Exec(that.Context)
  250. if err != nil {
  251. log.Printf("unable to scard %s: %s", that.Name, err)
  252. } else {
  253. rerun := false
  254. for k, q := range that.Sets {
  255. if that.Queues[q] == nil {
  256. continue
  257. }
  258. key := fmt.Sprintf("%s:%s", that.Name, k)
  259. scard, err := scards[k].Result()
  260. if err != nil {
  261. log.Printf("unable to scard %s: %s", key, err)
  262. continue
  263. }
  264. if !that.Core.MigrationMode && (scard > watermarkHigh || (k != q && scard > 0)) {
  265. spopLimit := scard - watermarkHigh
  266. if k != q || spopLimit > batchSize {
  267. spopLimit = batchSize
  268. }
  269. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  270. entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result()
  271. cancel()
  272. if err != nil {
  273. log.Printf("unable to spop %s: %s", key, err)
  274. }
  275. if len(entries) == 0 {
  276. continue
  277. }
  278. for _, entry := range entries {
  279. err := that.Queues[q].Put([]byte(entry))
  280. if err != nil {
  281. log.Printf("unable to q.Put %s: %s", key, err)
  282. return
  283. }
  284. }
  285. rerun = true
  286. } else if k == q && ((that.Core.MigrationMode && scard < watermarkHigh*2) || scard < watermarkLow) && that.Queues[q].Depth() > 0 {
  287. spopLimit := watermarkLow - scard
  288. if spopLimit > batchSize || that.Core.MigrationMode {
  289. spopLimit = batchSize
  290. }
  291. var entries []interface{}
  292. for that.Queues[q].Depth() > 0 && int64(len(entries)) < spopLimit {
  293. entry := <-that.Queues[q].ReadChan()
  294. entries = append(entries, string(entry))
  295. }
  296. if len(entries) == 0 {
  297. continue
  298. }
  299. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
  300. _, err := that.RedisClient.SAdd(ctx, key, entries...).Result()
  301. cancel()
  302. if err != nil {
  303. log.Printf("unable to sadd %s: %s", key, err)
  304. for _, entry := range entries {
  305. err := that.Queues[q].Put([]byte(entry.(string)))
  306. if err != nil {
  307. log.Printf("unable to q.Put %s: %s", key, err)
  308. }
  309. }
  310. return
  311. }
  312. rerun = true
  313. }
  314. }
  315. if rerun {
  316. select {
  317. case skipSleepChan <- true:
  318. default:
  319. }
  320. }
  321. that.UpdateStats()
  322. }
  323. select {
  324. case <-that.Context.Done():
  325. return
  326. case <-refreshTicker.C:
  327. if err := that.RefreshQueues(); err != nil {
  328. log.Printf("unable to refresh queues for %s: %s", that.Name, err)
  329. return
  330. }
  331. that.UpdateStats()
  332. case <-ticker.C:
  333. that.UpdateStats()
  334. case <-skipSleepChan:
  335. }
  336. }
  337. }
  338. func (that *Core) StopProjects() {
  339. var doneChans []chan bool
  340. for project, offloader := range that.Offloaders {
  341. log.Printf("stopping offloader %s", project)
  342. offloader.Cancel()
  343. doneChans = append(doneChans, offloader.Done)
  344. }
  345. for _, c := range doneChans {
  346. <-c
  347. }
  348. }
  349. func (that *Core) RefreshProjects(redisClient *redis.Client) {
  350. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  351. res, err := redisClient.HGetAll(ctx, "trackers").Result()
  352. cancel()
  353. if err != nil {
  354. log.Printf("unable to refresh trackers table: %s", err)
  355. return
  356. }
  357. updatedProjects := map[string]ProjectConfig{}
  358. for project, configString := range res {
  359. //if project != "ua" && project != "ukr-net" && project != "ua-urls" {
  360. // continue
  361. //}
  362. config := ProjectConfig{}
  363. err := json.Unmarshal([]byte(configString), &config)
  364. if err != nil {
  365. log.Printf("unable to decode project %s config: %s", project, err)
  366. continue
  367. }
  368. updatedProjects[project] = config
  369. }
  370. for project, offloader := range that.Offloaders {
  371. _, stopRequired := updatedProjects[project]
  372. stopRequired = !stopRequired
  373. if !stopRequired {
  374. stopRequired = offloader.OffloadConfigDiffers(updatedProjects[project].OffloadConfig)
  375. if !stopRequired {
  376. stopRequired = offloader.RedisConfigDiffers(updatedProjects[project].RedisConfig)
  377. if !stopRequired {
  378. select {
  379. case <-offloader.Context.Done():
  380. stopRequired = true
  381. case <-offloader.Done:
  382. stopRequired = true
  383. default:
  384. }
  385. }
  386. }
  387. }
  388. if stopRequired {
  389. log.Printf("stopping offloader %s", project)
  390. offloader.Cancel()
  391. <-offloader.Done
  392. delete(that.Offloaders, project)
  393. }
  394. }
  395. for project, config := range updatedProjects {
  396. if _, has := that.Offloaders[project]; !has {
  397. log.Printf("starting offloader %s", project)
  398. offloader := &Offloader{}
  399. offloader.Name = project
  400. offloader.ProjectConfig = config
  401. offloader.Core = that
  402. if config.RedisConfig != nil {
  403. offloader.RedisClient = redis.NewClient(&redis.Options{
  404. Addr: fmt.Sprintf("%s:%d", config.RedisConfig.Host, config.RedisConfig.Port),
  405. Username: "default",
  406. Password: config.RedisConfig.Pass,
  407. ReadTimeout: 15 * time.Minute,
  408. })
  409. } else {
  410. offloader.RedisClient = redisClient
  411. }
  412. offloader.Context, offloader.Cancel = context.WithCancel(context.Background())
  413. offloader.Done = make(chan bool)
  414. that.Offloaders[project] = offloader
  415. go offloader.Do()
  416. }
  417. }
  418. }
  419. func main() {
  420. rand.Seed(time.Now().UnixNano())
  421. log.SetFlags(log.Flags() | log.Lshortfile)
  422. go func() {
  423. var err error
  424. for i := 0; i < 10; i++ {
  425. port := rand.Intn(65535-1024) + 1024
  426. err = http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", port), nil)
  427. if err != nil && err != http.ErrServerClosed {
  428. log.Printf("unable to listen on port %d: %s", port, err)
  429. continue
  430. }
  431. break
  432. }
  433. }()
  434. dataDir := os.Getenv("DATA_DIR")
  435. if dataDir == "" {
  436. log.Panicf("no DATA_DIR specified")
  437. }
  438. // check if dataDir exists and is a directory
  439. if stat, err := os.Stat(dataDir); os.IsNotExist(err) {
  440. log.Panicf("DATA_DIR %s does not exist", dataDir)
  441. } else if !stat.IsDir() {
  442. log.Panicf("DATA_DIR %s is not a directory", dataDir)
  443. }
  444. mainOptions, err := redis.ParseURL(os.Getenv("REDIS_URL"))
  445. if err != nil {
  446. log.Panicf("%s", err)
  447. }
  448. mainOptions.ReadTimeout = 15 * time.Minute
  449. mainClient := redis.NewClient(mainOptions)
  450. sc := make(chan os.Signal, 1)
  451. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  452. ticker := time.NewTicker(1 * time.Minute)
  453. core := &Core{
  454. DataDir: dataDir,
  455. MainClient: mainClient,
  456. Offloaders: map[string]*Offloader{},
  457. MigrationMode: os.Getenv("MIGRATION_MODE") != "",
  458. }
  459. defer core.StopProjects()
  460. for {
  461. core.RefreshProjects(mainClient)
  462. select {
  463. case <-sc:
  464. core.StopProjects()
  465. return
  466. case <-ticker.C:
  467. }
  468. }
  469. }