|
|
@@ -163,21 +163,18 @@ func (that *ProjectBackfeedManager) Do() { |
|
|
|
} |
|
|
|
var sAddItems []interface{} |
|
|
|
for key, items := range keyMap { |
|
|
|
rawRes, err := resultMap[key].Result() |
|
|
|
res, err := resultMap[key].BoolSlice() |
|
|
|
if err != nil { |
|
|
|
log.Printf("%s", err) |
|
|
|
continue |
|
|
|
} |
|
|
|
rawResArray, ok := rawRes.([]interface{}) |
|
|
|
if !ok || len(keyMap[key]) != len(rawResArray) { |
|
|
|
if len(res) != len(keyMap[key]) { |
|
|
|
continue |
|
|
|
} |
|
|
|
for i, vi := range rawResArray { |
|
|
|
v, ok := vi.(int64) |
|
|
|
if !ok || v != 1 { |
|
|
|
continue |
|
|
|
for i, v := range res { |
|
|
|
if v { |
|
|
|
sAddItems = append(sAddItems, items[i]) |
|
|
|
} |
|
|
|
sAddItems = append(sAddItems, items[i]) |
|
|
|
} |
|
|
|
} |
|
|
|
dupes := wrapped - len(sAddItems) |
|
|
@@ -188,22 +185,17 @@ func (that *ProjectBackfeedManager) Do() { |
|
|
|
} |
|
|
|
args = append(args, sAddItems...) |
|
|
|
|
|
|
|
rawRes, err := that.LegacyRedis.Do(context.Background(), args...).Result() |
|
|
|
res, err := that.LegacyRedis.Do(context.Background(), args...).BoolSlice() |
|
|
|
if err != nil { |
|
|
|
log.Printf("unable to dedupe against %s legacy backfeed: %s", that.Name, err) |
|
|
|
} else { |
|
|
|
rawResArray, ok := rawRes.([]interface{}) |
|
|
|
if ok && len(sAddItems) == len(rawResArray) { |
|
|
|
var filteredSAddItems []interface{} |
|
|
|
for i, vi := range rawResArray { |
|
|
|
v, ok := vi.(int64) |
|
|
|
if !ok || v != 0 { |
|
|
|
continue |
|
|
|
} |
|
|
|
} else if len(res) == len(sAddItems) { |
|
|
|
var filteredSAddItems []interface{} |
|
|
|
for i, v := range res { |
|
|
|
if !v { |
|
|
|
filteredSAddItems = append(filteredSAddItems, sAddItems[i]) |
|
|
|
} |
|
|
|
sAddItems = filteredSAddItems |
|
|
|
} |
|
|
|
sAddItems = filteredSAddItems |
|
|
|
} |
|
|
|
} |
|
|
|
if len(sAddItems) != 0 { |
|
|
@@ -433,10 +425,12 @@ func (that *GlobalBackfeedManager) Handle(res http.ResponseWriter, req *http.Req |
|
|
|
if len(b) == 0 { |
|
|
|
continue |
|
|
|
} |
|
|
|
bcopy := make([]byte, 0, len(b)) |
|
|
|
copy(bcopy, b) |
|
|
|
item := &BackfeedItem{ |
|
|
|
PrimaryShard: GenShardHash(b), |
|
|
|
PrimaryShard: GenShardHash(bcopy), |
|
|
|
SecondaryShard: secondaryShard, |
|
|
|
Item: b, |
|
|
|
Item: bcopy, |
|
|
|
} |
|
|
|
ok := projectBackfeedManager.PushItem(req.Context(), item) |
|
|
|
if !ok { |
|
|
@@ -455,7 +449,7 @@ func (that *GlobalBackfeedManager) Handle(res http.ResponseWriter, req *http.Req |
|
|
|
if err != nil { |
|
|
|
WriteResponse(res, statusCode, err) |
|
|
|
} else { |
|
|
|
WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued", n)) |
|
|
|
WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n)) |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|