Skip to content

Commit

Permalink
Implement db-uri-read-replicas
Browse files Browse the repository at this point in the history
- takes a list of URIs to use as read replicas
- queries are routed based on the transaction mode,
  with read-mode queries going to the read replicas
  if any are configured
- we configure one pool per database uri, all with
  the same configuration
- which read replica to use is chosen randomly
- if you want the master to also be used for read-only
  queries, put it in both db-uri and db-uri-read-replicas

Test coverage is spotty and work-in-progress. It requires a
replicating setup which is not automated. With a locally set
up read replica, the integration test suite should pass as
follows:

PGHOSTREP='/tmp' PGPORTREP=5433 PGHOST='/tmp' PGPORT=5432 \
  postgrest-test-io

You can pass '-k test_readonly' for replication-specific tests.
  • Loading branch information
robx committed Oct 20, 2023
1 parent 54786a6 commit aff5565
Show file tree
Hide file tree
Showing 21 changed files with 195 additions and 32 deletions.
3 changes: 3 additions & 0 deletions postgrest.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ library
, optparse-applicative >= 0.13 && < 0.18
, parsec >= 3.1.11 && < 3.2
, protolude >= 0.3.1 && < 0.4
, random >= 1.2.1 && < 1.3
, regex-tdfa >= 1.2.2 && < 1.4
, retry >= 0.7.4 && < 0.10
, scientific >= 0.3.4 && < 0.4
Expand Down Expand Up @@ -229,6 +230,8 @@ test-suite spec
Feature.Query.UnicodeSpec
Feature.Query.UpdateSpec
Feature.Query.UpsertSpec
Feature.ReadOnlySpec
Feature.ReadReplicaSpec
Feature.RollbackSpec
Feature.RpcPreRequestGucsSpec
SpecHelper
Expand Down
15 changes: 8 additions & 7 deletions src/PostgREST/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ module PostgREST.Admin
( runAdmin
) where

import qualified Data.Text as T
import qualified Hasql.Session as SQL
import qualified Network.HTTP.Types.Status as HTTP
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp
import qualified Data.Text as T
import qualified Hasql.Session as SQL
import qualified Hasql.Transaction.Sessions as SQL
import qualified Network.HTTP.Types.Status as HTTP
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp

import Control.Monad.Extra (whenJust)

Expand All @@ -36,10 +37,10 @@ admin :: AppState.AppState -> AppConfig -> Wai.Application
admin appState appConfig req respond = do
isMainAppReachable <- any isRight <$> reachMainApp appConfig
isSchemaCacheLoaded <- isJust <$> AppState.getSchemaCache appState
isConnectionUp <-
isConnectionUp <- -- FIXME primary / read-replicas
if configDbChannelEnabled appConfig
then AppState.getIsListenerOn appState
else isRight <$> AppState.usePool appState (SQL.sql "SELECT 1")
else isRight <$> AppState.usePool appState SQL.Read (SQL.sql "SELECT 1")

case Wai.pathInfo req of
["ready"] ->
Expand Down
2 changes: 1 addition & 1 deletion src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ runDbHandler :: AppState.AppState -> SQL.IsolationLevel -> SQL.Mode -> Bool -> B
runDbHandler appState isoLvl mode authenticated prepared handler = do
dbResp <- lift $ do
let transaction = if prepared then SQL.transaction else SQL.unpreparedTransaction
AppState.usePool appState . transaction isoLvl mode $ runExceptT handler
AppState.usePool appState mode . transaction isoLvl mode $ runExceptT handler

resp <-
liftEither . mapLeft Error.PgErr $
Expand Down
79 changes: 59 additions & 20 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module PostgREST.AppState
, putSchemaCache
, putPgVersion
, usePool
, usePoolReadWrite
, loadSchemaCache
, reReadConfig
, connectionWorker
Expand All @@ -40,6 +41,7 @@ import qualified Hasql.Session as SQL
import qualified Hasql.Transaction.Sessions as SQL
import qualified PostgREST.Error as Error
import PostgREST.Version (prettyVersion)
import qualified Prelude ((!!))

