Skip to content

Commit

Permalink
Add playout delay header interceptor
Browse files Browse the repository at this point in the history
This interceptor adds the playout delay header extension following
https://webrtc.googlesource.com/src/+/refs/heads/main/docs/native-code/rtp-hdrext/playout-delay
  • Loading branch information
kevmo314 committed Jul 29, 2022
1 parent a82b843 commit cc23b0e
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 0 deletions.
1 change: 1 addition & 0 deletions AUTHORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ boks1971 <[email protected]>
David Zhao <[email protected]>
Jonathan Müller <[email protected]>
Kevin Caffrey <[email protected]>
Kevin Wang <[email protected]>
Mathis Engelbart <[email protected]>
Sean DuBois <[email protected]>
60 changes: 60 additions & 0 deletions pkg/playoutdelay/header_extension_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package playoutdelay

import (
"github.com/pion/interceptor"
"github.com/pion/rtp"
"time"
)

// HeaderExtensionInterceptorFactory is a interceptor.Factory for a HeaderExtensionInterceptor
type HeaderExtensionInterceptorFactory struct{}

// NewInterceptor constructs a new HeaderExtensionInterceptor
func (h *HeaderExtensionInterceptorFactory) NewInterceptor(id string, minDelay, maxDelay time.Duration) (interceptor.Interceptor, error) {
if minDelay.Milliseconds() < 0 || minDelay.Milliseconds() > 40950 || maxDelay.Milliseconds() < 0 || maxDelay.Milliseconds() > 40950 {
return nil, errPlayoutDelayInvalidValue
}
return &HeaderExtensionInterceptor{minDelay: uint16(minDelay.Milliseconds() / 10), maxDelay: uint16(maxDelay.Milliseconds() / 10)}, nil
}

// NewHeaderExtensionInterceptor returns a HeaderExtensionInterceptorFactory
func NewHeaderExtensionInterceptor() (*HeaderExtensionInterceptorFactory, error) {
return &HeaderExtensionInterceptorFactory{}, nil
}

// HeaderExtensionInterceptor adds transport wide sequence numbers as header extension to each RTP packet
type HeaderExtensionInterceptor struct {
interceptor.NoOp
minDelay, maxDelay uint16
}

const playoutDelayURI = "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay"

// BindLocalStream returns a writer that adds a rtp.TransportCCExtension
// header with increasing sequence numbers to each outgoing packet.
func (h *HeaderExtensionInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
var hdrExtID uint8
for _, e := range info.RTPHeaderExtensions {
if e.URI == playoutDelayURI {
hdrExtID = uint8(e.ID)
break
}
}
if hdrExtID == 0 { // Don't add header extension if ID is 0, because 0 is an invalid extension ID
return writer
}
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
tcc, err := (&PlayoutDelayExtension{
minDelay: h.minDelay,
maxDelay: h.maxDelay,
}).Marshal()
if err != nil {
return 0, err
}
err = header.SetExtension(hdrExtID, tcc)
if err != nil {
return 0, err
}
return writer.Write(header, payload, attributes)
})
}
68 changes: 68 additions & 0 deletions pkg/playoutdelay/header_extension_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package playoutdelay

import (
"sync"
"testing"
"time"

"github.com/pion/interceptor"
"github.com/pion/interceptor/internal/test"
"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)

func TestHeaderExtensionInterceptor(t *testing.T) {
t.Run("add playout delay to each packet", func(t *testing.T) {
factory, err := NewHeaderExtensionInterceptor()
assert.NoError(t, err)

inter, err := factory.NewInterceptor("", 10*time.Millisecond, 20*time.Millisecond)
assert.NoError(t, err)

pChan := make(chan *rtp.Packet, 10*5)
go func() {
// start some parallel streams using the same interceptor to test for race conditions
var wg sync.WaitGroup
num := 10
wg.Add(num)
for i := 0; i < num; i++ {
go func(ch chan *rtp.Packet, id uint16) {
stream := test.NewMockStream(&interceptor.StreamInfo{RTPHeaderExtensions: []interceptor.RTPHeaderExtension{
{
URI: playoutDelayURI,
ID: 1,
},
}}, inter)
defer func() {
wg.Done()
assert.NoError(t, stream.Close())
}()

for _, seqNum := range []uint16{id * 1, id * 2, id * 3, id * 4, id * 5} {
assert.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
select {
case p := <-stream.WrittenRTP():
assert.Equal(t, seqNum, p.SequenceNumber)
ch <- p
case <-time.After(10 * time.Millisecond):
panic("written rtp packet not found")
}
}
}(pChan, uint16(i+1))
}
wg.Wait()
close(pChan)
}()

for p := range pChan {
// Can't check for increasing transport cc sequence number, since we can't ensure ordering between the streams
// on pChan is same as in the interceptor, but at least make sure each packet has a seq nr.
extensionHeader := p.GetExtension(1)
ext := &PlayoutDelayExtension{}
err = ext.Unmarshal(extensionHeader)
assert.NoError(t, err)
assert.Equal(t, uint16(1), ext.minDelay)
assert.Equal(t, uint16(2), ext.maxDelay)
}
})
}
52 changes: 52 additions & 0 deletions pkg/playoutdelay/playout_delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Package playoutdelay implements the playout delay header extension.
package playoutdelay

import (
"encoding/binary"
"errors"
)

const (
// transport-wide sequence
playoutDelayExtensionSize = 3
playoutDelayMaxValue = (1 << 12) - 1
)

var (
errPlayoutDelayInvalidValue = errors.New("invalid playout delay value")
errTooSmall = errors.New("playout delay header extension too short")
)

// PlayoutDelayExtension is a extension payload format in
// https://webrtc.googlesource.com/src/+/refs/heads/main/docs/native-code/rtp-hdrext/playout-delay
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ID | len=2 | MIN delay | MAX delay |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
type PlayoutDelayExtension struct {
minDelay, maxDelay uint16
}

// Marshal serializes the members to buffer
func (p PlayoutDelayExtension) Marshal() ([]byte, error) {
if p.minDelay >= playoutDelayMaxValue || p.maxDelay >= playoutDelayMaxValue {
return nil, errPlayoutDelayInvalidValue
}

return []byte{
byte(p.minDelay >> 4),
byte(p.minDelay<<4) | byte(p.maxDelay>>8),
byte(p.maxDelay),
}, nil
}

// Unmarshal parses the passed byte slice and stores the result in the members
func (p *PlayoutDelayExtension) Unmarshal(rawData []byte) error {
if len(rawData) < playoutDelayExtensionSize {
return errTooSmall
}
p.minDelay = binary.BigEndian.Uint16(rawData[0:2]) >> 4
p.maxDelay = binary.BigEndian.Uint16(rawData[1:3]) & 0x0FFF
return nil
}

0 comments on commit cc23b0e

Please sign in to comment.