diff --git a/Cargo.lock b/Cargo.lock index 916b426..2ad92f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -328,6 +328,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.29" @@ -344,6 +359,23 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +[[package]] +name = "futures-executor" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" + [[package]] name = "futures-macro" version = "0.3.29" @@ -373,10 +405,13 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -754,10 +789,12 @@ checksum = "cbaa01d616eb84eb35cd085fdeaa8671dc8d951bdc4a75bfc414466e76b039ce" dependencies = [ "bitflags", "errno", + "futures", "libc", "libloading", "pkg-config", "regex", + "tokio", "windows-sys 0.36.1", ] @@ -962,6 +999,7 @@ dependencies = [ "rtcp", "rtp", "serde", + "webrtc-sdp", "webrtc-util 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1392,6 +1430,16 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "webrtc-sdp" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b27cfe685c697666a76932399d55026bf2a5a205d91d277fd16346f0c65a7c06" +dependencies = [ + "log", + "url", +] + [[package]] name = "webrtc-util" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 2979349..80d7cc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ tokio = { version = "1", features = [ "full" ] } tokio-stream = "0.1" warp = "0.3" futures-util = "0.3" -pcap = "1.0.0" +pcap = { version = "1.0.0", features = ["capture-stream"]} etherparse = "0.13.0" clap = { version = "4", features = ["derive"] } diff --git a/client/Cargo.lock b/client/Cargo.lock index fb4068c..ff239bb 100644 --- a/client/Cargo.lock +++ b/client/Cargo.lock @@ -1724,6 +1724,7 @@ dependencies = [ "rtcp", "rtp", "serde", + "webrtc-sdp", "webrtc-util 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2335,6 +2336,16 @@ dependencies = [ "web-sys", ] +[[package]] +name = "webrtc-sdp" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b27cfe685c697666a76932399d55026bf2a5a205d91d277fd16346f0c65a7c06" +dependencies = [ + "log", + "url", +] + [[package]] name = "webrtc-util" version = "0.8.0" diff --git a/client/src/app.rs b/client/src/app.rs index 0861840..5f2d520 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -4,7 +4,7 @@ use eframe::egui; use egui::{ComboBox, Ui}; use ewebsock::{WsEvent, WsMessage, WsReceiver, WsSender}; use log::{error, warn}; -use rtpeeker_common::{Request, Response, Source}; +use rtpeeker_common::{Request, Response, Source, StreamKey}; use packets_table::PacketsTable; use rtcp_packets_table::RtcpPacketsTable; @@ -12,7 +12,7 @@ use rtp_packets_table::RtpPacketsTable; use rtp_streams_table::RtpStreamsTable; use tab::Tab; -use crate::streams::{RefStreams, StreamKey}; +use crate::streams::RefStreams; use rtp_streams_plot::RtpStreamsPlot; mod packets_table; @@ -79,7 +79,7 @@ impl App { let packets_table = PacketsTable::new(streams.clone(), ws_sender.clone()); let rtp_packets_table = RtpPacketsTable::new(streams.clone()); let rtcp_packets_table = RtcpPacketsTable::new(streams.clone()); - let rtp_streams_table = RtpStreamsTable::new(streams.clone()); + let rtp_streams_table = RtpStreamsTable::new(streams.clone(), ws_sender.clone()); let rtp_streams_plot = RtpStreamsPlot::new(streams.clone()); let (tab, selected_source) = get_initial_state(cc); @@ -244,7 +244,7 @@ impl App { }; let Ok(response) = Response::decode(&msg) else { - error!("Failed to decode request message"); + error!("Failed to decode response message"); continue; }; @@ -263,6 +263,12 @@ impl App { } self.sources = sources; } + Response::Sdp(stream_key, sdp) => { + let mut streams = self.streams.borrow_mut(); + if let Some(stream) = streams.streams.get_mut(&stream_key) { + stream.add_sdp(sdp); + } + } } } } diff --git a/client/src/app/rtp_packets_table.rs b/client/src/app/rtp_packets_table.rs index 80f5337..3cab47f 100644 --- a/client/src/app/rtp_packets_table.rs +++ b/client/src/app/rtp_packets_table.rs @@ -1,9 +1,10 @@ use super::is_stream_visible; -use crate::streams::{RefStreams, StreamKey}; +use crate::streams::RefStreams; use eframe::epaint::Color32; use egui::RichText; use egui_extras::{Column, TableBody, TableBuilder}; use rtpeeker_common::packet::SessionPacket; +use rtpeeker_common::StreamKey; use std::collections::HashMap; pub struct RtpPacketsTable { @@ -166,17 +167,7 @@ impl RtpPacketsTable { resp.on_hover_text(rtp_packet.payload_type.to_string()); row.col(|ui| { - // if rtp_packet.previous_packet_is_lost { - // let resp = ui.label( - // RichText::from(format!("{} ⚠", rtp_packet.sequence_number)) - // .color(Color32::GOLD), - // ); - // resp.on_hover_text( - // RichText::from("Previous packet is lost!").color(Color32::GOLD), - // ); - // } else { ui.label(rtp_packet.sequence_number.to_string()); - // } }); row.col(|ui| { diff --git a/client/src/app/rtp_streams_plot.rs b/client/src/app/rtp_streams_plot.rs index fa6c7d4..e236d36 100644 --- a/client/src/app/rtp_streams_plot.rs +++ b/client/src/app/rtp_streams_plot.rs @@ -1,16 +1,19 @@ use self::SettingsXAxis::*; use super::is_stream_visible; use crate::streams::stream::{RtpInfo, Stream}; -use crate::streams::{RefStreams, StreamKey, Streams}; +use crate::streams::{RefStreams, Streams}; use eframe::egui; use eframe::egui::TextBuffer; use eframe::epaint::Color32; -use egui::plot::{Line, MarkerShape, Plot, PlotBounds, PlotPoints, PlotUi, Points}; -use egui::RichText; +use egui::plot::{ + Line, LineStyle, MarkerShape, Plot, PlotBounds, PlotPoint, PlotPoints, PlotUi, Points, Text, +}; use egui::Ui; +use egui::{Align2, RichText}; use rtpeeker_common::packet::SessionPacket; use rtpeeker_common::rtcp::ReceptionReport; use rtpeeker_common::rtp::payload_type::MediaType; +use rtpeeker_common::StreamKey; use rtpeeker_common::{Packet, RtcpPacket, RtpPacket}; use std::cell::Ref; use std::collections::HashMap; @@ -27,6 +30,18 @@ struct PointData { marker_shape: MarkerShape, } +struct StreamSeperatorLine { + x_start: f64, + x_end: f64, + y: f64, +} + +struct StreamText { + x: f64, + y: f64, + on_hover: String, +} + #[derive(Debug, PartialEq, Copy, Clone)] enum SettingsXAxis { RtpTimestamp, @@ -55,14 +70,16 @@ impl Display for SettingsXAxis { pub struct RtpStreamsPlot { streams: RefStreams, points_data: Vec, + stream_separator_lines: Vec, + stream_texts: Vec, x_axis: SettingsXAxis, requires_reset: bool, streams_visibility: HashMap, last_rtp_packets_len: usize, set_plot_bounds: bool, slider_max: i64, - slider_current_min: i64, - slider_current_max: i64, + slider_start: i64, + slider_length: i64, first_draw: bool, } @@ -71,14 +88,16 @@ impl RtpStreamsPlot { Self { streams, points_data: Vec::new(), + stream_separator_lines: Vec::new(), + stream_texts: Vec::new(), x_axis: RtpTimestamp, requires_reset: false, streams_visibility: HashMap::default(), last_rtp_packets_len: 0, set_plot_bounds: false, slider_max: 10000, - slider_current_min: 0, - slider_current_max: 1, + slider_start: 0, + slider_length: 1, first_draw: true, } } @@ -158,16 +177,16 @@ impl RtpStreamsPlot { let set_plot_button_clicked = ui.button("Set plot bounds").clicked(); let (x_min_text, x_max_text) = match self.x_axis { - RtpTimestamp => ("First RTP timestamp", "Last RTP timestamp"), - RawTimestamp => ("First second", "Last second"), - SequenceNumer => ("First sequence number", "Last sequence number"), + RtpTimestamp => ("First RTP timestamp", "Length"), + RawTimestamp => ("First second", "Length"), + SequenceNumer => ("First sequence number", "Length"), }; let max = (self.slider_max as f64 * 1.13) as i64; let x_min_resp = - ui.add(egui::Slider::new(&mut self.slider_current_min, 0..=max).text(x_min_text)); + ui.add(egui::Slider::new(&mut self.slider_start, 0..=max).text(x_min_text)); let x_max_resp = - ui.add(egui::Slider::new(&mut self.slider_current_max, 1..=max).text(x_max_text)); + ui.add(egui::Slider::new(&mut self.slider_length, 1..=max).text(x_max_text)); if set_plot_button_clicked | x_min_resp.dragged() | x_max_resp.dragged() { self.set_plot_bounds = true @@ -209,8 +228,8 @@ impl RtpStreamsPlot { { self.x_axis = setting; self.slider_max = 1; - self.slider_current_max = 1; - self.slider_current_min = 0; + self.slider_length = 1; + self.slider_start = 0; self.requires_reset = true; } }); @@ -289,10 +308,37 @@ impl RtpStreamsPlot { ); } } + for separator in &self.stream_separator_lines { + let StreamSeperatorLine { x_start, x_end, y } = separator; + plot_ui.line( + Line::new(PlotPoints::new(vec![[*x_start, *y], [*x_end, *y]])) + .color(Color32::GRAY) + .style(LineStyle::Solid) + .width(0.5), + ); + } + + for text in &self.stream_texts { + let StreamText { x, y, on_hover } = text; + plot_ui.text( + Text::new( + PlotPoint { x: *x, y: *y }, + RichText::new(on_hover) + .color(Color32::LIGHT_GRAY) + .strong() + .size(12.0), + ) + .anchor(Align2::RIGHT_TOP), + ) + } + if !self.first_draw && self.set_plot_bounds { plot_ui.set_plot_bounds(PlotBounds::from_min_max( - [(self.slider_current_min as f64) - 0.05, -0.5], - [self.slider_current_max as f64, heighest_y * 1.55], + [(self.slider_start as f64) - 0.05, -0.5], + [ + (self.slider_start + self.slider_length) as f64, + heighest_y * 1.55, + ], )); self.set_plot_bounds = false } @@ -303,6 +349,8 @@ impl RtpStreamsPlot { fn refresh_points(&mut self) { self.points_data.clear(); + self.stream_separator_lines.clear(); + self.stream_texts.clear(); let streams = self.streams.borrow(); let mut points_x_and_y_top: Vec<(f64, f64)> = Vec::new(); let mut previous_stream_max_y = 0.0; @@ -312,6 +360,24 @@ impl RtpStreamsPlot { return; } + let this_stream_y_baseline = match self.x_axis { + RtpTimestamp => previous_stream_max_y + 90.0, + RawTimestamp => previous_stream_max_y + 20.0, + SequenceNumer => previous_stream_max_y + 20.0, + }; + self.stream_texts.push(StreamText { + x: 0.0, + y: this_stream_y_baseline, + on_hover: String::from(&format!("{} ({:x}) ", stream.alias, stream.ssrc)), + }); + if let Some((x, _)) = points_x_and_y_top.last() { + self.stream_separator_lines.push(StreamSeperatorLine { + x_start: 0.0, + x_end: *x, + y: (this_stream_y_baseline + previous_stream_max_y) / 2.0, + }) + }; + build_stream_points( &streams, &mut points_x_and_y_top, @@ -320,6 +386,7 @@ impl RtpStreamsPlot { &mut self.points_data, &mut previous_stream_max_y, &mut self.slider_max, + this_stream_y_baseline, ); }); } @@ -334,8 +401,8 @@ fn build_stream_points( points_data: &mut Vec, previous_stream_max_y: &mut f64, slider_max: &mut i64, + this_stream_y_baseline: f64, ) { - let this_stream_y_baseline = *previous_stream_max_y + 0.2 * *previous_stream_max_y; let rtp_packets = &stream.rtp_packets; let rtcp_packets = &stream.rtcp_packets; if rtp_packets.is_empty() { diff --git a/client/src/app/rtp_streams_table.rs b/client/src/app/rtp_streams_table.rs index 4999ad8..f4b3da7 100644 --- a/client/src/app/rtp_streams_table.rs +++ b/client/src/app/rtp_streams_table.rs @@ -1,22 +1,71 @@ +use crate::streams::{stream::Stream, RefStreams}; use egui::plot::{Line, Plot, PlotPoints}; use egui::{TextEdit, Vec2}; use egui_extras::{Column, TableBody, TableBuilder}; +use ewebsock::{WsMessage, WsSender}; +use rtpeeker_common::{Request, StreamKey}; -use crate::streams::{stream::Stream, RefStreams}; +const SDP_PROMPT: &str = "Paste your SDP media section here, e.g. +m=audio 5004 RTP/AVP 96 +c=IN IP4 239.30.22.1 +a=rtpmap:96 L24/48000/2 +a=recvonly +"; pub struct RtpStreamsTable { streams: RefStreams, + ws_sender: WsSender, + sdp_window_open: bool, + chosen_key: Option, + sdp: String, } impl RtpStreamsTable { - pub fn new(streams: RefStreams) -> Self { - Self { streams } + pub fn new(streams: RefStreams, ws_sender: WsSender) -> Self { + Self { + streams, + ws_sender, + sdp_window_open: false, + chosen_key: None, + sdp: String::new(), + } } pub fn ui(&mut self, ctx: &egui::Context) { egui::CentralPanel::default().show(ctx, |ui| { self.build_table(ui); }); + self.build_sdp_window(ctx); + } + + fn build_sdp_window(&mut self, ctx: &egui::Context) { + let Some((_, _, _, ssrc)) = self.chosen_key else { + return; + }; + + let mut send_sdp = false; + + egui::Window::new(format!("SDP - {:x}", ssrc)) + .open(&mut self.sdp_window_open) + .default_width(800.0) + .default_height(800.0) + .vscroll(true) + .show(ctx, |ui| { + TextEdit::multiline(&mut self.sdp) + .hint_text(SDP_PROMPT) + .desired_rows(30) + .desired_width(f32::INFINITY) + .show(ui); + ui.add_space(10.0); + if ui.button(format!("Set SDP for {:x}", ssrc)).clicked() { + send_sdp = true; + } + }); + + if send_sdp { + self.send_sdp_request(); + self.sdp_window_open = false; + } } fn build_table(&mut self, ui: &mut egui::Ui) { @@ -108,11 +157,32 @@ impl RtpStreamsTable { let packet_rate = stream.get_mean_packet_rate(); ui.label(format!("{:.3}", packet_rate)); }); - row.col(|ui| { + let (_, resp) = row.col(|ui| { build_jitter_plot(ui, stream); }); + + resp.context_menu(|ui| { + if ui.button("Set SDP").clicked() { + ui.close_menu(); + self.chosen_key = Some(*key); + self.sdp = String::new(); + self.sdp_window_open = true; + } + }); }); } + + fn send_sdp_request(&mut self) { + let request = Request::ParseSdp(self.chosen_key.unwrap(), self.sdp.clone()); + + let Ok(msg) = request.encode() else { + log::error!("Failed to encode a request message"); + return; + }; + let msg = WsMessage::Binary(msg); + + self.ws_sender.send(msg); + } } fn build_jitter_plot(ui: &mut egui::Ui, stream: &Stream) { diff --git a/client/src/streams.rs b/client/src/streams.rs index fe92da4..59b41de 100644 --- a/client/src/streams.rs +++ b/client/src/streams.rs @@ -1,5 +1,6 @@ use packets::Packets; use rtpeeker_common::packet::SessionPacket; +use rtpeeker_common::StreamKey; use rtpeeker_common::{packet::TransportProtocol, Packet, RtcpPacket}; use std::cell::RefCell; use std::collections::HashMap; @@ -11,7 +12,6 @@ mod packets; pub mod stream; pub type RefStreams = Rc>; -pub type StreamKey = (SocketAddr, SocketAddr, TransportProtocol, u32); #[derive(Debug, Default)] pub struct Streams { diff --git a/client/src/streams/packets.rs b/client/src/streams/packets.rs index 2c4543e..f31aa70 100644 --- a/client/src/streams/packets.rs +++ b/client/src/streams/packets.rs @@ -47,7 +47,7 @@ impl Packets { pub fn id_count(&self) -> usize { match self.packets.last_key_value() { - Some((id, _)) => id + 1, + Some((id, _)) => *id, None => 0, } } diff --git a/client/src/streams/stream.rs b/client/src/streams/stream.rs index e682258..9be4016 100644 --- a/client/src/streams/stream.rs +++ b/client/src/streams/stream.rs @@ -1,7 +1,8 @@ use crate::utils::ntp_to_f64; use rtpeeker_common::packet::TransportProtocol; use rtpeeker_common::rtcp::{source_description::SdesType, SourceDescription}; -use rtpeeker_common::{Packet, RtcpPacket, RtpPacket}; +use rtpeeker_common::rtp::payload_type::PayloadType; +use rtpeeker_common::{Packet, RtcpPacket, RtpPacket, Sdp}; use std::cmp::{max, min}; use std::net::SocketAddr; use std::time::Duration; @@ -55,6 +56,7 @@ pub struct Stream { last_sequence_number: u16, first_time: Duration, last_time: Duration, + sdp: Option, // ntp synchronization pub ntp_rtp: Option<(u64, u32)>, pub estimated_clock_rate: Option, @@ -92,11 +94,17 @@ impl Stream { last_sequence_number: rtp.sequence_number, first_time: packet.timestamp, last_time: packet.timestamp, + sdp: None, ntp_rtp: None, estimated_clock_rate: None, } } + pub fn add_sdp(&mut self, sdp: Sdp) { + self.sdp = Some(sdp); + self.recalculate(); + } + pub fn get_duration(&self) -> Duration { self.last_time.checked_sub(self.first_time).unwrap() } @@ -161,6 +169,22 @@ impl Stream { self.rtcp_packets.push(rtcp_info); } + fn recalculate(&mut self) { + let mut rtp_packets = std::mem::take(&mut self.rtp_packets).into_iter(); + let rtp_info = rtp_packets.next().unwrap(); + self.bytes = rtp_info.bytes; + self.max_jitter = 0.0; + self.sum_jitter = 0.0; + self.jitter_count = 0; + self.first_sequence_number = rtp_info.packet.sequence_number; + self.last_sequence_number = rtp_info.packet.sequence_number; + self.first_time = rtp_info.time; + self.last_time = rtp_info.time; + self.rtp_packets = vec![rtp_info]; + + rtp_packets.for_each(|rtp| self.update_rtp_parameters(rtp)); + } + fn update_rtp_parameters(&mut self, mut rtp_info: RtpInfo) { rtp_info.time_delta = rtp_info.time - self.rtp_packets.last().unwrap().time; @@ -184,8 +208,22 @@ impl Stream { // TODO } + fn get_payload_type(&self, rtp_info: &RtpInfo) -> PayloadType { + let id = &rtp_info.packet.payload_type.id; + + if let Some(sdp) = &self.sdp { + if let Some(pt) = sdp.payload_types.get(id) { + return pt.clone(); + } + }; + + rtp_info.packet.payload_type.clone() + } + fn update_jitter(&mut self, rtp_info: &mut RtpInfo) { - let Some(clock_rate) = rtp_info.packet.payload_type.clock_rate else { + let payload_type = self.get_payload_type(rtp_info); + + let Some(clock_rate) = payload_type.clock_rate else { return; }; diff --git a/common/Cargo.lock b/common/Cargo.lock index 873baeb..a707c00 100644 --- a/common/Cargo.lock +++ b/common/Cargo.lock @@ -130,6 +130,15 @@ dependencies = [ "arrayvec", ] +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + [[package]] name = "getrandom" version = "0.2.10" @@ -153,6 +162,16 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +[[package]] +name = "idna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -302,6 +321,12 @@ dependencies = [ "windows-sys 0.36.1", ] +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -445,6 +470,7 @@ dependencies = [ "rtcp", "rtp", "serde", + "webrtc-sdp", "webrtc-util 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -536,6 +562,21 @@ dependencies = [ "syn", ] +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.33.0" @@ -566,18 +607,54 @@ dependencies = [ "syn", ] +[[package]] +name = "unicode-bidi" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" + [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "unicode-normalization" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "url" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "webrtc-sdp" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b27cfe685c697666a76932399d55026bf2a5a205d91d277fd16346f0c65a7c06" +dependencies = [ + "log", + "url", +] + [[package]] name = "webrtc-util" version = "0.8.0" diff --git a/common/Cargo.toml b/common/Cargo.toml index 07d0df1..e38aab0 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -12,4 +12,5 @@ pcap = "1.0.0" etherparse = "0.13.0" rtp = "0.9" rtcp = { git = "https://github.com/LVala/webrtc.git" } +webrtc-sdp = "0.3" webrtc-util = "0.8.0" diff --git a/common/src/lib.rs b/common/src/lib.rs index 507bf34..5b3ec0b 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,13 +1,19 @@ +use packet::TransportProtocol; use serde::{Deserialize, Serialize}; use std::fmt; +use std::net::SocketAddr; pub use crate::rtcp::RtcpPacket; pub use crate::rtp::RtpPacket; pub use packet::Packet; +pub use sdp::Sdp; pub mod packet; pub mod rtcp; pub mod rtp; +pub mod sdp; + +pub type StreamKey = (SocketAddr, SocketAddr, TransportProtocol, u32); #[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq)] pub enum Source { @@ -49,12 +55,14 @@ pub enum Request { FetchAll, Reparse(usize, packet::SessionProtocol), ChangeSource(Source), + ParseSdp(StreamKey, String), } #[derive(Serialize, Deserialize, Debug)] pub enum Response { Packet(Packet), Sources(Vec), + Sdp(StreamKey, Sdp), } impl Request { diff --git a/common/src/packet.rs b/common/src/packet.rs index b5a409d..9833d8c 100644 --- a/common/src/packet.rs +++ b/common/src/packet.rs @@ -1,5 +1,4 @@ use super::{RtcpPacket, RtpPacket}; -use bincode; use serde::{Deserialize, Serialize}; use std::fmt; use std::net::SocketAddr; @@ -75,12 +74,6 @@ pub struct Packet { pub contents: SessionPacket, } -impl Packet { - pub fn decode(bytes: &[u8]) -> Result { - bincode::deserialize(bytes) - } -} - #[cfg(not(target_arch = "wasm32"))] impl Packet { pub fn build(raw_packet: &pcap::Packet, id: usize) -> Option { @@ -114,22 +107,6 @@ impl Packet { }) } - pub fn encode(&self) -> Result, bincode::Error> { - // TODO: need a nicer way to temporarily get rid of payload field - let wo_payload = Self { - payload: None, - id: self.id, - timestamp: self.timestamp, - length: self.length, - source_addr: self.source_addr, - destination_addr: self.destination_addr, - transport_protocol: self.transport_protocol, - session_protocol: self.session_protocol, - contents: self.contents.clone(), - }; - bincode::serialize(&wo_payload) - } - pub fn guess_payload(&mut self) { // could use port to determine validity // TODO: STUN data, TURN channels, RTCP @@ -185,7 +162,7 @@ fn is_rtp(packet: &RtpPacket) -> bool { if packet.version != 2 { return false; } - if let 72..=76 = packet.payload_type.id { + if let 72..=79 = packet.payload_type.id { return false; } if packet.ssrc == 0 { diff --git a/common/src/rtp/payload_type.rs b/common/src/rtp/payload_type.rs index 85aa31e..53d6016 100644 --- a/common/src/rtp/payload_type.rs +++ b/common/src/rtp/payload_type.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use std::fmt; -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum MediaType { Audio, Video, diff --git a/common/src/sdp.rs b/common/src/sdp.rs new file mode 100644 index 0000000..fd785d0 --- /dev/null +++ b/common/src/sdp.rs @@ -0,0 +1,64 @@ +use crate::rtp::payload_type::PayloadType; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Serialize, Deserialize, Debug)] +pub struct Sdp { + pub payload_types: HashMap, +} + +#[cfg(not(target_arch = "wasm32"))] +impl Sdp { + pub fn build(raw_sdp: String) -> Option { + use crate::rtp::payload_type::MediaType; + use webrtc_sdp::{ + attribute_type::SdpAttribute, media_type::SdpMediaValue, parse_sdp_line, SdpLine, + SdpType, + }; + + let mut lines = raw_sdp.lines(); + + // first line should be media + let Some(first_line) = lines.next() else { + return None; + }; + let Ok(SdpLine { + sdp_type: SdpType::Media(media), + .. + }) = parse_sdp_line(first_line, 0) + else { + return None; + }; + + let media_type = match media.media { + SdpMediaValue::Audio => MediaType::Audio, + SdpMediaValue::Video => MediaType::Video, + _ => { + return None; + } + }; + + let payload_types = lines + .filter_map(|line| { + let Ok(SdpLine { + sdp_type: SdpType::Attribute(SdpAttribute::Rtpmap(rtpmap)), + .. + }) = parse_sdp_line(line, 1) + else { + return None; + }; + + let pt = PayloadType { + id: rtpmap.payload_type, + name: rtpmap.codec_name, + clock_rate: Some(rtpmap.frequency), + media_type, + }; + + Some((pt.id, pt)) + }) + .collect(); + + Some(Self { payload_types }) + } +} diff --git a/src/cmd/run.rs b/src/cmd/run.rs index 5c2f162..6f53993 100644 --- a/src/cmd/run.rs +++ b/src/cmd/run.rs @@ -15,8 +15,8 @@ pub struct Run { #[arg(short, long, num_args = 1..)] interfaces: Vec, /// capture filter string in Wireshark/tcpdump syntax, applies to all sources - #[arg(short, long)] - capture: Option, + #[arg(short, long, default_value_t = String::new())] + capture: String, /// IP address used by the application #[arg(short, long, default_value_t = DEFAULT_IP)] address: IpAddr, @@ -27,34 +27,59 @@ pub struct Run { impl Run { pub async fn run(self) { - if self.files.is_empty() && self.interfaces.is_empty() { + let live_filter = self.create_capture_filter(); + + let mut file_sniffers = get_sniffers(self.files, Sniffer::from_file); + let mut interface_sniffers = get_sniffers(self.interfaces, Sniffer::from_device); + + let file_res = apply_filters(&mut file_sniffers, &self.capture); + let interface_res = apply_filters(&mut interface_sniffers, &live_filter); + + if file_res.is_err() || interface_res.is_err() { + println!("Error: provided capture filter is invalid"); + return; + } + + let sniffers: HashMap<_, _> = file_sniffers + .into_iter() + .chain(interface_sniffers) + .collect(); + + if sniffers.is_empty() { // TODO: use some pretty printing (colors, bold font etc.) - println!("Error: no pcap files or network interfaces were passed"); + println!("Error: no valid sources were passed"); return; } - let file_sniffers = get_sniffers(self.files, Sniffer::from_file, self.capture.clone()); - let interface_sniffers = get_sniffers(self.interfaces, Sniffer::from_device, self.capture); let address = SocketAddr::new(self.address, self.port); + server::run(sniffers, address).await; + } - server::run(interface_sniffers, file_sniffers, address).await; + fn create_capture_filter(&self) -> String { + // to filter out RTPeeker own WebSocket/HTTP messages + let own_filter = if self.address.is_unspecified() { + format!("not port {}", self.port) + } else { + format!("not (host {} and port {})", self.address, self.port) + }; + + if self.capture.is_empty() { + own_filter + } else { + format!("({}) and ({})", own_filter, self.capture) + } } } -fn get_sniffers( - mut sources: Vec, - get_sniffer: F, - filter: Option, -) -> HashMap> +fn get_sniffers(mut sources: Vec, get_sniffer: F) -> HashMap where - T: pcap::Activated, - F: Fn(&str, Option) -> Result, Error>, + F: Fn(&str) -> Result, { sources.sort_unstable(); sources.dedup(); sources .into_iter() - .filter_map(|source| match get_sniffer(&source, filter.clone()) { + .filter_map(|source| match get_sniffer(&source) { Ok(sniffer) => Some((source, sniffer)), Err(err) => { println!( @@ -66,3 +91,13 @@ where }) .collect() } + +fn apply_filters(sniffers: &mut HashMap, filter: &str) -> Result<(), Error> { + for (_, sniffer) in sniffers.iter_mut() { + if let err @ Err(_) = sniffer.apply_filter(filter) { + return err; + } + } + + Ok(()) +} diff --git a/src/server.rs b/src/server.rs index 918d7c6..b9e3bee 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,15 +5,15 @@ use futures_util::{ }; use log::{error, info, warn}; use rtpeeker_common::packet::SessionProtocol; -use rtpeeker_common::Source; -use rtpeeker_common::{Request, Response}; +use rtpeeker_common::{Request, Response, Sdp}; +use rtpeeker_common::{Source, StreamKey}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; -use tokio::sync::{mpsc, mpsc::Sender, RwLock}; +use tokio::sync::{mpsc, mpsc::UnboundedSender, RwLock}; use warp::ws::{Message, WebSocket}; use warp::Filter; @@ -22,12 +22,12 @@ const WS_PATH: &str = "ws"; static NEXT_CLIENT_ID: AtomicUsize = AtomicUsize::new(1); struct Client { - pub sender: mpsc::Sender, + pub sender: mpsc::UnboundedSender, pub source: Option, } impl Client { - pub fn new(sender: mpsc::Sender) -> Self { + pub fn new(sender: mpsc::UnboundedSender) -> Self { Self { sender, source: None, @@ -39,30 +39,14 @@ type Clients = Arc>>; type Packets = Arc>>; type PacketsMap = Arc>; -pub async fn run( - interface_sniffers: HashMap>, - file_sniffers: HashMap>, - addr: SocketAddr, -) { +pub async fn run(sniffers: HashMap, addr: SocketAddr) { let clients = Clients::default(); let mut source_to_packets = HashMap::new(); // a bit of repetition, but Rust bested me this time - for (file, sniffer) in file_sniffers { + for (_file, sniffer) in sniffers { let packets = Packets::default(); - let source = Source::File(file); - source_to_packets.insert(source, packets.clone()); - - let cloned_clients = clients.clone(); - tokio::task::spawn(async move { - sniff(sniffer, packets, cloned_clients).await; - }); - } - - for (interface, sniffer) in interface_sniffers { - let packets = Packets::default(); - let source = Source::Interface(interface); - source_to_packets.insert(source, packets.clone()); + source_to_packets.insert(sniffer.source.clone(), packets.clone()); let cloned_clients = clients.clone(); tokio::task::spawn(async move { @@ -83,9 +67,9 @@ pub async fn run( }); let routes = ws.or(warp::fs::dir(DIST_DIR)); - println!("RTPeeker running on http://{}/", addr); - warp::serve(routes).run(addr).await; + println!("RTPeeker running on http://{}/", addr); + warp::serve(routes).try_bind(addr).await; } async fn client_connected(ws: WebSocket, clients: Clients, source_to_packets: PacketsMap) { @@ -97,11 +81,7 @@ async fn client_connected(ws: WebSocket, clients: Clients, source_to_packets: Pa send_pcap_filenames(&client_id, &mut ws_tx, &source_to_packets).await; - // FIXME: something is very wrong here - // if buffer size is > 1, rx.recv always waits - // until channel's buffer is full before receiving - // might be because of blocking sniffers - let (tx, mut rx) = mpsc::channel(1); + let (tx, mut rx) = mpsc::unbounded_channel(); tokio::task::spawn(async move { while let Some(message) = rx.recv().await { @@ -144,11 +124,13 @@ async fn send_pcap_filenames( .await; } -async fn sniff(mut sniffer: Sniffer, packets: Packets, clients: Clients) { - while let Some(result) = sniffer.next_packet() { +async fn sniff(mut sniffer: Sniffer, packets: Packets, clients: Clients) { + while let Some(result) = sniffer.next_packet().await { match result { Ok(mut pack) => { pack.guess_payload(); + // TODO: Packet send via WebSocket contains its + // payload, which is undesired let response = Response::Packet(pack); let Ok(encoded) = response.encode() else { @@ -162,12 +144,9 @@ async fn sniff(mut sniffer: Sniffer, packets: Packets, cl source: Some(source), sender, } if *source == sniffer.source => { - sender - .send(msg.clone()) - .unwrap_or_else(|e| { - error!("Sniffer: error while sending packet: {}", e); - }) - .await; + sender.send(msg.clone()).unwrap_or_else(|e| { + error!("Sniffer: error while sending packet: {}", e); + }); } _ => {} } @@ -179,19 +158,20 @@ async fn sniff(mut sniffer: Sniffer, packets: Packets, cl } } -async fn send_all_packets(client_id: usize, packets: &Packets, ws_tx: &mut Sender) { +async fn send_all_packets( + client_id: usize, + packets: &Packets, + ws_tx: &mut UnboundedSender, +) { for pack in packets.read().await.iter() { let Ok(encoded) = pack.encode() else { error!("Failed to encode packet, client_id: {}", client_id); continue; }; let msg = Message::binary(encoded); - ws_tx - .send(msg) - .unwrap_or_else(|e| { - error!("WebSocket `feed` error: {}, client_id: {}", e, client_id); - }) - .await; + ws_tx.send(msg).unwrap_or_else(|e| { + error!("WebSocket `feed` error: {}, client_id: {}", e, client_id); + }); } info!( @@ -233,12 +213,46 @@ async fn reparse_packet( source: Some(source), sender, } if *source == *cur_source => { - sender - .send(msg.clone()) - .unwrap_or_else(|e| { - error!("Sniffer: error while sending packet: {}", e); - }) - .await; + sender.send(msg.clone()).unwrap_or_else(|e| { + error!("Sniffer: error while sending packet: {}", e); + }); + } + _ => {} + }; + } +} + +async fn parse_sdp( + client_id: usize, + clients: &Clients, + cur_source: &Source, + stream_key: StreamKey, + raw_sdp: String, +) { + let Some(sdp) = Sdp::build(raw_sdp) else { + log::warn!( + "Received invalid SDP for {:?}: {:?}", + cur_source, + stream_key + ); + return; + }; + + let Ok(encoded) = Response::Sdp(stream_key, sdp).encode() else { + error!("Failed to encode sdp, client_id: {}", client_id); + return; + }; + + let msg = Message::binary(encoded); + for (_, client) in clients.read().await.iter() { + match client { + Client { + source: Some(source), + sender, + } if *source == *cur_source => { + sender.send(msg.clone()).unwrap_or_else(|e| { + error!("Sniffer: error while sending sdp: {}", e); + }); } _ => {} }; @@ -309,6 +323,11 @@ async fn handle_messages( send_all_packets(client_id, packets, &mut sender).await; } + Request::ParseSdp(stream_key, sdp) => { + if let Some(source) = &source { + parse_sdp(client_id, clients, source, stream_key, sdp).await; + } + } }; } Err(e) => error!("WebSocket error: {}, client_id: {}", e, client_id), diff --git a/src/sniffer.rs b/src/sniffer.rs index 4bc914b..ad8338e 100644 --- a/src/sniffer.rs +++ b/src/sniffer.rs @@ -1,82 +1,131 @@ +use futures_util::StreamExt; +use pcap::{Capture, PacketCodec, PacketStream}; use rtpeeker_common::{Packet, Source}; -use std::result; #[derive(Debug)] pub enum Error { + CouldntReceivePacket, FileNotFound, DeviceNotFound, DeviceUnavailable, - CouldntReceivePacket, UnsupportedPacketType, InvalidFilter, + PacketStreamUnavailable, } -type Result = result::Result; - -pub struct Sniffer { +struct PacketDecoder { packet_id: usize, - capture: pcap::Capture, +} + +impl PacketDecoder { + pub fn new() -> Self { + Self { packet_id: 1 } + } +} + +impl PacketCodec for PacketDecoder { + type Item = Result; + + fn decode(&mut self, packet: pcap::Packet<'_>) -> Self::Item { + let res = match Packet::build(&packet, self.packet_id) { + Some(packet) => Ok(packet), + None => Err(Error::UnsupportedPacketType), + }; + + self.packet_id += 1; + res + } +} + +// well, it's not technically a Stream... +struct OfflineStream { + capture: Capture, + decoder: PacketDecoder, +} + +impl OfflineStream { + pub fn new(capture: Capture, decoder: PacketDecoder) -> Self { + Self { capture, decoder } + } + + pub fn next(&mut self) -> Option, pcap::Error>> { + let packet = match self.capture.next_packet() { + Err(pcap::Error::NoMorePackets) => return None, + Err(err) => return Some(Err(err)), + Ok(packet) => packet, + }; + + Some(Ok(self.decoder.decode(packet))) + } +} + +enum CaptureType { + Offline(OfflineStream), + Online(PacketStream), +} + +pub struct Sniffer { + capture: CaptureType, pub source: Source, } -impl Sniffer { - pub fn from_file(file: &str, filter: Option) -> Result { - let Ok(mut capture) = pcap::Capture::from_file(file) else { +impl Sniffer { + pub fn from_file(file: &str) -> Result { + let Ok(capture) = pcap::Capture::from_file(file) else { return Err(Error::FileNotFound); }; - if let Some(filter) = filter { - let Ok(_) = capture.filter(&filter, true) else { - return Err(Error::InvalidFilter); - }; - } + let decoder = PacketDecoder::new(); + let stream = OfflineStream::new(capture, decoder); Ok(Self { - packet_id: 1, - capture, + capture: CaptureType::Offline(stream), source: Source::File(file.to_string()), }) } -} -impl Sniffer { - pub fn from_device(device: &str, filter: Option) -> Result { + pub fn from_device(device: &str) -> Result { let Ok(capture) = pcap::Capture::from_device(device) else { return Err(Error::DeviceNotFound); }; - let Ok(mut capture) = capture.immediate_mode(true).open() else { + let Ok(capture) = capture.immediate_mode(true).open() else { return Err(Error::DeviceUnavailable); }; - if let Some(filter) = filter { - let Ok(_) = capture.filter(&filter, true) else { - return Err(Error::InvalidFilter); - }; - } + let Ok(capture) = capture.setnonblock() else { + return Err(Error::DeviceUnavailable); + }; + + let decoder = PacketDecoder::new(); + let Ok(stream) = capture.stream(decoder) else { + return Err(Error::PacketStreamUnavailable); + }; Ok(Self { - packet_id: 1, - capture, + capture: CaptureType::Online(stream), source: Source::Interface(device.to_string()), }) } -} -impl Sniffer { - pub fn next_packet(&mut self) -> Option> { - let packet = match self.capture.next_packet() { - Ok(pack) => pack, - Err(pcap::Error::NoMorePackets) => return None, - Err(_) => return Some(Err(Error::CouldntReceivePacket)), - }; + pub fn apply_filter(&mut self, filter: &str) -> Result<(), Error> { + match self.capture { + CaptureType::Online(ref mut stream) => stream.capture_mut().filter(filter, true), + CaptureType::Offline(ref mut stream) => stream.capture.filter(filter, true), + } + .map_err(|_| Error::InvalidFilter) + } - let res = match Packet::build(&packet, self.packet_id) { - Some(packet) => Ok(packet), - None => Err(Error::UnsupportedPacketType), + pub async fn next_packet(&mut self) -> Option> { + let packet = match self.capture { + CaptureType::Offline(ref mut stream) => stream.next(), + CaptureType::Online(ref mut stream) => stream.next().await, }; - self.packet_id += 1; - Some(res) + match packet { + None => None, + Some(Err(_)) => Some(Err(Error::CouldntReceivePacket)), + Some(Ok(pack)) => Some(pack), + } } }