Commit aa876bcd authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Propagate CNAME.

parent 5a91a7aa
...@@ -41,4 +41,5 @@ type downTrack interface { ...@@ -41,4 +41,5 @@ type downTrack interface {
WriteRTP(packat *rtp.Packet) error WriteRTP(packat *rtp.Packet) error
Accumulate(bytes uint32) Accumulate(bytes uint32)
setTimeOffset(ntp uint64, rtp uint32) setTimeOffset(ntp uint64, rtp uint32)
setCname(string)
} }
...@@ -216,6 +216,9 @@ func newDiskConn(directory, label string, up upConnection, remoteTracks []upTrac ...@@ -216,6 +216,9 @@ func newDiskConn(directory, label string, up upConnection, remoteTracks []upTrac
func (t *diskTrack) setTimeOffset(ntp uint64, rtp uint32) { func (t *diskTrack) setTimeOffset(ntp uint64, rtp uint32) {
} }
func (t *diskTrack) setCname(string) {
}
func clonePacket(packet *rtp.Packet) *rtp.Packet { func clonePacket(packet *rtp.Packet) *rtp.Packet {
buf, err := packet.Marshal() buf, err := packet.Marshal()
if err != nil { if err != nil {
......
...@@ -79,6 +79,7 @@ type rtpDownTrack struct { ...@@ -79,6 +79,7 @@ type rtpDownTrack struct {
srNTPTime uint64 srNTPTime uint64
remoteNTPTime uint64 remoteNTPTime uint64
remoteRTPTime uint32 remoteRTPTime uint32
cname atomic.Value
rtt uint64 rtt uint64
} }
...@@ -95,6 +96,10 @@ func (down *rtpDownTrack) setTimeOffset(ntp uint64, rtp uint32) { ...@@ -95,6 +96,10 @@ func (down *rtpDownTrack) setTimeOffset(ntp uint64, rtp uint32) {
atomic.StoreUint32(&down.remoteRTPTime, rtp) atomic.StoreUint32(&down.remoteRTPTime, rtp)
} }
func (down *rtpDownTrack) setCname(cname string) {
down.cname.Store(cname)
}
type rtpDownConnection struct { type rtpDownConnection struct {
id string id string
pc *webrtc.PeerConnection pc *webrtc.PeerConnection
...@@ -187,6 +192,7 @@ type rtpUpTrack struct { ...@@ -187,6 +192,7 @@ type rtpUpTrack struct {
writerDone chan struct{} writerDone chan struct{}
mu sync.Mutex mu sync.Mutex
cname string
local []downTrack local []downTrack
srTime uint64 srTime uint64
srNTPTime uint64 srNTPTime uint64
...@@ -362,7 +368,7 @@ func getTrackMid(pc *webrtc.PeerConnection, track *webrtc.Track) string { ...@@ -362,7 +368,7 @@ func getTrackMid(pc *webrtc.PeerConnection, track *webrtc.Track) string {
// called locked // called locked
func (up *rtpUpConnection) complete() bool { func (up *rtpUpConnection) complete() bool {
for mid, _ := range up.labels { for mid := range up.labels {
found := false found := false
for _, t := range up.tracks { for _, t := range up.tracks {
m := getTrackMid(up.pc, t.track) m := getTrackMid(up.pc, t.track)
...@@ -566,10 +572,14 @@ func writeLoop(conn *rtpUpConnection, track *rtpUpTrack, ch <-chan packetIndex) ...@@ -566,10 +572,14 @@ func writeLoop(conn *rtpUpConnection, track *rtpUpTrack, ch <-chan packetIndex)
track.mu.Lock() track.mu.Lock()
ntp := track.srNTPTime ntp := track.srNTPTime
rtp := track.srRTPTime rtp := track.srRTPTime
cname := track.cname
track.mu.Unlock() track.mu.Unlock()
if ntp != 0 { if ntp != 0 {
action.track.setTimeOffset(ntp, rtp) action.track.setTimeOffset(ntp, rtp)
} }
if cname != "" {
action.track.setCname(cname)
}
} else { } else {
found := false found := false
for i, t := range local { for i, t := range local {
...@@ -691,7 +701,7 @@ func sendFIR(pc *webrtc.PeerConnection, ssrc uint32, seqno uint8) error { ...@@ -691,7 +701,7 @@ func sendFIR(pc *webrtc.PeerConnection, ssrc uint32, seqno uint8) error {
return pc.WriteRTCP([]rtcp.Packet{ return pc.WriteRTCP([]rtcp.Packet{
&rtcp.FullIntraRequest{ &rtcp.FullIntraRequest{
FIR: []rtcp.FIREntry{ FIR: []rtcp.FIREntry{
rtcp.FIREntry{ {
SSRC: ssrc, SSRC: ssrc,
SequenceNumber: seqno, SequenceNumber: seqno,
}, },
...@@ -716,7 +726,7 @@ func sendNACK(pc *webrtc.PeerConnection, ssrc uint32, first uint16, bitmap uint1 ...@@ -716,7 +726,7 @@ func sendNACK(pc *webrtc.PeerConnection, ssrc uint32, first uint16, bitmap uint1
&rtcp.TransportLayerNack{ &rtcp.TransportLayerNack{
MediaSSRC: ssrc, MediaSSRC: ssrc,
Nacks: []rtcp.NackPair{ Nacks: []rtcp.NackPair{
rtcp.NackPair{ {
first, first,
rtcp.PacketBitmap(bitmap), rtcp.PacketBitmap(bitmap),
}, },
...@@ -763,6 +773,7 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei ...@@ -763,6 +773,7 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei
now := rtptime.Jiffies() now := rtptime.Jiffies()
for _, p := range ps { for _, p := range ps {
local := track.getLocal()
switch p := p.(type) { switch p := p.(type) {
case *rtcp.SenderReport: case *rtcp.SenderReport:
track.mu.Lock() track.mu.Lock()
...@@ -773,11 +784,26 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei ...@@ -773,11 +784,26 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei
track.srNTPTime = p.NTPTime track.srNTPTime = p.NTPTime
track.srRTPTime = p.RTPTime track.srRTPTime = p.RTPTime
track.mu.Unlock() track.mu.Unlock()
local := track.getLocal()
for _, l := range local { for _, l := range local {
l.setTimeOffset(p.NTPTime, p.RTPTime) l.setTimeOffset(p.NTPTime, p.RTPTime)
} }
case *rtcp.SourceDescription: case *rtcp.SourceDescription:
for _, c := range p.Chunks {
if c.Source != track.track.SSRC() {
continue
}
for _, i := range c.Items {
if i.Type != rtcp.SDESCNAME {
continue
}
track.mu.Lock()
track.cname = i.Text
track.mu.Unlock()
for _, l := range local {
l.setCname(i.Text)
}
}
}
} }
} }
...@@ -910,30 +936,46 @@ func sendSR(conn *rtpDownConnection) error { ...@@ -910,30 +936,46 @@ func sendSR(conn *rtpDownConnection) error {
remoteNTP := atomic.LoadUint64(&t.remoteNTPTime) remoteNTP := atomic.LoadUint64(&t.remoteNTPTime)
remoteRTP := atomic.LoadUint32(&t.remoteRTPTime) remoteRTP := atomic.LoadUint32(&t.remoteRTPTime)
if remoteNTP == 0 { if remoteNTP != 0 {
// we never got a remote SR for this track srTime := rtptime.NTPToTime(remoteNTP)
continue d := now.Sub(srTime)
if d > 0 && d < time.Hour {
delay := rtptime.FromDuration(
d, clockrate,
)
nowRTP = remoteRTP + uint32(delay)
}
p, b := t.rate.Totals()
packets = append(packets,
&rtcp.SenderReport{
SSRC: t.track.SSRC(),
NTPTime: nowNTP,
RTPTime: nowRTP,
PacketCount: p,
OctetCount: b,
})
atomic.StoreUint64(&t.srTime, jiffies)
atomic.StoreUint64(&t.srNTPTime, nowNTP)
} }
srTime := rtptime.NTPToTime(remoteNTP)
d := now.Sub(srTime) cname, ok := t.cname.Load().(string)
if d > 0 && d < time.Hour { if ok {
delay := rtptime.FromDuration( item := rtcp.SourceDescriptionItem{
d, clockrate, Type: rtcp.SDESCNAME,
Text: cname,
}
packets = append(packets,
&rtcp.SourceDescription{
Chunks: []rtcp.SourceDescriptionChunk{
{
Source: t.track.SSRC(),
Items: []rtcp.SourceDescriptionItem{item},
},
},
},
) )
nowRTP = remoteRTP + uint32(delay)
} }
p, b := t.rate.Totals()
packets = append(packets,
&rtcp.SenderReport{
SSRC: t.track.SSRC(),
NTPTime: nowNTP,
RTPTime: nowRTP,
PacketCount: p,
OctetCount: b,
})
atomic.StoreUint64(&t.srTime, jiffies)
atomic.StoreUint64(&t.srNTPTime, nowNTP)
} }
if len(packets) == 0 { if len(packets) == 0 {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment