Commit 5e845eb4 authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Send FIR if initial keyframe is too old.

parent a189e0ad
...@@ -464,7 +464,7 @@ func (up *rtpUpConnection) sendPLI(track *rtpUpTrack) error { ...@@ -464,7 +464,7 @@ func (up *rtpUpConnection) sendPLI(track *rtpUpTrack) error {
} }
last := atomic.LoadUint64(&track.lastPLI) last := atomic.LoadUint64(&track.lastPLI)
now := rtptime.Jiffies() now := rtptime.Jiffies()
if now >= last && now-last < rtptime.JiffiesPerSec/5 { if now >= last && now-last < rtptime.JiffiesPerSec/2 {
return ErrRateLimited return ErrRateLimited
} }
atomic.StoreUint64(&track.lastPLI, now) atomic.StoreUint64(&track.lastPLI, now)
...@@ -492,7 +492,7 @@ func (up *rtpUpConnection) sendFIR(track *rtpUpTrack, increment bool) error { ...@@ -492,7 +492,7 @@ func (up *rtpUpConnection) sendFIR(track *rtpUpTrack, increment bool) error {
} }
last := atomic.LoadUint64(&track.lastFIR) last := atomic.LoadUint64(&track.lastFIR)
now := rtptime.Jiffies() now := rtptime.Jiffies()
if now >= last && now-last < rtptime.JiffiesPerSec/5 { if now >= last && now-last < rtptime.JiffiesPerSec/2 {
return ErrRateLimited return ErrRateLimited
} }
atomic.StoreUint64(&track.lastFIR, now) atomic.StoreUint64(&track.lastFIR, now)
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"sfu/conn" "sfu/conn"
"sfu/packetcache" "sfu/packetcache"
...@@ -208,17 +209,12 @@ func (writer *rtpWriter) add(track conn.DownTrack, add bool, max int) error { ...@@ -208,17 +209,12 @@ func (writer *rtpWriter) add(track conn.DownTrack, add bool, max int) error {
} }
} }
func sendKeyframe(track conn.DownTrack, cache *packetcache.Cache) { func sendKeyframe(kf []uint16, track conn.DownTrack, cache *packetcache.Cache) {
_, _, kf := cache.Keyframe()
if len(kf) == 0 {
return
}
buf := make([]byte, packetcache.BufSize) buf := make([]byte, packetcache.BufSize)
var packet rtp.Packet var packet rtp.Packet
for _, seqno := range kf { for _, seqno := range kf {
bytes := cache.Get(seqno, buf) bytes := cache.Get(seqno, buf)
if(bytes == 0) { if bytes == 0 {
return return
} }
err := packet.Unmarshal(buf[:bytes]) err := packet.Unmarshal(buf[:bytes])
...@@ -237,13 +233,16 @@ func sendKeyframe(track conn.DownTrack, cache *packetcache.Cache) { ...@@ -237,13 +233,16 @@ func sendKeyframe(track conn.DownTrack, cache *packetcache.Cache) {
func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) { func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) {
defer close(writer.done) defer close(writer.done)
codec := track.track.Codec().Name
buf := make([]byte, packetcache.BufSize) buf := make([]byte, packetcache.BufSize)
var packet rtp.Packet var packet rtp.Packet
local := make([]conn.DownTrack, 0) local := make([]conn.DownTrack, 0)
// reset whenever a new track is inserted // 3 means we want a new keyframe, 2 means we already sent FIR but
firSent := false // haven't gotten a keyframe yet, 1 means we want a PLI.
kfNeeded := 0
for { for {
select { select {
...@@ -258,7 +257,6 @@ func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) { ...@@ -258,7 +257,6 @@ func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) {
action.ch <- nil action.ch <- nil
close(action.ch) close(action.ch)
firSent = false
track.mu.Lock() track.mu.Lock()
ntp := track.srNTPTime ntp := track.srNTPTime
rtp := track.srRTPTime rtp := track.srRTPTime
...@@ -270,7 +268,26 @@ func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) { ...@@ -270,7 +268,26 @@ func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) {
if cname != "" { if cname != "" {
action.track.SetCname(cname) action.track.SetCname(cname)
} }
go sendKeyframe(action.track, track.cache)
found, _, lts := track.cache.Last()
kts, _, kf := track.cache.Keyframe()
if codec == webrtc.VP8 && found && len(kf) > 0 {
if ((lts-kts)&0x80000000) != 0 ||
lts-kts < 2*90000 {
// we got a recent keyframe
go sendKeyframe(
kf,
action.track,
track.cache,
)
} else {
// Request a new keyframe
kfNeeded = 3
}
} else {
// no keyframe yet, one should
// arrive soon. Do nothing.
}
} else { } else {
found := false found := false
for i, t := range local { for i, t := range local {
...@@ -306,12 +323,11 @@ func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) { ...@@ -306,12 +323,11 @@ func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) {
continue continue
} }
kfNeeded := false
for _, l := range local { for _, l := range local {
err := l.WriteRTP(&packet) err := l.WriteRTP(&packet)
if err != nil { if err != nil {
if err == conn.ErrKeyframeNeeded { if err == conn.ErrKeyframeNeeded {
kfNeeded = true kfNeeded = 1
} else { } else {
continue continue
} }
...@@ -319,12 +335,27 @@ func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) { ...@@ -319,12 +335,27 @@ func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) {
l.Accumulate(uint32(bytes)) l.Accumulate(uint32(bytes))
} }
if kfNeeded { if kfNeeded > 0 {
err := up.sendFIR(track, !firSent) kf := false
switch codec {
case webrtc.VP8:
kf = isVP8Keyframe(&packet)
default:
kf = true
}
if kf {
kfNeeded = 0
}
}
if kfNeeded >= 2 {
err := up.sendFIR(track, kfNeeded >= 3)
if err == ErrUnsupportedFeedback { if err == ErrUnsupportedFeedback {
up.sendPLI(track) up.sendPLI(track)
} }
firSent = true kfNeeded = 2
} else if kfNeeded > 0 {
up.sendPLI(track)
} }
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment