Skip to content
This repository has been archived by the owner on Sep 3, 2024. It is now read-only.

Child start handling for Process () is broken. #8

Closed
hyperthunk opened this issue Feb 18, 2017 · 20 comments
Closed

Child start handling for Process () is broken. #8

hyperthunk opened this issue Feb 18, 2017 · 20 comments
Assignees

Comments

@hyperthunk
Copy link
Member

hyperthunk commented Feb 18, 2017

First reported as part of haskell-distributed/distributed-process-platform#77, there is a serious fault in the ToChildStart instance for Process ().

Not only do we potentially spin up and subsequently leak starter processes, the whole premise of this approach is wrong, since it can lead to a supervisor waiting indefinitely for a child to start. This breaks the contract between parent and child processes and goes against the design principles of supervision as laid out in the original OTP implementation.

I propose we remove this instance and leave it to implementors to define, but also that we remove StarterPid from the data type, since we have no clean solutions for using these without hitting the issues mentioned above.

Specifically, spawn should be asynchronous, and indication that a child has died should come from monitor signals, such that once the child has spawned, the monitor signal should be established before the code has a chance to proceed (and potentially crash prior to monitoring being properly established).

Some things I think we can/should rule out...

The original fix (from @tavisrudd before the repos were split)

Here's a playback of the commentary there... When the StarterProcess ChildStart variant is used with dynamic supervision, each cycle of startNewChild + terminateChild/deleteChild
leaks a process. The proposed fix was to kill the started process, for which we have an ID.

This was broken for two reasons. The first is that we do not evaluate toChildStart in the supervisor process, thus killing the starter process means we can no longer restart the child. The second reason is that we break location transparency, as per this part of the thread:

... So it is safe for us to kill this ProcessId from our point of view, because we're deleting the child spec (and therefore won't require this "re-starter process" to continue living. But what if the code that created this re-starter tries to then send it to another supervisor? The ProcessId given to that supervisor will be dead/stale and the supervisor will reject it (when trying to start the child spec) with StartFailureDied DiedUnknownId which is confusing. (UPDATE: note that I'd missed the fact this supervisor won't be able to start children either...)

At first glance, I thought what we actually wanted here is a finalizer that is guaranteed to kill off the re-starter process "at some point" after the ChildSpec becomes unreachable (and is gc'ed). However, the System.Mem.Weak documentation isn't clear on whether or not this is guaranteed for an ADT such as ProcessId. In particular, this comment:

WARNING: weak pointers to ordinary non-primitive Haskell types are particularly fragile, because the compiler is free to optimise away or duplicate the underlying data structure. Therefore attempting to place a finalizer on an ordinary Haskell type may well result in the finalizer running earlier than you expected. This is not a problem for caches and memo tables where early finalization is benign.

The alternative to this would be to document this behaviour of ChildSpec, explaining that once deleted from a supervisor, the ChildSpec becomes invalid. But I really really dislike this idea. The problem is that the ChildSpec is now behaving like a shared, mutable (unsafe) data structure. Even if you serialize the ChildSpec and send it to another node, if the same ChildSpec (or even another ChildSpec that shares the same ToChildStart thunk!) is removed from any supervisor in the system, we've just invalidated that same data structure across all nodes, because the ProcessId is no longer valid. That just seems wrong to me, no matter how much documentation we throw at it.

One approach that might work here would be to convert the StarterProcess constructor to take a Weak ProcessId and in the ToChildStart instance create a weak reference to the process id and create a finalizer that kills the process once all its clients - the supervisors using it - go away, i.e., the ChildStart datum becomes garbage. Problem solved? Well no.....

Firstly, we'd need to test that this finalization business works properly for a Weak ProcessId. You've already written profiling code to detect the leak, so that shouldn't be too difficult, though relying on finalization will probably mean having to increase the time bounds of the tests to give the System.Mem.Weak infrastructure time to do the cleanup - maybe even forcing a GC at some point.

Secondly, and this is a serious problem: finalisation is useless if/when the ChildStart datum escapes the local node. Which means, in practice, that you can't serialize and send it remotely, otherwise this whole finalization thing will go wrong - we'll never detect that a remote peer is using the ChildStart at all (i.e., we will loose track of it as soon as it's gone over the wire) and therefore - assuming that finalizing a ProcessId works at all - we'll end up killing the re-starter process whilst remote supervisors are still using it. Nasty. Confusing. Bug. :(

You can see now why I was a fussy little kitty when @roman and I were debating how to support local children in the original ticket that introduced StarterProcess. This issue is fraught with edge cases and we're changing a piece of critical fault-tolerance infrastructure, so we can't afford to screw it up.

I think we have three choices here, as I see it - feel free to suggest alternatives though, as always:

  1. remove StarterProcess and make people use the Closure Process constructor instead
  2. re-work StarterProcess to reference count its clients/supervisors
  3. remove the ToChildStart instance for Process () and export StarterProcess then make people implement this themselves

The idea of (1) is that we're avoiding the issue by making the API less friendly. Not a great option IMO.

UPDATE: I actually think (1) is what we should do now. I went on to say the following, which I'll recant shortly...

The idea behind (2) is that the re-starter process will keep some internal state to track each time a new supervisor tries to use it. Each time a supervisor deletes a ChildSpec that uses this re-starter pid, instead of kill restarterPid ... they should exit restarterPid Shutdown and the re-starter process should catchExit expecting Shutdown and decrement its supervisor/reference count each time. Once that ref-count reaches zero, it terminates. This is still vulnerable to the problem I outlined above though, so I think it's not really viable. We cannot provide an API to the general population that is open to abuse like this.

The idea of (3) is that we either find a way to expose that re-starter pid or just make people implement the ToChildStart themselves. The point is, if you've written your code so that you know which re-starter is associated with which ChildSpec and you know (in your own code) that you're no longer using the ChildSpec or the StarterProcess then you - the one who knows what is going on are best placed to terminate that process yourself when you're sure it is no longer needed. This puts the responsibility where it really IMHO belongs - in the hands of the application developers.

The problem with (2) and (3) is that we introduce a huge degree of complexity for very little benefit. Despite the original report in https://cloud-haskell.atlassian.net/browse/DPP-81, it is not difficult to turn a Process () expression into a closure without template haskell, and therefore we're jumping through hoops for hardly any reason at all. But there's more: The way that StarterProcess works is fundamentally broken, because the supervisor has to interact with this other known process to spawn its children, and get back a process id and monitor ref. That is broken in a fundamental way: if we do not own the code for that starter process then we've no way of ensuring we won't block indefinitely waiting for a reply. Even if we do, if the starter resides on a foreign node, network congestion could cause almost indefinite blocking (even in the presence of heartbeats), so this is a bit no-no. We cannot have the supervisor process blocked waiting like that - spawning a child has to be asynchronous, and there needs to be a guarantee that monitoring has been set up correctly before the child proceeds to run, which requires a wrapper that we need to have written. We cannot leave this up to 3rd parties.

Because of this (above), we have to have a thunk that we can execute, which we can wrap in the relevant spawn/monitor code for safely. Since we cannot send Process a over the wire, only Closure (Process a) is possible - allowing for the CreateHandle instance obviously, which takes Closure (SupervisorPid -> Process (ChildPid, Message)) - and we cannot operate in terms of Process a. A concession would be to break up ChildSpec into those used to define the supervisor and those used for dynamic supervision, allowing Process () in a child spec used to boot the supervisor but not in subsequent calls to addChild etc. I really do not like this idea however, since not only to it create an imbalanced API, it could also prevent us sending supervisor setup data over the wire (since it could potentially contain Process () instead of Closure (Process ()) thunks), and that is a pretty huge disadvantage.

The final concession we might make, would be to separate out the supervisor runtime implementation into two parts, one that handles local children and another for remote. I still don't like this idea though, because we end up with a fractured API, but I will consider it if people shout asking for Process () to be supported as a ToChildStart instance again. The approach I would take to achieve this would involve (a) breaking up the server so that the child start handling could be modified, (b) segregating the client APIs and having two supervisor client handles, one for local only and another for remote only supervisors. The client handle for a local only supervisor would carry an STM.TChan used for sending unserialisable thunks to the server. This would require us to implement haskell-distributed/distributed-process-client-server#9 first.

@hyperthunk
Copy link
Member Author

Some background (from Jira):

When running addChild/removeChild in a loop using the Supervisor API, we noticed a memory leak. I don't know if this is related to #DPP-72.
We have tried to replace the StateT monad being used in ManagedProcess to be Strict, and created a NFData instance for the Supervisor State, and that doesn't seem to stop the leak. We will add more ps files with profiling as we go.
Here is the code we are using for the test:
https://gist.github.com/roman/7878922
Attached is one ps
OptionsAttachments
maestro-tester.ps
98 kB09/Dec/13 10:17 PM
memory-leak-on-supervisor.ps
15 kB09/Dec/13 7:15 PM
supervisor-test-4-cores-3000-ev.ps
26 kB09/Dec/13 10:21 PM
Activity
All
Comments
Work log
History
Activity
More
hyperthunk Tim Watson [Administrator] added a comment - 09/Dec/13 9:07 PM
When running addChild/removeChild in a loop using the Supervisor API, we noticed a memory leak. I don't know if this is related to #DPP-72.
It could be, if DPP-72 OPEN is/was due to a leak in Closure or similar. I haven't nailed that bug so it's hard to tell.
We have tried to replace the StateT monad being used in ManagedProcess to be Strict, and created a NFData instance for the Supervisor State, and that doesn't seem to stop the leak.
I very much doubt there's a leak in ManagedProcess, but there could be one in Supervisor. If the leak is related to the use of Closure then you try using a StarterProcess child start instead of creating a remotable/serializable closure.
BTW does it still leak if you use viaSend instead of viaClosure?
We will add more ps files with profiling as we go.
Please do!
Here is the code we are using for the test: https://gist.github.com/roman/7878922
Hmn, that's not exactly an isolated test, but I'll take a look when I get a chance. It's also quite hard to see whether or not that memory profile is really incorrect, since you do create a lot of serializable objects during the test run - closures, messages sent between them, etc etc - what happens if you make the supervisor sit doing nothing for 10 - 20 seconds after all that activity? Presumably something needs to happen to trigger a gc, though I'm not entirely sure what that something should be.
BTW, as I mentioned in the thread which spawned DPP-72 OPEN , this is unlikely to me something specific with ManagedProcess or (IMO) Supervisor, and we no longer use the Async module(s) to make RPC calls to MP servers. Regardless, the problem we saw there could certainly still exist.
Are you able to run a biographical profile (for drag,void) followed by a retainer profile? Those would be really useful.
More
roman Roman Gonzalez added a comment - 09/Dec/13 10:16 PM - edited
Hey Tim,
Thanks for the quick reply. Quick notes:

  1. We were printing the statistics of the Supervisor on viaClosure, and we didn't see any restarts happening on our tests. So I think the problem is not there, also the viaSend has a leak
    2.1) If we leave the Process monad running after hammering the supervisor, we see a step down in memory (I'll attach a ps file soon), so yes, it might not be a leak bur rather the GC doesn't kick in until the hammering has stopped. This is the behavior when we have +RTS -Nx where x > 1
    2.2) When +RTS -N4 the behavior is different, we see a leak going on.
  2. I think there is a race condition going on somewhere (my first suspect would be the Chan Transport), I get a "thread blocked indefinitely in an MVar operation" error after a while, also If I change the transport to TCP, it would just print a Supervisor message "killed-by=pid://0:11,reason=TerminatedBySupervisormaestro-tester"
    and hang. This happens when running with -threaded and -Nx where x > 1
  3. I'm not able to run biographical nor retainer profile, for some reason I get a segfault.
    More
    roman Roman Gonzalez added a comment - 09/Dec/13 10:17 PM
    This is running 5000 times addChild/terminate-deleteChild with -N1
    More
    roman Roman Gonzalez added a comment - 09/Dec/13 10:21 PM
    4 cores on 3000 times
    More
    roman Roman Gonzalez added a comment - 09/Dec/13 10:29 PM
    Updated code https://gist.github.com/roman/7878922/36245e58e08e48cbe53f4eebf1f185eac05a48aa
    More
    hyperthunk Tim Watson [Administrator] added a comment - 10/Dec/13 1:15 AM - edited
    We were printing the statistics of the Supervisor on viaClosure, and we didn't see any restarts happening on our tests. So I think the problem is not there, also the viaSend has a leak
    Right of course - that makes sense, since you've chosen Transient children, they'll only be restarted if they fail abnormally.
    If we leave the Process monad running after hammering the supervisor, we see a step down in memory (I'll attach a ps file soon), so yes, it might not be a leak bur rather the GC doesn't kick in until the hammering has stopped. This is the behavior when we have +RTS -Nx where x > 1
    I suspect that's what you're really seeing. Once the various thunks are obviously no longer needed for any kind of evaluation, the references to bytestrings will become garbage and all that PINNED memory will be reclaimed. What's interesting is that Supervisor generally uses the unsafe messaging primitives in order to avoid Serialization (and therefore bytestring) overheads when the sender and receiver reside on the same local node. So I wouldn't have expected to see leaking bytestrings as we did in DPP-72 OPEN - perhaps there's something else at work here.
    I think there is a race condition going on somewhere (my first suspect would be the Chan Transport), I get a "thread blocked indefinitely in an MVar operation" error after a while
    That just indicates that the only thread which was going to write to an MVar (as far as the RTS can tell) has died, leaving another thread stuck trying to read it. Presumably the test process crashed for some reason, leaving the outer thread stuck trying to read resultMVar.
    Where does that remoteTable come from BTW? That doesn't look right at all - you should be merging the Platform's remoteTable (assuming you don't need the one from distributed-process also) with your own (from SupervisorTest.Helpers).
    also If I change the transport to TCP, it would just print a Supervisor message "killed-by=pid://0:11,reason=TerminatedBySupervisormaestro-tester" and hang. This happens when running with -threaded and -Nx where x > 1
    Erm, where exactly is that happening? Presumably you're calling putStrLn $ show exception somewhere when you detect an exit to make that happen? Note that you've made that exit happen by calling terminateChild - the code ends up in childShutdown which uses kill pid "TerminatedBySupervisor" when it sees TerminateImmediately. So that message is not abnormal - assuming you're the one printing it out...
    I'm not able to run biographical nor retainer profile, for some reason I get a segfault.
    I have the same problem (on OSX) - need to get a linux machine to play on, but work's busy atm so haven't had time to.
    More
    hyperthunk Tim Watson [Administrator] added a comment - 10/Dec/13 1:16 AM
    Oh and ... what OS are you actually on, what version of GHC and which version of the CH code are you running?
    More
    roman Roman Gonzalez added a comment - 10/Dec/13 6:23 PM
    Hey Tim,
    I'm doing all this tests in Linux Gentoo, GHC v7.6.3, and CH development branch.
    I'll try to have a cabal project setup so you can try it out easily (instead of having gist laying around)
    More
    hyperthunk Tim Watson [Administrator] added a comment - 10/Dec/13 6:28 PM
    Cool, thanks Roman Gonzalez, much appreciated!
    More
    roman Roman Gonzalez added a comment - 10/Dec/13 8:25 PM
    Tim,
    https://github.com/roman/distributed-process-platform-profiling
    Here is a project I set up, you have to run make leaking_examples to get started, it should put a ps file on the out directory. Let me know if you need any help.
    Cheers.
    Roman.-
    More
    hyperthunk Tim Watson [Administrator] added a comment - 10/Dec/13 9:51 PM
    Ok thanks - I doubt I'll get to it before the end of the week, but I will take a look.
    More
    hyperthunk Tim Watson [Administrator] added a comment - 19/Dec/13 9:19 PM
    Ok thanks - I doubt I'll get to it before the end of the week, but I will take a look.
    I'm afraid that with the rapid approach of the holiday season, I'm unlikely to get into this before the new year. Any free time I do have for Cloud Haskell will need to go into other release preparation and bugfix/feature finalizing activities. I will try and dedicate some time to investigating this before releasing though.
    More
    hyperthunk Tim Watson [Administrator] added a comment - 18/Jan/14 10:10 AM
    Hi Roman Gonzalez, I've managed to build the project but profiling seems to be broken for me on OSX. Once I get that fixed I'll dig into this properly.
    More
    hyperthunk Tim Watson [Administrator] added a comment - 03/Feb/14 9:50 PM
    Urgh, I'm really sorry but I've not had time to look into this yet. It may have to wait until after the release, because I'm just completely swamped at the moment.
    Have you made any further progress in your investigations?
    More
    tavisrudd Tavis Rudd added a comment - 05/Mar/14 1:25 AM
    Tim, there was a dependency issue in the Makefile of Roman's profile test project. Maybe that's what you ran into on OSX. I've pushed a fix for it.
    We were still seeing the issue a few weeks ago we tested against the recent changes in DPP's development branch.
    More
    hyperthunk Tim Watson [Administrator] added a comment - 05/Mar/14 10:45 PM
    Thanks Tavis Rudd, I'll try and get this tested over the weekend. I've fixed a number of issues in Supervisor recently, so I'll be interested to see how this fares. TBH I'm still not convinced we're seeing an actual leak yet, but I won't dismiss the problem until we've made absolutely sure one way or the other.
    Thanks for your patience - sorry it's taken so long to get around to this!
    More
    tavisrudd Tavis Rudd added a comment - 08/Mar/14 9:48 PM
    It's possible that the leak we're seeing in our real code is related to what you mentioned in https://cloud-haskell.atlassian.net/browse/DPP-81?focusedCommentId=14015&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14015 re restarters from ChildStart.
    More
    hyperthunk Tim Watson [Administrator] added a comment - 12/Mar/14 9:55 AM
    No, I've fixed that in HEAD (development) by ensuring that the links are created. When I finally get around to testing this, I suspect we'll find that it really is not a leak at all, but I just haven't found to for it yet. Sorry - as soon I can, I will, promise!
    More
    tavisrudd Tavis Rudd added a comment - 18/Mar/14 9:31 PM
    We've found the cause of the leak. The StarterProcess restarterPid variant of ChildStart is leaking restarter processes when used with dynamic supervision. They are eventually cleaned up when the supervisor terminates but for long running dynamic pools under a supervisor that does not exit, the combo of startNewChild / terminateChild+deleteChild leaks a process per iteration. We're working on a patch.
    More
    hyperthunk Tim Watson [Administrator] added a comment - 18/Mar/14 9:48 PM
    We've found the cause of the leak.... We're working on a patch.
    Tavis Rudd - you guys rock! Thanks to you all for your work investigating and tracking this down.
    More
    tavisrudd Tavis Rudd added a comment - 18/Mar/14 10:50 PM
    We think the solution is to clean up the restarter in handleDeleteChild but it's in RestrictedProcess so it seems we can't kill the restarter without converting it to unrestricted Process. Would that mess things up?

@hyperthunk
Copy link
Member Author

hyperthunk commented Feb 18, 2017

Going back to the original ask which caused us to introduce ToChildStart for Process (), it would've been solved much more easily at the distributed-process-client-server level:

I would like to ask for your advice on a design problem I'm facing with this library. Basically I need an actor that communicates to an external PubSub mechanism, and translate those PubSub messages to Process () messages. This is pretty straight forward on a simple Process () action, however, I want to have a supervisor for it in case of failure. The problem resides that this PubSub mechanism keeps an State with TVars that are not Serializable, so I can't pass this PubSub context to the Process () action through a Closure. Would it be feasible to have a new extra ChildSpec type (say data LocalChildspec) that is not Serializable and would be used on the run/start functions only? (no startChild from external calls would be possible using LocalChildSpec)

runPubSubActor :: PubSub -> Process ()
runPubSubActor = ...
init :: PubSub -> Process ()
init pubSubCtx = do
 replacing Supervisor.run
runLocal (restartAll ..) [pubSubActorSpec]
where
pubSubActorSpec = LocalChildSpec
{ childKey = "pubsub-bridge", ..., childRun = (runPubSubActor pubSubCtx) }

The complexities that I can foresee coming from this change are that we need to cover both LocalChildSpec and ChildSpec on the specs Seq field on the Supervisor State, also some adaptition of the "*addChild" functions need to be done. For sure there are other complexities that should arise from this decision but I can't see them clearly at this point. I want to try it out, but I want your advise first to know if this is a good idea or not.

I think now that haskell-distributed/distributed-process-client-server#9 would solve this nicely, since you could encapsulate the STM objects in the server definition and use handleExternal (or whatever we come up with to support that notion) to interact with them. We will face similar issues about serialisation there, however the client-server definitions have never been meant to be serialisable, so I'm fairly confident we don't have an issue there.

@tdietert
Copy link
Contributor

tdietert commented May 15, 2018

After spending a few hours searching, I cannot for the life of me find out how to "turn a Process () expression into a closure without template haskell", as per:

it is not difficult to turn a Process () expression into a closure without template haskell
I have tried:

class IsService a where
  serviceProc :: a -> Process b
  serviceName :: a -> [Char]

mkServiceClosure :: (IsService a, Serializable b) => a -> Closure (Process b)
mkServiceClosure s = mkClosure ()
  where
    (mkClosure, remoteTable) = mkClosureValSingle (serviceName s) $ \() -> serviceProc s

ignoring the fact that the String supplied will have to be the same across all CH nodes (taking care of this with a typeclass grouping a name with a function, instead of using TH), this is not working. I get the following "Overlapping Instances" error:

    • Overlapping instances for Control.Distributed.Process.Internal.Closure.Explicit.MkTDict
                                  (Process b)
        arising from a use of ‘mkClosureValSingle’
      Matching instances:
        instance Control.Distributed.Process.Internal.Closure.Explicit.MkTDict
                   a
          -- Defined in ‘Control.Distributed.Process.Internal.Closure.Explicit’
        instance Serializable b =>
                 Control.Distributed.Process.Internal.Closure.Explicit.MkTDict
                   (Process b)
          -- Defined in ‘Control.Distributed.Process.Internal.Closure.Explicit’
    • In the expression: mkClosureValSingle (serviceName s)
      In the expression:
        mkClosureValSingle (serviceName s) $ \ () -> serviceProc s
      In a pattern binding:
        (mkClosure, remoteTable)
          = mkClosureValSingle (serviceName s) $ \ () -> serviceProc s
   |
73 |     (mkClosure, remoteTable) = mkClosureValSingle (serviceName s) $ \() -> serviceProc s
   |     

Coming from https://hackage.haskell.org/package/distributed-process-0.7.3/docs/Control-Distributed-Process-Closure.html#t:MkTDict

@tdietert
Copy link
Contributor

For future reference, there is a detailed outline about how to make Process a values into Closure (Process a) values found at the top of the Control.Distributed.Static module documentation.

@mcfilib
Copy link

mcfilib commented Oct 25, 2018

Is anyone (@hyperthunk @tdietert?) able to summarise where this is at and if there's a path forward?

@tdietert
Copy link
Contributor

@mcfilib the best I can do is point you to my workaround; converting a Process () value to a closure and then creating a childspec from the resulting Closure (Process ()) value: https://github.com/adjoint-io/uplink_dev/blob/master/src/Network/P2P/Service.hs#L295

@hyperthunk
Copy link
Member Author

I'll take a look soon, I've been away from the project a while but will try and get back to you ASAP...

@hyperthunk
Copy link
Member Author

Okay folks, this is a bit convoluted. So I previously fixed this bug by refactoring the ToChildStart implementation. It used to rely on a process outside of the supervisor (running in the calling client, believe it or not), which was rather flakey. I've since removed that implementation altogether, since it was bug ridden, and haven't replaced it.

The current implementation works fine, there aren't any races for ToChildStart handling, nor leaks that I'm aware of. There is however, one downside to insisting on a Closure, which is that you've got to wrap your child process code in a closure. Since this is bread and butter for Cloud Haskell, I don't really see it as a problem, however it does make this library perhaps a little less accessible to newcomers.

Now I personally think we can close this bug, however I'm happy to discuss further.

The original code looked like this:

instance ToChildStart (Process ()) where
  toChildStart proc = do
      starterPid <- spawnLocal $ do
        -- note [linking]: the first time we see the supervisor's pid,
        -- we must link to it, but only once, otherwise we simply waste
        -- time and resources creating duplicate links
        (supervisor, _, sendPidPort) <- expectTriple
        link supervisor
        spawnIt proc supervisor sendPidPort
        tcsProcLoop proc
      return (StarterProcess starterPid)
    where
      tcsProcLoop :: Process () -> Process ()
      tcsProcLoop p = forever' $ do
        (supervisor, _, sendPidPort) <- expectTriple
        spawnIt p supervisor sendPidPort

      spawnIt :: Process ()
              -> SupervisorPid
              -> SendPort ChildPid
              -> Process ()
      spawnIt proc' supervisor sendPidPort = do
        supervisedPid <- spawnLocal $ do
          link supervisor
          self <- getSelfPid
          (proc' `catches` [ Handler $ filterInitFailures supervisor self
                           , Handler $ logFailure supervisor self ])
            `catchesExit` [\_ m -> handleMessageIf m (== ExitShutdown)
                                                    (\_ -> return ())]
        sendChan sendPidPort supervisedPid

As you can probably tell by correlating that with my original bug report, those pids can leak...

@hyperthunk
Copy link
Member Author

@mcfilib I'm London based, so if you're not too far off in the UK, we could do a co-lo and talk through any issues on this and other platform libraries you're working on/with. I don't have the ability to contribute much by way of coding at the moment (though I will get back to this eventually), but I'm more than happy to help get people up to speed with what's there.

@hyperthunk
Copy link
Member Author

@mcfilib the best I can do is point you to my workaround; converting a Process () value to a closure and then creating a childspec from the resulting Closure (Process ()) value: https://github.com/adjoint-io/uplink_dev/blob/master/src/Network/P2P/Service.hs#L295

Yep, that's a deliberate design decision. You cannot pass a Process () remotely, therefore ChildStart (Process ()) cannot be part of the public API of this library. We tried to work around that previously by having code that executed on the client and then passed the ProcessId to the supervisor, but that's risky.. The various issues are document in this thread. I opted to remove that code, and leave it to API consumers to simply wrap their code in a Closure (which is common practise in Cloud Haskell anyway) and avoid the issues altogether.

Note that it is possible to work around this issue, sort of. There are two places where I cheat and use STM to communicate with intra-node peer processes, but obviously this isn't a good idea for the Supervisor module, since you end up with a supervisor that cannot support remote clients. Worse than that, if the API supported this notion (using STM) for local clients but not remote ones, then there's no type safe way to enforce the notion that remote clients can't use your supervisor if they're supplying a Process () thunk using STM, and yet a runtime failure of the supervisor would ensue should they try.

@hyperthunk
Copy link
Member Author

Here are some notable examples of using STM to communicate with intra-node (but not truly remote) processes. They can be useful for coordinating with code that runs outside the Process monad..

The first is in distributed-process-client-server. This code allows us to wrap an input handler for an arbitrary STM action, which will be executed in the server loop along with all the inbound message handlers supplied in the server definition.

The salient bits of the documentation start here. A few snippets of code:

demoExternal = do
  inChan <- liftIO newTQueueIO
  replyQ <- liftIO newTQueueIO
  let procDef = statelessProcess {
                    apiHandlers = [
                      handleExternal
                        (readTQueue inChan)
                        (\s (m :: String) -> do
                            liftIO $ atomically $ writeTQueue replyQ m
                            continue s)
                    ]
                    }
  let txt = "hello 2-way stm foo"
  pid <- spawnLocal $ serve () (statelessInit Infinity) procDef
  echoTxt <- liftIO $ do
    -- firstly we write something that the server can receive
    atomically $ writeTQueue inChan txt
    -- then sit and wait for it to write something back to us
    atomically $ readTQueue replyQ

  say (show $ echoTxt == txt)

and

data StmServer = StmServer { serverPid  :: ProcessId
                           , writerChan :: TQueue String
                           , readerChan :: TQueue String
                           }

instance Resolvable StmServer where
  resolve = return . Just . serverPid

echoStm :: StmServer -> String -> Process (Either ExitReason String)
echoStm StmServer{..} = callSTM serverPid
                                (writeTQueue writerChan)
                                (readTQueue  readerChan)

launchEchoServer :: CallHandler () String String -> Process StmServer
launchEchoServer handler = do
  (inQ, replyQ) <- liftIO $ do
    cIn <- newTQueueIO
    cOut <- newTQueueIO
    return (cIn, cOut)

  let procDef = statelessProcess {
                  apiHandlers = [
                    handleCallExternal
                      (readTQueue inQ)
                      (writeTQueue replyQ)
                      handler
                  ]
                }

  pid <- spawnLocal $ serve () (statelessInit Infinity) procDef
  return $ StmServer pid inQ replyQ

testExternalCall :: TestResult Bool -> Process ()
testExternalCall result = do
  let txt = "hello stm-call foo"
  srv <- launchEchoServer (\st (msg :: String) -> reply msg st)
  echoStm srv txt >>= stash result . (== Right txt)

And finally we've got this, which uses the PCopy facility from distributed-process-extras to cheat similarly when dealing with a simple broadcast exchange pattern. The salient code checks to see if our caller is on the same node as us, and reverts to using STM and zero copy passing data. This is an optimisation only, since copying is rather expensive in Cloud Haskell currently. Also note that this means that the broadcast exchange process will not be well behaved in a supervision tree if it's not wrapped by an initialisation process, since any client STM handles could potentially leak (I think) and certainly won't be valid after it dies, though actually I would hope the distributed-process-client-server implementation would catch that and deal with it properly. (Note that configureExchange is and API layered on top of distributed-process-client-server).

broadcastClient :: Exchange -> Process (InputStream Message)
broadcastClient ex@Exchange{..} = do
  myNode <- getSelfNode
  us     <- getSelfPid
  if processNodeId pid == myNode -- see [note: pcopy]
     then do (sp, rp) <- newChan
             configureExchange ex $ pCopy (BindSTM us sp)
             mRef <- P.monitor pid
             P.finally (receiveWait [ matchChanP rp
                                    , handleServerFailure mRef ])
                       (P.unmonitor mRef)
     else do (sp, rp) <- newChan :: Process (Channel Message)
             configureExchange ex $ BindPort us sp
             mRef <- P.monitor pid
             P.finally (receiveWait [
                           match (\(_ :: BindOk)   -> return $ newInputStream $ Left rp)
                         , match (\(f :: BindFail) -> die f)
                         , handleServerFailure mRef
                         ])
                       (P.unmonitor mRef)

I believe that I used PCopy as an optimisation in distributed-process-registry somewhere too.

IIRC I was also considering using handleExternal in the https://github.com/haskell-distributed/distributed-process-task/blob/pool/src/Control/Distributed/Process/Task/Pool.hs implementation, or at least the managed process behind it (https://github.com/haskell-distributed/distributed-process-task/blob/pool/src/Control/Distributed/Process/Task/Pool/Internal/Process.hs).

@hyperthunk
Copy link
Member Author

So... Does anyone object to this issue being closed, since the offending code is no longer present?

@hyperthunk hyperthunk self-assigned this Nov 6, 2018
@mcfilib
Copy link

mcfilib commented Nov 6, 2018

@hyperthunk @tdietert thank you both for your replies - immensely helpful. I don't have any objections to this being closed (genuinely wrote closured there and corrected myself).

@hyperthunk thank you too for the offer of an irl. I'm in Cardiff at the minute and I think I have everything I need but I'll let you know if that changes.

@hyperthunk
Copy link
Member Author

Although this issue is now closed, please consider the proposal in haskell-distributed/distributed-process#400. I may implement this in a branch, once I've resolved the outstanding pull request and updating dependencies to support the latest stack ecosystem.

@hyperthunk
Copy link
Member Author

For the benefit of future readers, and @tdietert in particular, with the new static pointers capability, you can make your closures very easily...

awaitsRegistration :: Process ()
awaitsRegistration = do
  self <- getSelfPid
  nid <- expect :: Process NodeId
  runUntilRegistered nid self
  say $ regName ++ " registered to " ++ show self
  expect :: Process ()
  where
    runUntilRegistered nid us = do
      whereisRemoteAsync nid regName
      receiveWait [
          matchIf (\(WhereIsReply n (Just p)) -> n == regName && p == us)
                  (const $ return ())
        ]

regName :: String
regName = "testRegisterRemote"

awaitsRegStatic :: Static (Process ())
awaitsRegStatic = staticPtr $ static awaitsRegistration

awaitsRegClosure :: Closure (Process ())
awaitsRegClosure = staticClosure awaitsRegStatic

testRegistryMonitoring :: LocalNode -> NodeId -> Assertion
testRegistryMonitoring node nid =
  runProcess node $ do
    pid <- spawn nid awaitsRegClosure
    -- etc etc...

@mcfilib
Copy link

mcfilib commented Nov 26, 2018

@hyperthunk nice!

Where does this capability come from? How do we get our hands on it?

@hyperthunk
Copy link
Member Author

I'm going to be looking at rolling it into a release shortly. Various bits of distribute-process get an update to handle the static pointer support that's already available in distributed-static. :)

@kaizhang
Copy link

I'm going to be looking at rolling it into a release shortly. Various bits of distribute-process get an update to handle the static pointer support that's already available in distributed-static. :)

I'm eager to see this! Is there a separate issue to track its progress?

@hyperthunk
Copy link
Member Author

hyperthunk commented Jan 14, 2019 via email

@tdietert
Copy link
Contributor

For the benefit of future readers, and @tdietert in particular, with the new static pointers capability, you can make your closures very easily...

awaitsRegistration :: Process ()
awaitsRegistration = do
  self <- getSelfPid
  nid <- expect :: Process NodeId
  runUntilRegistered nid self
  say $ regName ++ " registered to " ++ show self
  expect :: Process ()
  where
    runUntilRegistered nid us = do
      whereisRemoteAsync nid regName
      receiveWait [
          matchIf (\(WhereIsReply n (Just p)) -> n == regName && p == us)
                  (const $ return ())
        ]

regName :: String
regName = "testRegisterRemote"

awaitsRegStatic :: Static (Process ())
awaitsRegStatic = staticPtr $ static awaitsRegistration

awaitsRegClosure :: Closure (Process ())
awaitsRegClosure = staticClosure awaitsRegStatic

testRegistryMonitoring :: LocalNode -> NodeId -> Assertion
testRegistryMonitoring node nid =
  runProcess node $ do
    pid <- spawn nid awaitsRegClosure
    -- etc etc...

@hyperthunk where does the static function in awaitsRegStatic come from?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants