Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic SDP support #63

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions client/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions client/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use eframe::egui;
use egui::{ComboBox, Ui};
use ewebsock::{WsEvent, WsMessage, WsReceiver, WsSender};
use log::{error, warn};
use rtpeeker_common::{Request, Response, Source};
use rtpeeker_common::{Request, Response, Source, StreamKey};

use packets_table::PacketsTable;
use rtcp_packets_table::RtcpPacketsTable;
use rtp_packets_table::RtpPacketsTable;
use rtp_streams_table::RtpStreamsTable;
use tab::Tab;

use crate::streams::{RefStreams, StreamKey};
use crate::streams::RefStreams;
use rtp_streams_plot::RtpStreamsPlot;

mod packets_table;
Expand Down Expand Up @@ -79,7 +79,7 @@ impl App {
let packets_table = PacketsTable::new(streams.clone(), ws_sender.clone());
let rtp_packets_table = RtpPacketsTable::new(streams.clone());
let rtcp_packets_table = RtcpPacketsTable::new(streams.clone());
let rtp_streams_table = RtpStreamsTable::new(streams.clone());
let rtp_streams_table = RtpStreamsTable::new(streams.clone(), ws_sender.clone());
let rtp_streams_plot = RtpStreamsPlot::new(streams.clone());

let (tab, selected_source) = get_initial_state(cc);
Expand Down Expand Up @@ -244,7 +244,7 @@ impl App {
};

let Ok(response) = Response::decode(&msg) else {
error!("Failed to decode request message");
error!("Failed to decode response message");
continue;
};

Expand All @@ -263,6 +263,12 @@ impl App {
}
self.sources = sources;
}
Response::Sdp(stream_key, sdp) => {
let mut streams = self.streams.borrow_mut();
if let Some(stream) = streams.streams.get_mut(&stream_key) {
stream.add_sdp(sdp);
}
}
}
}
}
Expand Down
13 changes: 2 additions & 11 deletions client/src/app/rtp_packets_table.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use super::is_stream_visible;
use crate::streams::{RefStreams, StreamKey};
use crate::streams::RefStreams;
use eframe::epaint::Color32;
use egui::RichText;
use egui_extras::{Column, TableBody, TableBuilder};
use rtpeeker_common::packet::SessionPacket;
use rtpeeker_common::StreamKey;
use std::collections::HashMap;

