Skip to content

Commit

Permalink
Add instant start for WebRTC consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed May 14, 2024
1 parent 32e0ee4 commit a51156c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
48 changes: 35 additions & 13 deletions pkg/core/track.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,12 @@ func NewSender(media *Media, codec *Codec) *Sender {
type HandlerFunc func(packet *rtp.Packet)

func (s *Sender) HandleRTP(track *Receiver) {
bufferSize := 100
s.Bind(track)
go s.worker(track)
}

func (s *Sender) Bind(track *Receiver) {
var bufferSize uint16

if GetKind(track.Codec.Name) == KindVideo {
if track.Codec.IsRTP() {
Expand All @@ -123,6 +128,8 @@ func (s *Sender) HandleRTP(track *Receiver) {
} else {
bufferSize = 50
}
} else {
bufferSize = 100
}

buffer := make(chan *rtp.Packet, bufferSize)
Expand All @@ -133,28 +140,43 @@ func (s *Sender) HandleRTP(track *Receiver) {
}
track.senders[s] = buffer
track.mu.Unlock()

s.mu.Lock()
s.receivers = append(s.receivers, track)
s.mu.Unlock()
}

go func() {
// read packets from buffer channel until it will be closed
func (s *Sender) worker(track *Receiver) {
track.mu.Lock()
buffer := track.senders[s]
track.mu.Unlock()

// read packets from buffer channel until it will be closed
if buffer != nil {
for packet := range buffer {
s.bytes += len(packet.Payload)
s.Handler(packet)
}
}

// remove current receiver from list
// it can only happen when receiver close buffer channel
s.mu.Lock()
for i, receiver := range s.receivers {
if receiver == track {
s.receivers = append(s.receivers[:i], s.receivers[i+1:]...)
break
}
// remove current receiver from list
// it can only happen when receiver close buffer channel
s.mu.Lock()
for i, receiver := range s.receivers {
if receiver == track {
s.receivers = append(s.receivers[:i], s.receivers[i+1:]...)
break
}
s.mu.Unlock()
}()
}
s.mu.Unlock()
}

func (s *Sender) Start() {
s.mu.Lock()
for _, track := range s.receivers {
go s.worker(track)
}
s.mu.Unlock()
}

func (s *Sender) Close() {
Expand Down
4 changes: 4 additions & 0 deletions pkg/webrtc/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
c.Fire(state)

switch state {
case webrtc.PeerConnectionStateConnected:
for _, sender := range c.senders {
sender.Start()
}
case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed:
// disconnect event comes earlier, than failed
// but it comes only for success connections
Expand Down
4 changes: 2 additions & 2 deletions pkg/webrtc/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv

for _, sender := range c.senders {
if sender.Codec == codec {
sender.HandleRTP(track)
sender.Bind(track)
return nil
}
}
Expand Down Expand Up @@ -77,7 +77,7 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv
sender.Handler = pcm.RepackG711(false, sender.Handler)
}

sender.HandleRTP(track)
sender.Bind(track)

c.senders = append(c.senders, sender)
return nil
Expand Down

0 comments on commit a51156c

Please sign in to comment.