Commit 5e130122 authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Forward NACKs to sender in case of double loss.

We already send NACKs when a packet is missing.  Under high packet loss,
however, the recovery packet might get lost two.  Forward receiver NACKs
to the sender, but only after a delay and after checking that the packet
has not arrived in the meantime.
parent 962c675d
...@@ -32,6 +32,7 @@ type UpTrack interface { ...@@ -32,6 +32,7 @@ type UpTrack interface {
Codec() *webrtc.RTPCodec Codec() *webrtc.RTPCodec
// get a recent packet. Returns 0 if the packet is not in cache. // get a recent packet. Returns 0 if the packet is not in cache.
GetRTP(seqno uint16, result []byte) uint16 GetRTP(seqno uint16, result []byte) uint16
Nack(conn Up, seqnos []uint16) error
} }
// Type Down represents a connection in the server to client direction. // Type Down represents a connection in the server to client direction.
......
...@@ -190,10 +190,11 @@ type rtpUpTrack struct { ...@@ -190,10 +190,11 @@ type rtpUpTrack struct {
mu sync.Mutex mu sync.Mutex
cname string cname string
local []conn.DownTrack
srTime uint64 srTime uint64
srNTPTime uint64 srNTPTime uint64
srRTPTime uint32 srRTPTime uint32
local []conn.DownTrack
bufferedNACKs []uint16
} }
type localTrackAction struct { type localTrackAction struct {
...@@ -538,13 +539,15 @@ func sendNACK(pc *webrtc.PeerConnection, ssrc uint32, first uint16, bitmap uint1 ...@@ -538,13 +539,15 @@ func sendNACK(pc *webrtc.PeerConnection, ssrc uint32, first uint16, bitmap uint1
return pc.WriteRTCP([]rtcp.Packet{packet}) return pc.WriteRTCP([]rtcp.Packet{packet})
} }
func sendRecovery(p *rtcp.TransportLayerNack, track *rtpDownTrack) { func gotNACK(conn *rtpDownConnection, track *rtpDownTrack, p *rtcp.TransportLayerNack) {
var unhandled []uint16
var packet rtp.Packet var packet rtp.Packet
buf := make([]byte, packetcache.BufSize) buf := make([]byte, packetcache.BufSize)
for _, nack := range p.Nacks { for _, nack := range p.Nacks {
for _, seqno := range nack.PacketList() { for _, seqno := range nack.PacketList() {
l := track.remote.GetRTP(seqno, buf) l := track.remote.GetRTP(seqno, buf)
if l == 0 { if l == 0 {
unhandled = append(unhandled, seqno)
continue continue
} }
err := packet.Unmarshal(buf[:l]) err := packet.Unmarshal(buf[:l])
...@@ -559,6 +562,38 @@ func sendRecovery(p *rtcp.TransportLayerNack, track *rtpDownTrack) { ...@@ -559,6 +562,38 @@ func sendRecovery(p *rtcp.TransportLayerNack, track *rtpDownTrack) {
track.rate.Accumulate(uint32(l)) track.rate.Accumulate(uint32(l))
} }
} }
if len(unhandled) == 0 {
return
}
track.remote.Nack(conn.remote, unhandled)
}
func (track *rtpUpTrack) Nack(conn conn.Up, nacks []uint16) error {
track.mu.Lock()
defer track.mu.Unlock()
doit := len(track.bufferedNACKs) == 0
outer:
for _, nack := range nacks {
for _, seqno := range track.bufferedNACKs {
if seqno == nack {
continue outer
}
}
track.bufferedNACKs = append(track.bufferedNACKs, nack)
}
if doit {
up, ok := conn.(*rtpUpConnection)
if !ok {
log.Printf("Nack: unexpected type %T", conn)
return errors.New("unexpected connection type")
}
go nackWriter(up, track)
}
return nil
} }
func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPReceiver) { func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPReceiver) {
...@@ -938,7 +973,7 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT ...@@ -938,7 +973,7 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT
} }
} }
case *rtcp.TransportLayerNack: case *rtcp.TransportLayerNack:
sendRecovery(p, track) gotNACK(conn, track, p)
} }
} }
} }
......
...@@ -3,6 +3,7 @@ package rtpconn ...@@ -3,6 +3,7 @@ package rtpconn
import ( import (
"errors" "errors"
"log" "log"
"sort"
"time" "time"
"github.com/pion/rtp" "github.com/pion/rtp"
...@@ -360,3 +361,55 @@ func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) { ...@@ -360,3 +361,55 @@ func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) {
} }
} }
} }
// nackWriter is called when bufferedNACKs becomes non-empty. It decides
// which nacks to ship out.
func nackWriter(conn *rtpUpConnection, track *rtpUpTrack) {
// a client might send us a NACK for a packet that has already
// been nacked by the reader loop. Give recovery a chance.
time.Sleep(100 * time.Millisecond)
track.mu.Lock()
nacks := track.bufferedNACKs
track.bufferedNACKs = nil
track.mu.Unlock()
// drop any nacks before the last keyframe
var cutoff uint16
found, seqno, _ := track.cache.KeyframeSeqno()
if found {
cutoff = seqno
} else {
last, lastSeqno, _ := track.cache.Last()
if !last {
// NACK on a fresh track? Give up.
return
}
// no keyframe, use an arbitrary cutoff
cutoff = lastSeqno - 256
}
i := 0
for i < len(nacks) {
if ((nacks[i] - cutoff) & 0x8000) != 0 {
// earlier than the cutoff, drop
nacks = append(nacks[:i], nacks[i+1:]...)
continue
}
l := track.cache.Get(nacks[i], nil)
if l > 0 {
// the packet arrived in the meantime
nacks = append(nacks[:i], nacks[i+1:]...)
continue
}
i++
}
sort.Slice(nacks, func(i, j int) bool {
return nacks[i]-cutoff < nacks[j]-cutoff
})
for _, nack := range nacks {
conn.sendNACK(track, nack, 0)
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment