You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

470 lines
13 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. _ "net/http/pprof"
  9. "os"
  10. "os/signal"
  11. "strings"
  12. "syscall"
  13. "time"
  14. "github.com/go-redis/redis/v8"
  15. dq "github.com/nsqio/go-diskqueue"
  16. )
  17. const (
  18. DefaultWatermarkHigh int64 = 100000
  19. DefaultWatermarkLow int64 = 100000
  20. DefaultBatchSize int64 = 10000
  21. )
  22. func l(_ dq.LogLevel, f string, args ...interface{}) {
  23. log.Printf(f, args...)
  24. }
  25. type ProjectRedisConfig struct {
  26. Host string `json:"host"`
  27. Pass string `json:"pass"`
  28. Port int `json:"port"`
  29. }
  30. type ProjectOffloadConfig struct {
  31. WatermarkHigh int64 `json:"high"`
  32. WatermarkLow int64 `json:"low"`
  33. BatchSize int64 `json:"batchsize"`
  34. }
  35. type ProjectConfig struct {
  36. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  37. OffloadConfig ProjectOffloadConfig `json:"offload"`
  38. }
  39. type Offloader struct {
  40. RedisClient *redis.Client
  41. ProjectConfig ProjectConfig
  42. Context context.Context
  43. Cancel context.CancelFunc
  44. Done chan bool
  45. Queues map[string]dq.Interface
  46. Sets map[string]string
  47. Name string
  48. }
  49. func (that *Offloader) CleanName(s string) string {
  50. return strings.ReplaceAll(strings.ReplaceAll(s, "/", "_"), "\x00", "_")
  51. }
  52. func (that *Offloader) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  53. if that.ProjectConfig.RedisConfig == nil && new == nil {
  54. return false
  55. }
  56. if that.ProjectConfig.RedisConfig == nil || new == nil || that.ProjectConfig.RedisConfig.Host != new.Host || that.ProjectConfig.RedisConfig.Port != new.Port || that.ProjectConfig.RedisConfig.Pass != new.Pass {
  57. return true
  58. }
  59. return false
  60. }
  61. func (that *Offloader) OffloadConfigDiffers(new ProjectOffloadConfig) bool {
  62. return that.ProjectConfig.OffloadConfig.WatermarkHigh != new.WatermarkHigh || that.ProjectConfig.OffloadConfig.WatermarkLow != new.WatermarkLow || that.ProjectConfig.OffloadConfig.BatchSize != new.BatchSize
  63. }
  64. func (that *Offloader) RefreshQueues() error {
  65. pipe := that.RedisClient.Pipeline()
  66. prioritiesCmdRes := pipe.ZRange(that.Context, fmt.Sprintf("%s:priorities", that.Name), 0, -1)
  67. filtersCmdRes := pipe.SMembers(that.Context, fmt.Sprintf("%s:filters", that.Name))
  68. _, err := pipe.Exec(that.Context)
  69. if err != nil {
  70. return err
  71. }
  72. priorities, err := prioritiesCmdRes.Result()
  73. if err != nil {
  74. return err
  75. }
  76. filters, err := filtersCmdRes.Result()
  77. if err != nil {
  78. return err
  79. }
  80. setQueueMap := map[string]string{
  81. "todo": "todo",
  82. "todo:secondary": "todo:secondary",
  83. "todo:redo": "todo:redo",
  84. "todo:backfeed": "todo:backfeed",
  85. "done": "done",
  86. "unretrievable": "unretrievable",
  87. }
  88. for _, filter := range filters {
  89. setQueueMap[fmt.Sprintf("filtered:%s", filter)] = "filtered"
  90. }
  91. for _, priority := range priorities {
  92. setQueueMap[fmt.Sprintf("todo:prio:%s", priority)] = fmt.Sprintf("todo:prio:%s", priority)
  93. }
  94. needQueueMap := map[string]bool{}
  95. for setName, queueName := range setQueueMap {
  96. needQueueMap[queueName] = true
  97. if _, has := that.Queues[queueName]; !has {
  98. log.Printf("opening queue %s for %s:%s", queueName, that.Name, setName)
  99. queue := dq.New(fmt.Sprintf("%s:%s", that.Name, that.CleanName(queueName)), dataDir, 128*1024*1024, 0, 128*1024*1024, 1_000_000, 5*time.Second, l)
  100. if queue == nil {
  101. return fmt.Errorf("unable to open disk queue %s:%s (dq.New()==nil)", that.Name, that.CleanName(queueName))
  102. }
  103. that.Queues[queueName] = queue
  104. }
  105. that.Sets[setName] = queueName
  106. }
  107. for k, v := range that.Queues {
  108. if _, has := needQueueMap[k]; !has {
  109. v.Close()
  110. delete(that.Queues, k)
  111. }
  112. }
  113. return nil
  114. }
  115. func (that *Offloader) CloseQueues() {
  116. for k, q := range that.Queues {
  117. log.Printf("closing queue %s for %s", k, that.Name)
  118. q.Close()
  119. }
  120. }
  121. func (that *Offloader) UpdateStats() {
  122. hset := map[string]interface{}{}
  123. for k, q := range that.Sets {
  124. if k != q {
  125. continue
  126. }
  127. hset[k] = fmt.Sprintf("%d", that.Queues[q].Depth())
  128. }
  129. _, err := that.RedisClient.HSet(that.Context, fmt.Sprintf("%s:offloaded", that.Name), hset).Result()
  130. if err != nil {
  131. log.Printf("unable to hmset %s:offloaded: %s", that.Name, err)
  132. }
  133. }
  134. func (that *Offloader) Do() {
  135. defer close(that.Done)
  136. defer that.Cancel()
  137. if that.ProjectConfig.RedisConfig != nil {
  138. defer that.RedisClient.Close()
  139. }
  140. that.Sets = map[string]string{}
  141. that.Queues = map[string]dq.Interface{}
  142. defer that.CloseQueues()
  143. ticker := time.NewTicker(1 * time.Second)
  144. defer ticker.Stop()
  145. refreshTicker := time.NewTicker(5 * time.Minute)
  146. defer refreshTicker.Stop()
  147. if err := that.RefreshQueues(); err != nil {
  148. log.Printf("unable to refresh queues for %s: %s", that.Name, err)
  149. return
  150. }
  151. that.UpdateStats()
  152. skipSleepChan := make(chan bool, 1)
  153. defer close(skipSleepChan)
  154. watermarkHigh := that.ProjectConfig.OffloadConfig.WatermarkHigh
  155. if watermarkHigh == 0 {
  156. watermarkHigh = DefaultWatermarkHigh
  157. }
  158. watermarkLow := that.ProjectConfig.OffloadConfig.WatermarkLow
  159. if watermarkLow == 0 {
  160. watermarkLow = DefaultWatermarkLow
  161. }
  162. batchSize := that.ProjectConfig.OffloadConfig.BatchSize
  163. if batchSize == 0 {
  164. batchSize = DefaultBatchSize
  165. }
  166. for {
  167. //for k, q := range that.Queues {
  168. // key := fmt.Sprintf("%s:%s", that.Name, k)
  169. // scard, err := that.RedisClient.SCard(that.Context, key).Result()
  170. // if err != nil {
  171. // log.Printf("unable to scard %s: %s", key, err)
  172. // continue
  173. // }
  174. // for scard > watermarkHigh || scard < watermarkLow {
  175. // select {
  176. // case <-that.Context.Done():
  177. // return
  178. // case <-refreshTicker.C:
  179. // that.RefreshQueues()
  180. // that.UpdateStats()
  181. // default:
  182. // }
  183. // if scard > watermarkHigh {
  184. // spopLimit := scard - watermarkHigh
  185. // if spopLimit > batchSize {
  186. // spopLimit = batchSize
  187. // }
  188. // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  189. // entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result()
  190. // cancel()
  191. // if err != nil {
  192. // log.Printf("unable to spop %s: %s", key, err)
  193. // }
  194. // scard = scard - int64(len(entries))
  195. // for _, entry := range entries {
  196. // err := q.Put([]byte(entry))
  197. // if err != nil {
  198. // log.Printf("unable to q.Put %s: %s", key, err)
  199. // return
  200. // }
  201. // }
  202. // } else if scard < watermarkLow {
  203. // spopLimit := watermarkLow - scard
  204. // if spopLimit > batchSize {
  205. // spopLimit = batchSize
  206. // }
  207. // var entries []interface{}
  208. // for q.Depth() > 0 && int64(len(entries)) < spopLimit {
  209. // entry := <-q.ReadChan()
  210. // entries = append(entries, string(entry))
  211. // }
  212. // if len(entries) == 0 {
  213. // break
  214. // }
  215. // ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
  216. // _, err := that.RedisClient.SAdd(ctx, key, entries...).Result()
  217. // cancel()
  218. // if err != nil {
  219. // log.Printf("unable to sadd %s %#v: %s", key, entries, err)
  220. // for _, entry := range entries {
  221. // err := q.Put([]byte(entry.(string)))
  222. // if err != nil {
  223. // log.Printf("unable to q.Put %s: %s", key, err)
  224. // }
  225. // }
  226. // return
  227. // }
  228. // scard = scard + int64(len(entries))
  229. // }
  230. // }
  231. //}
  232. scards := map[string]*redis.IntCmd{}
  233. pipe := that.RedisClient.Pipeline()
  234. for k := range that.Sets {
  235. key := fmt.Sprintf("%s:%s", that.Name, k)
  236. scards[k] = pipe.SCard(that.Context, key)
  237. }
  238. _, err := pipe.Exec(that.Context)
  239. if err != nil {
  240. log.Printf("unable to scard %s: %s", that.Name, err)
  241. } else {
  242. rerun := false
  243. for k, q := range that.Sets {
  244. key := fmt.Sprintf("%s:%s", that.Name, k)
  245. scard, err := scards[k].Result()
  246. if err != nil {
  247. log.Printf("unable to scard %s: %s", key, err)
  248. continue
  249. }
  250. if scard > watermarkHigh || (k != q && scard > 0) {
  251. spopLimit := scard - watermarkHigh
  252. if k != q || spopLimit > batchSize {
  253. spopLimit = batchSize
  254. }
  255. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  256. entries, err := that.RedisClient.SPopN(ctx, key, spopLimit).Result()
  257. cancel()
  258. if err != nil {
  259. log.Printf("unable to spop %s: %s", key, err)
  260. }
  261. if len(entries) == 0 {
  262. continue
  263. }
  264. for _, entry := range entries {
  265. err := that.Queues[q].Put([]byte(entry))
  266. if err != nil {
  267. log.Printf("unable to q.Put %s: %s", key, err)
  268. return
  269. }
  270. }
  271. rerun = true
  272. } else if k == q && scard < watermarkLow && that.Queues[q].Depth() > 0 {
  273. spopLimit := watermarkLow - scard
  274. if spopLimit > batchSize {
  275. spopLimit = batchSize
  276. }
  277. var entries []interface{}
  278. for that.Queues[q].Depth() > 0 && int64(len(entries)) < spopLimit {
  279. entry := <-that.Queues[q].ReadChan()
  280. entries = append(entries, string(entry))
  281. }
  282. if len(entries) == 0 {
  283. continue
  284. }
  285. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
  286. _, err := that.RedisClient.SAdd(ctx, key, entries...).Result()
  287. cancel()
  288. if err != nil {
  289. log.Printf("unable to sadd %s: %s", key, err)
  290. for _, entry := range entries {
  291. err := that.Queues[q].Put([]byte(entry.(string)))
  292. if err != nil {
  293. log.Printf("unable to q.Put %s: %s", key, err)
  294. }
  295. }
  296. return
  297. }
  298. rerun = true
  299. }
  300. }
  301. if rerun {
  302. select {
  303. case skipSleepChan <- true:
  304. default:
  305. }
  306. }
  307. that.UpdateStats()
  308. }
  309. select {
  310. case <-that.Context.Done():
  311. return
  312. case <-refreshTicker.C:
  313. if err := that.RefreshQueues(); err != nil {
  314. log.Printf("unable to refresh queues for %s: %s", that.Name, err)
  315. return
  316. }
  317. that.UpdateStats()
  318. case <-ticker.C:
  319. that.UpdateStats()
  320. case <-skipSleepChan:
  321. }
  322. }
  323. }
  324. var offloaders = map[string]*Offloader{}
  325. func StopProjects() {
  326. var doneChans []chan bool
  327. for project, offloader := range offloaders {
  328. log.Printf("stopping offloader %s", project)
  329. offloader.Cancel()
  330. doneChans = append(doneChans, offloader.Done)
  331. }
  332. for _, c := range doneChans {
  333. <-c
  334. }
  335. }
  336. func RefreshProjects(redisClient *redis.Client) {
  337. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
  338. res, err := redisClient.HGetAll(ctx, "trackers").Result()
  339. cancel()
  340. if err != nil {
  341. log.Printf("unable to refresh trackers table: %s", err)
  342. return
  343. }
  344. updatedProjects := map[string]ProjectConfig{}
  345. for project, configString := range res {
  346. //if project != "ua" && project != "ukr-net" && project != "ua-urls" {
  347. // continue
  348. //}
  349. config := ProjectConfig{}
  350. err := json.Unmarshal([]byte(configString), &config)
  351. if err != nil {
  352. log.Printf("unable to decode project %s config: %s", project, err)
  353. continue
  354. }
  355. updatedProjects[project] = config
  356. }
  357. for project, offloader := range offloaders {
  358. _, stopRequired := updatedProjects[project]
  359. stopRequired = !stopRequired
  360. if !stopRequired {
  361. stopRequired = offloader.OffloadConfigDiffers(updatedProjects[project].OffloadConfig)
  362. if !stopRequired {
  363. stopRequired = offloader.RedisConfigDiffers(updatedProjects[project].RedisConfig)
  364. if !stopRequired {
  365. select {
  366. case <-offloader.Context.Done():
  367. stopRequired = true
  368. case <-offloader.Done:
  369. stopRequired = true
  370. default:
  371. }
  372. }
  373. }
  374. }
  375. if stopRequired {
  376. log.Printf("stopping offloader %s", project)
  377. offloader.Cancel()
  378. <-offloader.Done
  379. delete(offloaders, project)
  380. }
  381. }
  382. for project, config := range updatedProjects {
  383. if _, has := offloaders[project]; !has {
  384. log.Printf("starting offloader %s", project)
  385. offloader := &Offloader{}
  386. offloader.Name = project
  387. offloader.ProjectConfig = config
  388. if config.RedisConfig != nil {
  389. offloader.RedisClient = redis.NewClient(&redis.Options{
  390. Addr: fmt.Sprintf("%s:%d", config.RedisConfig.Host, config.RedisConfig.Port),
  391. Username: "default",
  392. Password: config.RedisConfig.Pass,
  393. ReadTimeout: 15 * time.Minute,
  394. })
  395. } else {
  396. offloader.RedisClient = redisClient
  397. }
  398. offloader.Context, offloader.Cancel = context.WithCancel(context.Background())
  399. offloader.Done = make(chan bool)
  400. offloaders[project] = offloader
  401. go offloader.Do()
  402. }
  403. }
  404. }
  405. var dataDir string
  406. func main() {
  407. log.SetFlags(log.Flags() | log.Lshortfile)
  408. go func() {
  409. if err := http.ListenAndServe("127.0.0.1:16992", nil); err != nil {
  410. log.Printf("webserver error: %s", err)
  411. }
  412. }()
  413. dataDir = os.Getenv("DATA_DIR")
  414. if dataDir == "" {
  415. log.Panicf("no DATA_DIR specified")
  416. }
  417. mainOptions, err := redis.ParseURL(os.Getenv("REDIS_URL"))
  418. if err != nil {
  419. log.Panicf("%s", err)
  420. }
  421. mainOptions.ReadTimeout = 15 * time.Minute
  422. mainClient := redis.NewClient(mainOptions)
  423. sc := make(chan os.Signal, 1)
  424. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  425. ticker := time.NewTicker(1 * time.Minute)
  426. defer StopProjects()
  427. for {
  428. RefreshProjects(mainClient)
  429. select {
  430. case <-sc:
  431. StopProjects()
  432. return
  433. case <-ticker.C:
  434. }
  435. }
  436. }