diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/HttpRequest.java b/container-core/src/main/java/com/yahoo/jdisc/http/HttpRequest.java index e801873e2cd5..6a25937592b4 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/HttpRequest.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/HttpRequest.java @@ -88,7 +88,11 @@ protected HttpRequest(CurrentContainer container, URI uri, Method method, Versio this.version = version; this.remoteAddress = remoteAddress; this.parameters.putAll(getUriQueryParameters(uri)); - this.connectedAt = (connectedAtMillis != null) ? connectedAtMillis : creationTime(TimeUnit.MILLISECONDS); + if (connectedAtMillis != null) { + this.connectedAt = connectedAtMillis; + } else { + this.connectedAt = creationTime(TimeUnit.MILLISECONDS); + } } catch (Throwable e) { release(); throw e; diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 594c5c8f3985..b483d6977d6e 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -187,12 +187,17 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final Metric metric; private final DocumentApiMetrics metrics; private final DocumentOperationParser parser; + private final long maxThrottled; + private final long maxThrottledAgeNS; private final DocumentAccess access; private final AsyncSession asyncSession; private final Map clusters; + private final Deque operations; private final Deque visitOperations = new ConcurrentLinkedDeque<>(); + private final AtomicLong enqueued = new AtomicLong(); private final AtomicLong outstanding = new AtomicLong(); private final Map visits = new ConcurrentHashMap<>(); + private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-")); private final Map> handlers = defineApi(); @@ -216,12 +221,16 @@ public DocumentV1ApiHandler(Metric metric, this.parser = new DocumentOperationParser(documentmanagerConfig); this.metric = metric; this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1"); + this.maxThrottled = executorConfig.maxThrottled(); + this.maxThrottledAgeNS = (long) (executorConfig.maxThrottledAge() * 1_000_000_000.0); this.access = access; this.asyncSession = access.createAsyncSession(new AsyncParameters()); this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig); + this.operations = new ConcurrentLinkedDeque<>(); long resendDelayMS = SystemTimer.adjustTimeoutByDetectedHz(Duration.ofMillis(executorConfig.resendDelayMillis())).toMillis(); // TODO: Here it would be better to have dedicated threads with different wait depending on blocked or empty. + this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS); this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS); } @@ -279,19 +288,27 @@ public void destroy() { visits.values().forEach(VisitorSession::abort); visits.values().forEach(VisitorSession::destroy); - // Shut down visitor dispatcher, so only we empty the queue of outstanding operations, and can be sure it is empty. + // Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty. + dispatcher.shutdown(); visitDispatcher.shutdown(); - while ( ! (visitOperations.isEmpty()) && clock.instant().isBefore(doom)) { + while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) { + dispatchEnqueued(); dispatchVisitEnqueued(); } + if ( ! operations.isEmpty()) + log.log(WARNING, "Failed to empty request queue before shutdown timeout — " + operations.size() + " requests left"); + if ( ! visitOperations.isEmpty()) - log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + visitOperations.size() + " operations left"); + log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + operations.size() + " operations left"); try { while (outstanding.get() > 0 && clock.instant().isBefore(doom)) Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis())); + if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS)) + dispatcher.shutdownNow(); + if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS)) visitDispatcher.shutdownNow(); } @@ -534,6 +551,30 @@ private DocumentOperationParameters parametersFromRequest(HttpRequest request, S return parameters; } + /** Dispatches enqueued requests until one is blocked. */ + void dispatchEnqueued() { + try { + while (dispatchFirst()); + } + catch (Exception e) { + log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e); + } + } + + /** Attempts to dispatch the first enqueued operations, and returns whether this was successful. */ + private boolean dispatchFirst() { + Operation operation = operations.poll(); + if (operation == null) + return false; + + if (operation.dispatch()) { + enqueued.decrementAndGet(); + return true; + } + operations.push(operation); + return false; + } + /** Dispatches enqueued requests until one is blocked. */ void dispatchVisitEnqueued() { try { @@ -557,16 +598,36 @@ private boolean dispatchFirstVisit() { return false; } + private long qAgeNS(HttpRequest request) { + Operation oldest = operations.peek(); + return (oldest != null) + ? (request.relativeCreatedAtNanoTime() - oldest.request.relativeCreatedAtNanoTime()) + : 0; + } + /** * Enqueues the given request and operation, or responds with "overload" if the queue is full, * and then attempts to dispatch an enqueued operation from the head of the queue. */ private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier operationParser) { - Operation operation = new Operation(request, handler, operationParser); - if ( ! operation.dispatch()) { + long numQueued = enqueued.incrementAndGet(); + if (numQueued > maxThrottled) { + enqueued.decrementAndGet(); overload(request, "Rejecting execution due to overload: " - + (long)asyncSession.getCurrentWindowSize() + " requests already enqueued", handler); + + maxThrottled + " requests already enqueued", handler); + return; + } + if (numQueued > 1) { + long ageNS = qAgeNS(request); + if (ageNS > maxThrottledAgeNS) { + enqueued.decrementAndGet(); + overload(request, "Rejecting execution due to overload: " + + maxThrottledAgeNS / 1_000_000_000.0 + " seconds worth of work enqueued", handler); + return; + } } + operations.offer(new Operation(request, handler, operationParser)); + dispatchFirst(); } diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 2d0b2de100e7..58cf34712aaa 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -71,7 +71,13 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -184,31 +190,55 @@ public void testResolveCluster() { } @Test - public void testOverLoad() { + public void testOverLoadBySize() { RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); // OVERLOAD is a 429 access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR))); var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); + var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"message\": \"Rejecting execution due to overload: 2 requests already enqueued\"" + + "}", response3.readAll()); + assertEquals(429, response3.getStatus()); + + access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR))); + handler.dispatchEnqueued(); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" + + " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + "}", response1.readAll()); - assertEquals(429, response1.getStatus()); + assertEquals(500, response1.getStatus()); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + + "}", response2.readAll()); + assertEquals(500, response2.getStatus()); + driver.close(); + } + @Test + public void testOverLoadByAge() { + RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); + // OVERLOAD is a 429 + access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR))); + var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); + try { Thread.sleep(3_000); } catch (InterruptedException e) {} + var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" + + " \"message\": \"Rejecting execution due to overload: 1.0 seconds worth of work enqueued\"" + "}", response2.readAll()); - assertEquals(429, response1.getStatus()); + assertEquals(429, response2.getStatus()); access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR))); - var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); + handler.dispatchEnqueued(); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + - "}", response3.readAll()); - assertEquals(500, response3.getStatus()); + "}", response1.readAll()); + assertEquals(500, response1.getStatus()); driver.close(); } @@ -1006,6 +1036,78 @@ public void visit_timestamp_ranges_can_be_open_in_both_ends() { }); } + @Test + public void testThroughput() throws InterruptedException { + DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build(); + handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, + executorConfig, clusterConfig, bucketConfig); + + int writers = 4; + int queueFill = executorConfig.maxThrottled() - writers; + RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); + ScheduledExecutorService writer = Executors.newScheduledThreadPool(writers); + ScheduledExecutorService reader = Executors.newScheduledThreadPool(1); + ScheduledExecutorService replier = Executors.newScheduledThreadPool(writers); + BlockingQueue responses = new LinkedBlockingQueue<>(); + + Response success = new Response(0, null, Response.Outcome.SUCCESS); + int docs = 1 << 14; + assertTrue(docs >= writers); + AtomicReference failed = new AtomicReference<>(); + + CountDownLatch latch = new CountDownLatch(docs); + reader.execute(() -> { + while ( ! reader.isShutdown()) { + try { + var response = responses.take(); + response.awaitResponse().readAll(); + if (response.getStatus() != 200) + failed.set(response.getResponse()); + latch.countDown(); + } + catch (InterruptedException e) { break; } + } + }); + + // Fill the handler resend queue. + long startNanos = System.nanoTime(); + CountDownLatch setup = new CountDownLatch(queueFill); + access.session.expect((id, parameters) -> { + setup.countDown(); + return new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)); + }); + for (int i = 0; i < queueFill; i++) { + int j = i; + writer.execute(() -> { + responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j, + POST, + "{ \"fields\": { \"artist\": \"Sigrid\" } }")); + }); + } + setup.await(); + + // Let "messagebus" start accepting messages. + access.session.expect((id, parameters) -> { + replier.schedule(() -> parameters.responseHandler().get().handleResponse(success), 10, TimeUnit.MILLISECONDS); + return new Result(0); + }); + // Send the rest of the documents. Rely on resender to empty queue of throttled operations. + for (int i = queueFill; i < docs; i++) { + int j = i; + writer.execute(() -> { + responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j, + POST, + "{ \"fields\": { \"artist\": \"Sigrid\" } }")); + }); + } + latch.await(); + System.err.println(docs + " requests in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); + + assertNull(failed.get()); + driver.close(); + } + + static class MockDocumentAccess extends DocumentAccess { private final AtomicReference> expectations = new AtomicReference<>(); @@ -1121,7 +1223,7 @@ public Result update(DocumentUpdate update, DocumentOperationParameters paramete @Override public double getCurrentWindowSize() { - return 20; + throw new AssertionError("Not used"); } public void expect(BiFunction expectations) {