diff --git a/src/query/service/src/servers/http/v1/query/page_manager.rs b/src/query/service/src/servers/http/v1/query/page_manager.rs index d370e6703985..bd9909a84d5f 100644 --- a/src/query/service/src/servers/http/v1/query/page_manager.rs +++ b/src/query/service/src/servers/http/v1/query/page_manager.rs @@ -163,6 +163,7 @@ impl PageManager { remain_size -= size; remain_rows -= 1; } else { + self.row_buffer.push_front(row); remain_size = 0; } } else { diff --git a/src/query/service/tests/it/servers/http/http_query_handlers.rs b/src/query/service/tests/it/servers/http/http_query_handlers.rs index c4db4d5bc05c..10030cbe7d64 100644 --- a/src/query/service/tests/it/servers/http/http_query_handlers.rs +++ b/src/query/service/tests/it/servers/http/http_query_handlers.rs @@ -202,7 +202,7 @@ impl TestHttpQueryRequest { #[derive(Debug, Clone)] struct TestHttpQueryFetchReply { - resps: Vec<(StatusCode, QueryResponse)>, + pub resps: Vec<(StatusCode, QueryResponse)>, } impl TestHttpQueryFetchReply { @@ -1715,6 +1715,20 @@ async fn test_max_size_per_page() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "current_thread")] +async fn test_max_size_per_page_total_rows() -> Result<()> { + let _fixture = TestFixture::setup().await?; + // bytes_limit / rows_limit = 1024 * 1024 * 10 / 10000 = 1048.567 + let sql = "select repeat('1', 1050) as a from numbers(20000)"; + let wait_time_secs = 5; + let json = serde_json::json!({"sql": sql.to_string(), "pagination": {"wait_time_secs": wait_time_secs}}); + let reply = TestHttpQueryRequest::new(json).fetch_total().await?; + assert!(reply.error().is_none(), "{:?}", reply.error()); + assert_eq!(reply.resps.len(), 3); + assert_eq!(reply.data().len(), 20000, "{:?}", reply.error()); + Ok(()) +} + #[tokio::test(flavor = "current_thread")] async fn test_null_response() -> Result<()> { let _fixture = TestFixture::setup().await?;