Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client's executeQuery("<sql>") may or may not complete, depending #10632

Open
credmond opened this issue Jan 5, 2025 · 0 comments
Open

Client's executeQuery("<sql>") may or may not complete, depending #10632

credmond opened this issue Jan 5, 2025 · 0 comments

Comments

@credmond
Copy link

credmond commented Jan 5, 2025

There's a bug in the client API whereby if you have have a stream or table, and do a simple SELECT via a executeQuery but without a LIMIT clause (or with a LIMIT greater than the maxRows default of 10k (i.e., ExecuteQueryMaxResultRows)), it may or may not complete the future.

It's random, and decreases in likelihood the higher your LIMIT is compared than maxRows.

Expected: it should complete (success or failre), regardless, with an exception at least. It makes no sense and it's impossible to handle the issue or understand the cause programmatically. All we get is a log. Your own exceptions are swallowed.

This below error is logged for most rows being consumed AFTER the max (10k) by default.

io.confluent.ksql.api.client.exception.KsqlClientException: Reached max number of rows that may be returned by executeQuery(). Increase the limit via ClientOptions#setExecuteQueryMaxResultRows(). Current limit: 10000
	at io.confluent.ksql.api.client.impl.ExecuteQueryResponseHandler.handleRow(ExecuteQueryResponseHandler.java:81)
	at io.confluent.ksql.api.client.impl.QueryResponseHandler.doHandleBodyBuffer(QueryResponseHandler.java:45)
	at io.confluent.ksql.api.client.impl.ResponseHandler.handleBodyBuffer(ResponseHandler.java:39)
	at io.confluent.ksql.api.client.impl.ExecuteQueryResponseHandler.handleBodyBuffer(ExecuteQueryResponseHandler.java:37)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handleParsing(RecordParserImpl.java:214)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handle(RecordParserImpl.java:285)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handle(RecordParserImpl.java:27)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:277)
	at io.vertx.core.http.impl.HttpEventHandler.handleChunk(HttpEventHandler.java:51)
	at io.vertx.core.http.impl.HttpClientResponseImpl.handleChunk(HttpClientResponseImpl.java:239)
	at io.vertx.core.http.impl.Http2ClientConnection$StreamImpl.handleData(Http2ClientConnection.java:500)
	at io.vertx.core.http.impl.VertxHttp2Stream.lambda$new$1(VertxHttp2Stream.java:74)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:134)
	at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:76)
	at io.vertx.core.impl.ContextBase.execute(ContextBase.java:290)
	at io.vertx.core.http.impl.VertxHttp2Stream.onData(VertxHttp2Stream.java:120)
	at io.vertx.core.http.impl.Http2ConnectionBase.onDataRead(Http2ConnectionBase.java:315)
	at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:34)
	at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:320)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:409)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:244)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:164)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:186)
	at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:61)
	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:391)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:451)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.channelRead(VertxHttp2ConnectionHandler.java:415)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1503)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1366)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1415)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1575)

How to reproduce?

Run this a few times. Decrease LIMIT closer to 10k and observe that sometimes it does complete (i.e., thenAccept is called like it is successful), but other times will not. E.g., on my machine, a LIMIT of 12k is such an example.

import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.vertx.core.Vertx;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class But {

    private static String KSQLDB_SERVER_HOST = "ksqldb0";
    private static int KSQLDB_SERVER_HOST_PORT = 8089;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        final Client client = ClientConfig.client();
        client.executeQuery("SELECT * FROM RIDERLOCATIONS LIMIT 20000;")
                .thenAccept(result -> {

                    // This may or not be called if limit is not set or is > 10000 (default max rows). The further away from 10000, the less likely to be called.
                    System.out.println("Result size: " + result.size());
                }).exceptionally(e -> {
                    // This will NEVER be executed
                    System.out.println("Request failed: " + e);
                    e.printStackTrace();
                    return null;
                });

        // No, waiting longer won't help.
        Thread.sleep(15000);

        // Terminate any open connections and close the client
        client.close();
    }

    public static Client client() {
        final Vertx vertx = Vertx.vertx();
        final ClientOptions options = ClientOptions.create()
                .setUseTls(false)
                .setHost(KSQLDB_SERVER_HOST)
                .setPort(KSQLDB_SERVER_HOST_PORT);
        final Client client = Client.create(options, vertx);
        return client;
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant