Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream pipeline correctly connected by take! hangs #147

Open
lxsameer opened this issue Oct 9, 2017 · 6 comments
Open

Stream pipeline correctly connected by take! hangs #147

lxsameer opened this issue Oct 9, 2017 · 6 comments

Comments

@lxsameer
Copy link

lxsameer commented Oct 9, 2017

Hi
First of all kudos.
I have several components which everyone of them has an input stream and an output stream.
I connected my input and output of components like:

input of component A connected to output of component A.
output of component A connected to input of component B.
input of B connected to output of B
output of B connected to input of C
and finally input of C connected to output of C.

I confirmed this by walking the pipeline using downstream function. here is the output of the walk:

STREAM: manifold.stream.default.Stream@2db12c5b  ->  manifold.stream.default.Stream@5572f9d1

STREAM: manifold.stream.default.Stream@5572f9d1  ->  manifold.stream.default.Stream@656ed510

STREAM: manifold.stream.default.Stream@656ed510  ->  manifold.stream.default.Stream@8d5cdb0

STREAM: manifold.stream.default.Stream@8d5cdb0  ->  manifold.stream.default.Stream@46a7703a

STREAM: manifold.stream.default.Stream@46a7703a  ->  manifold.stream.default.Stream@60399d13

But the problem is that when I put! something in any of these streams, and try-take! the value in either the stream itself or the downstream, take! hangs out and try-take! returns the timeout value. But the put! return value derefs to true.

My component functions are fairly simple:

  (fn [component]
    ;; cpmponent is a map.
    (let [input  (hcomp/input component)
          output (hcomp/output component)]
      (println "INPUT: " (str input))
      (println "OUTPUT: " (str output))
      (stream/connect input output)
      component))

If i remove the call to connect function in the function ( not create a pipeline ) I can take a value from the same stream which i put the value in.

NOTE: I tried to debug this issue in latest manifold version. I found out that the producers in https://github.com/ztellman/manifold/blob/master/src/manifold/stream/default.clj#L204 is empty for the downstream stream. My guess is that put method of stream returns true but it does not actually put the value in the producers. But i didn't confirm this guess.

@ztellman
Copy link
Collaborator

I just tried to reproduce your issue based on your description:

> (def a (s/stream))

> (def b (s/stream))

> (def c (s/stream))

> (s/put-all! a [1 2 3])
<< … >>

> (s/connect a b)
nil

> (s/connect b c)
nil

> (->> c s/stream->seq (take 3))
(1 2 3)

This seems to work as expected. Can you expand on how my example differs from what you're doing?

@lxsameer
Copy link
Author

Thanks for the quick response. Some friend in clojurians pointed out to me that once I connected several streams together. I only can consume messages from the final output. So in my example I only can consume from C's output which works just fine.
So it was my bad and my misunderstanding 🙏 .
But it would be a good idea to update a docs with a note about this.

@dm3
Copy link
Contributor

dm3 commented Oct 10, 2017

Hey, I was the one who answered the question.

@lxsameer - would be interesting to hear how you thought the streams worked and what your initial design was based on your mental model. Probably you aren't the only one making the same assumptions. Will be easier to clarify the documentation after we clear this up.

@lxsameer
Copy link
Author

I read the docs on aleph.io many times and my assumption was that when you connect a series of streams to each other, you still can inspect each stream along the way. For example if I connect A -> B -> C. Then by putting some value in A I should be able to take that from B as well without effecting the pipeline ( I still should be able to get it from C too ). Because in my use case I connect the components input and output based on a workflow graph dynamically.

@ztellman
Copy link
Collaborator

If you connect B to multiple streams, the messages will be sent to both, but all operators in manifold.stream operate via side effects. If I'm understanding what you're saying, you seem to be expecting them to work like Clojure's seqs, which don't disappear when transformed elsewhere.

If things did work this way, then each stream in your software would effectively hold onto every message forever, eventually leading to a memory leak. This happens fairly often with large lazy sequences, where you mistakenly hold onto the head of the sequence. This is why Manifold mimics the semantics of core.async, Java queues, and other similar abstractions by removing messages once they're consumed.

@lxsameer
Copy link
Author

Thanks @ztellman @dm3 .
I totally understand the flow right now. But I suggest to add a small note to the docs to help future users understand the concept better and don't make the same mistake as I did.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants