From 48779ed808d7247dcc7b97be824a71c63d4d93be Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Sat, 2 Apr 2022 16:04:10 -0400 Subject: [PATCH] Add NADA implementation This change adds the NADA congestion control implementation. The binding to cc is left TODO. --- AUTHORS.txt | 1 + interceptor.go | 1 - pkg/nack/retainable_packet.go | 3 +- pkg/nada/README.md | 5 ++ pkg/nada/config.go | 107 ++++++++++++++++++++++++++ pkg/nada/ecn/ecn.go | 23 ++++++ pkg/nada/ecn/ecn_darwin.go | 11 +++ pkg/nada/ecn/ecn_linux.go | 17 +++++ pkg/nada/nada.go | 2 + pkg/nada/packet_stream.go | 118 +++++++++++++++++++++++++++++ pkg/nada/receiver.go | 126 +++++++++++++++++++++++++++++++ pkg/nada/report.go | 75 ++++++++++++++++++ pkg/nada/sender.go | 118 +++++++++++++++++++++++++++++ pkg/nada/sender_receiver_test.go | 52 +++++++++++++ 14 files changed, 656 insertions(+), 3 deletions(-) create mode 100644 pkg/nada/README.md create mode 100644 pkg/nada/config.go create mode 100644 pkg/nada/ecn/ecn.go create mode 100644 pkg/nada/ecn/ecn_darwin.go create mode 100644 pkg/nada/ecn/ecn_linux.go create mode 100644 pkg/nada/nada.go create mode 100644 pkg/nada/packet_stream.go create mode 100644 pkg/nada/receiver.go create mode 100644 pkg/nada/report.go create mode 100644 pkg/nada/sender.go create mode 100644 pkg/nada/sender_receiver_test.go diff --git a/AUTHORS.txt b/AUTHORS.txt index 0b351d58..dde64a1f 100644 --- a/AUTHORS.txt +++ b/AUTHORS.txt @@ -13,5 +13,6 @@ boks1971 David Zhao Jonathan Müller Kevin Caffrey +Kevin Wang Mathis Engelbart Sean DuBois diff --git a/interceptor.go b/interceptor.go index a143fdd0..9955178b 100644 --- a/interceptor.go +++ b/interceptor.go @@ -17,7 +17,6 @@ type Factory interface { // Interceptor can be used to add functionality to you PeerConnections by modifying any incoming/outgoing rtp/rtcp // packets, or sending your own packets as needed. type Interceptor interface { - // BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might // change in the future. The returned method will be called once per packet batch. BindRTCPReader(reader RTCPReader) RTCPReader diff --git a/pkg/nack/retainable_packet.go b/pkg/nack/retainable_packet.go index a27fff2c..20518d1a 100644 --- a/pkg/nack/retainable_packet.go +++ b/pkg/nack/retainable_packet.go @@ -60,8 +60,7 @@ func (m *packetManager) releasePacket(header *rtp.Header, payload *[]byte) { } } -type noOpPacketFactory struct { -} +type noOpPacketFactory struct{} func (f *noOpPacketFactory) NewPacket(header *rtp.Header, payload []byte) (*retainablePacket, error) { return &retainablePacket{ diff --git a/pkg/nada/README.md b/pkg/nada/README.md new file mode 100644 index 00000000..9d5b9897 --- /dev/null +++ b/pkg/nada/README.md @@ -0,0 +1,5 @@ +# [RFC 8698] NADA: A Unified Congestion Control Scheme for Real-Time Media + +Notes: + +* The receiver in this implementation assumes a monotonically ordered sequence of packets. \ No newline at end of file diff --git a/pkg/nada/config.go b/pkg/nada/config.go new file mode 100644 index 00000000..786fc525 --- /dev/null +++ b/pkg/nada/config.go @@ -0,0 +1,107 @@ +package nada + +import "time" + +// Bits represents a unit of one bit. +type Bits uint32 + +// BitsPerSecond represents a unit of one bit per second. +type BitsPerSecond float64 + +const ( + // Kbps represents 1 kbps. + Kbps = BitsPerSecond(1_000) + // Mbps represents 1 Mbps. + Mbps = BitsPerSecond(1_000_000) +) + +// Config represents the configuration of a NADA bandwidth estimator. +type Config struct { + // Weight of priority of the flow + Priority float64 + // Minimum rate of the application supported by the media encoder + MinimumRate BitsPerSecond // RMIN + // Maximum rate of the application supported by media encoder + MaximumRate BitsPerSecond // RMAX + // Reference congestion level + ReferenceCongestionLevel time.Duration // XREF + // Scaling parameter for gradual rate update calculation + Kappa float64 + // Scaling parameter for gradual rate update calculation + Eta float64 + // Upper bound of RTT in gradual rate update calculation + Tau time.Duration + // Target feedback interval + Delta time.Duration + + // Observation window in time for calculating packet summary statistics at receiver + LogWindow time.Duration // LOGWIN + // Threshold for determining queuing delay build up at receiver + QueueingDelayThreshold time.Duration + // Bound on filtering delay + FilteringDelay time.Duration // DFILT + // Upper bound on rate increase ratio for accelerated ramp-up + GammaMax float64 + // Upper bound on self-inflicted queueing delay during ramp up + QueueBound time.Duration // QBOUND + + // Multiplier for self-scaling the expiration threshold of the last observed loss + // (loss_exp) based on measured average loss interval (loss_int) + LossMultiplier float64 // MULTILOSS + // Delay threshold for invoking non-linear warping + DelayThreshold time.Duration // QTH + // Scaling parameter in the exponent of non-linear warping + Lambda float64 + + // Reference packet loss ratio + ReferencePacketLossRatio float64 // PLRREF + // Reference packet marking ratio + ReferencePacketMarkingRatio float64 // PMRREF + // Reference delay penalty for loss when lacket loss ratio is at least PLRREF + ReferenceDelayLoss time.Duration // DLOSS + // Reference delay penalty for ECN marking when packet marking is at PMRREF + ReferenceDelayMarking time.Duration // DMARK + + // Frame rate of incoming video + FrameRate float64 // FRAMERATE + // Scaling parameter for modulating outgoing sending rate + BetaSending float64 + // Scaling parameter for modulating video encoder target rate + BetaVideoEncoder float64 + // Smoothing factor in exponential smoothing of packet loss and marking rate + Alpha float64 +} + +// DefaultConfig returns the default configuration recommended by the specification. +func DefaultConfig() Config { + return Config{ + Priority: 1.0, + MinimumRate: 150 * Kbps, + MaximumRate: 1500 * Kbps, + ReferenceCongestionLevel: 10 * time.Millisecond, + Kappa: 0.5, + Eta: 2.0, + Tau: 500 * time.Millisecond, + Delta: 100 * time.Millisecond, + + LogWindow: 500 * time.Millisecond, + QueueingDelayThreshold: 10 * time.Millisecond, + FilteringDelay: 120 * time.Millisecond, + GammaMax: 0.5, + QueueBound: 50 * time.Millisecond, + + LossMultiplier: 7.0, + DelayThreshold: 50 * time.Millisecond, + Lambda: 0.5, + + ReferencePacketLossRatio: 0.01, + ReferencePacketMarkingRatio: 0.01, + ReferenceDelayLoss: 10 * time.Millisecond, + ReferenceDelayMarking: 2 * time.Millisecond, + + FrameRate: 30.0, + BetaSending: 0.1, + BetaVideoEncoder: 0.1, + Alpha: 0.1, + } +} diff --git a/pkg/nada/ecn/ecn.go b/pkg/nada/ecn/ecn.go new file mode 100644 index 00000000..9d9cd351 --- /dev/null +++ b/pkg/nada/ecn/ecn.go @@ -0,0 +1,23 @@ +// Package ecn provides ExplicitCongestionNotification (ECN) support. +package ecn + +import ( + "errors" + "syscall" +) + +var errNoECN = errors.New("no ECN control message") + +// CheckExplicitCongestionNotification checks if the given oob data includes an ECN bit set. +func CheckExplicitCongestionNotification(oob []byte) (uint8, error) { + ctrlMsgs, err := syscall.ParseSocketControlMessage(oob) + if err != nil { + return 0, err + } + for _, ctrlMsg := range ctrlMsgs { + if ctrlMsg.Header.Type == syscall.IP_TOS { + return (ctrlMsg.Data[0] & 0x3), nil + } + } + return 0, errNoECN +} diff --git a/pkg/nada/ecn/ecn_darwin.go b/pkg/nada/ecn/ecn_darwin.go new file mode 100644 index 00000000..d9aed280 --- /dev/null +++ b/pkg/nada/ecn/ecn_darwin.go @@ -0,0 +1,11 @@ +package ecn + +import ( + "net" +) + +// EnableExplicitCongestionNotification enables ECN on the given connection. +func EnableExplicitCongestionNotification(conn *net.UDPConn) error { + // noop. + return nil +} diff --git a/pkg/nada/ecn/ecn_linux.go b/pkg/nada/ecn/ecn_linux.go new file mode 100644 index 00000000..bc514e13 --- /dev/null +++ b/pkg/nada/ecn/ecn_linux.go @@ -0,0 +1,17 @@ +package ecn + +import ( + "net" + "reflect" + "syscall" +) + +// EnableExplicitCongestionNotification enables ECN on the given connection. +func EnableExplicitCongestionNotification(conn *net.UDPConn) error { + ptrVal := reflect.ValueOf(*conn) + fdmember := reflect.Indirect(ptrVal).FieldByName("fd") + pfdmember := reflect.Indirect(fdmember).FieldByName("pfd") + netfdmember := reflect.Indirect(pfdmember).FieldByName("Sysfd") + fd := int(netfdmember.Int()) + return syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_RECVTOS, 1) +} diff --git a/pkg/nada/nada.go b/pkg/nada/nada.go new file mode 100644 index 00000000..a0b3cc71 --- /dev/null +++ b/pkg/nada/nada.go @@ -0,0 +1,2 @@ +// Package nada provides an implementation of the NADA congestion control algorithm. +package nada diff --git a/pkg/nada/packet_stream.go b/pkg/nada/packet_stream.go new file mode 100644 index 00000000..828ec577 --- /dev/null +++ b/pkg/nada/packet_stream.go @@ -0,0 +1,118 @@ +package nada + +import ( + "errors" + "fmt" + "sync" + "time" +) + +type packet struct { + ts time.Time + seq uint16 + ecn bool + size Bits + queueingDelay bool +} + +// String returns a string representation of the packet. +func (p *packet) String() string { + return fmt.Sprintf("%v@%v", p.seq, p.ts.Nanosecond()%1000) +} + +type packetStream struct { + sync.Mutex + + window time.Duration + packets []*packet + markCount uint16 + totalSize Bits + queueingDelayCount uint16 +} + +func newPacketStream(window time.Duration) *packetStream { + return &packetStream{ + window: window, + } +} + +var errTimeOrder = errors.New("invalid packet timestamp ordering") + +// add writes a packet to the underlying stream. +func (ps *packetStream) add(ts time.Time, seq uint16, ecn bool, size Bits, queueingDelay bool) error { + ps.Lock() + defer ps.Unlock() + + if len(ps.packets) > 0 && ps.packets[len(ps.packets)-1].ts.After(ts) { + return errTimeOrder + } + // check if the packet seq already exists. + for _, p := range ps.packets { + if p.seq == seq { + return errTimeOrder + } + } + ps.packets = append(ps.packets, &packet{ + ts: ts, + seq: seq, + ecn: ecn, + size: size, + queueingDelay: queueingDelay, + }) + if ecn { + ps.markCount++ + } + ps.totalSize += size + if queueingDelay { + ps.queueingDelayCount++ + } + return nil +} + +// prune removes packets that are older than the window and returns the loss and marking rate. +func (ps *packetStream) prune(now time.Time) (loss float64, marking float64, receivingRate BitsPerSecond, hasQueueingDelay bool) { + ps.Lock() + defer ps.Unlock() + + startTS := now.Add(-ps.window) + start := 0 + for ; start < len(ps.packets) && ps.packets[start].ts.Before(startTS); start++ { + // decrement mark count if ecn. + if ps.packets[start].ecn { + ps.markCount-- + } + ps.totalSize -= ps.packets[start].size + if ps.packets[start].queueingDelay { + ps.queueingDelayCount-- + } + } + if start > 0 { + ps.packets = ps.packets[start:] + } + seqs := make([]uint16, len(ps.packets)) + for i, p := range ps.packets { + seqs[i] = p.seq + } + begin, end := getSeqRange(seqs) + loss = 1 - float64(len(ps.packets))/float64(end-begin+1) + marking = float64(ps.markCount) / float64(end-begin+1) + return loss, marking, BitsPerSecond(float64(ps.totalSize) / ps.window.Seconds()), ps.queueingDelayCount > 0 +} + +func getSeqRange(seqs []uint16) (uint16, uint16) { + minDelta := 0 + maxDelta := 0 + seq0 := seqs[0] + for _, seq := range seqs { + delta := int(seq - seq0) + if seq-seq0 >= 16384 { + delta -= (1 << 16) + if delta < minDelta { + minDelta = delta + } + } else if delta > maxDelta { + maxDelta = delta + } + } + return seq0 + uint16(minDelta), seq0 + uint16(maxDelta) +} diff --git a/pkg/nada/receiver.go b/pkg/nada/receiver.go new file mode 100644 index 00000000..6ec00870 --- /dev/null +++ b/pkg/nada/receiver.go @@ -0,0 +1,126 @@ +package nada + +import ( + "math" + "time" +) + +// Receiver represents a receiver of a NADA bandwidth estimator. +type Receiver struct { + config Config + BaselineDelay time.Duration // d_base + EstimatedQueuingDelay time.Duration // d_queue + EstimatedPacketLossRatio float64 + EstimatedPacketECNMarkingRatio float64 + ReceivingRate BitsPerSecond + LastTimestamp time.Time + CurrentTimestamp time.Time + RecommendedRateAdaptionMode RateAdaptionMode + + packetStream *packetStream +} + +// NewReceiver creates a new NADA receiver. +func NewReceiver(now time.Time, config Config) *Receiver { + return &Receiver{ + config: config, + BaselineDelay: time.Duration(1<<63 - 1), + EstimatedPacketLossRatio: 0.0, + EstimatedPacketECNMarkingRatio: 0.0, + ReceivingRate: 0.0, + LastTimestamp: now, + CurrentTimestamp: now, + packetStream: newPacketStream(config.LogWindow), + } +} + +// OnReceiveMediaPacket implements the media receive algorithm. +func (r *Receiver) OnReceiveMediaPacket(now time.Time, sent time.Time, seq uint16, ecn bool, size Bits) error { + // obtain current timestamp t_curr from system clock + r.CurrentTimestamp = now + + // obtain from packet header sending time stamp tSent + tSent := sent + + // obtain one-way delay measurement: dFwd = t_curr - t_sent + dFwd := r.CurrentTimestamp.Sub(tSent) + + // update baseline delay: d_base = min(d_base, d_fwd) + if dFwd < r.BaselineDelay { + r.BaselineDelay = dFwd + } + + // update queuing delay: d_queue = d_fwd - d_base + r.EstimatedQueuingDelay = dFwd - r.BaselineDelay + + if err := r.packetStream.add(now, seq, ecn, size, r.EstimatedQueuingDelay > r.config.QueueingDelayThreshold); err != nil { + return err + } + + pLossInst, pMarkInst, rRecvInst, hasQueueingDelay := r.packetStream.prune(now) + + // update packet loss ratio estimate p_loss + // r.config.α*p_loss_inst + (1-r.config.α)*r.EstimatedPacketLossRatio + r.EstimatedPacketLossRatio = r.config.Alpha*(pLossInst-r.EstimatedPacketLossRatio) + r.EstimatedPacketLossRatio + + // update packet marking ratio estimate p_mark + // r.config.α*p_mark_inst + (1-r.config.α)*r.EstimatedPacketECNMarkingRatio + r.EstimatedPacketECNMarkingRatio = r.config.Alpha*(pMarkInst-r.EstimatedPacketECNMarkingRatio) + r.EstimatedPacketECNMarkingRatio + + // update measurement of receiving rate r_recv + r.ReceivingRate = rRecvInst + + // update recommended rate adaption mode. + if pLossInst == 0 && !hasQueueingDelay { + r.RecommendedRateAdaptionMode = RateAdaptionModeAcceleratedRampUp + } else { + r.RecommendedRateAdaptionMode = RateAdaptionModeGradualUpdate + } + + return nil +} + +// BuildFeedbackReport creates a new feedback packet. +func (r *Receiver) BuildFeedbackReport() *FeedbackReport { + // calculate non-linear warping of delay d_tilde if packet loss exists + equivalentDelay := r.equivalentDelay() + + // calculate current aggregate congestion signal x_curr + aggregatedCongestionSignal := equivalentDelay + + scale(r.config.ReferenceDelayMarking, math.Pow(r.EstimatedPacketECNMarkingRatio/r.config.ReferencePacketMarkingRatio, 2)) + + scale(r.config.ReferenceDelayLoss, math.Pow(r.EstimatedPacketLossRatio/r.config.ReferencePacketLossRatio, 2)) + + // determine mode of rate adaptation for sender: rmode + rmode := r.RecommendedRateAdaptionMode + + // update t_last = t_curr + r.LastTimestamp = r.CurrentTimestamp + + // send feedback containing values of: rmode, x_curr, and r_recv + return &FeedbackReport{ + RecommendedRateAdaptionMode: rmode, + AggregatedCongestionSignal: aggregatedCongestionSignal, + ReceivingRate: r.ReceivingRate, + } +} + +func scale(t time.Duration, x float64) time.Duration { + return time.Duration(float64(t) * x) +} + +// d_tilde computes d_tilde as described by +// +// / d_queue, if d_queue= (1>>15) || xCurr < 0 { + return nil, errInvalidReport + } + binary.BigEndian.PutUint16(rawPacket[0:2], uint16(xCurr)) + if r.ReceivingRate >= (1<<32) || r.ReceivingRate < 0 { + return nil, errInvalidReport + } + binary.BigEndian.PutUint32(rawPacket[2:6], uint32(r.ReceivingRate)) + if r.RecommendedRateAdaptionMode { + rawPacket[0] |= 1 << 7 + } + return rawPacket, nil +} + +// Unmarshal creates a report given a byte slice +func (r *FeedbackReport) Unmarshal(rawPacket []byte) error { + if len(rawPacket) != 6 { + return errInvalidReport + } + r.RecommendedRateAdaptionMode = (rawPacket[0] & (1 << 7)) != 0 + xCurr := binary.BigEndian.Uint16(rawPacket[0:2]) + xCurr &= 0x7FFF + r.AggregatedCongestionSignal = time.Duration(xCurr) * 100 * time.Microsecond + r.ReceivingRate = BitsPerSecond(binary.BigEndian.Uint32(rawPacket[2:6])) + return nil +} diff --git a/pkg/nada/sender.go b/pkg/nada/sender.go new file mode 100644 index 00000000..20418b6c --- /dev/null +++ b/pkg/nada/sender.go @@ -0,0 +1,118 @@ +package nada + +import ( + "math" + "time" +) + +// Sender represents a NADA sender. +type Sender struct { + config Config + ReferenceRate BitsPerSecond // r_ref + SenderEstimatedRoundTripTime time.Duration // rtt + PreviousAggregateCongestionSignal time.Duration // x_prev + LastTimestamp time.Time + CurrentTimestamp time.Time +} + +// NewSender creates a new sender in the NADA estimation pair. +func NewSender(now time.Time, config Config) *Sender { + return &Sender{ + config: config, + ReferenceRate: config.MinimumRate, + SenderEstimatedRoundTripTime: 0, + PreviousAggregateCongestionSignal: 0, + LastTimestamp: now, + CurrentTimestamp: now, + } +} + +// UpdateEstimatedRoundTripTime sets the estimated round trip time from an external source. +// This can be calculated from the sender report and receiver reports on the sender. +// +// See https://github.com/versatica/mediasoup/issues/73 +func (s *Sender) UpdateEstimatedRoundTripTime(rtt time.Duration) { + s.SenderEstimatedRoundTripTime = time.Duration((s.config.Alpha * float64(s.SenderEstimatedRoundTripTime)) + ((1 - s.config.Alpha) * float64(rtt))) +} + +// OnReceiveFeedbackReport updates the sender with the given NADA feedback report. +func (s *Sender) OnReceiveFeedbackReport(now time.Time, report *FeedbackReport) { + // obtain current timestamp from system clock: t_curr + s.CurrentTimestamp = now + + // measure feedback interval: delta = t_curr - t_last + delta := s.CurrentTimestamp.Sub(s.LastTimestamp) + + if report.RecommendedRateAdaptionMode { + // update r_ref following gradual update rules + // + // In gradual update mode, the rate r_ref is updated as: + // + // x_offset = x_curr - PRIO*XREF*RMAX/r_ref (5) + // + // x_diff = x_curr - x_prev (6) + // + // delta x_offset + // r_ref = r_ref - KAPPA*-------*------------*r_ref + // TAU TAU + // + // x_diff + // - KAPPA*ETA*---------*r_ref (7) + // TAU + + xOffset := report.AggregatedCongestionSignal - scale(s.config.ReferenceCongestionLevel, s.config.Priority*float64(s.config.MaximumRate)/float64(s.ReferenceRate)) + xDiff := report.AggregatedCongestionSignal - s.PreviousAggregateCongestionSignal + + s.ReferenceRate = BitsPerSecond(float64(s.ReferenceRate) * + (1 - + (s.config.Kappa * (float64(delta) / float64(s.config.Tau)) * (float64(xOffset) / float64(s.config.Tau))) - + (s.config.Kappa * s.config.Eta * (float64(xDiff) / float64(s.config.Tau))))) + } else { + // update r_ref following accelerated ramp-up rules + // + // In accelerated ramp-up mode, the rate r_ref is updated as follows: + // + // QBOUND + // gamma = min(GAMMA_MAX, ------------------) (3) + // rtt+DELTA+DFILT + // + // r_ref = max(r_ref, (1+gamma) r_recv) (4) + + gamma := math.Min(s.config.GammaMax, float64(s.config.QueueBound)/float64(s.SenderEstimatedRoundTripTime+s.config.Delta+s.config.FilteringDelay)) + s.ReferenceRate = BitsPerSecond(math.Max(float64(s.ReferenceRate), (1+gamma)*float64(report.ReceivingRate))) + } + + // clip rate r_ref within the range of minimum rate (RMIN) and maximum rate (RMAX). + if s.ReferenceRate < s.config.MinimumRate { + s.ReferenceRate = s.config.MinimumRate + } + if s.ReferenceRate > s.config.MaximumRate { + s.ReferenceRate = s.config.MaximumRate + } + + // x_prev = x_curr + s.PreviousAggregateCongestionSignal = report.AggregatedCongestionSignal + + // t_last = t_curr + s.LastTimestamp = s.CurrentTimestamp +} + +// GetTargetRate returns the target rate for the sender. +func (s *Sender) GetTargetRate(bufferLen uint) BitsPerSecond { + // r_diff_v = min(0.05*r_ref, BETA_V*8*buffer_len*FPS). (11) + // r_vin = max(RMIN, r_ref - r_diff_v). (13) + + rDiffV := math.Min(0.05*float64(s.ReferenceRate), s.config.BetaVideoEncoder*8*float64(bufferLen)*(s.config.FrameRate)) + rVin := math.Max(float64(s.config.MinimumRate), float64(s.ReferenceRate)-rDiffV) + return BitsPerSecond(rVin) +} + +// GetSendingRate returns the sending rate for the sender. +func (s *Sender) GetSendingRate(bufferLen uint) BitsPerSecond { + // r_diff_s = min(0.05*r_ref, BETA_S*8*buffer_len*FPS). (12) + // r_send = min(RMAX, r_ref + r_diff_s). (14) + + rDiffS := math.Min(0.05*float64(s.ReferenceRate), s.config.BetaSending*8*float64(bufferLen)*(s.config.FrameRate)) + rSend := math.Min(float64(s.config.MaximumRate), float64(s.ReferenceRate)+rDiffS) + return BitsPerSecond(rSend) +} diff --git a/pkg/nada/sender_receiver_test.go b/pkg/nada/sender_receiver_test.go new file mode 100644 index 00000000..57009e31 --- /dev/null +++ b/pkg/nada/sender_receiver_test.go @@ -0,0 +1,52 @@ +package nada + +import ( + "log" + "testing" + "time" +) + +func TestSenderReceiver_Simple(t *testing.T) { + t0 := time.Now() + sender := NewSender(t0, DefaultConfig()) + receiver := NewReceiver(t0, DefaultConfig()) + + // send some data at 1 Mbps. + seq := uint16(0) + for ; seq < uint16(2_000); seq++ { + t1 := t0.Add(time.Duration(seq) * time.Millisecond) + t2 := t1.Add(25 * time.Millisecond) + if err := receiver.OnReceiveMediaPacket(t2, t1, seq, false, 1000); err != nil { + t.Fatal(err) + } + + if seq%100 == 0 { + report := receiver.BuildFeedbackReport() + log.Printf("%v", report) + sender.OnReceiveFeedbackReport(t2, report) + + // get the estimated bandwidth. + log.Printf("%d %v %v", seq, sender.GetSendingRate(0), sender.GetTargetRate(0)) + } + } + + // then introduce 25% loss. + for ; seq < uint16(4_000); seq++ { + t1 := t0.Add(time.Duration(seq) * time.Millisecond) + t2 := t1.Add(25 * time.Millisecond) + if seq%4 != 0 { + if err := receiver.OnReceiveMediaPacket(t2, t1, seq, false, 1000); err != nil { + t.Fatal(err) + } + } + + if seq%100 == 0 { + report := receiver.BuildFeedbackReport() + log.Printf("%v", report) + sender.OnReceiveFeedbackReport(t2, report) + + // get the estimated bandwidth. + log.Printf("%d %v %v", seq, sender.GetSendingRate(0), sender.GetTargetRate(0)) + } + } +}