Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

499 рядки
12 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net/http"
  11. "os"
  12. "os/signal"
  13. "strings"
  14. "sync"
  15. "syscall"
  16. "time"
  17. "github.com/go-redis/redis/v8"
  18. "github.com/gorilla/mux"
  19. )
  20. const (
  21. ItemChannelBuffer = 100000
  22. ItemWrapSize = 100000
  23. )
  24. type ProjectRedisConfig struct {
  25. Host string `json:"host"`
  26. Pass string `json:"pass"`
  27. Port int `json:"port"`
  28. }
  29. type ProjectConfig struct {
  30. RedisConfig *ProjectRedisConfig `json:"redis,omitempty"`
  31. }
  32. type BackfeedItem struct {
  33. PrimaryShard byte
  34. SecondaryShard string
  35. Item []byte
  36. }
  37. type ProjectBackfeedManager struct {
  38. Context context.Context
  39. Cancel context.CancelFunc
  40. Done chan bool
  41. C chan *BackfeedItem
  42. Name string
  43. BackfeedRedis *redis.ClusterClient
  44. ProjectRedis *redis.Client
  45. Lock sync.RWMutex
  46. ProjectConfig ProjectConfig
  47. }
  48. func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) bool {
  49. if that.ProjectConfig.RedisConfig == nil && new == nil {
  50. return false
  51. }
  52. if that.ProjectConfig.RedisConfig == nil || new == nil || that.ProjectConfig.RedisConfig.Host != new.Host || that.ProjectConfig.RedisConfig.Port != new.Port || that.ProjectConfig.RedisConfig.Pass != new.Pass {
  53. return true
  54. }
  55. return false
  56. }
  57. func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) bool {
  58. that.Lock.RLock()
  59. defer that.Lock.RUnlock()
  60. if that.C == nil {
  61. return false
  62. }
  63. select {
  64. case <-ctx.Done():
  65. return false
  66. case <-that.Context.Done():
  67. return false
  68. case that.C <- item:
  69. return true
  70. }
  71. }
  72. func (that *ProjectBackfeedManager) PopItem(blocking bool) (*BackfeedItem, bool) {
  73. if blocking {
  74. select {
  75. case <-that.Context.Done():
  76. return nil, false
  77. case item := <-that.C:
  78. return item, true
  79. }
  80. } else {
  81. select {
  82. case <-that.Context.Done():
  83. return nil, false
  84. case item := <-that.C:
  85. return item, true
  86. default:
  87. return nil, false
  88. }
  89. }
  90. }
  91. func (that *ProjectBackfeedManager) CloseItemChannel() {
  92. that.Lock.Lock()
  93. defer that.Lock.Unlock()
  94. if that.C == nil {
  95. return
  96. }
  97. close(that.C)
  98. that.C = nil
  99. }
  100. func (that *ProjectBackfeedManager) Do() {
  101. defer close(that.Done)
  102. defer that.CloseItemChannel()
  103. defer that.Cancel()
  104. for {
  105. select {
  106. case <-that.Context.Done():
  107. break
  108. case <-that.Done:
  109. break
  110. default:
  111. }
  112. item, ok := that.PopItem(true)
  113. if !ok {
  114. break
  115. }
  116. keyMap := map[string][][]byte{}
  117. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  118. keyMap[key] = append(keyMap[key], item.Item)
  119. wrapped := 1
  120. for wrapped < ItemWrapSize {
  121. item, ok := that.PopItem(false)
  122. if !ok {
  123. break
  124. }
  125. key := fmt.Sprintf("%s:%02x:%s", that.Name, item.PrimaryShard, item.SecondaryShard)
  126. keyMap[key] = append(keyMap[key], item.Item)
  127. wrapped++
  128. }
  129. select {
  130. case <-that.Context.Done():
  131. break
  132. case <-that.Done:
  133. break
  134. default:
  135. }
  136. resultMap := map[string]*redis.Cmd{}
  137. pipe := that.BackfeedRedis.Pipeline()
  138. for key, items := range keyMap {
  139. args := []interface{}{
  140. "bf.madd",
  141. key,
  142. }
  143. for _, item := range items {
  144. args = append(args, item)
  145. }
  146. resultMap[key] = pipe.Do(context.Background(), args...)
  147. }
  148. _, err := pipe.Exec(context.Background())
  149. if err != nil {
  150. log.Printf("%s", err)
  151. continue
  152. }
  153. var sAddItems []interface{}
  154. for key, items := range keyMap {
  155. rawRes, err := resultMap[key].Result()
  156. if err != nil {
  157. log.Printf("%s", err)
  158. continue
  159. }
  160. rawResArray, ok := rawRes.([]interface{})
  161. if !ok || len(keyMap[key]) != len(rawResArray) {
  162. continue
  163. }
  164. for i, vi := range rawResArray {
  165. v, ok := vi.(int64)
  166. if !ok || v != 1 {
  167. continue
  168. }
  169. sAddItems = append(sAddItems, items[i])
  170. }
  171. }
  172. dupes := wrapped - len(sAddItems)
  173. if len(sAddItems) != 0 {
  174. err := that.ProjectRedis.SAdd(context.Background(), fmt.Sprintf("%s:todo:backfeed", that.Name), sAddItems...).Err()
  175. if err != nil {
  176. log.Printf("failed to sadd items for %s: %s", that.Name, err)
  177. }
  178. }
  179. if dupes > 0 {
  180. that.BackfeedRedis.HIncrBy(context.Background(), ":", that.Name, int64(dupes))
  181. }
  182. }
  183. }
  184. type GlobalBackfeedManager struct {
  185. Context context.Context
  186. Cancel context.CancelFunc
  187. ActiveFeeds map[string]*ProjectBackfeedManager
  188. ActiveSlugs map[string]string
  189. TrackerRedis *redis.Client
  190. BackfeedRedis *redis.ClusterClient
  191. Lock sync.RWMutex
  192. }
  193. func (that *GlobalBackfeedManager) RefreshFeeds() error {
  194. slugProjectMap, err := that.TrackerRedis.HGetAll(that.Context, "backfeed").Result()
  195. if err != nil {
  196. return err
  197. }
  198. var projects []string
  199. projectSlugMap := map[string][]string{}
  200. for slug, project := range slugProjectMap {
  201. projectSlugMap[project] = append(projectSlugMap[project], slug)
  202. }
  203. for project := range projectSlugMap {
  204. projects = append(projects, project)
  205. }
  206. projectConfigs := map[string]ProjectConfig{}
  207. if len(projects) != 0 {
  208. cfgi, err := that.TrackerRedis.HMGet(that.Context, "trackers", projects...).Result()
  209. if err != nil {
  210. return err
  211. }
  212. if len(projects) != len(cfgi) {
  213. return fmt.Errorf("hmget result had unexpected length")
  214. }
  215. for i, project := range projects {
  216. configString, ok := cfgi[i].(string)
  217. if !ok {
  218. continue
  219. }
  220. config := ProjectConfig{}
  221. err := json.Unmarshal([]byte(configString), &config)
  222. if err != nil {
  223. continue
  224. }
  225. projectConfigs[project] = config
  226. }
  227. }
  228. projects = nil
  229. for project := range projectSlugMap {
  230. if _, has := projectConfigs[project]; !has {
  231. delete(projectSlugMap, project)
  232. continue
  233. }
  234. projects = append(projects, project)
  235. }
  236. for slug, project := range slugProjectMap {
  237. if _, has := projectConfigs[project]; !has {
  238. delete(slugProjectMap, slug)
  239. }
  240. }
  241. // add feeds for new projects
  242. for _, project := range projects {
  243. projectConfig := projectConfigs[project]
  244. var outdatedProjectBackfeedManager *ProjectBackfeedManager
  245. if projectBackfeedManager, has := that.ActiveFeeds[project]; has {
  246. if that.ActiveFeeds[project].RedisConfigDiffers(projectConfig.RedisConfig) {
  247. outdatedProjectBackfeedManager = projectBackfeedManager
  248. } else {
  249. continue
  250. }
  251. }
  252. ctx, cancel := context.WithCancel(that.Context)
  253. projectBackfeedManager := &ProjectBackfeedManager{
  254. Context: ctx,
  255. Cancel: cancel,
  256. Done: make(chan bool),
  257. C: make(chan *BackfeedItem, ItemChannelBuffer),
  258. BackfeedRedis: that.BackfeedRedis,
  259. Name: project,
  260. ProjectConfig: projectConfig,
  261. }
  262. if projectConfig.RedisConfig != nil {
  263. projectBackfeedManager.ProjectRedis = redis.NewClient(&redis.Options{
  264. Addr: fmt.Sprintf("%s:%d", projectConfig.RedisConfig.Host, projectConfig.RedisConfig.Port),
  265. Username: "default",
  266. Password: projectConfig.RedisConfig.Pass,
  267. ReadTimeout: 15 * time.Minute,
  268. })
  269. } else {
  270. projectBackfeedManager.ProjectRedis = that.TrackerRedis
  271. }
  272. go projectBackfeedManager.Do()
  273. that.Lock.Lock()
  274. that.ActiveFeeds[project] = projectBackfeedManager
  275. that.Lock.Unlock()
  276. if outdatedProjectBackfeedManager != nil {
  277. outdatedProjectBackfeedManager.Cancel()
  278. <-outdatedProjectBackfeedManager.Done
  279. log.Printf("updated project: %s", project)
  280. } else {
  281. log.Printf("added project: %s", project)
  282. }
  283. }
  284. that.Lock.Lock()
  285. that.ActiveSlugs = slugProjectMap
  286. that.Lock.Unlock()
  287. // remove feeds for old projects
  288. for project, projectBackfeedManager := range that.ActiveFeeds {
  289. if _, has := projectSlugMap[project]; has {
  290. continue
  291. }
  292. log.Printf("removing project: %s", project)
  293. that.Lock.Lock()
  294. delete(that.ActiveFeeds, project)
  295. that.Lock.Unlock()
  296. projectBackfeedManager.Cancel()
  297. <-projectBackfeedManager.Done
  298. log.Printf("removed project: %s", project)
  299. }
  300. return nil
  301. }
  302. type Splitter struct {
  303. Delimiter []byte
  304. IgnoreEOF bool
  305. }
  306. func (that *Splitter) Split(data []byte, atEOF bool) (int, []byte, error) {
  307. for i := 0; i < len(data); i++ {
  308. if bytes.Equal(data[i:i+len(that.Delimiter)], that.Delimiter) {
  309. return i + len(that.Delimiter), data[:i], nil
  310. }
  311. }
  312. if len(data) == 0 || !atEOF {
  313. return 0, nil, nil
  314. }
  315. if atEOF && that.IgnoreEOF {
  316. return len(data), data, nil
  317. }
  318. return 0, data, io.ErrUnexpectedEOF
  319. }
  320. func GenShardHash(b []byte) (final byte) {
  321. for i, b := range b {
  322. final = (b ^ final ^ byte(i)) + final + byte(i) + final*byte(i)
  323. }
  324. return final
  325. }
  326. func WriteResponse(res http.ResponseWriter, statusCode int, v interface{}) {
  327. res.Header().Set("Content-Type", "application/json")
  328. res.WriteHeader(statusCode)
  329. if statusCode == http.StatusNoContent {
  330. return
  331. }
  332. if err, isError := v.(error); isError {
  333. v = map[string]interface{}{
  334. "error": fmt.Sprintf("%v", err),
  335. "status_code": statusCode,
  336. }
  337. } else {
  338. log.Printf("%#v", v)
  339. v = map[string]interface{}{
  340. "data": v,
  341. "status_code": statusCode,
  342. }
  343. }
  344. json.NewEncoder(res).Encode(v)
  345. }
  346. func (that *GlobalBackfeedManager) GetFeed(slug string) *ProjectBackfeedManager {
  347. that.Lock.RLock()
  348. defer that.Lock.RUnlock()
  349. project, has := that.ActiveSlugs[slug]
  350. if !has {
  351. return nil
  352. }
  353. projectBackfeedManager, has := that.ActiveFeeds[project]
  354. if !has {
  355. return nil
  356. }
  357. return projectBackfeedManager
  358. }
  359. func (that *GlobalBackfeedManager) Handle(res http.ResponseWriter, req *http.Request) {
  360. defer req.Body.Close()
  361. vars := mux.Vars(req)
  362. slug := vars["slug"]
  363. secondaryShard := req.URL.Query().Get("shard")
  364. projectBackfeedManager := that.GetFeed(slug)
  365. if projectBackfeedManager == nil {
  366. WriteResponse(res, http.StatusNotFound, fmt.Errorf("%s", "no such backfeed channel"))
  367. return
  368. }
  369. splitter := &Splitter{
  370. Delimiter: []byte(req.URL.Query().Get("delimiter")),
  371. IgnoreEOF: req.URL.Query().Get("ignoreeof") != "",
  372. }
  373. if len(splitter.Delimiter) == 0 {
  374. splitter.Delimiter = []byte{0x00}
  375. }
  376. scanner := bufio.NewScanner(req.Body)
  377. scanner.Split(splitter.Split)
  378. var err error
  379. statusCode := http.StatusNoContent
  380. for scanner.Scan() {
  381. b := scanner.Bytes()
  382. if len(b) == 0 {
  383. continue
  384. }
  385. item := &BackfeedItem{
  386. PrimaryShard: GenShardHash(b),
  387. SecondaryShard: secondaryShard,
  388. Item: b,
  389. }
  390. ok := projectBackfeedManager.PushItem(req.Context(), item)
  391. if !ok {
  392. err = fmt.Errorf("channel closed")
  393. statusCode = http.StatusServiceUnavailable
  394. break
  395. }
  396. }
  397. if err == nil {
  398. err = scanner.Err()
  399. if err != nil {
  400. statusCode = http.StatusBadRequest
  401. }
  402. }
  403. WriteResponse(res, statusCode, err)
  404. return
  405. }
  406. func (that *GlobalBackfeedManager) CancelAllFeeds() {
  407. that.Cancel()
  408. for project, projectBackfeedManager := range that.ActiveFeeds {
  409. log.Printf("waiting for %s channel to shut down...", project)
  410. <-projectBackfeedManager.Done
  411. delete(that.ActiveFeeds, project)
  412. }
  413. }
  414. func main() {
  415. log.SetFlags(log.Flags() | log.Lshortfile)
  416. trackerRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_TRACKER"))
  417. if err != nil {
  418. log.Panicf("%s", err)
  419. }
  420. trackerRedisOptions.ReadTimeout = 15 * time.Minute
  421. trackerRedisClient := redis.NewClient(trackerRedisOptions)
  422. backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{
  423. Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","),
  424. Username: os.Getenv("REDIS_BACKFEED_USERNAME"),
  425. Password: os.Getenv("REDIS_BACKFEED_PASSWORD"),
  426. ReadTimeout: 15 * time.Minute,
  427. })
  428. globalBackfeedManager := &GlobalBackfeedManager{
  429. ActiveFeeds: map[string]*ProjectBackfeedManager{},
  430. ActiveSlugs: map[string]string{},
  431. TrackerRedis: trackerRedisClient,
  432. BackfeedRedis: backfeedRedisClient,
  433. }
  434. globalBackfeedManager.Context, globalBackfeedManager.Cancel = context.WithCancel(context.Background())
  435. defer globalBackfeedManager.CancelAllFeeds()
  436. err = globalBackfeedManager.RefreshFeeds()
  437. if err != nil {
  438. log.Panicf("unable to set up backfeed projects: %s", err)
  439. }
  440. r := mux.NewRouter()
  441. r.Methods(http.MethodPost).Path("/legacy/{slug}").HandlerFunc(globalBackfeedManager.Handle)
  442. serveErrChan := make(chan error)
  443. go func() {
  444. s := &http.Server{
  445. Addr: ":21581",
  446. IdleTimeout: 1 * time.Hour,
  447. MaxHeaderBytes: 1 * 1024 * 1024,
  448. Handler: r,
  449. }
  450. serveErrChan <- s.ListenAndServe()
  451. }()
  452. sc := make(chan os.Signal, 1)
  453. signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
  454. ticker := time.NewTicker(1 * time.Second)
  455. for {
  456. select {
  457. case <-sc:
  458. return
  459. case <-ticker.C:
  460. }
  461. err = globalBackfeedManager.RefreshFeeds()
  462. if err != nil {
  463. log.Printf("unable to refresh backfeed projects: %s", err)
  464. }
  465. }
  466. }