Skip to content

Commit

Permalink
Merge pull request #29 from narma/add_session_batch_execute_zio-1.x
Browse files Browse the repository at this point in the history
Add session batch execute zio 1.x
  • Loading branch information
myazinn authored Dec 8, 2022
2 parents 766cff6 + 43bde9c commit 49054cf
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
19 changes: 17 additions & 2 deletions src/it/scala/zio/cassandra/session/SessionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.datastax.oss.driver.api.core.cql.SimpleStatement
import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException
import com.dimafeng.testcontainers.CassandraContainer
import zio.cassandra.session.cql.CqlStringContext
import zio.cassandra.session.cql.query.Batch
import zio.cassandra.session.cql.unsafe.lift
import zio.test.Assertion._
import zio.test._
Expand Down Expand Up @@ -104,7 +105,7 @@ object SessionSpec extends CassandraSpecUtils {
testM("selectFirst should return Some(null) for null") {
for {
session <- ZIO.service[Session]
result <- session.selectFirst(s"select data FROM $keyspace.test_data WHERE id = 0").map(_.map(_.getString(0)))
result <- session.selectFirst(s"select data FROM $keyspace.test_data WHERE id = 0").map(_.map(_.getString(0)))
} yield assertTrue(result.contains(null))
},
testM("selectFirst interpolated query (cqlConst) should return Some") {
Expand All @@ -131,9 +132,23 @@ object SessionSpec extends CassandraSpecUtils {
testM("execute will create a table") {
for {
session <- ZIO.service[Session]
table = "table_" +UUID.randomUUID().toString.replaceAll("-", "_")
table = "table_" + UUID.randomUUID().toString.replaceAll("-", "_")
created <- session.execute(cqlConst"create table $keyspace.$table(id text primary key)")
} yield assertTrue(created)
},
testM("execute will insert batched data") {
for {
session <- ZIO.service[Session]
tbl = "table_" + UUID.randomUUID().toString.replaceAll("-", "_")
table = s"$keyspace.$tbl"
_ <- session.execute(cqlConst"create table $table(id text primary key)")
insert1 <- session.prepare(cqlConst"insert into $table(id) values ('primary key 1')")
insert2 <- session.prepare(cqlConst"insert into $table(id) values ('primary key 2')")
insert3 <- session.prepare(cqlConst"insert into $table(id) values ('primary key 3')")
batch = Batch.unlogged.add(Seq(insert1, insert2, insert3))
inserted <- session.execute(batch)
result <- session.selectFirst(cqlConst"select count(*) from $table".as[Long])
} yield assertTrue(inserted, result.contains(3))
}
)
}
5 changes: 4 additions & 1 deletion src/main/scala/zio/cassandra/session/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.datastax.oss.driver.api.core.metadata.Metadata
import com.datastax.oss.driver.api.core.metrics.Metrics
import com.datastax.oss.driver.api.core.{ CqlIdentifier, CqlSession, CqlSessionBuilder }
import zio._
import zio.cassandra.session.cql.query.{ PreparedQuery, QueryTemplate }
import zio.cassandra.session.cql.query.{ Batch, PreparedQuery, QueryTemplate }
import zio.stream.Stream
import zio.stream.ZStream.Pull

Expand Down Expand Up @@ -34,6 +34,9 @@ trait Session {
final def execute(template: QueryTemplate[_]): Task[Boolean] =
prepare(template).flatMap(_.execute)

final def execute(batch: Batch): Task[Boolean] =
execute(batch.build).map(_.wasApplied)

final def select[R](template: QueryTemplate[R]): Stream[Throwable, R] =
Stream.fromEffect(prepare(template)).flatMap(_.select)

Expand Down
12 changes: 9 additions & 3 deletions src/main/scala/zio/cassandra/session/cql/query/Batch.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package zio.cassandra.session.cql.query

import com.datastax.oss.driver.api.core.cql.{ BatchStatementBuilder, BatchType }
import com.datastax.oss.driver.api.core.cql.{ BatchStatement, BatchStatementBuilder, BatchType }
import zio.cassandra.session.Session
import zio.{ Has, RIO, ZIO }

class Batch(batchStatementBuilder: BatchStatementBuilder) {
def add(queries: Seq[PreparedQuery[_]]) = new Batch(batchStatementBuilder.addStatements(queries.map(_.statement): _*))

def add(queries: Seq[PreparedQuery[_]]): this.type = {
batchStatementBuilder.addStatements(queries.map(_.statement): _*)
this
}

def build: BatchStatement = batchStatementBuilder.build()

def execute: RIO[Has[Session], Boolean] =
ZIO.accessM[Has[Session]] { session =>
session.get.execute(batchStatementBuilder.build()).map(_.wasApplied)
session.get.execute(this)
}

def config(config: BatchStatementBuilder => BatchStatementBuilder): Batch =
Expand Down

0 comments on commit 49054cf

Please sign in to comment.