diff --git a/build.sbt b/build.sbt index c17f637..0af8bc9 100644 --- a/build.sbt +++ b/build.sbt @@ -56,7 +56,8 @@ lazy val core = project "com.precog" %% "quasar-foundation" % quasarPluginQuasarVersion.value % "test->test", "org.http4s" %% "jawn-fs2" % "1.0.0-RC2" % Test, "io.argonaut" %% "argonaut-jawn" % "6.3.0-M2" % Test - )) + ), + testFrameworks := Seq(TestFrameworks.Specs2)) .enablePlugins(QuasarPlugin) .evictToLocal("QUASAR_PATH", "connector", true) .evictToLocal("QUASAR_PATH", "api", true) diff --git a/core/src/main/scala/quasar/plugin/postgres/datasource/PostgresDatasource.scala b/core/src/main/scala/quasar/plugin/postgres/datasource/PostgresDatasource.scala index 60636f4..1432c25 100644 --- a/core/src/main/scala/quasar/plugin/postgres/datasource/PostgresDatasource.scala +++ b/core/src/main/scala/quasar/plugin/postgres/datasource/PostgresDatasource.scala @@ -18,8 +18,10 @@ package quasar.plugin.postgres.datasource import slamdata.Predef._ +import java.time.format.DateTimeFormatter + import cats.~> -import cats.data.NonEmptyList +import cats.data.{EitherT, NonEmptyList} import cats.effect._ import cats.implicits._ @@ -31,14 +33,17 @@ import fs2.{text, Pull, Stream} import org.slf4s.Logging -import quasar.{ScalarStage, ScalarStages} import quasar.api.ColumnType +import quasar.api.DataPathSegment import quasar.api.datasource.DatasourceType +import quasar.api.push.InternalKey import quasar.api.resource.{ResourcePathType => RPT, _} import quasar.common.CPathField -import quasar.connector.{ResourceError => RE, _} +import quasar.connector.datasource.Loader import quasar.connector.datasource.{BatchLoader, DatasourceModule, Loader} +import quasar.connector.{ResourceError => RE, Offset, _} import quasar.qscript.InterpretedRead +import quasar.{ScalarStage, ScalarStages} import shims._ @@ -51,40 +56,12 @@ final class PostgresDatasource[F[_]: MonadResourceErr: Sync]( val kind: DatasourceType = PostgresDatasourceModule.kind - val loaders = NonEmptyList.of(Loader.Batch(BatchLoader.Full { (ir: InterpretedRead[ResourcePath]) => - pathToLoc(ir.path) match { - case Some(Right((schema, table))) => - val back = tableExists(schema, table) map { exists => - if (exists) - Right(maskedColumns(ir.stages) match { - case Some((columns, nextStages)) => - (tableAsJsonBytes(schema, ColumnProjections.Explicit(columns), table), nextStages) - - case None => - (tableAsJsonBytes(schema, ColumnProjections.All, table), ir.stages) - }) - else - Left(RE.pathNotFound[RE](ir.path)) - } - - xa.connect(xa.kernel) - .evalMap(c => runCIO(c)(back.map(_.map(_.leftMap(_.translate(runCIO(c))))))) - .evalMap { - case Right((s, stages)) => - QueryResult.typed(DataFormat.ldjson, ResultData.Continuous(s), stages).pure[F] - - case Left(re) => - MonadResourceErr[F].raiseError[QueryResult[F]](re) - } - - case _ => - Resource.eval(MonadResourceErr[F].raiseError(RE.notAResource(ir.path))) - } - })) + val loaders = + NonEmptyList.one(Loader.Batch(BatchLoader.Seek(load(_, _)))) def pathIsResource(path: ResourcePath): Resource[F, Boolean] = - Resource.eval(pathToLoc(path) match { - case Some(Right((schema, table))) => + Resource.eval(pathToLoc(path).toOption match { + case Some(Loc.Leaf(schema, table)) => tableExists(schema, table).transact(xa) case _ => false.pure[F] @@ -98,15 +75,15 @@ final class PostgresDatasource[F[_]: MonadResourceErr: Sync]( .some .pure[Resource[F, ?]] else - pathToLoc(prefixPath) match { - case Some(Right((schema, table))) => + pathToLoc(prefixPath).toOption match { + case Some(Loc.Leaf(schema, table)) => Resource.eval( tableExists(schema, table) .map(p => if (p) Some(Stream.empty.covaryAll[F, (ResourceName, RPT.Physical)]) else None) .transact(xa)) - case Some(Left(schema)) => - val l = Some(schema).filterNot(containsWildcard).map(Left(_)) + case Some(Loc.Prefix(schema)) => + val l = Some(schema).filterNot(containsWildcard).map(Loc.Prefix(_)) val paths = tables(l) @@ -126,7 +103,114 @@ final class PostgresDatasource[F[_]: MonadResourceErr: Sync]( //// - private type Loc = Option[Either[Schema, (Schema, Table)]] + private def load(read: InterpretedRead[ResourcePath], offset: Option[Offset]): Resource[F, QueryResult[F]] = { + val back = (for { + loc <- EitherT.fromEither[ConnectionIO](pathToLoc(read.path)) + + predicate <- EitherT.fromEither[ConnectionIO]( + offset.fold[Either[RE, Option[OffsetCondition]]](Right(None))(o => + offsetProjection(o, read.path).map(Some(_)))) + + (schema, table) <- EitherT.fromEither[ConnectionIO](loc match { + case Loc.Leaf(schema, table) => + Right((schema, table)) + case _ => + Left(RE.notAResource(read.path)) + }) + + exists <- EitherT.right[RE](tableExists(schema, table)) + + _ <- EitherT.pure[ConnectionIO, RE](()).ensure(RE.pathNotFound(read.path))(_ => exists) + + } yield maskedColumns(read.stages) match { + case Some((columns, nextStages)) => + (tableAsJsonBytes(schema, ColumnProjections.Explicit(columns), table, predicate), nextStages) + + case None => + (tableAsJsonBytes(schema, ColumnProjections.All, table, predicate), read.stages) + }).value + + xa.connect(xa.kernel) + .evalMap(c => runCIO(c)(back.map(_.map(_.leftMap(_.translate(runCIO(c))))))) + .evalMap { + case Right((s, stages)) => + QueryResult.typed(DataFormat.ldjson, ResultData.Continuous(s), stages).pure[F] + case Left(re) => + MonadResourceErr[F].raiseError[QueryResult[F]](re) + } + } + + private sealed trait PgVal extends Product with Serializable + + private object PgVal { + case class Whole(value: Long) extends PgVal + case class Floating(value: Double) extends PgVal + case class Str(value: String) extends PgVal + } + + private case class OffsetCondition( + column: String, + value: PgVal, + tpe: Option[String]) + + private def offsetProjection(offset: Offset, path: ResourcePath): Either[RE, OffsetCondition] = + offset match { + case Offset.Internal(NonEmptyList(DataPathSegment.Field(proj), Nil), value) => { + val k: InternalKey.Actual[A] forSome { type A } = value.value + + val offsetValue: Option[OffsetCondition] = k match { + case InternalKey.RealKey(num) => { + val pgVal = if (num.isWhole()) { + PgVal.Whole(num.toLong) + } else { + PgVal.Floating(num.toDouble) + } + + Some(OffsetCondition(proj, pgVal, None)) + } + case InternalKey.StringKey(_) => + None + + case InternalKey.DateTimeKey(dt) => + Some( + OffsetCondition( + proj, + PgVal.Str(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(dt)), + Some("timestamp with time zone"))) + + case InternalKey.DateKey(_) => + None + + case InternalKey.LocalDateKey(ld) => + Some( + OffsetCondition( + proj, + PgVal.Str(DateTimeFormatter.ISO_LOCAL_DATE.format(ld)), + Some("date"))) + + case InternalKey.LocalDateTimeKey(ldt) => + Some( + OffsetCondition( + proj, + PgVal.Str(DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(ldt)), + Some("timestamp"))) + } + + offsetValue.toRight( + RE.seekFailed( + path, + "Only numbers, offsetdatetime, localdate and localdatetime columns are supported for delta loads")) + } + case _ => + Left(RE.seekFailed(path, "Only projection of flat columns is supported for delta loads")) + } + + private sealed trait Loc extends Product with Serializable + + private object Loc { + case class Prefix(schema: Schema) extends Loc + case class Leaf(schema: Schema, table: Table) extends Loc + } // Characters considered as pattern placeholders in // `DatabaseMetaData#getTables` @@ -163,11 +247,11 @@ final class PostgresDatasource[F[_]: MonadResourceErr: Sync]( * * To be a tables selector, an `Ident` must not contain any of the wildcard characters. */ - private def locSelector(schema: Schema, table: Table): Loc = + private def locSelector(schema: Schema, table: Table): Option[Loc] = (containsWildcard(schema), containsWildcard(table)) match { case (true, _) => None - case (false, true) => Some(Left(schema)) - case (false, false) => Some(Right((schema, table))) + case (false, true) => Some(Loc.Prefix(schema)) + case (false, false) => Some(Loc.Leaf(schema, table)) } private def containsWildcard(s: String): Boolean = @@ -193,29 +277,55 @@ final class PostgresDatasource[F[_]: MonadResourceErr: Sync]( case _ => None } - private def pathToLoc(rp: ResourcePath): Loc = - Some(rp) collect { - case schema /: ResourcePath.Root => Left(schema) - case schema /: table /: ResourcePath.Root => Right((schema, table)) + private def pathToLoc(rp: ResourcePath): Either[RE, Loc] = + rp match { + case schema /: ResourcePath.Root => Right(Loc.Prefix(schema)) + case schema /: table /: ResourcePath.Root => Right(Loc.Leaf(schema, table)) + case _ => Left(RE.notAResource(rp)) } private def runCIO(c: java.sql.Connection): ConnectionIO ~> F = λ[ConnectionIO ~> F](_.foldMap(xa.interpret).run(c)) - private def tableAsJsonBytes(schema: Schema, columns: ColumnProjections, table: Table) + private def tableAsJsonBytes( + schema: Schema, + columns: ColumnProjections, + table: Table, + predicate: Option[OffsetCondition]) : Stream[ConnectionIO, Byte] = { val schemaFr = Fragment.const0(hygienicIdent(schema)) val tableFr = Fragment.const(hygienicIdent(table)) val locFr = schemaFr ++ fr0"." ++ tableFr + val predicateFr = Fragments.whereOrOpt(predicate map { + case OffsetCondition(column, value, tpe) => { + val col = hygienicIdent(column) + + val v = value match { + case PgVal.Str(str) => + fr"$str" + case PgVal.Floating(d) => + fr"$d" + case PgVal.Whole(l) => + fr"$l" + } + + tpe.fold( + Fragment.const(col) ++ fr">=" ++ v)(t => { + val tpeFragment = Fragment.const0(s"::$t") + Fragment.const(col) ++ fr">=" ++ v ++ tpeFragment + }) + } + }) + val fromFr = columns match { case ColumnProjections.Explicit(cs) => val colsFr = cs.map(n => Fragment.const(hygienicIdent(n))).intercalate(fr",") - Fragments.parentheses(fr"SELECT" ++ colsFr ++ fr"FROM" ++ locFr) + Fragments.parentheses(fr"SELECT" ++ colsFr ++ fr"FROM" ++ locFr ++ predicateFr) case ColumnProjections.All => - locFr + Fragments.parentheses(fr"SELECT * FROM" ++ locFr ++ predicateFr) } val sql = @@ -233,10 +343,10 @@ final class PostgresDatasource[F[_]: MonadResourceErr: Sync]( .compile .lastOrError - private def tables(selector: Loc): Stream[ConnectionIO, TableMeta] = { + private def tables(selector: Option[Loc]): Stream[ConnectionIO, TableMeta] = { val (selectSchema, selectTable) = selector match { - case Some(Left(s)) => (s, "%") - case Some(Right((s, t))) => (s, t) + case Some(Loc.Prefix(s)) => (s, "%") + case Some(Loc.Leaf(s, t)) => (s, t) case None => (null, "%") } diff --git a/core/src/test/scala/quasar/plugin/postgres/datasource/PostgresDatasourceSpec.scala b/core/src/test/scala/quasar/plugin/postgres/datasource/PostgresDatasourceSpec.scala index 907d672..e99d109 100644 --- a/core/src/test/scala/quasar/plugin/postgres/datasource/PostgresDatasourceSpec.scala +++ b/core/src/test/scala/quasar/plugin/postgres/datasource/PostgresDatasourceSpec.scala @@ -23,6 +23,7 @@ import argonaut._, Argonaut._, JawnParser._ import cats.~> import cats.effect._ import cats.implicits._ +import cats.data.NonEmptyList import doobie._ import doobie.implicits._ @@ -35,10 +36,12 @@ import org.specs2.specification.BeforeAfterAll import quasar.{IdStatus, ScalarStage, ScalarStages} import quasar.api.ColumnType +import quasar.api.DataPathSegment import quasar.api.resource.{ResourcePathType => RPT, _} import quasar.common.CPath import quasar.concurrent.unsafe._ import quasar.connector.{ResourceError => RE, _} +import quasar.connector.Offset import quasar.connector.datasource.{DatasourceSpec, DatasourceModule} import quasar.contrib.scalaz.MonadError_ import quasar.qscript.InterpretedRead @@ -46,6 +49,10 @@ import quasar.qscript.InterpretedRead import scala.concurrent.ExecutionContext.Implicits.global import shims.applicativeToScalaz +import skolems.∃ +import quasar.api.push.InternalKey +import java.time.OffsetDateTime +import spire.math.Real object PostgresDatasourceSpec extends DatasourceSpec[IO, Stream[IO, ?], RPT.Physical] @@ -368,6 +375,104 @@ object PostgresDatasourceSpec } } + "seek" >> { + "filters from specified offset" >> { + "no pushdown" >>* { + val setup = + xa.trans.apply(for { + _ <- sql"""DROP TABLE IF EXISTS "pgsrcSchemaA"."seek"""".update.run + _ <- sql"""CREATE TABLE "pgsrcSchemaA"."seek" (foo VARCHAR, jayson jsonb, off integer)""".update.run + _ <- sql"""INSERT INTO "pgsrcSchemaA"."seek" (foo, jayson, off) VALUES ('A', '{ "x": [1, 2, 3], "y": ["one", "two"] }', 1)""".update.run + _ <- sql"""INSERT INTO "pgsrcSchemaA"."seek" (foo, jayson, off) VALUES ('B', '{ "x": [3, 4, 5], "y": ["three", "four"] }', 2)""".update.run + _ <- sql"""INSERT INTO "pgsrcSchemaA"."seek" (foo, jayson, off) VALUES ('C', '{ "x": [6, 7, 8], "y": ["five", "six"] }', 3)""".update.run + _ <- sql"""INSERT INTO "pgsrcSchemaA"."seek" (foo, jayson, off) VALUES ('D', '{ "x": [9, 10, 11], "y": ["seven", "eigth"] }', 4)""".update.run + } yield ()) + + val expected = List( + Json( + "foo" := "B", + "jayson" := + Json( + "x" := Json.array(jNumber(3), jNumber(4), jNumber(5)), + "y" := Json.array(jString("three"), jString("four"))), + "off" := jNumber(2)), + Json( + "foo" := "C", + "jayson" := + Json( + "x" := Json.array(jNumber(6), jNumber(7), jNumber(8)), + "y" := Json.array(jString("five"), jString("six"))), + "off" := jNumber(3)), + Json( + "foo" := "D", + "jayson" := + Json( + "x" := Json.array(jNumber(9), jNumber(10), jNumber(11)), + "y" := Json.array(jString("seven"), jString("eigth"))), + "off" := jNumber(4))) + + val read = InterpretedRead( + ResourcePath.root() / ResourceName("pgsrcSchemaA") / ResourceName("seek"), + ScalarStages.Id) + + val offset = Offset.Internal( + NonEmptyList.of(DataPathSegment.Field("off")), + ∃(InternalKey.Actual.real(Real(2)))) + + (setup >> resultsFrom[Json](read, offset)) + .map(_ must (ScalarStages.Id, expected).zip(be_===, containTheSameElementsAs(_))) + } + + "mask pushdown" >>* { + val setup = + xa.trans.apply(for { + _ <- sql"""DROP TABLE IF EXISTS "pgsrcSchemaA"."seekMask"""".update.run + _ <- sql"""CREATE TABLE "pgsrcSchemaA"."seekMask" (foo VARCHAR, jayson jsonb, off TIMESTAMP WITH TIME ZONE)""".update.run + _ <- sql"""INSERT INTO "pgsrcSchemaA"."seekMask" (foo, jayson, off) VALUES ('A', '{ "x": [1, 2, 3], "y": ["one", "two"] }', '2022-07-10T00:18:33Z')""".update.run + _ <- sql"""INSERT INTO "pgsrcSchemaA"."seekMask" (foo, jayson, off) VALUES ('B', '{ "x": [3, 4, 5], "y": ["three", "four"] }', '2022-07-11T00:18:33Z')""".update.run + _ <- sql"""INSERT INTO "pgsrcSchemaA"."seekMask" (foo, jayson, off) VALUES ('C', '{ "x": [6, 7, 8], "y": ["five", "six"] }', '2022-07-12T00:18:33Z')""".update.run + _ <- sql"""INSERT INTO "pgsrcSchemaA"."seekMask" (foo, jayson, off) VALUES ('D', '{ "x": [9, 10, 11], "y": ["seven", "eigth"] }', '2022-07-13T00:18:33Z')""".update.run + } yield ()) + + val expected = List( + Json( + "foo" := "B", + "jayson" := + Json( + "x" := Json.array(jNumber(3), jNumber(4), jNumber(5)), + "y" := Json.array(jString("three"), jString("four")))), + + Json( + "foo" := "C", + "jayson" := + Json( + "x" := Json.array(jNumber(6), jNumber(7), jNumber(8)), + "y" := Json.array(jString("five"), jString("six")))), + + Json( + "foo" := "D", + "jayson" := + Json( + "x" := Json.array(jNumber(9), jNumber(10), jNumber(11)), + "y" := Json.array(jString("seven"), jString("eigth"))))) + + val mask: ScalarStage = ScalarStage.Mask(Map( + CPath.parse(".foo") -> ColumnType.Top, + CPath.parse(".jayson") -> ColumnType.Top)) + + val read = InterpretedRead( + ResourcePath.root() / ResourceName("pgsrcSchemaA") / ResourceName("seekMask"), + ScalarStages(IdStatus.ExcludeId, List(mask))) + + val offset = Offset.Internal( + NonEmptyList.of(DataPathSegment.Field("off")), + ∃(InternalKey.Actual.dateTime(OffsetDateTime.parse("2022-07-11T00:18:33Z")))) + + (setup >> resultsFrom[Json](read, offset)) + .map(_ must (ScalarStages.Id, expected).zip(be_===, containTheSameElementsAs(_))) + } + } + } //// private final case class Widget(serial: String, width: Double, height: Double) @@ -391,6 +496,22 @@ object PostgresDatasourceSpec ResourcePath.root() / ResourceName("pgsrcSchemaA") / ResourceName("widgets"), stages) + private def resultsFrom[A: DecodeJson](ir: InterpretedRead[ResourcePath], offset: Offset) + : IO[(ScalarStages, List[A])] = + pgds + .loadFrom(ir, offset) + .getOrElseF(Resource.eval(IO.raiseError(new RuntimeException("Seek failed")))) use { + case QueryResult.Typed(_, s, stages) => + s.data.chunks.parseJsonStream[Json] + .map(_.as[A].toOption) + .unNone + .compile.toList + .tupleLeft(stages) + + case qr => + IO.raiseError(new RuntimeException(s"Expected QueryResult.Typed, received: $qr")) + } + private def resultsOf[A: DecodeJson](ir: InterpretedRead[ResourcePath]) : IO[(ScalarStages, List[A])] = pgds.evaluate(ir) use { diff --git a/version.sbt b/version.sbt index 783af2b..194af05 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "24.0.11" +version in ThisBuild := "24.1.0"