Skip to content

Commit

Permalink
chore(connector): update log (#1785)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Mar 29, 2024
1 parent 39d9334 commit a5a45c4
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 29 deletions.
18 changes: 10 additions & 8 deletions hstream-io/HStream/IO/IOTask.hs
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,18 @@ getDockerName :: T.Text -> T.Text
getDockerName = ("IOTASK_" <>)

runIOTask :: IOTask -> IO ()
runIOTask ioTask@IOTask{..} = do
Log.info $ "taskCmd: " <> Log.buildString taskCmd
runIOTask ioTask@IOTask{taskInfo=TaskInfo{taskConfig=TaskConfig{..}}, ..} = do
tp <- TP.startProcess taskProcessConfig
writeIORef process' (Just tp)
_ <- C.forkIO $
_ <- C.forkIO $ do
Log.info $ "run IOTask : " <> Log.buildString taskCmd
E.catch
(handleStdout ioTask (TP.getStdout tp) (TP.getStdin tp))
(\(e :: E.SomeException) -> Log.info $ "handleStdout exited:" <> Log.buildString (show e))
(\(e :: E.SomeException) ->
Log.info $ "IOTask " <> Log.build (show ioTask) <> " exited:" <> Log.buildString (show e)
)
return ()
where
TaskInfo {..} = taskInfo
TaskConfig {..} = taskConfig
taskCmd = concat [
"docker run --rm -i",
" --network=", T.unpack tcNetwork,
Expand Down Expand Up @@ -111,11 +111,13 @@ handleStdout ioTask hStdout hStdin = forever $ do
IO.hFlush hStdin

handleConnectorRequest :: IOTask -> MSG.ConnectorRequest -> IO MSG.ConnectorResponse
handleConnectorRequest ioTask MSG.ConnectorRequest{..} = do
handleConnectorRequest ioTask req@MSG.ConnectorRequest{..} = do
MSG.ConnectorResponse crId <$> E.catch
(handleConnectorMessage ioTask crMessage)
(\(e :: E.ZooException) -> do
Log.warning $ "handleConnectorRequest failed:" <> Log.buildString (show e) <> "ignored"
Log.fatal $ "handleConnectorRequest failed, "
<> "\n\tRequest: " <> Log.build (show req)
<> "\n\tError: " <> Log.buildString (show e)
pure J.Null)

handleConnectorMessage :: IOTask -> MSG.ConnectorMessage -> IO J.Value
Expand Down
8 changes: 8 additions & 0 deletions hstream-io/HStream/IO/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ data IOTask = IOTask
, ioOptions :: IOOptions
}

instance Show IOTask where
show IOTask {taskInfo=TaskInfo{..}, ..}
= "{id=" <> show taskId
<> ", name=" <> show taskName
<> ", type=" <> show taskType
<> ", target=" <> show taskTarget
<> " }"

type ZkUrl = T.Text
type Path = T.Text
data ConnectorMetaConfig
Expand Down
24 changes: 15 additions & 9 deletions hstream-io/HStream/IO/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ createIOTask worker@Worker{..} name typ target cfg = do
createIOTaskFromTaskInfo worker taskId taskInfo options False True True
showIOTask_ worker name

createIOTaskFromTaskInfo :: HasCallStack => Worker -> T.Text -> TaskInfo -> IOOptions -> Bool -> Bool -> Bool -> IO ()
createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..} ioOptions cleanIfExists createMetaData enableCheck = do
createIOTaskFromTaskInfo
:: HasCallStack
=> Worker -> T.Text -> TaskInfo -> IOOptions -> Bool -> Bool -> Bool -> IO ()
createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..}
ioOptions cleanIfExists createMetaData enableCheck = do
getIOTask worker taskName >>= \case
Nothing -> pure ()
Just _ -> do
Expand All @@ -103,6 +106,7 @@ createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..} ioOptio

when createMetaData $ M.createIOTaskMeta workerHandle taskName taskId taskInfo
C.modifyMVar_ ioTasksM $ \ioTasks -> do
-- FIXME: already check ioTask exist in `getIOTask worker` step, no need check again
case HM.lookup taskName ioTasks of
Just _ -> throwIO $ HE.ConnectorExists taskName
Nothing -> do
Expand All @@ -124,6 +128,7 @@ showIOTask_ worker@Worker{..} name = do
task@IOTask{taskInfo=TaskInfo{..}, ..} <- getIOTask_ worker name
taskOffsets <- C.readMVar taskOffsetsM
M.getIOTaskMeta workerHandle taskId >>= \case
-- FIXME: find another way to handle this inconsistency with memory and meta
Nothing -> throwIO $ HE.ConnectorNotFound name
Just c -> do
dockerStatus <- getDockerStatus task
Expand Down Expand Up @@ -171,9 +176,10 @@ listRecoverableResources worker@Worker{..} = do

recoverTask :: Worker -> T.Text -> IO ()
recoverTask worker@Worker{..} name = do
Log.info $ "recovering task:" <> Log.buildString' name
M.getIOTaskFromName workerHandle name >>= \case
Nothing -> throwIO $ HE.ConnectorNotFound name
Nothing -> do
Log.info $ "can't found task " <> Log.build name <> " in meta, recover task failed"
throwIO $ HE.ConnectorNotFound name
Just (taskId, TaskMeta{taskInfoMeta=taskInfo@TaskInfo{..}}) -> do
let newConnCfg = J.insert "hstream" (J.toJSON hsConfig) connectorConfig
newImage = if options.optFixedConnectorImage
Expand All @@ -185,19 +191,19 @@ recoverTask worker@Worker{..} name = do
<> Log.build taskConfig.tcImage <> " -> " <> Log.build newImage
M.updateTaskConfig workerHandle taskId newTaskConfig
createIOTaskFromTaskInfo worker taskId taskInfo{connectorConfig=newConnCfg, taskConfig=newTaskConfig} options True False False
Log.info $ "recovering task " <> Log.buildString' name <> " success"

-- update config and restart
alterConnectorConfig :: Worker -> T.Text -> T.Text -> IO ()
alterConnectorConfig worker name config = do
updated <- updateConnectorConfig worker name config
when updated $ do
Log.info $ "updated connector config, connector:" <> Log.build name
E.catch
(stopIOTask worker name True)
(\(e :: E.SomeException) -> Log.warning $ "failed to stop io task:" <> Log.buildString (show e))
Log.info $ "paused connector:" <> Log.build name
Log.info $ "pause connector " <> Log.build name <> " for update"
recoverTask worker name
Log.info $ "resumed connector:" <> Log.build name
Log.info $ "resume updated connector " <> Log.build name

updateConnectorConfig :: Worker -> T.Text -> T.Text -> IO Bool
updateConnectorConfig worker name config = do
Expand All @@ -210,7 +216,7 @@ updateConnectorConfig worker name config = do
let mergeCfg (J.Object x) (J.Object y) = J.Object (J.union x y)
let newConnCfg = J.insertWith mergeCfg "connector" (J.toJSON overrided) connectorConfig
M.updateConfig worker.workerHandle taskId newConnCfg
Log.info $ "updated connector config, connector:" <> Log.build name
Log.info $ "update connector config, connector:" <> Log.build name
<> ", new config:" <> Log.buildString' newConnCfg
return True

Expand All @@ -228,7 +234,7 @@ deleteIOTask :: Worker -> T.Text -> IO ()
deleteIOTask worker@Worker{..} taskName = do
E.catch
(stopIOTask worker taskName True)
(\(e :: E.SomeException) -> Log.info $ "try to stop io task:" <> Log.buildString (show e))
(\(e :: E.SomeException) -> Log.fatal $ "stop IOTask " <> Log.build taskName <> " error: " <> Log.buildString (show e))
M.deleteIOTaskMeta workerHandle taskName
C.modifyMVar_ ioTasksM $ return . HM.delete taskName

