diff --git a/sea-streamer-file/src/consumer/group.rs b/sea-streamer-file/src/consumer/group.rs index f905827..288d876 100644 --- a/sea-streamer-file/src/consumer/group.rs +++ b/sea-streamer-file/src/consumer/group.rs @@ -69,6 +69,7 @@ struct Subscribers { prefetch_message: usize, } +#[derive(Default)] struct SubscriberMap { senders: HashMap>>, groups: Vec<((ConsumerGroup, StreamKey), Vec)>, @@ -82,7 +83,7 @@ impl Streamers { match ctrl { BgTask::Drop(sid) => { let mut streamers = STREAMERS.lock().await; - streamers.remove(sid); + streamers.remove_subscriber(sid); } } } @@ -159,7 +160,18 @@ impl Streamers { )) } - fn remove(&mut self, sid: Sid) { + /// Returns number of purged streamers + fn purge(&mut self, file_id: FileId) -> usize { + if let Some(handles) = self.streamers.get_mut(&file_id) { + let size = handles.len(); + handles.retain(|(_, h)| !h.ctrl.is_disconnected()); + size - handles.len() + } else { + 0 + } + } + + fn remove_subscriber(&mut self, sid: Sid) { for (_, handles) in self.streamers.iter_mut() { for (_, handle) in handles.iter_mut() { handle.subscribers.remove(sid); @@ -241,6 +253,11 @@ pub(crate) async fn preseek_consumer(file_id: &FileId, sid: Sid) -> Result<(), F streamers.pre_seek(file_id, sid).await } +async fn end_streamer(file_id: FileId) -> usize { + let mut streamers = STREAMERS.lock().await; + streamers.purge(file_id) +} + /// Query info about global Streamer(s) topology pub async fn query_streamer(file_id: &FileId) -> Option> { let streamers = STREAMERS.lock().await; @@ -316,6 +333,10 @@ impl Streamer { } } } + std::mem::drop(ctrl); // disconnect ctrl; so that it can be purged + let source = source.take_source(); + let file = source.end().await; + assert_eq!(end_streamer(file.id()).await, 1); }); Self { @@ -329,11 +350,7 @@ impl Streamer { impl Subscribers { fn new(prefetch_message: usize) -> Self { Self { - subscribers: Arc::new(Mutex::new(SubscriberMap { - senders: Default::default(), - groups: Default::default(), - ungrouped: Default::default(), - })), + subscribers: Default::default(), prefetch_message, } } diff --git a/sea-streamer-file/src/consumer/mod.rs b/sea-streamer-file/src/consumer/mod.rs index ef12273..c1a4996 100644 --- a/sea-streamer-file/src/consumer/mod.rs +++ b/sea-streamer-file/src/consumer/mod.rs @@ -113,6 +113,8 @@ impl ConsumerTrait for FileConsumer { /// Otherwise it will await the next message. fn next(&self) -> Self::NextFuture<'_> { if let Err(TrySendError::Disconnected(_)) = self.ctrl.try_send(CtrlMsg::Read) { + // race: there is a possibility that *after* we enter the receiver future + // ctrl disconnect immediately. it will manifest in the StreamEnded below. NextFuture::Error(Some(StreamErr::Backend(FileErr::StreamEnded))) } else { NextFuture::Future(self.receiver.recv_async()) diff --git a/sea-streamer-file/src/dyn_file.rs b/sea-streamer-file/src/dyn_file.rs index 9d4e883..e51cfe8 100644 --- a/sea-streamer-file/src/dyn_file.rs +++ b/sea-streamer-file/src/dyn_file.rs @@ -1,8 +1,8 @@ use std::{future::Future, pin::Pin}; use crate::{ - ByteSource, Bytes, FileErr, FileId, FileReader, FileReaderFuture, FileSource, FileSourceFuture, - ReadFrom, + AsyncFile, ByteSource, Bytes, FileErr, FileId, FileReader, FileReaderFuture, FileSource, + FileSourceFuture, ReadFrom, }; use sea_streamer_types::{export::futures::FutureExt, SeqPos}; @@ -75,6 +75,20 @@ impl DynFileSource { } } + pub async fn end(self) -> AsyncFile { + match self { + Self::Dead => panic!("DynFileSource: Dead"), + Self::FileReader(file) => { + let (file, _, _) = file.end(); + file + } + Self::FileSource(mut src) => { + let (file, _, _, _) = src.end().await; + file + } + } + } + #[inline] pub fn offset(&self) -> u64 { match self {