From 2d82b7a92efb66b1c6b2678266dc592019bd380a Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Fri, 29 Dec 2023 09:55:43 +0100 Subject: [PATCH] rtp: a lock for rtp_source --- src/rtp/fb.c | 1 + src/rtp/member.c | 1 + src/rtp/pkt.c | 1 + src/rtp/rr.c | 1 + src/rtp/rtcp.c | 1 + src/rtp/rtcp.h | 4 ++++ src/rtp/rtp.c | 1 + src/rtp/sdes.c | 1 + src/rtp/sess.c | 48 +++++++++++++++++++++++++++++++++++++++++++++--- src/rtp/source.c | 13 +++++++++++++ 10 files changed, 69 insertions(+), 3 deletions(-) diff --git a/src/rtp/fb.c b/src/rtp/fb.c index 9dd376af9..e541034fd 100644 --- a/src/rtp/fb.c +++ b/src/rtp/fb.c @@ -12,6 +12,7 @@ #include #include #include +#include #include "rtcp.h" diff --git a/src/rtp/member.c b/src/rtp/member.c index 1a3bd78e5..0d3e6c7ad 100644 --- a/src/rtp/member.c +++ b/src/rtp/member.c @@ -12,6 +12,7 @@ #include #include #include +#include #include "rtcp.h" diff --git a/src/rtp/pkt.c b/src/rtp/pkt.c index dffb054d1..b73abe255 100644 --- a/src/rtp/pkt.c +++ b/src/rtp/pkt.c @@ -12,6 +12,7 @@ #include #include #include +#include #include "rtcp.h" diff --git a/src/rtp/rr.c b/src/rtp/rr.c index 9d39de878..3b2bc4ede 100644 --- a/src/rtp/rr.c +++ b/src/rtp/rr.c @@ -13,6 +13,7 @@ #include #include #include +#include #include "rtcp.h" diff --git a/src/rtp/rtcp.c b/src/rtp/rtcp.c index 17c3d20fb..620e3ab5a 100644 --- a/src/rtp/rtcp.c +++ b/src/rtp/rtcp.c @@ -11,6 +11,7 @@ #include #include #include +#include #include "rtcp.h" diff --git a/src/rtp/rtcp.h b/src/rtp/rtcp.h index d85404866..17861ff85 100644 --- a/src/rtp/rtcp.h +++ b/src/rtp/rtcp.h @@ -47,6 +47,8 @@ struct rtp_source { uint32_t last_rtp_ts; /**< Last RTP timestamp */ uint32_t psent; /**< RTP packets sent */ uint32_t osent; /**< RTP octets sent */ + + mtx_t *lock; /**< Lock for this struct */ }; /** RTP Member */ @@ -71,6 +73,8 @@ void source_calc_jitter(struct rtp_source *s, uint32_t rtp_ts, uint32_t arrival); int source_calc_lost(const struct rtp_source *s); uint8_t source_calc_fraction_lost(struct rtp_source *s); +int source_lock(struct rtp_source *s); +int source_unlock(struct rtp_source *s); /* RR (Reception report) */ int rtcp_rr_alloc(struct rtcp_rr **rrp, size_t count); diff --git a/src/rtp/rtp.c b/src/rtp/rtp.c index 76d73cf5f..16b87ff11 100644 --- a/src/rtp/rtp.c +++ b/src/rtp/rtp.c @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include "rtcp.h" diff --git a/src/rtp/sdes.c b/src/rtp/sdes.c index 711d8a908..b7bb7eb88 100644 --- a/src/rtp/sdes.c +++ b/src/rtp/sdes.c @@ -11,6 +11,7 @@ #include #include #include +#include #include "rtcp.h" diff --git a/src/rtp/sess.c b/src/rtp/sess.c index 4d6e5f565..9aa832f33 100644 --- a/src/rtp/sess.c +++ b/src/rtp/sess.c @@ -84,6 +84,14 @@ static void sess_destructor(void *data) } +static void source_destructor(void *data) +{ + struct rtp_source *s = data; + + mem_deref(s->lock); +} + + static struct rtp_member *get_member(struct rtcp_sess *sess, uint32_t src) { struct rtp_member *mbr; @@ -178,6 +186,7 @@ static void handle_incoming_sr(struct rtcp_sess *sess, if (mbr->s) { /* Save time when SR was received */ + source_lock(mbr->s); mbr->s->sr_recv = tmr_jiffies(); /* Save NTP timestamp from SR */ @@ -186,6 +195,7 @@ static void handle_incoming_sr(struct rtcp_sess *sess, mbr->s->rtp_ts = msg->r.sr.rtp_ts; mbr->s->psent = msg->r.sr.psent; mbr->s->osent = msg->r.sr.osent; + source_unlock(mbr->s); } for (i=0; ihdr.count; i++) @@ -392,12 +402,14 @@ static bool sender_apply_handler(struct le *le, void *arg) /* Initialise the members */ rr.ssrc = mbr->src; + source_lock(s); rr.fraction = source_calc_fraction_lost(s); rr.lost = source_calc_lost(s); rr.last_seq = s->cycles | s->max_seq; rr.jitter = s->jitter >> 4; rr.lsr = calc_lsr(&s->last_sr); rr.dlsr = calc_dlsr(s->sr_recv); + source_unlock(s); return 0 != rtcp_rr_encode(mb, &rr); } @@ -578,20 +590,31 @@ void rtcp_sess_rx_rtp(struct rtcp_sess *sess, struct rtp_header *hdr, } if (!mbr->s) { - mbr->s = mem_zalloc(sizeof(*mbr->s), NULL); - if (!mbr->s) { + int err; + + mbr->s = mem_zalloc(sizeof(*mbr->s), source_destructor); + if (!mbr->s) + err = ENOMEM; + else + err = mutex_alloc(&mbr->s->lock); + + if (err) { DEBUG_NOTICE("could not add sender: 0x%08x\n", hdr->ssrc); + mbr->s = mem_deref(mbr->s); return; } /* first packet - init sequence number */ + source_lock(mbr->s); source_init_seq(mbr->s, hdr->seq); /* probation not used */ sa_cpy(&mbr->s->rtp_peer, peer); + source_unlock(mbr->s); ++sess->senderc; } + source_lock(mbr->s); if (!source_update_seq(mbr->s, hdr->seq)) { DEBUG_WARNING("rtp_update_seq() returned 0\n"); } @@ -612,6 +635,7 @@ void rtcp_sess_rx_rtp(struct rtcp_sess *sess, struct rtp_header *hdr, mbr->s->last_rtp_ts = hdr->ts; mbr->s->rtp_rx_bytes += payload_size; + source_unlock(mbr->s); } @@ -650,10 +674,12 @@ int rtcp_stats(struct rtp_sock *rs, uint32_t ssrc, struct rtcp_stats *stats) return 0; } + source_lock(mbr->s); stats->rx.sent = mbr->s->received; stats->rx.lost = source_calc_lost(mbr->s); stats->rx.jit = sess->srate_rx ? 1000000 * (mbr->s->jitter>>4) / sess->srate_rx : 0; + source_unlock(mbr->s); return 0; } @@ -663,18 +689,34 @@ static bool debug_handler(struct le *le, void *arg) { const struct rtp_member *mbr = le->data; struct re_printf *pf = arg; + struct mbuf *mb = NULL; int err; err = re_hprintf(pf, " member 0x%08x: lost=%d Jitter=%.1fms" " RTT=%.1fms\n", mbr->src, mbr->cum_lost, (double)mbr->jit/1000, (double)mbr->rtt/1000); + if (err) + return true; + if (mbr->s) { - err |= re_hprintf(pf, + mb = mbuf_alloc(64); + if (!mb) + return true; + + source_lock(mbr->s); + err = mbuf_printf(mb, " IP=%J psent=%u rcvd=%u\n", &mbr->s->rtp_peer, mbr->s->psent, mbr->s->received); + source_unlock(mbr->s); + if (err) + goto out; + + re_hprintf(pf, "%b", mb->buf, mb->pos); } +out: + mem_deref(mb); return err != 0; } diff --git a/src/rtp/source.c b/src/rtp/source.c index c6a324229..996623242 100644 --- a/src/rtp/source.c +++ b/src/rtp/source.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include "rtcp.h" @@ -175,3 +176,15 @@ uint8_t source_calc_fraction_lost(struct rtp_source *s) return fraction; } + + +int source_lock(struct rtp_source *s) +{ + return mtx_lock(s->lock); +} + + +int source_unlock(struct rtp_source *s) +{ + return mtx_unlock(s->lock); +}