Expand Down
22 changes: 10 additions & 12 deletions hstream/src/HStream/Server/Handler/Connector.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ createConnectorHandler
-> ServerRequest 'Normal CreateConnectorRequest Connector
-> IO (ServerResponse 'Normal Connector)
createConnectorHandler sc (ServerNormalRequest _ req) = defaultExceptionHandle $ do
Log.debug "Receive Create Connector Request"
validateCreateConnector req
createIOTaskFromRequest sc req >>= returnResp
Log.info $ "Receive Create Connector Request: " <> Log.build (show req)
validateCreateConnector req
createIOTaskFromRequest sc req >>= returnResp

handleCreateConnector :: ServerContext -> G.UnaryHandler CreateConnectorRequest Connector
handleCreateConnector sc _ req = catchDefaultEx $ do
Log.info $ "Receive Create Connector Request: " <> Log.build (show req)
validateCreateConnector req
createIOTaskFromRequest sc req

Expand Down Expand Up @@ -151,7 +152,7 @@ resumeConnectorHandler
-> IO (ServerResponse 'Normal Empty)
resumeConnectorHandler sc@ServerContext{..}
(ServerNormalRequest _metadata ResumeConnectorRequest{..}) = defaultExceptionHandle $ do
Log.debug $ "Receive ResumeConnectorRequest. "
Log.info $ "Receive ResumeConnectorRequest. "
<> "Connector Name: " <> Log.build resumeConnectorRequestName
ServerNode{..} <- lookupResource sc ResConnector resumeConnectorRequestName
unless (serverNodeId == serverID) $
Expand All @@ -161,6 +162,8 @@ resumeConnectorHandler sc@ServerContext{..}

handleResumeConnector :: ServerContext -> G.UnaryHandler ResumeConnectorRequest Empty
handleResumeConnector sc@ServerContext{..} _ ResumeConnectorRequest{..} = catchDefaultEx $ do
Log.info $ "Receive ResumeConnectorRequest. "
<> "Connector Name: " <> Log.build resumeConnectorRequestName
ServerNode{..} <- lookupResource sc ResConnector resumeConnectorRequestName
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
Expand All @@ -172,7 +175,7 @@ pauseConnectorHandler
-> IO (ServerResponse 'Normal Empty)
pauseConnectorHandler sc@ServerContext{..}
(ServerNormalRequest _metadata PauseConnectorRequest{..}) = defaultExceptionHandle $ do
Log.debug $ "Receive Terminate Connector Request. "
Log.info $ "Receive Terminate Connector Request. "
<> "Connector ID: " <> Log.build pauseConnectorRequestName
ServerNode{..} <- lookupResource sc ResConnector pauseConnectorRequestName
unless (serverNodeId == serverID) $
Expand All @@ -182,6 +185,8 @@ pauseConnectorHandler sc@ServerContext{..}

handlePauseConnector :: ServerContext -> G.UnaryHandler PauseConnectorRequest Empty
handlePauseConnector sc@ServerContext{..} _ PauseConnectorRequest{..} = catchDefaultEx $ do
Log.info $ "Receive Terminate Connector Request. "
<> "Connector ID: " <> Log.build pauseConnectorRequestName
ServerNode{..} <- lookupResource sc ResConnector pauseConnectorRequestName
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
Expand Down Expand Up @@ -219,16 +224,9 @@ createIOTaskFromRequest sc CreateConnectorRequest{..} = do

createIOTask :: ServerContext -> T.Text -> T.Text -> T.Text -> T.Text -> IO Connector
createIOTask sc@ServerContext{..} name typ target cfg = do
-- FIXME: Can we remove this validation ?
validateNameAndThrow ResConnector name
ServerNode{..} <- lookupResource sc ResConnector name
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
Log.info $ "CreateConnector CodeGen"
<> ", connector type: " <> Log.build typ
<> ", connector name: " <> Log.build name
<> ", connector targe: " <> Log.build target
<> ", config: " <> Log.build cfg
IO.createIOTask scIOWorker name typ target cfg

-------------------------------------------------------------------------------
Expand Down

0 comments on commit a5a45c4

Please sign in to comment.