Skip to content

Commit

Permalink
Merge pull request scylladb#1069 from muzarski/replace-query-unpaged-…
Browse files Browse the repository at this point in the history
…in-examples

examples: replace `query_unpaged` for SELECT queries
  • Loading branch information
wprzytula authored Aug 29, 2024
2 parents 5752af4 + 18bdbd1 commit 47e9864
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 87 deletions.
28 changes: 14 additions & 14 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Result;
use futures::TryStreamExt;
use scylla::macros::FromRow;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
Expand Down Expand Up @@ -49,11 +50,11 @@ async fn main() -> Result<()> {
.await?;

// Rows can be parsed as tuples
let result = session
.query_unpaged("SELECT a, b, c FROM examples_ks.basic", &[])
.await?;
let mut iter = result.rows_typed::<(i32, i32, String)>()?;
while let Some((a, b, c)) = iter.next().transpose()? {
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?
.into_typed::<(i32, i32, String)>();
while let Some((a, b, c)) = iter.try_next().await? {
println!("a, b, c: {}, {}, {}", a, b, c);
}

Expand All @@ -65,20 +66,19 @@ async fn main() -> Result<()> {
_c: String,
}

let result = session
.query_unpaged("SELECT a, b, c FROM examples_ks.basic", &[])
.await?;
let mut iter = result.rows_typed::<RowData>()?;
while let Some(row_data) = iter.next().transpose()? {
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?
.into_typed::<RowData>();
while let Some(row_data) = iter.try_next().await? {
println!("row_data: {:?}", row_data);
}

// Or simply as untyped rows
let result = session
.query_unpaged("SELECT a, b, c FROM examples_ks.basic", &[])
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?;
let rows = result.rows.unwrap();
for row in rows {
while let Some(row) = iter.try_next().await? {
let a = row.columns[0].as_ref().unwrap().as_int().unwrap();
let b = row.columns[1].as_ref().unwrap().as_int().unwrap();
let c = row.columns[2].as_ref().unwrap().as_text().unwrap();
Expand Down
97 changes: 50 additions & 47 deletions examples/cql-time-types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use anyhow::Result;
use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
use futures::{StreamExt, TryStreamExt};
use scylla::frame::response::result::CqlValue;
use scylla::frame::value::{CqlDate, CqlTime, CqlTimestamp};
use scylla::transport::session::Session;
Expand Down Expand Up @@ -40,11 +41,12 @@ async fn main() -> Result<()> {
)
.await?;

let result = session
.query_unpaged("SELECT d from examples_ks.dates", &[])
.await?;
for row in result.rows_typed::<(NaiveDate,)>()? {
let (read_date,): (NaiveDate,) = match row {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(NaiveDate,)>();
while let Some(row_result) = iter.next().await {
let (read_date,): (NaiveDate,) = match row_result {
Ok(read_date) => read_date,
Err(_) => continue, // We might read a date that does not fit in NaiveDate, skip it
};
Expand All @@ -61,11 +63,12 @@ async fn main() -> Result<()> {
.query_unpaged("INSERT INTO examples_ks.dates (d) VALUES (?)", (time_date,))
.await?;

let result = session
.query_unpaged("SELECT d from examples_ks.dates", &[])
.await?;
for row in result.rows_typed::<(time::Date,)>()? {
let (read_date,) = match row {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(time::Date,)>();
while let Some(row_result) = iter.next().await {
let (read_date,): (time::Date,) = match row_result {
Ok(read_date) => read_date,
Err(_) => continue, // We might read a date that does not fit in time::Date, skip it
};
Expand All @@ -82,13 +85,13 @@ async fn main() -> Result<()> {
)
.await?;

let result = session
.query_unpaged("SELECT d from examples_ks.dates", &[])
.await?;
let mut iter = result.rows_typed::<(CqlValue,)>()?;
while let Some((value,)) = iter.next().transpose()? {
let read_days: u32 = match value {
CqlValue::Date(CqlDate(days)) => days,
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(CqlValue,)>();
while let Some(row_result) = iter.next().await {
let read_days: u32 = match row_result {
Ok((CqlValue::Date(CqlDate(days)),)) => days,
_ => panic!("oh no"),
};

Expand Down Expand Up @@ -118,11 +121,11 @@ async fn main() -> Result<()> {
)
.await?;

let result = session
.query_unpaged("SELECT t from examples_ks.times", &[])
.await?;
let mut iter = result.rows_typed::<(NaiveTime,)>()?;
while let Some((read_time,)) = iter.next().transpose()? {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(NaiveTime,)>();
while let Some((read_time,)) = iter.try_next().await? {
println!("Parsed a time into chrono::NaiveTime: {:?}", read_time);
}

Expand All @@ -133,11 +136,11 @@ async fn main() -> Result<()> {
.query_unpaged("INSERT INTO examples_ks.times (t) VALUES (?)", (time_time,))
.await?;

let result = session
.query_unpaged("SELECT t from examples_ks.times", &[])
.await?;
let mut iter = result.rows_typed::<(time::Time,)>()?;
while let Some((read_time,)) = iter.next().transpose()? {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(time::Time,)>();
while let Some((read_time,)) = iter.try_next().await? {
println!("Parsed a time into time::Time: {:?}", read_time);
}

Expand All @@ -148,11 +151,11 @@ async fn main() -> Result<()> {
.query_unpaged("INSERT INTO examples_ks.times (t) VALUES (?)", (time_time,))
.await?;

let result = session
.query_unpaged("SELECT t from examples_ks.times", &[])
.await?;
let mut iter = result.rows_typed::<(CqlTime,)>()?;
while let Some((read_time,)) = iter.next().transpose()? {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(CqlTime,)>();
while let Some((read_time,)) = iter.try_next().await? {
println!("Read a time as raw nanos: {:?}", read_time);
}

Expand All @@ -179,11 +182,11 @@ async fn main() -> Result<()> {
)
.await?;

let result = session
.query_unpaged("SELECT t from examples_ks.timestamps", &[])
.await?;
let mut iter = result.rows_typed::<(DateTime<Utc>,)>()?;
while let Some((read_time,)) = iter.next().transpose()? {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(DateTime<Utc>,)>();
while let Some((read_time,)) = iter.try_next().await? {
println!(
"Parsed a timestamp into chrono::DateTime<chrono::Utc>: {:?}",
read_time
Expand All @@ -200,11 +203,11 @@ async fn main() -> Result<()> {
)
.await?;

let result = session
.query_unpaged("SELECT t from examples_ks.timestamps", &[])
.await?;
let mut iter = result.rows_typed::<(time::OffsetDateTime,)>()?;
while let Some((read_time,)) = iter.next().transpose()? {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(time::OffsetDateTime,)>();
while let Some((read_time,)) = iter.try_next().await? {
println!(
"Parsed a timestamp into time::OffsetDateTime: {:?}",
read_time
Expand All @@ -221,11 +224,11 @@ async fn main() -> Result<()> {
)
.await?;

let result = session
.query_unpaged("SELECT t from examples_ks.timestamps", &[])
.await?;
let mut iter = result.rows_typed::<(CqlTimestamp,)>()?;
while let Some((read_time,)) = iter.next().transpose()? {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(CqlTimestamp,)>();
while let Some((read_time,)) = iter.try_next().await? {
println!("Read a timestamp as raw millis: {:?}", read_time);
}

Expand Down
11 changes: 6 additions & 5 deletions examples/schema_agreement.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{bail, Result};
use futures::TryStreamExt;
use scylla::transport::errors::QueryError;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
Expand Down Expand Up @@ -66,11 +67,11 @@ async fn main() -> Result<()> {
.await?;

// Rows can be parsed as tuples
let result = session
.query_unpaged("SELECT a, b, c FROM examples_ks.schema_agreement", &[])
.await?;
let mut iter = result.rows_typed::<(i32, i32, String)>()?;
while let Some((a, b, c)) = iter.next().transpose()? {
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.schema_agreement", &[])
.await?
.into_typed::<(i32, i32, String)>();
while let Some((a, b, c)) = iter.try_next().await? {
println!("a, b, c: {}, {}, {}", a, b, c);
}

Expand Down
11 changes: 6 additions & 5 deletions examples/tls.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Result;
use futures::TryStreamExt;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use std::env;
Expand Down Expand Up @@ -86,11 +87,11 @@ async fn main() -> Result<()> {
.await?;

// Rows can be parsed as tuples
let result = session
.query_unpaged("SELECT a, b, c FROM examples_ks.tls", &[])
.await?;
let mut iter = result.rows_typed::<(i32, i32, String)>()?;
while let Some((a, b, c)) = iter.next().transpose()? {
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.tls", &[])
.await?
.into_typed::<(i32, i32, String)>();
while let Some((a, b, c)) = iter.try_next().await? {
println!("a, b, c: {}, {}, {}", a, b, c);
}

Expand Down
14 changes: 9 additions & 5 deletions examples/user-defined-type.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Result;
use futures::TryStreamExt;
use scylla::macros::FromUserType;
use scylla::{SerializeValue, Session, SessionBuilder};
use std::env;
Expand Down Expand Up @@ -49,11 +50,14 @@ async fn main() -> Result<()> {
.await?;

// And read like any normal value
let result = session
.query_unpaged("SELECT my FROM examples_ks.user_defined_type_table", &[])
.await?;
let mut iter = result.rows_typed::<(MyType,)>()?;
while let Some((my_val,)) = iter.next().transpose()? {
let mut iter = session
.query_iter(
"SELECT a, b, c FROM examples_ks.user_defined_type_table",
&[],
)
.await?
.into_typed::<(MyType,)>();
while let Some((my_val,)) = iter.try_next().await? {
println!("{:?}", my_val);
}

Expand Down
21 changes: 10 additions & 11 deletions examples/value_list.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
use anyhow::Result;
use scylla::{Session, SessionBuilder};
use std::env;

#[tokio::main]
async fn main() {
async fn main() -> Result<()> {
let uri = env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());

println!("Connecting to {} ...", uri);

let session: Session = SessionBuilder::new().known_node(uri).build().await.unwrap();
let session: Session = SessionBuilder::new().known_node(uri).build().await?;

session.query_unpaged("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await.unwrap();
session.query_unpaged("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await?;

session
.query_unpaged(
"CREATE TABLE IF NOT EXISTS examples_ks.my_type (k int, my text, primary key (k))",
&[],
)
.await
.unwrap();
.await?;

#[derive(scylla::SerializeRow)]
struct MyType<'a> {
Expand All @@ -35,8 +35,7 @@ async fn main() {
"INSERT INTO examples_ks.my_type (k, my) VALUES (?, ?)",
to_insert,
)
.await
.unwrap();
.await?;

// You can also use type generics:
#[derive(scylla::SerializeRow)]
Expand All @@ -55,13 +54,13 @@ async fn main() {
"INSERT INTO examples_ks.my_type (k, my) VALUES (?, ?)",
to_insert_2,
)
.await
.unwrap();
.await?;

let q = session
.query_unpaged("SELECT * FROM examples_ks.my_type", &[])
.await
.unwrap();
.await?;

println!("Q: {:?}", q.rows);

Ok(())
}

0 comments on commit 47e9864

Please sign in to comment.