-
Notifications
You must be signed in to change notification settings - Fork 11
handleExternal
support
#9
Comments
As per github's clever links, there is another use case for this in haskell-distributed/distributed-process-supervisor#8. |
@teh I think the issues being discussed at the bottom of haskell-distributed/distributed-process-supervisor#8 are quite similar to your asks about interactions between warp handlers and managed processes in this library's API. The ability to write this seems pressing to me: demo :: STM.TChan InputType -> Process (ProcessDefinition State)
demo broadcastChan = do
rc <- liftIO $ dupTChan broadcastChan
defaultProcess = [
-- other things elided
infoHandlers = [ handleExternal (readTChan rc) doStuff ]
] Better still, would be the ability to wrap API handlers around an STM channel, such that we can write something like |
@teh take a look at https://github.com/haskell-distributed/distributed-process-client-server/tree/tw/h-ext and let me know if that would meet your needs please? I think the 2-way STM stuff is totally concrete and the implementation is extremely simple (and based on idioms we use all over the place for handling typed channels). Specifically, this test case exemplifies: testExternalService :: TestResult Bool -> Process ()
testExternalService result = do
inChan <- liftIO $ newTQueueIO
replyChan <- liftIO $ newTQueueIO
let procDef = statelessProcess {
apiHandlers = [
handleExternal
(readTQueue inChan)
(\s (m :: String) -> do
liftIO $ atomically $ writeTQueue replyChan m
continue s)
]
}
let txt = "hello 2-way stm foo"
pid <- spawnLocal $ serve () (statelessInit Infinity) procDef
echoTxt <- liftIO $ do
-- firstly we write something that the server can receive
atomically $ writeTQueue inChan txt
-- then sit and wait for it to write something back to us
atomically $ readTQueue replyChan
stash result (echoTxt == txt)
kill pid "done" |
@hyperthunk thanks for keeping at it! (I guess you mean e20135f specifically?) This isn't quite what I am doing but pretty close!
The boundary between callback-IO and CH is tricky, and the |
@teh, yeah I get that your usage is a bit different. Here are some thoughts... In terms of (1/2), it's totally up to you how you structure things. I would not use a single channel to go back to the warp handler either, since that would also be a potential bottleneck. For (3), my advise is this: do not fight warp, and do not fight the runtime. The point about this feature in client-server, is that it allows you to read a value back in I'll post a picture in a few mins.. |
Okay @teh having spent some time poking around in warp, I think the best thing for what you're doing is to simply fork a new process and wait on the result using stm, and tbh you could just use a |
Thanks! That's the conclusion I came to as well, and that's what my code does. Sorry if I wasn't clearer about that. As long as the request is small and blocking it's pretty simple. But I'm am looking forward to |
@teh okay, I've just pushed an improved mechanism for doing synchronous round trips - it'll simplify the server code (making it easier to follow, more transferable during potential refactoring, etc) and allows you to use whatever STM constructs you like to synchronise client and server. Here's the example code from the tests... I'm going to break it down (and we can steal the markup from this comment to set up a doc page around it later on the wiki/website...) data StmServer = StmServer { serverPid :: ProcessId
, writerChan :: TQueue String
, readerChan :: TQueue String
} We start out by defining a server handle, which is good practise for -cilent-server apps as per the docs/tutorials. We will use this to interact with the server process. Since we want to resolve it to a process (for monitoring) and in our test case, to kill it (once we're done), we add the relevant instances from -extras to make that easy: instance Resolvable StmServer where
resolve = return . Just . serverPid
instance Killable StmServer where
killProc StmServer{..} = kill serverPid
exitProc StmServer{..} = exit serverPid The client part of the interaction uses a new function exposed through the The callSTM :: forall s a b . (Addressable s)
=> s
-> (a -> STM ())
-> STM b
-> a
-> Process (Either ExitReason b)
callSTM server writeAction readAction input = do
liftIO $ atomically $ writeAction input
awaitResponse server [ matchSTM readAction (return . Right) ] Back to our code then, we implement the client side of our API using this function, and use the handle to (a) ensure we have the relevant echoStm :: StmServer -> String -> Process (Either ExitReason String)
echoStm StmServer{..} = callSTM serverPid
(writeTQueue writerChan)
(readTQueue readerChan) Now for our server implementation. We create the Given our input and output channels, we wire them into the server using the new Here's our server code now: launchEchoServer :: Process StmServer
launchEchoServer = do
(inQ, replyQ) <- liftIO $ do
cIn <- newTQueueIO
cOut <- newTQueueIO
return (cIn, cOut)
let procDef = statelessProcess {
apiHandlers = [
handleCallExternal
(readTQueue inQ)
(writeTQueue replyQ)
(\st (msg :: String) -> reply msg st)
]
}
pid <- spawnLocal $ serve () (statelessInit Infinity) procDef
return $ StmServer pid inQ replyQ Those Finally, the test case, which simply launches the server, calls it synchronously, and puts the reply/response into our result: testExternalCall :: TestResult Bool -> Process ()
testExternalCall result = do
let txt = "hello stm-call foo"
srv <- launchEchoServer
echoStm srv txt >>= stash result . (== Right txt)
killProc srv "done" So, there you have it. I'll try and get this merged soon. I think |
Oh, and I should point out that because the A key thing you'll want to watch out for with all of this though, is that if you're sending remote calls/messages then serialisation can incur quite a heavy cost. If you're communicating with other local processes, I strongly suggest using the |
You are productive! :)
|
@teh, well it might do... The underlying awaitResponse :: Addressable a
=> a
-> [Match (Either ExitReason b)]
-> Process (Either ExitReason b)
awaitResponse addr matches = do
mPid <- resolve addr
case mPid of
Nothing -> return $ Left $ ExitOther "UnresolvedAddress"
Just p -> do
mRef <- P.monitor p
receiveWait ((matchRef mRef):matches)
where
matchRef :: MonitorRef -> Match (Either ExitReason b)
matchRef r = matchIf (\(ProcessMonitorNotification r' _ _) -> r == r')
(\(ProcessMonitorNotification _ _ d) -> do
return (Left (ExitOther (show d)))) Now In many ways, my suspicion for what you're doing is that you might be better off just using distributed-process-async. If the server process needs to maintain some state - e.g., a database connection, OS resources, etc - then it makes sense to model it as a managed process. If you just want to proxy other cloud haskell processes that are doing more complex things and are addressable by name etc, then a stateless managed process will do the trick, but there is a risk of accidental complexity when dealing with cleanup operations. The usual way to clean up once a managed process is shutting down, is to install a shutdown handler. This will run before the process exits, and can be used to ensure any acquired resources are released properly. Since you cannot link your warp thread to the managed process (which is running in CH land) easily, a sensible approach would probably be to start the managed process with a Something I keep meaning to ask you...
|
Oh and I should point out that cleanup is one of the reasons why you'd normally arrange your process hierarchy in a supervision tree, such that when a branch shuts down, all the server processes perform their cleanup properly before restarting (or shutting down the supervisor, depending on the strategy chosen). |
Yes to both questions 1 and 2 (in my current code which is essentially still the one in haskell-distributed/distributed-process#306 (comment) ). Cleanup is an interesting question in general, though AFAICT I don't have anything like open file handlers that needs cleaning up directly. I do need to make sure that an async exception kills the |
Closing this issue, as the |
So we can compose STM actions inside a managed process definition, allow clients and servers that reside on the same node to communicate over STM channels, queues, etc.
The text was updated successfully, but these errors were encountered: