From 43b96cb0295d306e536da4bb13b05e04209d349b Mon Sep 17 00:00:00 2001 From: Commelina Date: Thu, 18 Jan 2024 17:55:28 +0800 Subject: [PATCH] hstream: fix incorrect metadata of querys with namespace (#1744) --- hstream/src/HStream/Server/Core/Query.hs | 30 ++++++++++----------- hstream/src/HStream/Server/Core/QueryNew.hs | 21 ++++++++------- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/hstream/src/HStream/Server/Core/Query.hs b/hstream/src/HStream/Server/Core/Query.hs index 280d02ade..fce5c803c 100644 --- a/hstream/src/HStream/Server/Core/Query.hs +++ b/hstream/src/HStream/Server/Core/Query.hs @@ -175,10 +175,11 @@ createQueryWithNamespace' _ -> throw $ HE.WrongExecutionPlan "Create query only support select / create stream as select statements" #else parseAndRefine createQueryRequestSql >>= \rSQL -> case rSQL of - RQCreate (RCreateAs stream select rOptions) -> - hstreamCodegen (RQCreate (RCreateAs (namespace <> stream) - (modifySelect namespace select) - rOptions)) >>= \case + RQCreate (RCreateAs stream select rOptions) -> do + let rSQLWithNamespace = RQCreate (RCreateAs (namespace <> stream) + (modifySelect namespace select) + rOptions) + hstreamCodegen rSQLWithNamespace >>= \case CreateBySelectPlan sources sink builder factor persist -> do -- validate names mapM_ (validateNameAndThrow ResStream) sources @@ -206,7 +207,7 @@ createQueryWithNamespace' } -- update metadata let relatedStreams = (sources, sink) - qInfo <- P.createInsertQueryInfo createQueryRequestQueryName createQueryRequestSql relatedStreams rSQL serverID metaHandle + qInfo <- P.createInsertQueryInfo createQueryRequestQueryName createQueryRequestSql relatedStreams rSQLWithNamespace serverID metaHandle -- run core task consumerClosed <- newTVarIO False createQueryAndRun sc QueryRunner { @@ -221,10 +222,9 @@ createQueryWithNamespace' _ -> throw $ HE.WrongExecutionPlan "Create query only support create stream/view as select statements" RQInsert (RInsertSel streamName rSel) -> do - (hstreamCodegen $ RQInsert (RInsertSel - (namespace <> streamName) - (modifySelect namespace rSel)) - ) >>= \case + let rSQLWithNamespace = RQInsert (RInsertSel (namespace <> streamName) + (modifySelect namespace rSel)) + hstreamCodegen rSQLWithNamespace >>= \case InsertBySelectPlan srcs sink builder persist -> do -- validate names traverse_ (validateNameAndThrow ResStream) (sink : srcs) @@ -250,7 +250,7 @@ createQueryWithNamespace' -- update metadata qInfo <- P.createInsertQueryInfo createQueryRequestQueryName createQueryRequestSql (srcs, sink) - rSQL serverID metaHandle + rSQLWithNamespace serverID metaHandle -- run core task consumerClosed <- newTVarIO False createQueryAndRun sc QueryRunner @@ -264,13 +264,13 @@ createQueryWithNamespace' hstreamQueryToQuery metaHandle qInfo _ -> throw $ HE.WrongExecutionPlan "Insert by Select query only supports `INSERT INTO SELECT FROM STREAM`" - RQCreate (RCreateView view select) -> - hstreamCodegen (RQCreate (RCreateView (namespace <> view) - (modifySelect namespace select) - )) >>= \case + RQCreate (RCreateView view select) -> do + let rSQLWithNamespace = RQCreate (RCreateView (namespace <> view) + (modifySelect namespace select)) + hstreamCodegen rSQLWithNamespace >>= \case CreateViewPlan sources sink view builder persist -> do validateNameAndThrow ResView view - Core.createView' sc view sources sink builder persist createQueryRequestSql rSQL createQueryRequestQueryName + Core.createView' sc view sources sink builder persist createQueryRequestSql rSQLWithNamespace createQueryRequestQueryName >>= hstreamViewToQuery metaHandle _ -> throw $ HE.WrongExecutionPlan "Create query only support create stream/view as select statements" _ -> throw $ HE.WrongExecutionPlan "Create query only support create stream/view as select statements" diff --git a/hstream/src/HStream/Server/Core/QueryNew.hs b/hstream/src/HStream/Server/Core/QueryNew.hs index 1d4c6a342..4467deee1 100644 --- a/hstream/src/HStream/Server/Core/QueryNew.hs +++ b/hstream/src/HStream/Server/Core/QueryNew.hs @@ -174,10 +174,11 @@ createQueryWithNamespace' _ -> throw $ HE.WrongExecutionPlan "Create query only support select / create stream as select statements" #else parseAndBind createQueryRequestSql (P.getSchema metaHandle) >>= \bSQL -> case bSQL of - BoundQCreate (BoundCreateAs stream select rOptions) -> - hstreamCodegen (BoundQCreate (BoundCreateAs (namespace <> stream) - (modifySelect namespace select) - rOptions)) (P.getSchema metaHandle) >>= \case + BoundQCreate (BoundCreateAs stream select rOptions) -> do + let bSQLWithNamespace = BoundQCreate (BoundCreateAs (namespace <> stream) + (modifySelect namespace select) + rOptions) + hstreamCodegen bSQLWithNamespace (P.getSchema metaHandle) >>= \case CreateBySelectPlan sources sink schema builder factor persist -> do -- validate names mapM_ (validateNameAndThrow ResStream) sources @@ -206,7 +207,7 @@ createQueryWithNamespace' P.registerSchema metaHandle schema -- update metadata let relatedStreams = (sources, sink) - qInfo <- P.createInsertQueryInfo createQueryRequestQueryName createQueryRequestSql relatedStreams bSQL serverID metaHandle + qInfo <- P.createInsertQueryInfo createQueryRequestQueryName createQueryRequestSql relatedStreams bSQLWithNamespace serverID metaHandle -- run core task consumerClosed <- newTVarIO False createQueryAndRun sc QueryRunner { @@ -264,13 +265,13 @@ createQueryWithNamespace' hstreamQueryToQuery metaHandle qInfo _ -> throw $ HE.WrongExecutionPlan "Insert by Select query only supports `INSERT INTO SELECT FROM STREAM`" -} - BoundQCreate (BoundCreateView view select) -> - hstreamCodegen (BoundQCreate (BoundCreateView (namespace <> view) - (modifySelect namespace select) - )) (P.getSchema metaHandle) >>= \case + BoundQCreate (BoundCreateView view select) -> do + let bSQLWithNamespace = BoundQCreate (BoundCreateView (namespace <> view) + (modifySelect namespace select)) + hstreamCodegen bSQLWithNamespace (P.getSchema metaHandle) >>= \case CreateViewPlan sources sink view schema builder persist -> do validateNameAndThrow ResView view - Core.createView' sc view sources sink schema builder persist createQueryRequestSql bSQL createQueryRequestQueryName + Core.createView' sc view sources sink schema builder persist createQueryRequestSql bSQLWithNamespace createQueryRequestQueryName >>= hstreamViewToQuery metaHandle _ -> throw $ HE.WrongExecutionPlan "Create query only support create stream/view as select statements" _ -> throw $ HE.WrongExecutionPlan "Create query only support create stream/view as select statements"