pub struct RtpPacketsTable {
Expand Down Expand Up @@ -162,17 +163,7 @@ impl RtpPacketsTable {
resp.on_hover_text(rtp_packet.payload_type.to_string());

row.col(|ui| {
// if rtp_packet.previous_packet_is_lost {
// let resp = ui.label(
// RichText::from(format!("{} ⚠", rtp_packet.sequence_number))
// .color(Color32::GOLD),
// );
// resp.on_hover_text(
// RichText::from("Previous packet is lost!").color(Color32::GOLD),
// );
// } else {
ui.label(rtp_packet.sequence_number.to_string());
// }
});

row.col(|ui| {
Expand Down
3 changes: 2 additions & 1 deletion client/src/app/rtp_streams_plot.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use self::SettingsXAxis::*;
use super::is_stream_visible;
use crate::streams::stream::{RtpInfo, Stream};
use crate::streams::{RefStreams, StreamKey, Streams};
use crate::streams::{RefStreams, Streams};
use eframe::egui;
use eframe::egui::TextBuffer;
use eframe::epaint::Color32;
Expand All @@ -13,6 +13,7 @@ use egui::{Align2, RichText};
use rtpeeker_common::packet::SessionPacket;
use rtpeeker_common::rtcp::ReceptionReport;
use rtpeeker_common::rtp::payload_type::MediaType;
use rtpeeker_common::StreamKey;
use rtpeeker_common::{Packet, RtcpPacket, RtpPacket};
use std::cell::Ref;
use std::collections::HashMap;
Expand Down
78 changes: 74 additions & 4 deletions client/src/app/rtp_streams_table.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,71 @@
use crate::streams::{stream::Stream, RefStreams};
use egui::plot::{Line, Plot, PlotPoints};
use egui::{TextEdit, Vec2};
use egui_extras::{Column, TableBody, TableBuilder};
use ewebsock::{WsMessage, WsSender};
use rtpeeker_common::{Request, StreamKey};

use crate::streams::{stream::Stream, RefStreams};
const SDP_PROMPT: &str = "Paste your SDP media section here, e.g.
m=audio 5004 RTP/AVP 96
c=IN IP4 239.30.22.1
a=rtpmap:96 L24/48000/2
a=recvonly
";

pub struct RtpStreamsTable {
streams: RefStreams,
ws_sender: WsSender,
sdp_window_open: bool,
chosen_key: Option<StreamKey>,
sdp: String,
}

impl RtpStreamsTable {
pub fn new(streams: RefStreams) -> Self {
Self { streams }
pub fn new(streams: RefStreams, ws_sender: WsSender) -> Self {
Self {
streams,
ws_sender,
sdp_window_open: false,
chosen_key: None,
sdp: String::new(),
}
}

pub fn ui(&mut self, ctx: &egui::Context) {
egui::CentralPanel::default().show(ctx, |ui| {
self.build_table(ui);
});
self.build_sdp_window(ctx);
}

fn build_sdp_window(&mut self, ctx: &egui::Context) {
let Some((_, _, _, ssrc)) = self.chosen_key else {
return;
};

let mut send_sdp = false;

egui::Window::new(format!("SDP - {:x}", ssrc))
.open(&mut self.sdp_window_open)
.default_width(800.0)
.default_height(800.0)
.vscroll(true)
.show(ctx, |ui| {
TextEdit::multiline(&mut self.sdp)
.hint_text(SDP_PROMPT)
.desired_rows(30)
.desired_width(f32::INFINITY)
.show(ui);
ui.add_space(10.0);
if ui.button(format!("Set SDP for {:x}", ssrc)).clicked() {
send_sdp = true;
}
});

if send_sdp {
self.send_sdp_request();
self.sdp_window_open = false;
}
}

fn build_table(&mut self, ui: &mut egui::Ui) {
Expand Down Expand Up @@ -108,11 +157,32 @@ impl RtpStreamsTable {
let packet_rate = stream.get_mean_packet_rate();
ui.label(format!("{:.3}", packet_rate));
});
row.col(|ui| {
let (_, resp) = row.col(|ui| {
build_jitter_plot(ui, stream);
});

resp.context_menu(|ui| {
if ui.button("Set SDP").clicked() {
ui.close_menu();
self.chosen_key = Some(*key);
self.sdp = String::new();
self.sdp_window_open = true;
}
});
});
}

fn send_sdp_request(&mut self) {
let request = Request::ParseSdp(self.chosen_key.unwrap(), self.sdp.clone());

let Ok(msg) = request.encode() else {
log::error!("Failed to encode a request message");
return;
};
let msg = WsMessage::Binary(msg);

self.ws_sender.send(msg);
}
}

fn build_jitter_plot(ui: &mut egui::Ui, stream: &Stream) {
Expand Down
2 changes: 1 addition & 1 deletion client/src/streams.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use packets::Packets;
use rtpeeker_common::packet::SessionPacket;
use rtpeeker_common::StreamKey;
use rtpeeker_common::{packet::TransportProtocol, Packet, RtcpPacket};
use std::cell::RefCell;
use std::collections::HashMap;
Expand All @@ -11,7 +12,6 @@ mod packets;
pub mod stream;

pub type RefStreams = Rc<RefCell<Streams>>;
pub type StreamKey = (SocketAddr, SocketAddr, TransportProtocol, u32);

#[derive(Debug, Default)]
pub struct Streams {
Expand Down
42 changes: 40 additions & 2 deletions client/src/streams/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::utils::ntp_to_f64;
use rtpeeker_common::packet::TransportProtocol;
use rtpeeker_common::rtcp::{source_description::SdesType, SourceDescription};
use rtpeeker_common::{Packet, RtcpPacket, RtpPacket};
use rtpeeker_common::rtp::payload_type::PayloadType;
use rtpeeker_common::{Packet, RtcpPacket, RtpPacket, Sdp};
use std::cmp::{max, min};
use std::net::SocketAddr;
use std::time::Duration;
Expand Down Expand Up @@ -55,6 +56,7 @@ pub struct Stream {
last_sequence_number: u16,
first_time: Duration,
last_time: Duration,
sdp: Option<Sdp>,
// ntp synchronization
pub ntp_rtp: Option<(u64, u32)>,
pub estimated_clock_rate: Option<f64>,
Expand Down Expand Up @@ -92,11 +94,17 @@ impl Stream {
last_sequence_number: rtp.sequence_number,
first_time: packet.timestamp,
last_time: packet.timestamp,
sdp: None,
ntp_rtp: None,
estimated_clock_rate: None,
}
}

pub fn add_sdp(&mut self, sdp: Sdp) {
self.sdp = Some(sdp);
self.recalculate();
}

pub fn get_duration(&self) -> Duration {
self.last_time.checked_sub(self.first_time).unwrap()
}
Expand Down Expand Up @@ -161,6 +169,22 @@ impl Stream {
self.rtcp_packets.push(rtcp_info);
}

fn recalculate(&mut self) {
let mut rtp_packets = std::mem::take(&mut self.rtp_packets).into_iter();
let rtp_info = rtp_packets.next().unwrap();
self.bytes = rtp_info.bytes;
self.max_jitter = 0.0;
self.sum_jitter = 0.0;
self.jitter_count = 0;
self.first_sequence_number = rtp_info.packet.sequence_number;
self.last_sequence_number = rtp_info.packet.sequence_number;
self.first_time = rtp_info.time;
self.last_time = rtp_info.time;
self.rtp_packets = vec![rtp_info];

rtp_packets.for_each(|rtp| self.update_rtp_parameters(rtp));
}

fn update_rtp_parameters(&mut self, mut rtp_info: RtpInfo) {
rtp_info.time_delta = rtp_info.time - self.rtp_packets.last().unwrap().time;

Expand All @@ -184,8 +208,22 @@ impl Stream {
// TODO
}

fn get_payload_type(&self, rtp_info: &RtpInfo) -> PayloadType {
let id = &rtp_info.packet.payload_type.id;

if let Some(sdp) = &self.sdp {
if let Some(pt) = sdp.payload_types.get(id) {
return pt.clone();
}
};

rtp_info.packet.payload_type.clone()
}

fn update_jitter(&mut self, rtp_info: &mut RtpInfo) {
let Some(clock_rate) = rtp_info.packet.payload_type.clock_rate else {
let payload_type = self.get_payload_type(rtp_info);

let Some(clock_rate) = payload_type.clock_rate else {
return;
};

Expand Down
Loading