import Control.AutoUpdate (defaultUpdateSettings, mkAutoUpdate,
updateAction)
Expand All @@ -51,6 +53,7 @@ import Data.IORef (IORef, atomicWriteIORef, newIORef,
import Data.Time (ZonedTime, defaultTimeLocale, formatTime,
getZonedTime)
import Data.Time.Clock (UTCTime, getCurrentTime)
import System.Random (randomRIO)

import PostgREST.Config (AppConfig (..),
addFallbackAppName,
Expand All @@ -74,7 +77,7 @@ data AuthResult = AuthResult

data AppState = AppState
-- | Database connection pool
{ statePool :: SQL.Pool
{ statePool :: Pool
-- | Database server version, will be updated by the connectionWorker
, statePgVersion :: IORef PgVersion
-- | No schema cache at the start. Will be filled in by the connectionWorker
Expand Down Expand Up @@ -104,10 +107,18 @@ data AppState = AppState
init :: AppConfig -> IO AppState
init conf = do
pool <- initPool conf
initWithPool pool conf
initWithCompletePool pool conf

data Pool = Pool
{ poolPrimary :: SQL.Pool
, poolReadReplicas :: [SQL.Pool]
}

initWithPool :: SQL.Pool -> AppConfig -> IO AppState
initWithPool pool conf = do
initWithPool pool = initWithCompletePool (Pool pool [])

initWithCompletePool :: Pool -> AppConfig -> IO AppState
initWithCompletePool pool conf = do
appState <- AppState pool
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
<*> newIORef Nothing
Expand Down Expand Up @@ -144,32 +155,57 @@ initWithPool pool conf = do
destroy :: AppState -> IO ()
destroy = destroyPool

initPool :: AppConfig -> IO SQL.Pool
initPool AppConfig{..} =
SQL.acquire
configDbPoolSize
(fromIntegral configDbPoolAcquisitionTimeout)
(fromIntegral configDbPoolMaxLifetime)
(fromIntegral configDbPoolMaxIdletime)
(toUtf8 $ addFallbackAppName prettyVersion configDbUri)
initPool :: AppConfig -> IO Pool
initPool AppConfig{..} = do
primary <- mkPool configDbUri
readReplicas <- traverse mkPool configDbUriReadReplicas
return $ Pool primary readReplicas
where
mkPool uri = SQL.acquire
configDbPoolSize
(fromIntegral configDbPoolAcquisitionTimeout)
(fromIntegral configDbPoolMaxLifetime)
(fromIntegral configDbPoolMaxIdletime)
(toUtf8 $ addFallbackAppName prettyVersion uri)

releasePool :: Pool -> IO ()
releasePool (Pool primary readReplicas) = do
SQL.release primary
mapM_ SQL.release readReplicas

-- | Run an action with a database connection.
usePool :: AppState -> SQL.Session a -> IO (Either SQL.UsageError a)
usePool AppState{..} x = do
res <- SQL.use statePool x
usePool :: AppState -> SQL.Mode -> SQL.Session a -> IO (Either SQL.UsageError a)
usePool AppState{..} mode x = do
pool <- pickPool statePool mode
res <- SQL.use pool x
whenLeft res (\case
SQL.AcquisitionTimeoutUsageError -> debounceLogAcquisitionTimeout -- this can happen rapidly for many requests, so we debounce
_ -> pure ())
return res

-- | Run an action with a database connection.
usePoolReadWrite :: AppState -> SQL.Session a -> IO (Either SQL.UsageError a)
usePoolReadWrite appState = usePool appState SQL.Write

pickPool :: Pool -> SQL.Mode -> IO SQL.Pool
pickPool pool mode =
case mode of
SQL.Write -> return $ poolPrimary pool
SQL.Read ->
case poolReadReplicas pool of
[] -> return $ poolPrimary pool
pools -> do
i <- randomRIO (0, length pools - 1)
return $ pools Prelude.!! i

-- | Flush the connection pool so that any future use of the pool will
-- use connections freshly established after this call.
flushPool :: AppState -> IO ()
flushPool AppState{..} = SQL.release statePool
flushPool AppState{..} = releasePool statePool

-- | Destroy the pool on shutdown.
destroyPool :: AppState -> IO ()
destroyPool AppState{..} = SQL.release statePool
destroyPool AppState{..} = releasePool statePool

getPgVersion :: AppState -> IO PgVersion
getPgVersion = readIORef . statePgVersion
Expand Down Expand Up @@ -245,7 +281,7 @@ loadSchemaCache appState = do
conf@AppConfig{..} <- getConfig appState
result <-
let transaction = if configDbPreparedStatements then SQL.transaction else SQL.unpreparedTransaction in
usePool appState . transaction SQL.ReadCommitted SQL.Read $
usePool appState SQL.Read . transaction SQL.ReadCommitted SQL.Read $
querySchemaCache conf
case result of
Left e -> do
Expand Down Expand Up @@ -342,7 +378,9 @@ establishConnection appState =

getConnectionStatus :: IO ConnectionStatus
getConnectionStatus = do
pgVersion <- usePool appState $ queryPgVersion False -- No need to prepare the query here, as the connection might not be established
-- FIXME usePoolReadWrite in order to use the master database connection
-- we should probably have ConnectionStatus take into account both master and read replicas
pgVersion <- usePoolReadWrite appState $ queryPgVersion False -- No need to prepare the query here, as the connection might not be established
case pgVersion of
Left e -> do
logPgrstError appState e
Expand Down Expand Up @@ -378,7 +416,8 @@ reReadConfig startingUp appState = do
AppConfig{..} <- getConfig appState
dbSettings <-
if configDbConfig then do
qDbSettings <- usePool appState $ queryDbSettings (dumpQi <$> configDbPreConfig) configDbPreparedStatements
-- FIXME usePoolReadWrite to query the master database, we could also use the replica if any
qDbSettings <- usePoolReadWrite appState $ queryDbSettings (dumpQi <$> configDbPreConfig) configDbPreparedStatements
case qDbSettings of
Left e -> do
logWithZTime appState
Expand All @@ -396,7 +435,7 @@ reReadConfig startingUp appState = do
pure mempty
(roleSettings, roleIsolationLvl) <-
if configDbConfig then do
rSettings <- usePool appState $ queryRoleSettings configDbPreparedStatements
rSettings <- usePoolReadWrite appState $ queryRoleSettings configDbPreparedStatements -- FIXME read-only?
case rSettings of
Left e -> do
logWithZTime appState "An error ocurred when trying to query the role settings"
Expand Down
2 changes: 1 addition & 1 deletion src/PostgREST/CLI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ dumpSchema appState = do
conf@AppConfig{..} <- AppState.getConfig appState
result <-
let transaction = if configDbPreparedStatements then SQL.transaction else SQL.unpreparedTransaction in
AppState.usePool appState $
AppState.usePool appState SQL.Read $
transaction SQL.ReadCommitted SQL.Read $
querySchemaCache conf
case result of
Expand Down
7 changes: 7 additions & 0 deletions src/PostgREST/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ data AppConfig = AppConfig
, configDbTxAllowOverride :: Bool
, configDbTxRollbackAll :: Bool
, configDbUri :: Text
, configDbUriReadReplicas :: [Text]
, configDbUseLegacyGucs :: Bool
, configFilePath :: Maybe FilePath
, configJWKS :: Maybe JWKSet
Expand Down Expand Up @@ -158,6 +159,7 @@ toText conf =
,("db-pre-config", q . maybe mempty dumpQi . configDbPreConfig)
,("db-tx-end", q . showTxEnd)
,("db-uri", q . configDbUri)
,("db-uri-read-replicas", q . T.intercalate " " . configDbUriReadReplicas)
,("db-use-legacy-gucs", T.toLower . show . configDbUseLegacyGucs)
,("jwt-aud", T.decodeUtf8 . LBS.toStrict . JSON.encode . maybe "" toJSON . configJwtAudience)
,("jwt-role-claim-key", q . T.intercalate mempty . fmap dumpJSPath . configJwtRoleClaimKey)
Expand Down Expand Up @@ -258,6 +260,7 @@ parser optPath env dbSettings roleSettings roleIsolationLvl =
<*> parseTxEnd "db-tx-end" snd
<*> parseTxEnd "db-tx-end" fst
<*> (fromMaybe "postgresql://" <$> optString "db-uri")
<*> (maybe [] split <$> optValue "db-uri-read-replicas")
<*> (fromMaybe True <$> optBool "db-use-legacy-gucs")
<*> pure optPath
<*> pure Nothing
Expand Down Expand Up @@ -404,6 +407,10 @@ parser optPath env dbSettings roleSettings roleIsolationLvl =
Nothing -> (> 0) <$> (readMaybe s :: Maybe Integer)
coerceBool _ = Nothing

split :: C.Value -> [Text]
split (C.String s) = T.words s
split _ = []

splitOnCommas :: C.Value -> [Text]
splitOnCommas (C.String s) = T.strip <$> T.splitOn "," s
splitOnCommas _ = []
Expand Down
14 changes: 13 additions & 1 deletion test/io/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,18 @@ def dburi():
dbname = os.environ["PGDATABASE"]
host = os.environ["PGHOST"]
user = os.environ["PGUSER"]
return f"postgresql://?dbname={dbname}&host={host}&user={user}".encode()
port = os.environ.get("PGPORT", "5432")
return f"postgresql://?dbname={dbname}&host={host}&port={port}&user={user}".encode()


@pytest.fixture
def dburi_replica():
"Postgres database connection URI."
dbname = os.environ["PGDATABASE"]
host = os.environ["PGHOSTREP"]
user = os.environ["PGUSER"]
port = os.environ.get("PGPORTREP", "5432")
return f"postgresql://?dbname={dbname}&host={host}&port={port}&user={user}".encode()


@pytest.fixture
Expand All @@ -29,6 +40,7 @@ def baseenv():
return {
"PGDATABASE": os.environ["PGDATABASE"],
"PGHOST": os.environ["PGHOST"],
"PGPORT": os.environ.get("PGPORT", "5432"),
"PGUSER": os.environ["PGUSER"],
}

Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/aliases.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = true
db-pre-config = ""
db-tx-end = "commit"
db-uri = "postgresql://"
db-uri-read-replicas = ""
db-use-legacy-gucs = true
jwt-aud = ""
jwt-role-claim-key = ".\"aliased\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/boolean-numeric.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = true
db-pre-config = ""
db-tx-end = "commit"
db-uri = "postgresql://"
db-uri-read-replicas = ""
db-use-legacy-gucs = true
jwt-aud = ""
jwt-role-claim-key = ".\"role\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/boolean-string.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = true
db-pre-config = ""
db-tx-end = "commit"
db-uri = "postgresql://"
db-uri-read-replicas = ""
db-use-legacy-gucs = true
jwt-aud = ""
jwt-role-claim-key = ".\"role\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/defaults.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = false
db-pre-config = ""
db-tx-end = "commit"
db-uri = "postgresql://"
db-uri-read-replicas = ""
db-use-legacy-gucs = true
jwt-aud = ""
jwt-role-claim-key = ".\"role\""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = true
db-pre-config = "postgrest.pre_config"
db-tx-end = "rollback-allow-override"
db-uri = "postgresql://"
db-uri-read-replicas = "rep_1 rep_2"
db-use-legacy-gucs = false
jwt-aud = "https://otherexample.org"
jwt-role-claim-key = ".\"other\".\"pre_config_role\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/no-defaults-with-db.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = true
db-pre-config = "postgrest.preconf"
db-tx-end = "commit-allow-override"
db-uri = "postgresql://"
db-uri-read-replicas = "rep_1 rep_2"
db-use-legacy-gucs = false
jwt-aud = "https://example.org"
jwt-role-claim-key = ".\"a\".\"role\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/no-defaults.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = false
db-pre-config = "postgrest.pre_config"
db-tx-end = "rollback-allow-override"
db-uri = "tmp_db"
db-uri-read-replicas = "rep_1 rep_2"
db-use-legacy-gucs = false
jwt-aud = "https://postgrest.org"
jwt-role-claim-key = ".\"user\"[0].\"real-role\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/types.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = true
db-pre-config = ""
db-tx-end = "commit"
db-uri = "postgresql://"
db-uri-read-replicas = ""
db-use-legacy-gucs = true
jwt-aud = ""
jwt-role-claim-key = ".\"role\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/no-defaults-env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ PGRST_DB_CONFIG: false
PGRST_DB_PRE_CONFIG: "postgrest.pre_config"
PGRST_DB_TX_END: rollback-allow-override
PGRST_DB_URI: tmp_db
PGRST_DB_URI_READ_REPLICAS: rep_1 rep_2
PGRST_DB_USE_LEGACY_GUCS: false
PGRST_JWT_AUD: 'https://postgrest.org'
PGRST_JWT_ROLE_CLAIM_KEY: '.user[0]."real-role"'
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/no-defaults.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = false
db-pre-config = "postgrest.pre_config"
db-tx-end = "rollback-allow-override"
db-uri = "tmp_db"
db-uri-read-replicas = "rep_1 rep_2"
db-use-legacy-gucs = false
jwt-aud = "https://postgrest.org"
jwt-role-claim-key = ".user[0].\"real-role\""
Expand Down
9 changes: 7 additions & 2 deletions test/io/fixtures.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ GRANT USAGE ON SCHEMA v1 TO postgrest_test_anonymous;
CREATE TABLE authors_only ();
GRANT SELECT ON authors_only TO postgrest_test_author;

CREATE TABLE projects AS SELECT FROM generate_series(1,5);
GRANT SELECT ON projects TO postgrest_test_anonymous, postgrest_test_w_superuser_settings;
CREATE TABLE projects AS SELECT * FROM generate_series(1,5) AS id;
GRANT SELECT, INSERT ON projects TO postgrest_test_anonymous, postgrest_test_w_superuser_settings;

create function get_guc_value(name text) returns text as $$
select nullif(current_setting(name), '')::text;
Expand Down Expand Up @@ -178,3 +178,8 @@ $$;
create function terminate_pgrst() returns setof record as $$
select pg_terminate_backend(pid) from pg_stat_activity where application_name iLIKE '%postgrest%';
$$ language sql security definer;

create or replace function read_only_session() returns void as $$
begin
set session characteristics as transaction read only;
end; $$ language plpgsql;
Loading

0 comments on commit aff5565

Please sign in to comment.