Skip to content

Commit

Permalink
Revert "- Avoid a Q in the document v1 handler. Rely only on mbus Q."
Browse files Browse the repository at this point in the history
  • Loading branch information
baldersheim authored Apr 16, 2024
1 parent fca990d commit 3be23c8
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ protected HttpRequest(CurrentContainer container, URI uri, Method method, Versio
this.version = version;
this.remoteAddress = remoteAddress;
this.parameters.putAll(getUriQueryParameters(uri));
this.connectedAt = (connectedAtMillis != null) ? connectedAtMillis : creationTime(TimeUnit.MILLISECONDS);
if (connectedAtMillis != null) {
this.connectedAt = connectedAtMillis;
} else {
this.connectedAt = creationTime(TimeUnit.MILLISECONDS);
}
} catch (Throwable e) {
release();
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,17 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final Metric metric;
private final DocumentApiMetrics metrics;
private final DocumentOperationParser parser;
private final long maxThrottled;
private final long maxThrottledAgeNS;
private final DocumentAccess access;
private final AsyncSession asyncSession;
private final Map<String, StorageCluster> clusters;
private final Deque<Operation> operations;
private final Deque<BooleanSupplier> visitOperations = new ConcurrentLinkedDeque<>();
private final AtomicLong enqueued = new AtomicLong();
private final AtomicLong outstanding = new AtomicLong();
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-"));
private final Map<String, Map<Method, Handler>> handlers = defineApi();

Expand All @@ -216,12 +221,16 @@ public DocumentV1ApiHandler(Metric metric,
this.parser = new DocumentOperationParser(documentmanagerConfig);
this.metric = metric;
this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
this.maxThrottled = executorConfig.maxThrottled();
this.maxThrottledAgeNS = (long) (executorConfig.maxThrottledAge() * 1_000_000_000.0);
this.access = access;
this.asyncSession = access.createAsyncSession(new AsyncParameters());
this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig);
this.operations = new ConcurrentLinkedDeque<>();
long resendDelayMS = SystemTimer.adjustTimeoutByDetectedHz(Duration.ofMillis(executorConfig.resendDelayMillis())).toMillis();

// TODO: Here it would be better to have dedicated threads with different wait depending on blocked or empty.
this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS);
this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS);
}

Expand Down Expand Up @@ -279,19 +288,27 @@ public void destroy() {
visits.values().forEach(VisitorSession::abort);
visits.values().forEach(VisitorSession::destroy);

// Shut down visitor dispatcher, so only we empty the queue of outstanding operations, and can be sure it is empty.
// Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty.
dispatcher.shutdown();
visitDispatcher.shutdown();
while ( ! (visitOperations.isEmpty()) && clock.instant().isBefore(doom)) {
while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) {
dispatchEnqueued();
dispatchVisitEnqueued();
}

if ( ! operations.isEmpty())
log.log(WARNING, "Failed to empty request queue before shutdown timeout — " + operations.size() + " requests left");

if ( ! visitOperations.isEmpty())
log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + visitOperations.size() + " operations left");
log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + operations.size() + " operations left");

try {
while (outstanding.get() > 0 && clock.instant().isBefore(doom))
Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis()));

if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS))
dispatcher.shutdownNow();

if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS))
visitDispatcher.shutdownNow();
}
Expand Down Expand Up @@ -534,6 +551,30 @@ private DocumentOperationParameters parametersFromRequest(HttpRequest request, S
return parameters;
}

/** Dispatches enqueued requests until one is blocked. */
void dispatchEnqueued() {
try {
while (dispatchFirst());
}
catch (Exception e) {
log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e);
}
}

/** Attempts to dispatch the first enqueued operations, and returns whether this was successful. */
private boolean dispatchFirst() {
Operation operation = operations.poll();
if (operation == null)
return false;

if (operation.dispatch()) {
enqueued.decrementAndGet();
return true;
}
operations.push(operation);
return false;
}

