Skip to content

Commit

Permalink
Sketch Pipeline
Browse files Browse the repository at this point in the history
No testing has been done yet
  • Loading branch information
nikita-volkov committed Apr 20, 2024
1 parent ee73f4e commit f1fa1f8
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 10 deletions.
3 changes: 3 additions & 0 deletions hasql.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ common base
NoMonomorphismRestriction
Arrows
BangPatterns
BlockArguments
ConstraintKinds
DataKinds
DefaultSignatures
Expand Down Expand Up @@ -82,6 +83,7 @@ library
Hasql.Connection
Hasql.Decoders
Hasql.Encoders
Hasql.Pipeline
Hasql.Session
Hasql.Statement

Expand All @@ -101,6 +103,7 @@ library
Hasql.Encoders.Value
Hasql.Errors
Hasql.IO
Hasql.Pipeline.Core
Hasql.PostgresTypeInfo
Hasql.Prelude
Hasql.PreparedStatementRegistry
Expand Down
7 changes: 6 additions & 1 deletion library/Hasql/Decoders/Result.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ noResult =
checkExecStatus $ \case
LibPQ.CommandOk -> True
LibPQ.TuplesOk -> True
LibPQ.PipelineSync -> True
_ -> False

{-# INLINE rowsAffected #-}
Expand Down Expand Up @@ -66,7 +67,11 @@ checkExecStatus predicate =
LibPQ.NonfatalError -> serverError
LibPQ.FatalError -> serverError
LibPQ.EmptyQuery -> return ()
_ -> Result $ lift $ ExceptT $ pure $ Left $ UnexpectedResult $ "Unexpected result status: " <> (fromString $ show status)
_ -> unexpectedResult $ "Unexpected result status: " <> (fromString $ show status)

unexpectedResult :: Text -> Result a
unexpectedResult =
Result . lift . ExceptT . pure . Left . UnexpectedResult

{-# INLINE serverError #-}
serverError :: Result ()
Expand Down
7 changes: 7 additions & 0 deletions library/Hasql/Encoders/Params.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import Hasql.Prelude
import PostgreSQL.Binary.Encoding qualified as B
import Text.Builder qualified as E

renderReadable :: Params a -> a -> [Text]
renderReadable (Params (Op encoderOp)) params =
foldr step [] (encoderOp params)
where
step (_, _, _, rendering) acc =
rendering : acc

-- |
-- Encoder of some representation of a parameters product.
newtype Params a
Expand Down
7 changes: 7 additions & 0 deletions library/Hasql/Pipeline.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module Hasql.Pipeline
( Pipeline,
statement,
)
where

import Hasql.Pipeline.Core
66 changes: 66 additions & 0 deletions library/Hasql/Pipeline/Core.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
module Hasql.Pipeline.Core where

import Database.PostgreSQL.LibPQ qualified as Pq
import Hasql.Connection.Core qualified as Connection
import Hasql.Decoders.All qualified as Decoders
import Hasql.Encoders.All qualified as Encoders
import Hasql.Encoders.Params qualified as Encoders.Params
import Hasql.Errors
import Hasql.IO qualified as IO
import Hasql.Prelude
import Hasql.PreparedStatementRegistry qualified as PreparedStatementRegistry
import Hasql.Statement qualified as Statement

run :: Pipeline a -> Connection.Connection -> IO (Either QueryError a)
run (Pipeline send recv) (Connection.Connection pqConnectionRef integerDatetimes registry) =
withMVar pqConnectionRef \pqConnection -> do
Pq.enterPipelineMode pqConnection
sendResult <- send pqConnection integerDatetimes registry
Pq.pipelineSync pqConnection
recvResult <- recv pqConnection integerDatetimes
Pq.exitPipelineMode pqConnection
pure (sendResult *> recvResult)

data Pipeline a
= Pipeline
-- | Send commands.
(Pq.Connection -> Bool -> PreparedStatementRegistry.PreparedStatementRegistry -> IO (Either QueryError ()))
-- | Receive results.
(Pq.Connection -> Bool -> IO (Either QueryError a))
deriving (Functor)

instance Applicative Pipeline where
pure a =
Pipeline send recv
where
send _ _ _ =
pure (Right ())
recv _ _ =
pure (Right a)

Pipeline lSend lRecv <*> Pipeline rSend rRecv =
Pipeline send recv
where
send pqConn idt pReg = do
lSendRes <- lSend pqConn idt pReg
rSendRes <- rSend pqConn idt pReg
pure (lSendRes *> rSendRes)
recv pqConn idt = do
lRecvRes <- lRecv pqConn idt
rRecvRes <- rRecv pqConn idt
pure (lRecvRes <*> rRecvRes)

statement :: params -> Statement.Statement params result -> Pipeline result
statement params (Statement.Statement template (Encoders.Params paramsEncoder) (Decoders.Result decoder) preparable) =
Pipeline send recv
where
send pqConnection integerDatetimes registry =
mapLeft commandToQueryError
<$> IO.sendParametricStatement pqConnection integerDatetimes registry template paramsEncoder preparable params

recv pqConnection integerDatetimes =
mapLeft commandToQueryError
<$> IO.getResults pqConnection integerDatetimes decoder

commandToQueryError =
QueryError template (Encoders.Params.renderReadable paramsEncoder params)
18 changes: 9 additions & 9 deletions library/Hasql/Session/Core.hs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
module Hasql.Session.Core where

import Hasql.Connection.Core qualified as Connection
import Hasql.Decoders.All qualified as Decoders
import Hasql.Decoders.Result qualified as Decoders.Result
import Hasql.Decoders.Results qualified as Decoders.Results
import Hasql.Encoders.All qualified as Encoders
import Hasql.Encoders.Params qualified as Encoders.Params
import Hasql.Errors
import Hasql.IO qualified as IO
import Hasql.Pipeline.Core qualified as Pipeline
import Hasql.Prelude
import Hasql.Statement qualified as Statement

Expand Down Expand Up @@ -46,20 +48,18 @@ sql sql =
-- |
-- Parameters and a specification of a parametric single-statement query to apply them to.
statement :: params -> Statement.Statement params result -> Session result
statement input (Statement.Statement template (Encoders.Params paramsEncoder) decoder preparable) =
statement input (Statement.Statement template (Encoders.Params paramsEncoder) (Decoders.Result decoder) preparable) =
Session
$ ReaderT
$ \(Connection.Connection pqConnectionRef integerDatetimes registry) ->
ExceptT
$ fmap (mapLeft (QueryError template inputReps))
$ fmap (mapLeft (QueryError template (Encoders.Params.renderReadable paramsEncoder input)))
$ withMVar pqConnectionRef
$ \pqConnection -> do
r1 <- IO.sendParametricStatement pqConnection integerDatetimes registry template paramsEncoder preparable input
r2 <- IO.getResults pqConnection integerDatetimes (unsafeCoerce decoder)
r2 <- IO.getResults pqConnection integerDatetimes decoder
return $ r1 *> r2
where
inputReps =
let Encoders.Params.Params (Op encoderOp) = paramsEncoder
step (_, _, _, rendering) acc =
rendering : acc
in foldr step [] (encoderOp input)

pipeline :: Pipeline.Pipeline result -> Session result
pipeline pipeline =
Session $ ReaderT $ ExceptT . Pipeline.run pipeline

0 comments on commit f1fa1f8

Please sign in to comment.