Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NADA implementation #117

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ boks1971 <[email protected]>
David Zhao <[email protected]>
Jonathan Müller <[email protected]>
Kevin Caffrey <[email protected]>
Kevin Wang <[email protected]>
Mathis Engelbart <[email protected]>
Sean DuBois <[email protected]>
1 change: 0 additions & 1 deletion interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type Factory interface {
// Interceptor can be used to add functionality to you PeerConnections by modifying any incoming/outgoing rtp/rtcp
// packets, or sending your own packets as needed.
type Interceptor interface {

// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
// change in the future. The returned method will be called once per packet batch.
BindRTCPReader(reader RTCPReader) RTCPReader
Expand Down
3 changes: 1 addition & 2 deletions pkg/nack/retainable_packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func (m *packetManager) releasePacket(header *rtp.Header, payload *[]byte) {
}
}

type noOpPacketFactory struct {
}
type noOpPacketFactory struct{}

func (f *noOpPacketFactory) NewPacket(header *rtp.Header, payload []byte) (*retainablePacket, error) {
return &retainablePacket{
Expand Down
5 changes: 5 additions & 0 deletions pkg/nada/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# [RFC 8698] NADA: A Unified Congestion Control Scheme for Real-Time Media

Notes:

* The receiver in this implementation assumes a monotonically ordered sequence of packets.
107 changes: 107 additions & 0 deletions pkg/nada/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package nada

import "time"

// Bits represents a unit of one bit.
type Bits uint32

// BitsPerSecond represents a unit of one bit per second.
type BitsPerSecond float64

const (
// Kbps represents 1 kbps.
Kbps = BitsPerSecond(1_000)
// Mbps represents 1 Mbps.
Mbps = BitsPerSecond(1_000_000)
)

// Config represents the configuration of a NADA bandwidth estimator.
type Config struct {
// Weight of priority of the flow
Priority float64
// Minimum rate of the application supported by the media encoder
MinimumRate BitsPerSecond // RMIN
// Maximum rate of the application supported by media encoder
MaximumRate BitsPerSecond // RMAX
// Reference congestion level
ReferenceCongestionLevel time.Duration // XREF
// Scaling parameter for gradual rate update calculation
Kappa float64
// Scaling parameter for gradual rate update calculation
Eta float64
// Upper bound of RTT in gradual rate update calculation
Tau time.Duration
// Target feedback interval
Delta time.Duration

// Observation window in time for calculating packet summary statistics at receiver
LogWindow time.Duration // LOGWIN
// Threshold for determining queuing delay build up at receiver
QueueingDelayThreshold time.Duration
// Bound on filtering delay
FilteringDelay time.Duration // DFILT
// Upper bound on rate increase ratio for accelerated ramp-up
GammaMax float64
// Upper bound on self-inflicted queueing delay during ramp up
QueueBound time.Duration // QBOUND

// Multiplier for self-scaling the expiration threshold of the last observed loss
// (loss_exp) based on measured average loss interval (loss_int)
LossMultiplier float64 // MULTILOSS
// Delay threshold for invoking non-linear warping
DelayThreshold time.Duration // QTH
// Scaling parameter in the exponent of non-linear warping
Lambda float64

// Reference packet loss ratio
ReferencePacketLossRatio float64 // PLRREF
// Reference packet marking ratio
ReferencePacketMarkingRatio float64 // PMRREF
// Reference delay penalty for loss when lacket loss ratio is at least PLRREF
ReferenceDelayLoss time.Duration // DLOSS
// Reference delay penalty for ECN marking when packet marking is at PMRREF
ReferenceDelayMarking time.Duration // DMARK

// Frame rate of incoming video
FrameRate float64 // FRAMERATE
// Scaling parameter for modulating outgoing sending rate
BetaSending float64
// Scaling parameter for modulating video encoder target rate
BetaVideoEncoder float64
// Smoothing factor in exponential smoothing of packet loss and marking rate
Alpha float64
}

// DefaultConfig returns the default configuration recommended by the specification.
func DefaultConfig() Config {
return Config{
Priority: 1.0,
MinimumRate: 150 * Kbps,
MaximumRate: 1500 * Kbps,
ReferenceCongestionLevel: 10 * time.Millisecond,
Kappa: 0.5,
Eta: 2.0,
Tau: 500 * time.Millisecond,
Delta: 100 * time.Millisecond,

LogWindow: 500 * time.Millisecond,
QueueingDelayThreshold: 10 * time.Millisecond,
FilteringDelay: 120 * time.Millisecond,
GammaMax: 0.5,
QueueBound: 50 * time.Millisecond,

LossMultiplier: 7.0,
DelayThreshold: 50 * time.Millisecond,
Lambda: 0.5,

ReferencePacketLossRatio: 0.01,
ReferencePacketMarkingRatio: 0.01,
ReferenceDelayLoss: 10 * time.Millisecond,
ReferenceDelayMarking: 2 * time.Millisecond,

FrameRate: 30.0,
BetaSending: 0.1,
BetaVideoEncoder: 0.1,
Alpha: 0.1,
}
}
23 changes: 23 additions & 0 deletions pkg/nada/ecn/ecn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Package ecn provides ExplicitCongestionNotification (ECN) support.
package ecn

import (
"errors"
"syscall"
)

var errNoECN = errors.New("no ECN control message")

// CheckExplicitCongestionNotification checks if the given oob data includes an ECN bit set.
func CheckExplicitCongestionNotification(oob []byte) (uint8, error) {
ctrlMsgs, err := syscall.ParseSocketControlMessage(oob)
if err != nil {
return 0, err
}
for _, ctrlMsg := range ctrlMsgs {
if ctrlMsg.Header.Type == syscall.IP_TOS {
return (ctrlMsg.Data[0] & 0x3), nil
}
}
return 0, errNoECN
}
11 changes: 11 additions & 0 deletions pkg/nada/ecn/ecn_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ecn

import (
"net"
)

// EnableExplicitCongestionNotification enables ECN on the given connection.
func EnableExplicitCongestionNotification(conn *net.UDPConn) error {
// noop.
return nil
}
17 changes: 17 additions & 0 deletions pkg/nada/ecn/ecn_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ecn

import (
"net"
"reflect"
"syscall"
)

// EnableExplicitCongestionNotification enables ECN on the given connection.
func EnableExplicitCongestionNotification(conn *net.UDPConn) error {
ptrVal := reflect.ValueOf(*conn)
fdmember := reflect.Indirect(ptrVal).FieldByName("fd")
pfdmember := reflect.Indirect(fdmember).FieldByName("pfd")
netfdmember := reflect.Indirect(pfdmember).FieldByName("Sysfd")
fd := int(netfdmember.Int())
return syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_RECVTOS, 1)
}
2 changes: 2 additions & 0 deletions pkg/nada/nada.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package nada provides an implementation of the NADA congestion control algorithm.
package nada
118 changes: 118 additions & 0 deletions pkg/nada/packet_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package nada

import (
"errors"
"fmt"
"sync"
"time"
)

type packet struct {
ts time.Time
seq uint16
ecn bool
size Bits
queueingDelay bool
}

// String returns a string representation of the packet.
func (p *packet) String() string {
return fmt.Sprintf("%v@%v", p.seq, p.ts.Nanosecond()%1000)
}

type packetStream struct {
sync.Mutex

window time.Duration
packets []*packet
markCount uint16
totalSize Bits
queueingDelayCount uint16
}

func newPacketStream(window time.Duration) *packetStream {
return &packetStream{
window: window,
}
}

var errTimeOrder = errors.New("invalid packet timestamp ordering")

// add writes a packet to the underlying stream.
func (ps *packetStream) add(ts time.Time, seq uint16, ecn bool, size Bits, queueingDelay bool) error {
ps.Lock()
defer ps.Unlock()

if len(ps.packets) > 0 && ps.packets[len(ps.packets)-1].ts.After(ts) {
return errTimeOrder
}
// check if the packet seq already exists.
for _, p := range ps.packets {
if p.seq == seq {
return errTimeOrder
}
}
ps.packets = append(ps.packets, &packet{
ts: ts,
seq: seq,
ecn: ecn,
size: size,
queueingDelay: queueingDelay,
})
if ecn {
ps.markCount++
}
ps.totalSize += size
if queueingDelay {
ps.queueingDelayCount++
}
return nil
}

// prune removes packets that are older than the window and returns the loss and marking rate.
func (ps *packetStream) prune(now time.Time) (loss float64, marking float64, receivingRate BitsPerSecond, hasQueueingDelay bool) {
ps.Lock()
defer ps.Unlock()

startTS := now.Add(-ps.window)
start := 0
for ; start < len(ps.packets) && ps.packets[start].ts.Before(startTS); start++ {
// decrement mark count if ecn.
if ps.packets[start].ecn {
ps.markCount--
}
ps.totalSize -= ps.packets[start].size
if ps.packets[start].queueingDelay {
ps.queueingDelayCount--
}
}
if start > 0 {
ps.packets = ps.packets[start:]
}
seqs := make([]uint16, len(ps.packets))
for i, p := range ps.packets {
seqs[i] = p.seq
}
begin, end := getSeqRange(seqs)
loss = 1 - float64(len(ps.packets))/float64(end-begin+1)
marking = float64(ps.markCount) / float64(end-begin+1)
return loss, marking, BitsPerSecond(float64(ps.totalSize) / ps.window.Seconds()), ps.queueingDelayCount > 0
}

func getSeqRange(seqs []uint16) (uint16, uint16) {
minDelta := 0
maxDelta := 0
seq0 := seqs[0]
for _, seq := range seqs {
delta := int(seq - seq0)
if seq-seq0 >= 16384 {
delta -= (1 << 16)
if delta < minDelta {
minDelta = delta
}
} else if delta > maxDelta {
maxDelta = delta
}
}
return seq0 + uint16(minDelta), seq0 + uint16(maxDelta)
}
Loading