/** Dispatches enqueued requests until one is blocked. */
void dispatchVisitEnqueued() {
try {
Expand All @@ -557,16 +598,36 @@ private boolean dispatchFirstVisit() {
return false;
}

private long qAgeNS(HttpRequest request) {
Operation oldest = operations.peek();
return (oldest != null)
? (request.relativeCreatedAtNanoTime() - oldest.request.relativeCreatedAtNanoTime())
: 0;
}

/**
* Enqueues the given request and operation, or responds with "overload" if the queue is full,
* and then attempts to dispatch an enqueued operation from the head of the queue.
*/
private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<BooleanSupplier> operationParser) {
Operation operation = new Operation(request, handler, operationParser);
if ( ! operation.dispatch()) {
long numQueued = enqueued.incrementAndGet();
if (numQueued > maxThrottled) {
enqueued.decrementAndGet();
overload(request, "Rejecting execution due to overload: "
+ (long)asyncSession.getCurrentWindowSize() + " requests already enqueued", handler);
+ maxThrottled + " requests already enqueued", handler);
return;
}
if (numQueued > 1) {
long ageNS = qAgeNS(request);
if (ageNS > maxThrottledAgeNS) {
enqueued.decrementAndGet();
overload(request, "Rejecting execution due to overload: "
+ maxThrottledAgeNS / 1_000_000_000.0 + " seconds worth of work enqueued", handler);
return;
}
}
operations.offer(new Operation(request, handler, operationParser));
dispatchFirst();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -184,31 +190,55 @@ public void testResolveCluster() {
}

@Test
public void testOverLoad() {
public void testOverLoadBySize() {
RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
// OVERLOAD is a 429
access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)));
var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
" \"message\": \"Rejecting execution due to overload: 2 requests already enqueued\"" +
"}", response3.readAll());
assertEquals(429, response3.getStatus());

access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR)));
handler.dispatchEnqueued();
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
" \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" +
" \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
"}", response1.readAll());
assertEquals(429, response1.getStatus());
assertEquals(500, response1.getStatus());
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
" \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
"}", response2.readAll());
assertEquals(500, response2.getStatus());
driver.close();
}

@Test
public void testOverLoadByAge() {
RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
// OVERLOAD is a 429
access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)));
var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
try { Thread.sleep(3_000); } catch (InterruptedException e) {}
var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
" \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" +
" \"message\": \"Rejecting execution due to overload: 1.0 seconds worth of work enqueued\"" +
"}", response2.readAll());
assertEquals(429, response1.getStatus());
assertEquals(429, response2.getStatus());

access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR)));
var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
handler.dispatchEnqueued();
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
" \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
"}", response3.readAll());
assertEquals(500, response3.getStatus());
"}", response1.readAll());
assertEquals(500, response1.getStatus());
driver.close();
}

Expand Down Expand Up @@ -1006,6 +1036,78 @@ public void visit_timestamp_ranges_can_be_open_in_both_ends() {
});
}

@Test
public void testThroughput() throws InterruptedException {
DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build();
handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig,
executorConfig, clusterConfig, bucketConfig);

int writers = 4;
int queueFill = executorConfig.maxThrottled() - writers;
RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
ScheduledExecutorService writer = Executors.newScheduledThreadPool(writers);
ScheduledExecutorService reader = Executors.newScheduledThreadPool(1);
ScheduledExecutorService replier = Executors.newScheduledThreadPool(writers);
BlockingQueue<RequestHandlerTestDriver.MockResponseHandler> responses = new LinkedBlockingQueue<>();

Response success = new Response(0, null, Response.Outcome.SUCCESS);
int docs = 1 << 14;
assertTrue(docs >= writers);
AtomicReference<com.yahoo.jdisc.Response> failed = new AtomicReference<>();

CountDownLatch latch = new CountDownLatch(docs);
reader.execute(() -> {
while ( ! reader.isShutdown()) {
try {
var response = responses.take();
response.awaitResponse().readAll();
if (response.getStatus() != 200)
failed.set(response.getResponse());
latch.countDown();
}
catch (InterruptedException e) { break; }
}
});

// Fill the handler resend queue.
long startNanos = System.nanoTime();
CountDownLatch setup = new CountDownLatch(queueFill);
access.session.expect((id, parameters) -> {
setup.countDown();
return new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR));
});
for (int i = 0; i < queueFill; i++) {
int j = i;
writer.execute(() -> {
responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j,
POST,
"{ \"fields\": { \"artist\": \"Sigrid\" } }"));
});
}
setup.await();

// Let "messagebus" start accepting messages.
access.session.expect((id, parameters) -> {
replier.schedule(() -> parameters.responseHandler().get().handleResponse(success), 10, TimeUnit.MILLISECONDS);
return new Result(0);
});
// Send the rest of the documents. Rely on resender to empty queue of throttled operations.
for (int i = queueFill; i < docs; i++) {
int j = i;
writer.execute(() -> {
responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j,
POST,
"{ \"fields\": { \"artist\": \"Sigrid\" } }"));
});
}
latch.await();
System.err.println(docs + " requests in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");

assertNull(failed.get());
driver.close();
}


static class MockDocumentAccess extends DocumentAccess {

private final AtomicReference<Consumer<VisitorParameters>> expectations = new AtomicReference<>();
Expand Down Expand Up @@ -1121,7 +1223,7 @@ public Result update(DocumentUpdate update, DocumentOperationParameters paramete

@Override
public double getCurrentWindowSize() {
return 20;
throw new AssertionError("Not used");
}

public void expect(BiFunction<Object, DocumentOperationParameters, Result> expectations) {
Expand Down

0 comments on commit 3be23c8

Please sign in to comment.