From 5364304b2b0442baa5746d325c2b2695572f410b Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Fri, 1 Sep 2023 13:45:47 +0200 Subject: [PATCH] config,stream,rtprecv: remove RX thread temporarily --- docs/examples/config | 1 - include/baresip.h | 7 -- src/config.c | 16 +-- src/core.h | 2 - src/rtprecv.c | 246 +------------------------------------------ src/stream.c | 20 ---- 6 files changed, 5 insertions(+), 287 deletions(-) diff --git a/docs/examples/config b/docs/examples/config index d93950eb59..cdd5f827b5 100644 --- a/docs/examples/config +++ b/docs/examples/config @@ -72,7 +72,6 @@ video_jitter_buffer_delay 5-10 # (min. frames)-(max. packets) rtp_stats no #rtp_timeout 60 #avt_bundle no -#rtp_rxmode main # main,thread # Network #dns_server 1.1.1.1:53 diff --git a/include/baresip.h b/include/baresip.h index e2bc47f665..9a4bbc85ed 100644 --- a/include/baresip.h +++ b/include/baresip.h @@ -317,12 +317,6 @@ enum audio_mode { AUDIO_MODE_THREAD, /**< Use dedicated thread */ }; -/** RTP receive mode */ -enum rtp_receive_mode { - RECEIVE_MODE_MAIN = 0, /**< RTP RX is processed in main thread */ - RECEIVE_MODE_THREAD, /**< RTP RX is processed in separate thread */ -}; - /** SIP User-Agent */ struct config_sip { char uuid[64]; /**< Universally Unique Identifier */ @@ -401,7 +395,6 @@ struct config_avt { bool rtp_stats; /**< Enable RTP statistics */ uint32_t rtp_timeout; /**< RTP Timeout in seconds (0=off) */ bool bundle; /**< Media Multiplexing (BUNDLE) */ - enum rtp_receive_mode rxmode; /**< RTP RX processing mode */ }; /** Network Configuration */ diff --git a/src/config.c b/src/config.c index 5a6eb00f32..1a6202d111 100644 --- a/src/config.c +++ b/src/config.c @@ -98,8 +98,7 @@ static struct config core_config = { }, false, 0, - false, - RECEIVE_MODE_MAIN, + false }, /* Network */ @@ -314,7 +313,6 @@ static const char *net_af_str(int af) int config_parse_conf(struct config *cfg, const struct conf *conf) { struct vidsz size = {0, 0}; - struct pl rxmode; struct pl txmode; struct pl jbtype; struct pl tr; @@ -472,14 +470,6 @@ int config_parse_conf(struct config *cfg, const struct conf *conf) (void)conf_get_u32(conf, "rtp_timeout", &cfg->avt.rtp_timeout); (void)conf_get_bool(conf, "avt_bundle", &cfg->avt.bundle); - if (0 == conf_get(conf, "rtp_rxmode", &rxmode)) { - - if (0 == pl_strcasecmp(&rxmode, "thread")) { - cfg->avt.rxmode = RECEIVE_MODE_THREAD; - warning("rtp_rxmode thread is currently " - "experimental\n"); - } - } if (err) { warning("config: configure parse error (%m)\n", err); @@ -581,7 +571,6 @@ int config_print(struct re_printf *pf, const struct config *cfg) "rtp_stats\t\t%s\n" "rtp_timeout\t\t%u # in seconds\n" "avt_bundle\t\t%s\n" - "rtp_rxmode\t\t\t%s\n" "\n" "# Network\n" "net_interface\t\t%s\n" @@ -636,8 +625,6 @@ int config_print(struct re_printf *pf, const struct config *cfg) cfg->avt.rtp_stats ? "yes" : "no", cfg->avt.rtp_timeout, cfg->avt.bundle ? "yes" : "no", - cfg->avt.rxmode == RECEIVE_MODE_THREAD ? "thread" : - "main", cfg->net.ifname, net_af_str(cfg->net.af) @@ -848,7 +835,6 @@ static int core_config_template(struct re_printf *pf, const struct config *cfg) "rtp_stats\t\tno\n" "#rtp_timeout\t\t60\n" "#avt_bundle\t\tno\n" - "#rtp_rxmode\t\tmain\n" "\n# Network\n" "#dns_server\t\t1.1.1.1:53\n" "#dns_server\t\t1.0.0.1:53\n" diff --git a/src/core.h b/src/core.h index 9d3cbb99f1..f03a1a3704 100644 --- a/src/core.h +++ b/src/core.h @@ -481,7 +481,5 @@ void rtprecv_set_enable(struct rtp_receiver *rx, bool enable); int rtprecv_get_ssrc(struct rtp_receiver *rx, uint32_t *ssrc); void rtprecv_enable_mux(struct rtp_receiver *rx, bool enable); int rtprecv_debug(struct re_printf *pf, const struct rtp_receiver *rx); -int rtprecv_start_thread(struct rtp_receiver *rx); void rtprecv_mnat_connected_handler(const struct sa *raddr1, const struct sa *raddr2, void *arg); -bool rtprecv_running(const struct rtp_receiver *rx); diff --git a/src/rtprecv.c b/src/rtprecv.c index 61bc44e335..2bd5f3921d 100644 --- a/src/rtprecv.c +++ b/src/rtprecv.c @@ -30,7 +30,6 @@ struct rtp_receiver { uint32_t pseq; /**< Sequence number for incoming RTP */ bool pseq_set; /**< True if sequence number is set */ bool rtp_estab; /**< True if RTP stream established */ - RE_ATOMIC bool run; /**< True if RX thread is running */ mtx_t *mtx; /**< Mutex protects above fields */ /* Unprotected data */ @@ -41,155 +40,10 @@ struct rtp_receiver { stream_rtpestab_h *rtpestabh; /**< RTP established handler */ void *arg; /**< Stream argument */ void *sessarg; /**< Session argument */ - thrd_t thr; /**< RX thread */ - struct tmr tmr; /**< Timer for stopping RX thread */ int pt; /**< Previous payload type */ }; -enum work_type { - WORK_RTCP, - WORK_RTPESTAB, - WORK_PTCHANGED, - WORK_MNATCONNH, -}; - - -struct work { - enum work_type type; - struct rtp_receiver *rx; - union { - struct rtcp_msg *rtcp; - struct { - uint8_t pt; - struct mbuf *mb; - } pt; - struct { - struct sa raddr1; - struct sa raddr2; - } mnat; - } u; -}; - - -static void async_work_main(int err, void *arg); -static void work_destructor(void *arg); - - -/* - * functions that run in RX thread (if "rxmode thread" is configured) - */ - - -static void pass_rtcp_work(struct rtp_receiver *rx, struct rtcp_msg *msg) -{ - struct work *w; - - if (!re_atomic_rlx(&rx->run)) { - stream_process_rtcp(rx->strm, msg); - return; - } - - w = mem_zalloc(sizeof(*w), work_destructor); - if (!w) - return; - - w->type = WORK_RTCP; - w->rx = rx; - w->u.rtcp = mem_ref(msg); - re_thread_async_main_id((intptr_t)rx, NULL, async_work_main, w); -} - - -static int pass_pt_work(struct rtp_receiver *rx, uint8_t pt, struct mbuf *mb) -{ - struct work *w; - - if (!re_atomic_rlx(&rx->run)) - return rx->pth(pt, mb, rx->arg); - - w = mem_zalloc(sizeof(*w), work_destructor); - w->type = WORK_PTCHANGED; - w->rx = rx; - w->u.pt.pt = pt; - w->u.pt.mb = mbuf_dup(mb); - - return re_thread_async_main_id((intptr_t)rx, NULL, async_work_main, w); -} - - -static void pass_rtpestab_work(struct rtp_receiver *rx) -{ - struct work *w; - - if (!re_atomic_rlx(&rx->run)) { - rx->rtpestabh(rx->strm, rx->sessarg); - return; - } - - w = mem_zalloc(sizeof(*w), work_destructor); - w->type = WORK_RTPESTAB; - w->rx = rx; - - re_thread_async_main_id((intptr_t)rx, NULL, async_work_main, w); -} - - -static void pass_mnat_work(struct rtp_receiver *rx, const struct sa *raddr1, - const struct sa *raddr2) -{ - struct work *w; - - if (!re_atomic_rlx(&rx->run)) { - stream_mnat_connected(rx->strm, raddr1, raddr2); - return; - } - - w = mem_zalloc(sizeof(*w), work_destructor); - w->type = WORK_MNATCONNH; - w->rx = rx; - sa_cpy(&w->u.mnat.raddr1, raddr1); - sa_cpy(&w->u.mnat.raddr2, raddr2); - - re_thread_async_main_id((intptr_t)rx, NULL, async_work_main, w); -} - - -static void rtprecv_check_stop(void *arg) -{ - struct rtp_receiver *rx = arg; - - if (re_atomic_rlx(&rx->run)) - tmr_start(&rx->tmr, 10, rtprecv_check_stop, rx); - else - re_cancel(); -} - - -static int rtprecv_thread(void *arg) -{ - struct rtp_receiver *rx = arg; - int err; - - re_thread_init(); - tmr_start(&rx->tmr, 10, rtprecv_check_stop, rx); - - err = udp_thread_attach(rtp_sock(rx->rtp)); - if (err) - return err; - - err = udp_thread_attach(rtcp_sock(rx->rtp)); - if (err) - return err; - - err = re_main(NULL); - - tmr_cancel(&rx->tmr); - re_thread_close(); - return err; -} - - static int lostcalc(struct rtp_receiver *rx, uint16_t seq) { const uint16_t delta = seq - rx->pseq; @@ -344,7 +198,7 @@ void rtprecv_decode(const struct sa *src, const struct rtp_header *hdr, debug("stream: incoming rtp for '%s' established, " "receiving from %J\n", rx->name, src); rx->rtp_estab = true; - pass_rtpestab_work(rx); + rx->rtpestabh(rx->strm, rx->sessarg); } } @@ -373,7 +227,7 @@ void rtprecv_decode(const struct sa *src, const struct rtp_header *hdr, if (hdr->pt != rx->pt) { rx->pt = hdr->pt; - err = pass_pt_work(rx, hdr->pt, mb); + err = rx->pth(hdr->pt, mb, rx->arg); if (err && err != ENODATA) return; } @@ -420,7 +274,7 @@ void rtprecv_handle_rtcp(const struct sa *src, struct rtcp_msg *msg, rx->ts_last = tmr_jiffies(); mtx_unlock(rx->mtx); - pass_rtcp_work(rx, msg); + stream_process_rtcp(rx->strm, msg); } @@ -431,14 +285,10 @@ void rtprecv_mnat_connected_handler(const struct sa *raddr1, MAGIC_CHECK(rx); - pass_mnat_work(rx, raddr1, raddr2); + stream_mnat_connected(rx->strm, raddr1, raddr2); } -/* - * functions that run in main thread - */ - void rtprecv_set_socket(struct rtp_receiver *rx, struct rtp_sock *rtp) { mtx_lock(rx->mtx); @@ -562,13 +412,6 @@ static void destructor(void *arg) { struct rtp_receiver *rx = arg; - if (re_atomic_rlx(&rx->run)) { - re_atomic_rlx_set(&rx->run, false); - thrd_join(rx->thr, NULL); - } - - re_thread_async_main_cancel((intptr_t)rx); - mem_deref(rx->metric); mem_deref(rx->name); mem_deref(rx->mtx); @@ -640,41 +483,6 @@ int rtprecv_alloc(struct rtp_receiver **rxp, } -int rtprecv_start_thread(struct rtp_receiver *rx) -{ - int err; - - if (!rx) - return EINVAL; - - if (re_atomic_rlx(&rx->run)) - return 0; - - re_atomic_rlx_set(&rx->run, true); - err = thread_create_name(&rx->thr, - "RX thread", - rtprecv_thread, rx); - if (err) { - re_atomic_rlx_set(&rx->run, false); - } - else { - udp_thread_detach(rtp_sock(rx->rtp)); - udp_thread_detach(rtcp_sock(rx->rtp)); - } - - return err; -} - - -bool rtprecv_running(const struct rtp_receiver *rx) -{ - if (!rx) - return false; - - return re_atomic_rlx(&rx->run); -} - - void rtprecv_set_handlers(struct rtp_receiver *rx, stream_rtpestab_h *rtpestabh, void *arg) { @@ -693,49 +501,3 @@ struct metric *rtprecv_metric(struct rtp_receiver *rx) /* it is allowed to return metric because it is thread safe */ return rx->metric; } - - -static void work_destructor(void *arg) -{ - struct work *w = arg; - - switch (w->type) { - case WORK_RTCP: - mem_deref(w->u.rtcp); - break; - case WORK_PTCHANGED: - mem_deref(w->u.pt.mb); - break; - default: - break; - } -} - - -static void async_work_main(int err, void *arg) -{ - struct work *w = arg; - struct rtp_receiver *rx = w->rx; - (void)err; - - switch (w->type) { - case WORK_RTCP: - stream_process_rtcp(rx->strm, w->u.rtcp); - break; - case WORK_PTCHANGED: - rx->pth(w->u.pt.pt, w->u.pt.mb, rx->arg); - break; - case WORK_RTPESTAB: - rx->rtpestabh(rx->strm, rx->sessarg); - break; - case WORK_MNATCONNH: - stream_mnat_connected(rx->strm, - &w->u.mnat.raddr1, - &w->u.mnat.raddr2); - break; - default: - break; - } - - mem_deref(w); -} diff --git a/src/stream.c b/src/stream.c index 715507ede4..d02b6d8a4a 100644 --- a/src/stream.c +++ b/src/stream.c @@ -224,13 +224,6 @@ int stream_enable_tx(struct stream *strm, bool enable) } -static void stream_start_receiver(void *arg) -{ - struct stream *s = arg; - rtprecv_start_thread(s->rx); -} - - /** * Enable RX stream * @@ -258,19 +251,6 @@ int stream_enable_rx(struct stream *strm, bool enable) debug("stream: enable %s RTP receiver\n", media_name(strm->type)); rtprecv_set_enable(strm->rx, true); - if (strm->rtp && strm->cfg.rxmode == RECEIVE_MODE_THREAD && - strm->type == MEDIA_AUDIO && !rtprecv_running(strm->rx)) { - if (stream_bundle(strm)) { - warning("stream: rtp_rxmode thread was disabled " - "because it is not supported in combination " - "with avt_bundle\n"); - } - else { - tmr_start(&strm->rxm.tmr_rec, 1, stream_start_receiver, - strm); - } - } - return 0; }