Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueryStream only returns 1 row and end the stream with more remaining #371

Open
brsnik opened this issue Dec 14, 2024 · 0 comments
Open

Comments

@brsnik
Copy link

brsnik commented Dec 14, 2024

I am unable to get more than 1 row. In the current instance I need to receive 2 rows.
I've tried matching QueryItem::Row(row) and stream.into_row_stream.
But I simply cannot get it to return more than 1 row.

As you can see in the trace Done with status BitFlags<DoneStatus>(0b10001, More | Count) (1 row left) there is 1 more row left.

What am I doing wrong here?

The docs are seriously lacking in examples.

 let supplier_codes_refs: Vec<&dyn ToSql> =
        supplier_codes.iter().map(|s| s as &dyn ToSql).collect();

    let mut client = ctx.legacy_db.get().await.map_err(|e| {
        ApiError::InternalServerError(format!("Failed to get DB connection: {}", e))
    })?;

    let mut stream = client.query(
        r#"
            WITH _ AS (
                SELECT
                    ....
                WHERE
                    a.whseID IN (1, 2, 3, 4)
                AND p.supplierID IN (@P1)
            )
            SELECT
                legacy_id, supplier_code, price,
                IIF(stock_raw < 0, 0, stock_raw) AS stock
            FROM _;
        "#,
        &supplier_codes_refs,
    ).await.map_err(|e| {
        ApiError::InternalServerError(format!("Query execution error: {}", e))
    })?;

    let mut row_stream = stream.into_row_stream();

    while let Some(row) = row_stream.try_next().await.map_err(|e| {
        ApiError::InternalServerError(format!("Failed to fetch row: {}", e))
    })? {
        println!("----- Processing next ITEM");

        let legacy_id: Option<&str> = row.try_get("legacy_id").map_err(|e| {
            ApiError::RowExtractionError(format!("Failed to get legacy_id: {}", e))
        })?;

        let supplier_code: Option<&str> = row.try_get("supplier_code").map_err(|e| {
            ApiError::RowExtractionError(format!("Failed to get supplier_code: {}", e))
        })?;

        let price: Option<i32> = row.try_get("price").map_err(|e| {
            ApiError::RowExtractionError(format!("Failed to get price: {}", e))
        })?;

        let stock: Option<i32> = row.try_get("stock").map_err(|e| {
            ApiError::RowExtractionError(format!("Failed to get stock: {}", e))
        })?;

        println!(
            "____ --- ____ Processed Row - Legacy ID: {:?}, Supplier Code: {:?}, Price: {:?}, Stock: {:?}",
            legacy_id,
            supplier_code,
            price,
            stock
        );
    }

Trace:

2024-12-14T20:53:03.345544Z TRACE request{method=POST uri=/stock version=HTTP/1.1}: tiberius::client::connection: Sending a packet (41 bytes)
2024-12-14T20:53:03.353781Z TRACE request{method=POST uri=/stock version=HTTP/1.1}: tiberius::tds::codec::decode: Reading a TabularResult (37 bytes)
2024-12-14T20:53:03.353924Z  WARN request{method=POST uri=/stock version=HTTP/1.1}: tiberius::client::connection: TLS encryption is not enabled. All traffic including the login credentials are not encrypted.
2024-12-14T20:53:03.354128Z TRACE request{method=POST uri=/stock version=HTTP/1.1}: tiberius::client::connection: Sending a packet (236 bytes)
2024-12-14T20:53:03.365512Z TRACE request{method=POST uri=/stock version=HTTP/1.1}: tiberius::tds::codec::decode: Reading a TabularResult (292 bytes)
2024-12-14T20:53:03.365710Z  INFO request{method=POST uri=/stock version=HTTP/1.1}: tiberius::tds::stream::token: Database change from 'db2024' to 'master'
2024-12-14T20:53:03.365785Z  INFO request{method=POST uri=/stock version=HTTP/1.1}: tiberius::tds::stream::token: Changed database context to 'db2024'.
2024-12-14T20:53:03.365821Z  INFO request{method=POST uri=/stock version=HTTP/1.1}: tiberius::tds::stream::token: SQL collation changed to windows-1251
2024-12-14T20:53:03.365865Z  INFO request{method=POST uri=/stock version=HTTP/1.1}: tiberius::tds::stream::token: Microsoft SQL Server version 2299330574
2024-12-14T20:53:03.365931Z  INFO request{method=POST uri=/stock version=HTTP/1.1}: tiberius::tds::stream::token: Packet size change from '4096' to '4096'
2024-12-14T20:53:03.365964Z TRACE request{method=POST uri=/stock version=HTTP/1.1}: tiberius::tds::stream::token: Done with status BitFlags<DoneStatus>(0b0)
2024-12-14T20:53:03.366479Z TRACE request{method=POST uri=/stock version=HTTP/1.1}: tiberius::client::connection: Sending a packet (3054 bytes)
2024-12-14T20:53:03.443470Z TRACE request{method=POST uri=/stock version=HTTP/1.1}: tiberius::tds::codec::decode: Reading a TabularResult (193 bytes)
2024-12-14T20:53:03.443554Z TRACE request{method=POST uri=/stock version=HTTP/1.1}: tiberius::tds::stream::token: meta=TokenColMetaData { columns: [MetaDataColumn { base: BaseMetaDataColumn { flags: BitFlags<ColumnFlag>(0b1000, Updateable), ty: VarLenSized(VarLenContext { type: NVarchar, len: 100, collation: Some(Collation { info: 13632521, sort_id: 52 }) }) }, col_name: "legacy_id" }, MetaDataColumn { base: BaseMetaDataColumn { flags: BitFlags<ColumnFlag>(0b1001, Nullable | Updateable), ty: VarLenSized(VarLenContext { type: NVarchar, len: 100, collation: Some(Collation { info: 13632559, sort_id: 0 }) }) }, col_name: "supplier_code" }, MetaDataColumn { base: BaseMetaDataColumn { flags: BitFlags<ColumnFlag>(0b1, Nullable), ty: VarLenSized(VarLenContext { type: Intn, len: 4, collation: None }) }, col_name: "price" }, MetaDataColumn { base: BaseMetaDataColumn { flags: BitFlags<ColumnFlag>(0b100001, Nullable | Identity), ty: VarLenSized(VarLenContext { type: Intn, len: 4, collation: None }) }, col_name: "stock" }] }
2024-12-14T20:53:03.443678Z TRACE request{method=POST uri=/stock version=HTTP/1.1}: tiberius::tds::stream::token: TokenRow { data: [String(Some("016857")), String(Some("539580")), I32(Some(3930)), I32(Some(50))] }
----- Processing next ITEM
____ --- ____ Processed Row - Legacy ID: Some("016857"), Supplier Code: Some("539580"), Price: Some(3930), Stock: Some(50)
2024-12-14T20:53:03.443720Z TRACE request{method=POST uri=/stock version=HTTP/1.1}: tiberius::tds::stream::token: Done with status BitFlags<DoneStatus>(0b10001, More | Count) (1 row left)
2024-12-14T20:53:03.443733Z TRACE request{method=POST uri=/stock version=HTTP/1.1}: tiberius::tds::stream::token: Done with status BitFlags<DoneStatus>(0b0)
2024-12-14T20:53:03.443847Z DEBUG request{method=POST uri=/stock version=HTTP/1.1}: tower_http::trace::on_response: finished processing request latency=236 ms status=200
2024-12-14T20:53:03.444493Z TRACE axum::serve: connection [::1]:52430 closed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant