|
|
@@ -153,14 +153,14 @@ func (that *ProjectBackfeedManager) Do() { |
|
|
|
now := time.Now() |
|
|
|
resultMap := map[string]*redis.Cmd{} |
|
|
|
pipe := that.BackfeedRedis.Pipeline() |
|
|
|
lastTS := make([]interface{}, 0, len(keyMap)*2) |
|
|
|
lastTS := make([]any, 0, len(keyMap)*2) |
|
|
|
for key := range keyMap { |
|
|
|
lastTS = append(lastTS, key) |
|
|
|
lastTS = append(lastTS, fmt.Sprintf("%d", now.Unix())) |
|
|
|
} |
|
|
|
pipe.HSet(context.Background(), ":last_ts", lastTS...) |
|
|
|
for key, items := range keyMap { |
|
|
|
args := []interface{}{ |
|
|
|
args := []any{ |
|
|
|
"bf.madd", |
|
|
|
key, |
|
|
|
} |
|
|
@@ -172,7 +172,7 @@ func (that *ProjectBackfeedManager) Do() { |
|
|
|
if _, err := pipe.Exec(context.Background()); err != nil { |
|
|
|
log.Printf("%s", err) |
|
|
|
} |
|
|
|
var sAddItems []interface{} |
|
|
|
var sAddItems []any |
|
|
|
for key, items := range keyMap { |
|
|
|
res, err := resultMap[key].BoolSlice() |
|
|
|
if err != nil { |
|
|
@@ -350,19 +350,19 @@ func GenShardHash(b []byte) (final byte) { |
|
|
|
return final |
|
|
|
} |
|
|
|
|
|
|
|
func WriteResponse(res http.ResponseWriter, statusCode int, v interface{}) { |
|
|
|
func WriteResponse(res http.ResponseWriter, statusCode int, v any) { |
|
|
|
res.Header().Set("Content-Type", "application/json") |
|
|
|
res.WriteHeader(statusCode) |
|
|
|
if statusCode == http.StatusNoContent { |
|
|
|
return |
|
|
|
} |
|
|
|
if err, isError := v.(error); isError { |
|
|
|
v = map[string]interface{}{ |
|
|
|
v = map[string]any{ |
|
|
|
"error": fmt.Sprintf("%v", err), |
|
|
|
"status_code": statusCode, |
|
|
|
} |
|
|
|
} else { |
|
|
|
v = map[string]interface{}{ |
|
|
|
v = map[string]any{ |
|
|
|
"data": v, |
|
|
|
"status_code": statusCode, |
|
|
|
} |
|
|
@@ -474,14 +474,6 @@ func main() { |
|
|
|
trackerRedisOptions.ReadTimeout = 15 * time.Minute |
|
|
|
trackerRedisClient := redis.NewClient(trackerRedisOptions) |
|
|
|
|
|
|
|
legacyRedisOptions, err := redis.ParseURL(os.Getenv("REDIS_LEGACY")) |
|
|
|
if err != nil { |
|
|
|
log.Panicf("invalid REDIS_LEGACY url: %s", err) |
|
|
|
} |
|
|
|
legacyRedisOptions.ReadTimeout = 15 * time.Minute |
|
|
|
legacyRedisOptions.PoolSize = 128 |
|
|
|
legacyRedisClient := redis.NewClient(legacyRedisOptions) |
|
|
|
|
|
|
|
backfeedRedisClient := redis.NewClusterClient(&redis.ClusterOptions{ |
|
|
|
Addrs: strings.Split(os.Getenv("REDIS_BACKFEED_ADDRS"), ","), |
|
|
|
Username: os.Getenv("REDIS_BACKFEED_USERNAME"), |
|
|
@@ -490,10 +482,6 @@ func main() { |
|
|
|
PoolSize: 256, |
|
|
|
}) |
|
|
|
|
|
|
|
legacyRedisMetricsHook := redisprom.NewHook( |
|
|
|
redisprom.WithInstanceName("legacy"), |
|
|
|
) |
|
|
|
legacyRedisClient.AddHook(legacyRedisMetricsHook) |
|
|
|
backfeedRedisMetricsHook := redisprom.NewHook( |
|
|
|
redisprom.WithInstanceName("backfeed"), |
|
|
|
) |
|
|
@@ -513,9 +501,6 @@ func main() { |
|
|
|
client.ClientGetName(ctx) |
|
|
|
return client.Ping(ctx).Err() |
|
|
|
}) |
|
|
|
if err := legacyRedisClient.Ping(context.Background()).Err(); err != nil { |
|
|
|
log.Panicf("unable to ping legacy redis: %s", err) |
|
|
|
} |
|
|
|
|
|
|
|
globalBackfeedManager := &GlobalBackfeedManager{ |
|
|
|
ActiveFeeds: map[string]*ProjectBackfeedManager{}, |
|
|
|