Skip to content

Commit

Permalink
Merge pull request #20 from narma/add_more_powerful_versions_of_execu…
Browse files Browse the repository at this point in the history
…te_select_selectFirst

add more powerful versions of execute, select and selectFirst
  • Loading branch information
myazinn authored Sep 26, 2022
2 parents 42b790a + df86187 commit 4710b54
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 32 deletions.
5 changes: 5 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class ServiceImpl(session: Session) extends Service {

override def put(value: Model) = insertQuery(value).execute.unit.provide(ZLayer.succeed(session))
override def get(id: Int) = selectQuery(id).selectFirst.provideSome(ZLayer.succeed(session))

// alternatively, to avoid providing environment each time
def insert(value: Model) = session.execute(insertQuery(value)).unit
def select(id: Int) = session.selectFirst(selectQuery(id))

}
```

Expand Down
44 changes: 42 additions & 2 deletions src/it/scala/zio/cassandra/session/SessionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.SimpleStatement
import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException
import com.dimafeng.testcontainers.CassandraContainer
import zio.cassandra.session.cql.CqlStringContext
import zio.cassandra.session.cql.unsafe.lift
import zio.test.Assertion._
import zio.test._
import zio.{ Chunk, Scope, ZIO }

import java.net.InetSocketAddress
import java.util.UUID

object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {

Expand Down Expand Up @@ -54,6 +57,23 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {
.runCollect
} yield assertTrue(results == Chunk("one", "two", "three"))
},
test("select interpolated query (cqlConst) should return prepared data") {
for {
session <- ZIO.service[Session]
results <- session
.select(cqlConst"select data FROM $keyspace.test_data WHERE id IN (1,2,3)".as[String])
.runCollect
} yield assertTrue(results == Chunk("one", "two", "three"))
},
test("select interpolated query (cql) should return prepared data") {
for {
session <- ZIO.service[Session]
ids = List(1L, 2L, 3L)
results <- session
.select(cql"select data FROM ${lift(keyspace)}.test_data WHERE id IN $ids".as[String])
.runCollect
} yield assertTrue(results == Chunk("one", "two", "three"))
},
test("select should be pure stream") {
for {
session <- ZIO.service[Session]
Expand All @@ -65,15 +85,15 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {
results <- selectStream
} yield assertTrue(results == Chunk("one", "two", "three"))
},
test("selectOne should return None on empty result") {
test("selectFirst should return None on empty result") {
for {
session <- ZIO.service[Session]
result <- session
.selectFirst(s"select data FROM $keyspace.test_data WHERE id = 404")
.map(_.map(_.getString(0)))
} yield assertTrue(result.isEmpty)
},
test("selectOne should return Some for one") {
test("selectFirst should return Some for one") {
for {
session <- ZIO.service[Session]
result <- session
Expand All @@ -88,6 +108,19 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {
.map(_.map(_.getString(0)))
} yield assertTrue(result.contains(null))
},
test("selectFirst interpolated query (cqlConst) should return Some") {
for {
session <- ZIO.service[Session]
result <- session.selectFirst(cqlConst"select data FROM $keyspace.test_data WHERE id = 1".as[String])
} yield assertTrue(result.contains("one"))
},
test("selectFirst interpolated query (cql) should return Some") {
for {
session <- ZIO.service[Session]
id = 1L
result <- session.selectFirst(cql"select data FROM ${lift(keyspace)}.test_data WHERE id = $id".as[String])
} yield assertTrue(result.contains("one"))
},
test("select will emit in chunks sized equal to statement pageSize") {
val st = SimpleStatement.newInstance(s"select data from $keyspace.test_data").setPageSize(2)
for {
Expand All @@ -102,6 +135,13 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {
session <- ZIO.service[Session]
results <- session.select(st).map(_.getString(0)).runCollect
} yield assert(results)(hasSameElements(Chunk("one", "two", "three")))
},
test("execute will create a table") {
for {
session <- ZIO.service[Session]
table = UUID.randomUUID().toString.replaceAll("-", "_")
created <- session.execute(cqlConst"create table $keyspace.$table(id text primary key)")
} yield assertTrue(created)
}
)
}
23 changes: 22 additions & 1 deletion src/main/scala/zio/cassandra/session/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ import com.datastax.oss.driver.api.core.cql._
import com.datastax.oss.driver.api.core.metadata.Metadata
import com.datastax.oss.driver.api.core.metrics.Metrics
import com.datastax.oss.driver.api.core.{ CqlIdentifier, CqlSession, CqlSessionBuilder }
import shapeless.HList
import zio._
import zio.cassandra.session.cql.query.{ ParameterizedQuery, PreparedQuery, Query, QueryTemplate }
import zio.macros.accessible
import zio.stream.{ Stream, ZStream }
import zio.stream.ZStream.Pull
import zio.stream.{ Stream, ZStream }

import scala.jdk.CollectionConverters.IterableHasAsScala
import scala.jdk.OptionConverters.RichOptional

@accessible
trait Session {

def prepare(stmt: String): Task[PreparedStatement]

def execute(stmt: Statement[_]): Task[AsyncResultSet]
Expand All @@ -26,6 +29,23 @@ trait Session {
// short-cuts
def selectFirst(stmt: Statement[_]): Task[Option[Row]]

final def prepare[V <: HList, R](query: QueryTemplate[V, R]): Task[PreparedQuery[V, R]] = {
import query.{ binder, reads }
prepare(query.query).map(new PreparedQuery[V, R](this, _, query.config))
}

final def prepare[V <: HList, R](query: ParameterizedQuery[V, R]): Task[Query[R]] =
prepare(query.template).map(_.applyProduct(query.values))

final def execute[V <: HList](query: ParameterizedQuery[V, _]): Task[Boolean] =
prepare(query).flatMap(_.execute)

final def select[V <: HList, R](query: ParameterizedQuery[V, R]): Stream[Throwable, R] =
ZStream.fromZIO(prepare(query)).flatMap(_.select)

final def selectFirst[V <: HList, R](query: ParameterizedQuery[V, R]): Task[Option[R]] =
prepare(query).flatMap(_.selectFirst)

// other methods
def metrics: Option[Metrics]
def name: String
Expand All @@ -36,6 +56,7 @@ trait Session {

def context: DriverContext
def keyspace: Option[CqlIdentifier]

}

object Session {
Expand Down
10 changes: 0 additions & 10 deletions src/main/scala/zio/cassandra/session/cql/FieldName.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zio.cassandra.session.cql.query
import com.datastax.oss.driver.api.core.cql.BoundStatement
import shapeless.HList
import shapeless.ops.hlist.Prepend
import zio.RIO
import zio._
import zio.cassandra.session.Session
import zio.cassandra.session.cql.Binder
import zio.cassandra.session.cql.codec.Reads
Expand All @@ -13,14 +13,11 @@ case class ParameterizedQuery[V <: HList: Binder, R: Reads] private (template: Q
def +(that: String): ParameterizedQuery[V, R] = ParameterizedQuery[V, R](this.template + that, this.values)
def as[R1: Reads]: ParameterizedQuery[V, R1] = ParameterizedQuery[V, R1](template.as[R1], values)

def select: ZStream[Session, Throwable, R] =
ZStream.unwrap(template.prepare.map(_.applyProduct(values).select))
def select: ZStream[Session, Throwable, R] = ZStream.serviceWithStream(_.select(this))

def selectFirst: RIO[Session, Option[R]] =
template.prepare.flatMap(_.applyProduct(values).selectFirst)
def selectFirst: RIO[Session, Option[R]] = ZIO.serviceWithZIO(_.selectFirst(this))

def execute: RIO[Session, Boolean] =
template.prepare.map(_.applyProduct(values)).flatMap(_.execute)
def execute: RIO[Session, Boolean] = ZIO.serviceWithZIO(_.execute(this))

def config(config: BoundStatement => BoundStatement): ParameterizedQuery[V, R] =
ParameterizedQuery[V, R](template.config(config), values)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import zio.cassandra.session.cql.Binder
import zio.cassandra.session.Session
import zio.cassandra.session.cql.codec.Reads

class PreparedQuery[V <: HList: Binder, R: Reads] private[cql] (
class PreparedQuery[V <: HList: Binder, R: Reads] private[session] (
session: Session,
statement: PreparedStatement,
config: BoundStatement => BoundStatement
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/zio/cassandra/session/cql/query/Query.scala
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
package zio.cassandra.session.cql.query

import com.datastax.oss.driver.api.core.cql.BoundStatement
import zio.{ Task, ZIO }
import zio.stream.Stream
import zio.cassandra.session.Session
import zio.cassandra.session.cql.codec.Reads
import zio.stream.Stream
import zio.{ Task, ZIO }

class Query[R: Reads] private[cql] (
session: Session,
private[cql] val statement: BoundStatement
) {
def config(statement: BoundStatement => BoundStatement) = new Query[R](session, statement(this.statement))

def select: Stream[Throwable, R] = session.select(statement).mapChunksZIO { chunk =>
def select: Stream[Throwable, R] = session.select(statement).mapChunksZIO { chunk =>
chunk.mapZIO(row => ZIO.attempt(Reads[R].read(row)))
}

def selectFirst: Task[Option[R]] = session.selectFirst(statement).flatMap {
def selectFirst: Task[Option[R]] = session.selectFirst(statement).flatMap {
case None => ZIO.none
case Some(row) => ZIO.attempt(Reads[R].read(row)).map(Some(_))
}

def execute: Task[Boolean] = session.execute(statement).map(_.wasApplied)
def execute: Task[Boolean] = session.execute(statement).map(_.wasApplied)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ import zio.cassandra.session.cql.Binder
import zio.cassandra.session.cql.codec.Reads
import zio.{ RIO, ZIO }

case class QueryTemplate[V <: HList: Binder, R: Reads] private[cql] (
case class QueryTemplate[V <: HList, R] private[cql] (
query: String,
config: BoundStatement => BoundStatement
) {
)(implicit val binder: Binder[V], val reads: Reads[R]) {
def +(that: String): QueryTemplate[V, R] = QueryTemplate[V, R](this.query + that, config)
def as[R1: Reads]: QueryTemplate[V, R1] = QueryTemplate[V, R1](query, config)

def prepare: RIO[Session, PreparedQuery[V, R]] =
ZIO.serviceWithZIO { session =>
session.prepare(query).map(new PreparedQuery(session, _, config))
}
def prepare: RIO[Session, PreparedQuery[V, R]] = ZIO.serviceWithZIO(_.prepare(this))

def config(config: BoundStatement => BoundStatement): QueryTemplate[V, R] =
QueryTemplate[V, R](this.query, this.config andThen config)
Expand Down

0 comments on commit 4710b54

Please sign in to comment.