From 91b8a1a98e605fdb6e25d9d62103fc9e3cd30fd1 Mon Sep 17 00:00:00 2001 From: Rafael Dantas Justo Date: Thu, 2 Feb 2023 14:48:46 -0300 Subject: [PATCH 1/2] Fix: Bulk processor retries indefinitely on failure When all retries are exhausted the worker internal requests buffer needs to be cleared in failure scenarios. That is required because the commitFunc (and consequently the underlying BulkService.Do call) doesn't reset it when some error happens. Without clearing the internal buffer the worker will continue sending the same requests on the following rounds of execution. Kudos for this solution goes to @rwynn and @raiRaiyan . Resolves #1278 --- bulk_processor.go | 8 ++++++ bulk_processor_test.go | 65 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/bulk_processor.go b/bulk_processor.go index f2711f829..8d1e5e3a3 100644 --- a/bulk_processor.go +++ b/bulk_processor.go @@ -580,6 +580,14 @@ func (w *bulkWorker) commit(ctx context.Context) error { err := RetryNotify(commitFunc, w.p.backoff, notifyFunc) w.updateStats(res) if err != nil { + // After all retry attempts clear the requests for the next round. This is + // important when backoff is disabled or limited to a number of rounds, and + // the aftermath is a failure., because the commitFunc (and consequently the + // underlying BulkService.Do call) will not clear the requests from the + // worker. Without this the same requests will be used on the next round of + // execution of the worker. + w.service.Reset() + w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err) } diff --git a/bulk_processor_test.go b/bulk_processor_test.go index 19ce060bd..66316c91e 100644 --- a/bulk_processor_test.go +++ b/bulk_processor_test.go @@ -8,6 +8,8 @@ import ( "context" "fmt" "math/rand" + "net/http" + "net/http/httptest" "reflect" "sync/atomic" "testing" @@ -335,6 +337,60 @@ func TestBulkProcessorFlush(t *testing.T) { } } +func TestBulkWorker_commit_clearFailedRequests(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + defer server.Close() + + client, err := NewClient( + SetURL(server.URL), + SetSniff(false), + SetHealthcheck(false), + SetHttpClient(tooManyHTTPClient{}), + ) + if err != nil { + t.Fatal(err) + } + + var calls int + + bulkProcessor, err := NewBulkProcessorService(client). + BulkActions(-1). + BulkSize(-1). + FlushInterval(0). + RetryItemStatusCodes(). + Backoff(StopBackoff{}). + After(BulkAfterFunc(func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) { + calls++ + if calls == 1 { + if len(requests) != 10 { + t.Errorf("expected 10 requests; got: %d", len(requests)) + } + } else if len(requests) > 0 { + t.Errorf("expected 0 requests; got: %d", len(requests)) + } + })).Do(context.Background()) + + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + bulkProcessor.Add(NewBulkIndexRequest()) + } + + // first flush should process 10 items + if err := bulkProcessor.Flush(); err != nil { + t.Fatal(err) + } + // second flush should process none (even if the first flush failed) + if err := bulkProcessor.Flush(); err != nil { + t.Fatal(err) + } + if err := bulkProcessor.Close(); err != nil { + t.Fatal(err) + } +} + // -- Helper -- func testBulkProcessor(t *testing.T, numDocs int, svc *BulkProcessorService) { @@ -427,3 +483,12 @@ func testBulkProcessor(t *testing.T, numDocs int, svc *BulkProcessorService) { t.Fatalf("expected %d documents; got: %d", numDocs, count) } } + +type tooManyHTTPClient struct { +} + +func (t tooManyHTTPClient) Do(r *http.Request) (*http.Response, error) { + recorder := httptest.NewRecorder() + recorder.WriteHeader(http.StatusTooManyRequests) + return recorder.Result(), nil +} From a2de1fec1dac1a29b0b28270824d8deb7e52ad7a Mon Sep 17 00:00:00 2001 From: Rafael Dantas Justo Date: Thu, 2 Feb 2023 15:12:33 -0300 Subject: [PATCH 2/2] Fix comment typos --- bulk_processor.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/bulk_processor.go b/bulk_processor.go index 8d1e5e3a3..798b23119 100644 --- a/bulk_processor.go +++ b/bulk_processor.go @@ -580,12 +580,12 @@ func (w *bulkWorker) commit(ctx context.Context) error { err := RetryNotify(commitFunc, w.p.backoff, notifyFunc) w.updateStats(res) if err != nil { - // After all retry attempts clear the requests for the next round. This is - // important when backoff is disabled or limited to a number of rounds, and - // the aftermath is a failure., because the commitFunc (and consequently the - // underlying BulkService.Do call) will not clear the requests from the - // worker. Without this the same requests will be used on the next round of - // execution of the worker. + // After exhausting all retry attempts, the worker must clear the requests + // for the next round. This is important when backoff is disabled or limited + // to a number of rounds, and the aftermath is a failure because the + // commitFunc (and consequently the underlying BulkService.Do call) will not + // clear the requests from the worker. Without this, the same requests will + // be used on the next round of execution of the worker. w.service.Reset() w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err)