-
I've got this stream where we unzip files from a incoming Stream: This works archive
//unzip
.through(Archive.unzip(blocker))
.parEvalMapUnordered(5) { entry =>
entry.body.through(store.put(Path(s"import/$id/${entry.name}"))).compile.drain.as(entry.name)
}
.compile
.toList While this doesn't archive
//unzip
.through(Archive.unzip(blocker))
.map(entry => entry.body.through(store.put(Path(s"import/$id/${entry.name}"))).as(entry.name))
.parJoin(5)
.compile
.toList With the latter, it uploads the file to the blob store, but it doesn't yield the |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
Check the implementation of |
Beta Was this translation helpful? Give feedback.
-
Ah that's correct :) My guess that |
Beta Was this translation helpful? Give feedback.
Check the implementation of
store.put
-- I'm guessing it doesn't emit anything. E.g.,def put(...): Stream[F, Unit] = Stream.eval_(takeAction(...))
emits no output.