diff --git a/main.go b/main.go index f97b00c..f84bf47 100644 --- a/main.go +++ b/main.go @@ -65,7 +65,7 @@ func (that *ProjectBackfeedManager) RedisConfigDiffers(new *ProjectRedisConfig) return that.ProjectConfig.RedisConfig == nil || new == nil || *that.ProjectConfig.RedisConfig != *new } -func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) bool { +func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *BackfeedItem) error { //that.Lock.RLock() //defer that.Lock.RUnlock() //if that.C == nil { @@ -73,11 +73,13 @@ func (that *ProjectBackfeedManager) PushItem(ctx context.Context, item *Backfeed //} select { case <-ctx.Done(): - return false + return ctx.Err() case <-that.Context.Done(): - return false + return fmt.Errorf("backfeed channel closed") case that.C <- item: - return true + return nil + default: + return fmt.Errorf("backfeed channel full") } } @@ -424,7 +426,6 @@ func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *ht scanner := bufio.NewScanner(req.Body) scanner.Split(splitter.Split) - var err error statusCode := http.StatusNoContent n := 0 for scanner.Scan() { @@ -439,25 +440,19 @@ func (that *GlobalBackfeedManager) HandleLegacy(res http.ResponseWriter, req *ht SecondaryShard: secondaryShard, Item: bcopy, } - ok := projectBackfeedManager.PushItem(req.Context(), item) - if !ok { - err = fmt.Errorf("channel closed") - statusCode = http.StatusServiceUnavailable - break - } - n++ - } - if err == nil { - err = scanner.Err() + err := projectBackfeedManager.PushItem(req.Context(), item) if err != nil { - statusCode = http.StatusBadRequest + WriteResponse(res, http.StatusServiceUnavailable, err) + return } + n++ } + err := scanner.Err() if err != nil { WriteResponse(res, statusCode, err) - } else { - WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n)) + return } + WriteResponse(res, http.StatusOK, fmt.Sprintf("%d items queued for deduplication", n)) return }