Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Sniffer::next_packet non-blocking #62

Merged
merged 5 commits into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ tokio = { version = "1", features = [ "full" ] }
tokio-stream = "0.1"
warp = "0.3"
futures-util = "0.3"
pcap = "1.0.0"
pcap = { version = "1.0.0", features = ["capture-stream"]}
etherparse = "0.13.0"
clap = { version = "4", features = ["derive"] }

Expand Down
2 changes: 1 addition & 1 deletion client/src/streams/packets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl Packets {

pub fn id_count(&self) -> usize {
match self.packets.last_key_value() {
Some((id, _)) => id + 1,
Some((id, _)) => *id,
None => 0,
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ fn is_rtp(packet: &RtpPacket) -> bool {
if packet.version != 2 {
return false;
}
if let 72..=76 = packet.payload_type.id {
if let 72..=79 = packet.payload_type.id {
return false;
}
if packet.ssrc == 0 {
Expand Down
29 changes: 15 additions & 14 deletions src/cmd/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ impl Run {
let mut file_sniffers = get_sniffers(self.files, Sniffer::from_file);
let mut interface_sniffers = get_sniffers(self.interfaces, Sniffer::from_device);

if file_sniffers.is_empty() && interface_sniffers.is_empty() {
// TODO: use some pretty printing (colors, bold font etc.)
println!("Error: no valid sources were passed");
return;
}

let file_res = apply_filters(&mut file_sniffers, &self.capture);
let interface_res = apply_filters(&mut interface_sniffers, &live_filter);

Expand All @@ -46,8 +40,19 @@ impl Run {
return;
}

let sniffers: HashMap<_, _> = file_sniffers
.into_iter()
.chain(interface_sniffers)
.collect();

if sniffers.is_empty() {
// TODO: use some pretty printing (colors, bold font etc.)
println!("Error: no valid sources were passed");
return;
}

let address = SocketAddr::new(self.address, self.port);
server::run(interface_sniffers, file_sniffers, address).await;
server::run(sniffers, address).await;
}

fn create_capture_filter(&self) -> String {
Expand All @@ -66,10 +71,9 @@ impl Run {
}
}

fn get_sniffers<T, F>(mut sources: Vec<String>, get_sniffer: F) -> HashMap<String, Sniffer<T>>
fn get_sniffers<F>(mut sources: Vec<String>, get_sniffer: F) -> HashMap<String, Sniffer>
where
T: pcap::Activated,
F: Fn(&str) -> Result<Sniffer<T>, Error>,
F: Fn(&str) -> Result<Sniffer, Error>,
{
sources.sort_unstable();
sources.dedup();
Expand All @@ -88,10 +92,7 @@ where
.collect()
}

fn apply_filters<T>(sniffers: &mut HashMap<String, Sniffer<T>>, filter: &str) -> Result<(), Error>
where
T: pcap::Activated,
{
fn apply_filters(sniffers: &mut HashMap<String, Sniffer>, filter: &str) -> Result<(), Error> {
for (_, sniffer) in sniffers.iter_mut() {
if let err @ Err(_) = sniffer.apply_filter(filter) {
return err;
Expand Down
71 changes: 23 additions & 48 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::sync::{mpsc, mpsc::Sender, RwLock};
use tokio::sync::{mpsc, mpsc::UnboundedSender, RwLock};
use warp::ws::{Message, WebSocket};
use warp::Filter;

Expand All @@ -22,12 +22,12 @@ const WS_PATH: &str = "ws";
static NEXT_CLIENT_ID: AtomicUsize = AtomicUsize::new(1);

struct Client {
pub sender: mpsc::Sender<Message>,
pub sender: mpsc::UnboundedSender<Message>,
pub source: Option<Source>,
}

impl Client {
pub fn new(sender: mpsc::Sender<Message>) -> Self {
pub fn new(sender: mpsc::UnboundedSender<Message>) -> Self {
Self {
sender,
source: None,
Expand All @@ -39,30 +39,14 @@ type Clients = Arc<RwLock<HashMap<usize, Client>>>;
type Packets = Arc<RwLock<Vec<Response>>>;
type PacketsMap = Arc<HashMap<Source, Packets>>;

pub async fn run(
interface_sniffers: HashMap<String, Sniffer<pcap::Active>>,
file_sniffers: HashMap<String, Sniffer<pcap::Offline>>,
addr: SocketAddr,
) {
pub async fn run(sniffers: HashMap<String, Sniffer>, addr: SocketAddr) {
let clients = Clients::default();
let mut source_to_packets = HashMap::new();

// a bit of repetition, but Rust bested me this time
for (file, sniffer) in file_sniffers {
for (_file, sniffer) in sniffers {
let packets = Packets::default();
let source = Source::File(file);
source_to_packets.insert(source, packets.clone());

let cloned_clients = clients.clone();
tokio::task::spawn(async move {
sniff(sniffer, packets, cloned_clients).await;
});
}

for (interface, sniffer) in interface_sniffers {
let packets = Packets::default();
let source = Source::Interface(interface);
source_to_packets.insert(source, packets.clone());
source_to_packets.insert(sniffer.source.clone(), packets.clone());

let cloned_clients = clients.clone();
tokio::task::spawn(async move {
Expand Down Expand Up @@ -97,11 +81,7 @@ async fn client_connected(ws: WebSocket, clients: Clients, source_to_packets: Pa

send_pcap_filenames(&client_id, &mut ws_tx, &source_to_packets).await;

// FIXME: something is very wrong here
// if buffer size is > 1, rx.recv always waits
// until channel's buffer is full before receiving
// might be because of blocking sniffers
let (tx, mut rx) = mpsc::channel(1);
let (tx, mut rx) = mpsc::unbounded_channel();

tokio::task::spawn(async move {
while let Some(message) = rx.recv().await {
Expand Down Expand Up @@ -144,8 +124,8 @@ async fn send_pcap_filenames(
.await;
}

async fn sniff<T: pcap::Activated>(mut sniffer: Sniffer<T>, packets: Packets, clients: Clients) {
while let Some(result) = sniffer.next_packet() {
async fn sniff(mut sniffer: Sniffer, packets: Packets, clients: Clients) {
while let Some(result) = sniffer.next_packet().await {
match result {
Ok(mut pack) => {
pack.guess_payload();
Expand All @@ -162,12 +142,9 @@ async fn sniff<T: pcap::Activated>(mut sniffer: Sniffer<T>, packets: Packets, cl
source: Some(source),
sender,
} if *source == sniffer.source => {
sender
.send(msg.clone())
.unwrap_or_else(|e| {
error!("Sniffer: error while sending packet: {}", e);
})
.await;
sender.send(msg.clone()).unwrap_or_else(|e| {
error!("Sniffer: error while sending packet: {}", e);
});
}
_ => {}
}
Expand All @@ -179,19 +156,20 @@ async fn sniff<T: pcap::Activated>(mut sniffer: Sniffer<T>, packets: Packets, cl
}
}

async fn send_all_packets(client_id: usize, packets: &Packets, ws_tx: &mut Sender<Message>) {
async fn send_all_packets(
client_id: usize,
packets: &Packets,
ws_tx: &mut UnboundedSender<Message>,
) {
for pack in packets.read().await.iter() {
let Ok(encoded) = pack.encode() else {
error!("Failed to encode packet, client_id: {}", client_id);
continue;
};
let msg = Message::binary(encoded);
ws_tx
.send(msg)
.unwrap_or_else(|e| {
error!("WebSocket `feed` error: {}, client_id: {}", e, client_id);
})
.await;
ws_tx.send(msg).unwrap_or_else(|e| {
error!("WebSocket `feed` error: {}, client_id: {}", e, client_id);
});
}

info!(
Expand Down Expand Up @@ -233,12 +211,9 @@ async fn reparse_packet(
source: Some(source),
sender,
} if *source == *cur_source => {
sender
.send(msg.clone())
.unwrap_or_else(|e| {
error!("Sniffer: error while sending packet: {}", e);
})
.await;
sender.send(msg.clone()).unwrap_or_else(|e| {
error!("Sniffer: error while sending packet: {}", e);
});
}
_ => {}
};
Expand Down
Loading