Skip to content

Commit

Permalink
file: end streamer after EOS
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Sep 15, 2023
1 parent 25d33cf commit 84a7e67
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 9 deletions.
31 changes: 24 additions & 7 deletions sea-streamer-file/src/consumer/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct Subscribers {
prefetch_message: usize,
}

#[derive(Default)]
struct SubscriberMap {
senders: HashMap<Sid, Sender<Result<SharedMessage, FileErr>>>,
groups: Vec<((ConsumerGroup, StreamKey), Vec<Sid>)>,
Expand All @@ -82,7 +83,7 @@ impl Streamers {
match ctrl {
BgTask::Drop(sid) => {
let mut streamers = STREAMERS.lock().await;
streamers.remove(sid);
streamers.remove_subscriber(sid);
}
}
}
Expand Down Expand Up @@ -159,7 +160,18 @@ impl Streamers {
))
}

fn remove(&mut self, sid: Sid) {
/// Returns number of purged streamers
fn purge(&mut self, file_id: FileId) -> usize {
if let Some(handles) = self.streamers.get_mut(&file_id) {
let size = handles.len();
handles.retain(|(_, h)| !h.ctrl.is_disconnected());
size - handles.len()
} else {
0
}
}

fn remove_subscriber(&mut self, sid: Sid) {
for (_, handles) in self.streamers.iter_mut() {
for (_, handle) in handles.iter_mut() {
handle.subscribers.remove(sid);
Expand Down Expand Up @@ -241,6 +253,11 @@ pub(crate) async fn preseek_consumer(file_id: &FileId, sid: Sid) -> Result<(), F
streamers.pre_seek(file_id, sid).await
}

async fn end_streamer(file_id: FileId) -> usize {
let mut streamers = STREAMERS.lock().await;
streamers.purge(file_id)
}

/// Query info about global Streamer(s) topology
pub async fn query_streamer(file_id: &FileId) -> Option<Vec<StreamerInfo>> {
let streamers = STREAMERS.lock().await;
Expand Down Expand Up @@ -316,6 +333,10 @@ impl Streamer {
}
}
}
std::mem::drop(ctrl); // disconnect ctrl; so that it can be purged
let source = source.take_source();
let file = source.end().await;
assert_eq!(end_streamer(file.id()).await, 1);
});

Self {
Expand All @@ -329,11 +350,7 @@ impl Streamer {
impl Subscribers {
fn new(prefetch_message: usize) -> Self {
Self {
subscribers: Arc::new(Mutex::new(SubscriberMap {
senders: Default::default(),
groups: Default::default(),
ungrouped: Default::default(),
})),
subscribers: Default::default(),
prefetch_message,
}
}
Expand Down
2 changes: 2 additions & 0 deletions sea-streamer-file/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ impl ConsumerTrait for FileConsumer {
/// Otherwise it will await the next message.
fn next(&self) -> Self::NextFuture<'_> {
if let Err(TrySendError::Disconnected(_)) = self.ctrl.try_send(CtrlMsg::Read) {
// race: there is a possibility that *after* we enter the receiver future
// ctrl disconnect immediately. it will manifest in the StreamEnded below.
NextFuture::Error(Some(StreamErr::Backend(FileErr::StreamEnded)))
} else {
NextFuture::Future(self.receiver.recv_async())
Expand Down
18 changes: 16 additions & 2 deletions sea-streamer-file/src/dyn_file.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{future::Future, pin::Pin};

use crate::{
ByteSource, Bytes, FileErr, FileId, FileReader, FileReaderFuture, FileSource, FileSourceFuture,
ReadFrom,
AsyncFile, ByteSource, Bytes, FileErr, FileId, FileReader, FileReaderFuture, FileSource,
FileSourceFuture, ReadFrom,
};
use sea_streamer_types::{export::futures::FutureExt, SeqPos};

Expand Down Expand Up @@ -75,6 +75,20 @@ impl DynFileSource {
}
}

pub async fn end(self) -> AsyncFile {
match self {
Self::Dead => panic!("DynFileSource: Dead"),
Self::FileReader(file) => {
let (file, _, _) = file.end();
file
}
Self::FileSource(mut src) => {
let (file, _, _, _) = src.end().await;
file
}
}
}

#[inline]
pub fn offset(&self) -> u64 {
match self {
Expand Down

0 comments on commit 84a7e67

Please sign in to comment.