From a5a45c419ee2cf1530b27613847be4e4d8358558 Mon Sep 17 00:00:00 2001 From: YangKian <45479280+YangKian@users.noreply.github.com> Date: Fri, 29 Mar 2024 09:18:36 +0800 Subject: [PATCH] chore(connector): update log (#1785) --- hstream-io/HStream/IO/IOTask.hs | 18 +++++++------- hstream-io/HStream/IO/Types.hs | 8 +++++++ hstream-io/HStream/IO/Worker.hs | 24 ++++++++++++------- .../src/HStream/Server/Handler/Connector.hs | 22 ++++++++--------- 4 files changed, 43 insertions(+), 29 deletions(-) diff --git a/hstream-io/HStream/IO/IOTask.hs b/hstream-io/HStream/IO/IOTask.hs index 186bc4e4f..beb061fde 100644 --- a/hstream-io/HStream/IO/IOTask.hs +++ b/hstream-io/HStream/IO/IOTask.hs @@ -57,18 +57,18 @@ getDockerName :: T.Text -> T.Text getDockerName = ("IOTASK_" <>) runIOTask :: IOTask -> IO () -runIOTask ioTask@IOTask{..} = do - Log.info $ "taskCmd: " <> Log.buildString taskCmd +runIOTask ioTask@IOTask{taskInfo=TaskInfo{taskConfig=TaskConfig{..}}, ..} = do tp <- TP.startProcess taskProcessConfig writeIORef process' (Just tp) - _ <- C.forkIO $ + _ <- C.forkIO $ do + Log.info $ "run IOTask : " <> Log.buildString taskCmd E.catch (handleStdout ioTask (TP.getStdout tp) (TP.getStdin tp)) - (\(e :: E.SomeException) -> Log.info $ "handleStdout exited:" <> Log.buildString (show e)) + (\(e :: E.SomeException) -> + Log.info $ "IOTask " <> Log.build (show ioTask) <> " exited:" <> Log.buildString (show e) + ) return () where - TaskInfo {..} = taskInfo - TaskConfig {..} = taskConfig taskCmd = concat [ "docker run --rm -i", " --network=", T.unpack tcNetwork, @@ -111,11 +111,13 @@ handleStdout ioTask hStdout hStdin = forever $ do IO.hFlush hStdin handleConnectorRequest :: IOTask -> MSG.ConnectorRequest -> IO MSG.ConnectorResponse -handleConnectorRequest ioTask MSG.ConnectorRequest{..} = do +handleConnectorRequest ioTask req@MSG.ConnectorRequest{..} = do MSG.ConnectorResponse crId <$> E.catch (handleConnectorMessage ioTask crMessage) (\(e :: E.ZooException) -> do - Log.warning $ "handleConnectorRequest failed:" <> Log.buildString (show e) <> "ignored" + Log.fatal $ "handleConnectorRequest failed, " + <> "\n\tRequest: " <> Log.build (show req) + <> "\n\tError: " <> Log.buildString (show e) pure J.Null) handleConnectorMessage :: IOTask -> MSG.ConnectorMessage -> IO J.Value diff --git a/hstream-io/HStream/IO/Types.hs b/hstream-io/HStream/IO/Types.hs index 17e5a9332..3f46a55eb 100644 --- a/hstream-io/HStream/IO/Types.hs +++ b/hstream-io/HStream/IO/Types.hs @@ -83,6 +83,14 @@ data IOTask = IOTask , ioOptions :: IOOptions } +instance Show IOTask where + show IOTask {taskInfo=TaskInfo{..}, ..} + = "{id=" <> show taskId + <> ", name=" <> show taskName + <> ", type=" <> show taskType + <> ", target=" <> show taskTarget + <> " }" + type ZkUrl = T.Text type Path = T.Text data ConnectorMetaConfig diff --git a/hstream-io/HStream/IO/Worker.hs b/hstream-io/HStream/IO/Worker.hs index 2f5b6371f..dee23dc41 100644 --- a/hstream-io/HStream/IO/Worker.hs +++ b/hstream-io/HStream/IO/Worker.hs @@ -86,8 +86,11 @@ createIOTask worker@Worker{..} name typ target cfg = do createIOTaskFromTaskInfo worker taskId taskInfo options False True True showIOTask_ worker name -createIOTaskFromTaskInfo :: HasCallStack => Worker -> T.Text -> TaskInfo -> IOOptions -> Bool -> Bool -> Bool -> IO () -createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..} ioOptions cleanIfExists createMetaData enableCheck = do +createIOTaskFromTaskInfo + :: HasCallStack + => Worker -> T.Text -> TaskInfo -> IOOptions -> Bool -> Bool -> Bool -> IO () +createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..} + ioOptions cleanIfExists createMetaData enableCheck = do getIOTask worker taskName >>= \case Nothing -> pure () Just _ -> do @@ -103,6 +106,7 @@ createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..} ioOptio when createMetaData $ M.createIOTaskMeta workerHandle taskName taskId taskInfo C.modifyMVar_ ioTasksM $ \ioTasks -> do + -- FIXME: already check ioTask exist in `getIOTask worker` step, no need check again case HM.lookup taskName ioTasks of Just _ -> throwIO $ HE.ConnectorExists taskName Nothing -> do @@ -124,6 +128,7 @@ showIOTask_ worker@Worker{..} name = do task@IOTask{taskInfo=TaskInfo{..}, ..} <- getIOTask_ worker name taskOffsets <- C.readMVar taskOffsetsM M.getIOTaskMeta workerHandle taskId >>= \case + -- FIXME: find another way to handle this inconsistency with memory and meta Nothing -> throwIO $ HE.ConnectorNotFound name Just c -> do dockerStatus <- getDockerStatus task @@ -171,9 +176,10 @@ listRecoverableResources worker@Worker{..} = do recoverTask :: Worker -> T.Text -> IO () recoverTask worker@Worker{..} name = do - Log.info $ "recovering task:" <> Log.buildString' name M.getIOTaskFromName workerHandle name >>= \case - Nothing -> throwIO $ HE.ConnectorNotFound name + Nothing -> do + Log.info $ "can't found task " <> Log.build name <> " in meta, recover task failed" + throwIO $ HE.ConnectorNotFound name Just (taskId, TaskMeta{taskInfoMeta=taskInfo@TaskInfo{..}}) -> do let newConnCfg = J.insert "hstream" (J.toJSON hsConfig) connectorConfig newImage = if options.optFixedConnectorImage @@ -185,19 +191,19 @@ recoverTask worker@Worker{..} name = do <> Log.build taskConfig.tcImage <> " -> " <> Log.build newImage M.updateTaskConfig workerHandle taskId newTaskConfig createIOTaskFromTaskInfo worker taskId taskInfo{connectorConfig=newConnCfg, taskConfig=newTaskConfig} options True False False + Log.info $ "recovering task " <> Log.buildString' name <> " success" -- update config and restart alterConnectorConfig :: Worker -> T.Text -> T.Text -> IO () alterConnectorConfig worker name config = do updated <- updateConnectorConfig worker name config when updated $ do - Log.info $ "updated connector config, connector:" <> Log.build name E.catch (stopIOTask worker name True) (\(e :: E.SomeException) -> Log.warning $ "failed to stop io task:" <> Log.buildString (show e)) - Log.info $ "paused connector:" <> Log.build name + Log.info $ "pause connector " <> Log.build name <> " for update" recoverTask worker name - Log.info $ "resumed connector:" <> Log.build name + Log.info $ "resume updated connector " <> Log.build name updateConnectorConfig :: Worker -> T.Text -> T.Text -> IO Bool updateConnectorConfig worker name config = do @@ -210,7 +216,7 @@ updateConnectorConfig worker name config = do let mergeCfg (J.Object x) (J.Object y) = J.Object (J.union x y) let newConnCfg = J.insertWith mergeCfg "connector" (J.toJSON overrided) connectorConfig M.updateConfig worker.workerHandle taskId newConnCfg - Log.info $ "updated connector config, connector:" <> Log.build name + Log.info $ "update connector config, connector:" <> Log.build name <> ", new config:" <> Log.buildString' newConnCfg return True @@ -228,7 +234,7 @@ deleteIOTask :: Worker -> T.Text -> IO () deleteIOTask worker@Worker{..} taskName = do E.catch (stopIOTask worker taskName True) - (\(e :: E.SomeException) -> Log.info $ "try to stop io task:" <> Log.buildString (show e)) + (\(e :: E.SomeException) -> Log.fatal $ "stop IOTask " <> Log.build taskName <> " error: " <> Log.buildString (show e)) M.deleteIOTaskMeta workerHandle taskName C.modifyMVar_ ioTasksM $ return . HM.delete taskName diff --git a/hstream/src/HStream/Server/Handler/Connector.hs b/hstream/src/HStream/Server/Handler/Connector.hs index 092d69148..ec52d55aa 100644 --- a/hstream/src/HStream/Server/Handler/Connector.hs +++ b/hstream/src/HStream/Server/Handler/Connector.hs @@ -58,12 +58,13 @@ createConnectorHandler -> ServerRequest 'Normal CreateConnectorRequest Connector -> IO (ServerResponse 'Normal Connector) createConnectorHandler sc (ServerNormalRequest _ req) = defaultExceptionHandle $ do - Log.debug "Receive Create Connector Request" - validateCreateConnector req - createIOTaskFromRequest sc req >>= returnResp + Log.info $ "Receive Create Connector Request: " <> Log.build (show req) + validateCreateConnector req + createIOTaskFromRequest sc req >>= returnResp handleCreateConnector :: ServerContext -> G.UnaryHandler CreateConnectorRequest Connector handleCreateConnector sc _ req = catchDefaultEx $ do + Log.info $ "Receive Create Connector Request: " <> Log.build (show req) validateCreateConnector req createIOTaskFromRequest sc req @@ -151,7 +152,7 @@ resumeConnectorHandler -> IO (ServerResponse 'Normal Empty) resumeConnectorHandler sc@ServerContext{..} (ServerNormalRequest _metadata ResumeConnectorRequest{..}) = defaultExceptionHandle $ do - Log.debug $ "Receive ResumeConnectorRequest. " + Log.info $ "Receive ResumeConnectorRequest. " <> "Connector Name: " <> Log.build resumeConnectorRequestName ServerNode{..} <- lookupResource sc ResConnector resumeConnectorRequestName unless (serverNodeId == serverID) $ @@ -161,6 +162,8 @@ resumeConnectorHandler sc@ServerContext{..} handleResumeConnector :: ServerContext -> G.UnaryHandler ResumeConnectorRequest Empty handleResumeConnector sc@ServerContext{..} _ ResumeConnectorRequest{..} = catchDefaultEx $ do + Log.info $ "Receive ResumeConnectorRequest. " + <> "Connector Name: " <> Log.build resumeConnectorRequestName ServerNode{..} <- lookupResource sc ResConnector resumeConnectorRequestName unless (serverNodeId == serverID) $ throwIO $ HE.WrongServer "Connector is bound to a different node" @@ -172,7 +175,7 @@ pauseConnectorHandler -> IO (ServerResponse 'Normal Empty) pauseConnectorHandler sc@ServerContext{..} (ServerNormalRequest _metadata PauseConnectorRequest{..}) = defaultExceptionHandle $ do - Log.debug $ "Receive Terminate Connector Request. " + Log.info $ "Receive Terminate Connector Request. " <> "Connector ID: " <> Log.build pauseConnectorRequestName ServerNode{..} <- lookupResource sc ResConnector pauseConnectorRequestName unless (serverNodeId == serverID) $ @@ -182,6 +185,8 @@ pauseConnectorHandler sc@ServerContext{..} handlePauseConnector :: ServerContext -> G.UnaryHandler PauseConnectorRequest Empty handlePauseConnector sc@ServerContext{..} _ PauseConnectorRequest{..} = catchDefaultEx $ do + Log.info $ "Receive Terminate Connector Request. " + <> "Connector ID: " <> Log.build pauseConnectorRequestName ServerNode{..} <- lookupResource sc ResConnector pauseConnectorRequestName unless (serverNodeId == serverID) $ throwIO $ HE.WrongServer "Connector is bound to a different node" @@ -219,16 +224,9 @@ createIOTaskFromRequest sc CreateConnectorRequest{..} = do createIOTask :: ServerContext -> T.Text -> T.Text -> T.Text -> T.Text -> IO Connector createIOTask sc@ServerContext{..} name typ target cfg = do - -- FIXME: Can we remove this validation ? - validateNameAndThrow ResConnector name ServerNode{..} <- lookupResource sc ResConnector name unless (serverNodeId == serverID) $ throwIO $ HE.WrongServer "Connector is bound to a different node" - Log.info $ "CreateConnector CodeGen" - <> ", connector type: " <> Log.build typ - <> ", connector name: " <> Log.build name - <> ", connector targe: " <> Log.build target - <> ", config: " <> Log.build cfg IO.createIOTask scIOWorker name typ target cfg -------------------------------------------------------------------------------