diff --git a/pkg/core/track.go b/pkg/core/track.go index 1faae309..72e47074 100644 --- a/pkg/core/track.go +++ b/pkg/core/track.go @@ -113,7 +113,12 @@ func NewSender(media *Media, codec *Codec) *Sender { type HandlerFunc func(packet *rtp.Packet) func (s *Sender) HandleRTP(track *Receiver) { - bufferSize := 100 + s.Bind(track) + go s.worker(track) +} + +func (s *Sender) Bind(track *Receiver) { + var bufferSize uint16 if GetKind(track.Codec.Name) == KindVideo { if track.Codec.IsRTP() { @@ -123,6 +128,8 @@ func (s *Sender) HandleRTP(track *Receiver) { } else { bufferSize = 50 } + } else { + bufferSize = 100 } buffer := make(chan *rtp.Packet, bufferSize) @@ -133,28 +140,43 @@ func (s *Sender) HandleRTP(track *Receiver) { } track.senders[s] = buffer track.mu.Unlock() + s.mu.Lock() s.receivers = append(s.receivers, track) s.mu.Unlock() +} - go func() { - // read packets from buffer channel until it will be closed +func (s *Sender) worker(track *Receiver) { + track.mu.Lock() + buffer := track.senders[s] + track.mu.Unlock() + + // read packets from buffer channel until it will be closed + if buffer != nil { for packet := range buffer { s.bytes += len(packet.Payload) s.Handler(packet) } + } - // remove current receiver from list - // it can only happen when receiver close buffer channel - s.mu.Lock() - for i, receiver := range s.receivers { - if receiver == track { - s.receivers = append(s.receivers[:i], s.receivers[i+1:]...) - break - } + // remove current receiver from list + // it can only happen when receiver close buffer channel + s.mu.Lock() + for i, receiver := range s.receivers { + if receiver == track { + s.receivers = append(s.receivers[:i], s.receivers[i+1:]...) + break } - s.mu.Unlock() - }() + } + s.mu.Unlock() +} + +func (s *Sender) Start() { + s.mu.Lock() + for _, track := range s.receivers { + go s.worker(track) + } + s.mu.Unlock() } func (s *Sender) Close() { diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/conn.go index 64835353..0e10874e 100644 --- a/pkg/webrtc/conn.go +++ b/pkg/webrtc/conn.go @@ -120,6 +120,10 @@ func NewConn(pc *webrtc.PeerConnection) *Conn { c.Fire(state) switch state { + case webrtc.PeerConnectionStateConnected: + for _, sender := range c.senders { + sender.Start() + } case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed: // disconnect event comes earlier, than failed // but it comes only for success connections diff --git a/pkg/webrtc/consumer.go b/pkg/webrtc/consumer.go index 784b93fe..9d96ef59 100644 --- a/pkg/webrtc/consumer.go +++ b/pkg/webrtc/consumer.go @@ -20,7 +20,7 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv for _, sender := range c.senders { if sender.Codec == codec { - sender.HandleRTP(track) + sender.Bind(track) return nil } } @@ -77,7 +77,7 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv sender.Handler = pcm.RepackG711(false, sender.Handler) } - sender.HandleRTP(track) + sender.Bind(track) c.senders = append(c.senders, sender) return nil