Commit 2347417f authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Merge branch 'modular' into master

parents 714a0939 709a6857
...@@ -3,3 +3,4 @@ data/*.pem ...@@ -3,3 +3,4 @@ data/*.pem
sfu sfu
passwd passwd
groups/*.json groups/*.json
static/*.d.ts
...@@ -3,7 +3,8 @@ ...@@ -3,7 +3,8 @@
// This is not open source software. Copy it, and I'll break into your // This is not open source software. Copy it, and I'll break into your
// house and tell your three year-old that Santa doesn't exist. // house and tell your three year-old that Santa doesn't exist.
package main // Package conn defines interfaces for connections and tracks.
package conn
import ( import (
"errors" "errors"
...@@ -15,29 +16,33 @@ import ( ...@@ -15,29 +16,33 @@ import (
var ErrConnectionClosed = errors.New("connection is closed") var ErrConnectionClosed = errors.New("connection is closed")
var ErrKeyframeNeeded = errors.New("keyframe needed") var ErrKeyframeNeeded = errors.New("keyframe needed")
type upConnection interface { // Type Up represents a connection in the client to server direction.
addLocal(downConnection) error type Up interface {
delLocal(downConnection) bool AddLocal(Down) error
DelLocal(Down) bool
Id() string Id() string
Label() string Label() string
} }
type upTrack interface { // Type UpTrack represents a track in the client to server direction.
addLocal(downTrack) error type UpTrack interface {
delLocal(downTrack) bool AddLocal(DownTrack) error
DelLocal(DownTrack) bool
Label() string Label() string
Codec() *webrtc.RTPCodec Codec() *webrtc.RTPCodec
// get a recent packet. Returns 0 if the packet is not in cache. // get a recent packet. Returns 0 if the packet is not in cache.
getRTP(seqno uint16, result []byte) uint16 GetRTP(seqno uint16, result []byte) uint16
} }
type downConnection interface { // Type Down represents a connection in the server to client direction.
type Down interface {
GetMaxBitrate(now uint64) uint64 GetMaxBitrate(now uint64) uint64
} }
type downTrack interface { // Type DownTrack represents a track in the server to client direction.
type DownTrack interface {
WriteRTP(packat *rtp.Packet) error WriteRTP(packat *rtp.Packet) error
Accumulate(bytes uint32) Accumulate(bytes uint32)
setTimeOffset(ntp uint64, rtp uint32) SetTimeOffset(ntp uint64, rtp uint32)
setCname(string) SetCname(string)
} }
package main package disk
import ( import (
crand "crypto/rand"
"errors" "errors"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "encoding/hex"
"sync" "sync"
"time" "time"
...@@ -14,10 +15,15 @@ import ( ...@@ -14,10 +15,15 @@ import (
"github.com/pion/rtp/codecs" "github.com/pion/rtp/codecs"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media/samplebuilder" "github.com/pion/webrtc/v3/pkg/media/samplebuilder"
"sfu/conn"
"sfu/group"
) )
type diskClient struct { var Directory string
group *group
type Client struct {
group *group.Group
id string id string
mu sync.Mutex mu sync.Mutex
...@@ -25,45 +31,37 @@ type diskClient struct { ...@@ -25,45 +31,37 @@ type diskClient struct {
closed bool closed bool
} }
var idCounter struct {
mu sync.Mutex
counter int
}
func newId() string { func newId() string {
idCounter.mu.Lock() b := make([]byte, 16)
defer idCounter.mu.Unlock() crand.Read(b)
return hex.EncodeToString(b)
s := strconv.FormatInt(int64(idCounter.counter), 16)
idCounter.counter++
return s
} }
func NewDiskClient(g *group) *diskClient { func New(g *group.Group) *Client {
return &diskClient{group: g, id: newId()} return &Client{group: g, id: newId()}
} }
func (client *diskClient) Group() *group { func (client *Client) Group() *group.Group {
return client.group return client.group
} }
func (client *diskClient) Id() string { func (client *Client) Id() string {
return client.id return client.id
} }
func (client *diskClient) Credentials() clientCredentials { func (client *Client) Credentials() group.ClientCredentials {
return clientCredentials{"RECORDING", ""} return group.ClientCredentials{"RECORDING", ""}
} }
func (client *diskClient) SetPermissions(perms clientPermissions) { func (client *Client) SetPermissions(perms group.ClientPermissions) {
return return
} }
func (client *diskClient) pushClient(id, username string, add bool) error { func (client *Client) PushClient(id, username string, add bool) error {
return nil return nil
} }
func (client *diskClient) Close() error { func (client *Client) Close() error {
client.mu.Lock() client.mu.Lock()
defer client.mu.Unlock() defer client.mu.Unlock()
...@@ -75,13 +73,13 @@ func (client *diskClient) Close() error { ...@@ -75,13 +73,13 @@ func (client *diskClient) Close() error {
return nil return nil
} }
func (client *diskClient) kick(message string) error { func (client *Client) kick(message string) error {
err := client.Close() err := client.Close()
delClient(client) group.DelClient(client)
return err return err
} }
func (client *diskClient) pushConn(id string, conn upConnection, tracks []upTrack, label string) error { func (client *Client) PushConn(id string, up conn.Up, tracks []conn.UpTrack, label string) error {
client.mu.Lock() client.mu.Lock()
defer client.mu.Unlock() defer client.mu.Unlock()
...@@ -95,11 +93,11 @@ func (client *diskClient) pushConn(id string, conn upConnection, tracks []upTrac ...@@ -95,11 +93,11 @@ func (client *diskClient) pushConn(id string, conn upConnection, tracks []upTrac
delete(client.down, id) delete(client.down, id)
} }
if conn == nil { if up == nil {
return nil return nil
} }
directory := filepath.Join(recordingsDir, client.group.name) directory := filepath.Join(Directory, client.group.Name())
err := os.MkdirAll(directory, 0700) err := os.MkdirAll(directory, 0700)
if err != nil { if err != nil {
return err return err
...@@ -109,12 +107,12 @@ func (client *diskClient) pushConn(id string, conn upConnection, tracks []upTrac ...@@ -109,12 +107,12 @@ func (client *diskClient) pushConn(id string, conn upConnection, tracks []upTrac
client.down = make(map[string]*diskConn) client.down = make(map[string]*diskConn)
} }
down, err := newDiskConn(directory, label, conn, tracks) down, err := newDiskConn(directory, label, up, tracks)
if err != nil { if err != nil {
return err return err
} }
client.down[conn.Id()] = down client.down[up.Id()] = down
return nil return nil
} }
...@@ -125,7 +123,7 @@ type diskConn struct { ...@@ -125,7 +123,7 @@ type diskConn struct {
mu sync.Mutex mu sync.Mutex
file *os.File file *os.File
remote upConnection remote conn.Up
tracks []*diskTrack tracks []*diskTrack
width, height uint32 width, height uint32
} }
...@@ -150,7 +148,7 @@ func (conn *diskConn) reopen() error { ...@@ -150,7 +148,7 @@ func (conn *diskConn) reopen() error {
} }
func (conn *diskConn) Close() error { func (conn *diskConn) Close() error {
conn.remote.delLocal(conn) conn.remote.DelLocal(conn)
conn.mu.Lock() conn.mu.Lock()
tracks := make([]*diskTrack, 0, len(conn.tracks)) tracks := make([]*diskTrack, 0, len(conn.tracks))
...@@ -164,7 +162,7 @@ func (conn *diskConn) Close() error { ...@@ -164,7 +162,7 @@ func (conn *diskConn) Close() error {
conn.mu.Unlock() conn.mu.Unlock()
for _, t := range tracks { for _, t := range tracks {
t.remote.delLocal(t) t.remote.DelLocal(t)
} }
return nil return nil
} }
...@@ -196,7 +194,7 @@ func openDiskFile(directory, label string) (*os.File, error) { ...@@ -196,7 +194,7 @@ func openDiskFile(directory, label string) (*os.File, error) {
} }
type diskTrack struct { type diskTrack struct {
remote upTrack remote conn.UpTrack
conn *diskConn conn *diskConn
writer webm.BlockWriteCloser writer webm.BlockWriteCloser
...@@ -206,7 +204,7 @@ type diskTrack struct { ...@@ -206,7 +204,7 @@ type diskTrack struct {
origin uint64 origin uint64
} }
func newDiskConn(directory, label string, up upConnection, remoteTracks []upTrack) (*diskConn, error) { func newDiskConn(directory, label string, up conn.Up, remoteTracks []conn.UpTrack) (*diskConn, error) {
conn := diskConn{ conn := diskConn{
directory: directory, directory: directory,
label: label, label: label,
...@@ -231,10 +229,10 @@ func newDiskConn(directory, label string, up upConnection, remoteTracks []upTrac ...@@ -231,10 +229,10 @@ func newDiskConn(directory, label string, up upConnection, remoteTracks []upTrac
conn: &conn, conn: &conn,
} }
conn.tracks = append(conn.tracks, track) conn.tracks = append(conn.tracks, track)
remote.addLocal(track) remote.AddLocal(track)
} }
err := up.addLocal(&conn) err := up.AddLocal(&conn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -242,10 +240,10 @@ func newDiskConn(directory, label string, up upConnection, remoteTracks []upTrac ...@@ -242,10 +240,10 @@ func newDiskConn(directory, label string, up upConnection, remoteTracks []upTrac
return &conn, nil return &conn, nil
} }
func (t *diskTrack) setTimeOffset(ntp uint64, rtp uint32) { func (t *diskTrack) SetTimeOffset(ntp uint64, rtp uint32) {
} }
func (t *diskTrack) setCname(string) { func (t *diskTrack) SetCname(string) {
} }
func clonePacket(packet *rtp.Packet) *rtp.Packet { func clonePacket(packet *rtp.Packet) *rtp.Packet {
...@@ -310,7 +308,7 @@ func (t *diskTrack) WriteRTP(packet *rtp.Packet) error { ...@@ -310,7 +308,7 @@ func (t *diskTrack) WriteRTP(packet *rtp.Packet) error {
if t.writer == nil { if t.writer == nil {
if !keyframe { if !keyframe {
return ErrKeyframeNeeded return conn.ErrKeyframeNeeded
} }
return nil return nil
} }
......
package main package group
type clientCredentials struct { import (
"sfu/conn"
)
type ClientCredentials struct {
Username string `json:"username,omitempty"` Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"` Password string `json:"password,omitempty"`
} }
type clientPermissions struct { type ClientPermissions struct {
Op bool `json:"op,omitempty"` Op bool `json:"op,omitempty"`
Present bool `json:"present,omitempty"` Present bool `json:"present,omitempty"`
Record bool `json:"record,omitempty"` Record bool `json:"record,omitempty"`
} }
type client interface { type Client interface {
Group() *group Group() *Group
Id() string Id() string
Credentials() clientCredentials Credentials() ClientCredentials
SetPermissions(clientPermissions) SetPermissions(ClientPermissions)
pushConn(id string, conn upConnection, tracks []upTrack, label string) error PushConn(id string, conn conn.Up, tracks []conn.UpTrack, label string) error
pushClient(id, username string, add bool) error PushClient(id, username string, add bool) error
} }
type kickable interface { type Kickable interface {
kick(message string) error Kick(message string) error
} }
This diff is collapsed.
// Copyright (c) 2020 by Juliusz Chroboczek. package rtpconn
// This is not open source software. Copy it, and I'll break into your
// house and tell your three year-old that Santa doesn't exist.
package main
import ( import (
"errors" "errors"
...@@ -14,14 +9,16 @@ import ( ...@@ -14,14 +9,16 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"sfu/conn"
"sfu/estimator" "sfu/estimator"
"sfu/group"
"sfu/jitter" "sfu/jitter"
"sfu/packetcache" "sfu/packetcache"
"sfu/rtptime" "sfu/rtptime"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
) )
type bitrate struct { type bitrate struct {
...@@ -71,7 +68,7 @@ type iceConnection interface { ...@@ -71,7 +68,7 @@ type iceConnection interface {
type rtpDownTrack struct { type rtpDownTrack struct {
track *webrtc.Track track *webrtc.Track
remote upTrack remote conn.UpTrack
maxBitrate *bitrate maxBitrate *bitrate
rate *estimator.Estimator rate *estimator.Estimator
stats *receiverStats stats *receiverStats
...@@ -91,26 +88,26 @@ func (down *rtpDownTrack) Accumulate(bytes uint32) { ...@@ -91,26 +88,26 @@ func (down *rtpDownTrack) Accumulate(bytes uint32) {
down.rate.Accumulate(bytes) down.rate.Accumulate(bytes)
} }
func (down *rtpDownTrack) setTimeOffset(ntp uint64, rtp uint32) { func (down *rtpDownTrack) SetTimeOffset(ntp uint64, rtp uint32) {
atomic.StoreUint64(&down.remoteNTPTime, ntp) atomic.StoreUint64(&down.remoteNTPTime, ntp)
atomic.StoreUint32(&down.remoteRTPTime, rtp) atomic.StoreUint32(&down.remoteRTPTime, rtp)
} }
func (down *rtpDownTrack) setCname(cname string) { func (down *rtpDownTrack) SetCname(cname string) {
down.cname.Store(cname) down.cname.Store(cname)
} }
type rtpDownConnection struct { type rtpDownConnection struct {
id string id string
pc *webrtc.PeerConnection pc *webrtc.PeerConnection
remote upConnection remote conn.Up
tracks []*rtpDownTrack tracks []*rtpDownTrack
maxREMBBitrate *bitrate maxREMBBitrate *bitrate
iceCandidates []*webrtc.ICECandidateInit iceCandidates []*webrtc.ICECandidateInit
} }
func newDownConn(c client, id string, remote upConnection) (*rtpDownConnection, error) { func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection, error) {
pc, err := c.Group().API().NewPeerConnection(iceConfiguration()) pc, err := c.Group().API().NewPeerConnection(group.IceConfiguration())
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -193,7 +190,7 @@ type rtpUpTrack struct { ...@@ -193,7 +190,7 @@ type rtpUpTrack struct {
mu sync.Mutex mu sync.Mutex
cname string cname string
local []downTrack local []conn.DownTrack
srTime uint64 srTime uint64
srNTPTime uint64 srNTPTime uint64
srRTPTime uint32 srRTPTime uint32
...@@ -201,17 +198,17 @@ type rtpUpTrack struct { ...@@ -201,17 +198,17 @@ type rtpUpTrack struct {
type localTrackAction struct { type localTrackAction struct {
add bool add bool
track downTrack track conn.DownTrack
} }
func (up *rtpUpTrack) notifyLocal(add bool, track downTrack) { func (up *rtpUpTrack) notifyLocal(add bool, track conn.DownTrack) {
select { select {
case up.localCh <- localTrackAction{add, track}: case up.localCh <- localTrackAction{add, track}:
case <-up.readerDone: case <-up.readerDone:
} }
} }
func (up *rtpUpTrack) addLocal(local downTrack) error { func (up *rtpUpTrack) AddLocal(local conn.DownTrack) error {
up.mu.Lock() up.mu.Lock()
for _, t := range up.local { for _, t := range up.local {
if t == local { if t == local {
...@@ -226,7 +223,7 @@ func (up *rtpUpTrack) addLocal(local downTrack) error { ...@@ -226,7 +223,7 @@ func (up *rtpUpTrack) addLocal(local downTrack) error {
return nil return nil
} }
func (up *rtpUpTrack) delLocal(local downTrack) bool { func (up *rtpUpTrack) DelLocal(local conn.DownTrack) bool {
up.mu.Lock() up.mu.Lock()
for i, l := range up.local { for i, l := range up.local {
if l == local { if l == local {
...@@ -240,15 +237,15 @@ func (up *rtpUpTrack) delLocal(local downTrack) bool { ...@@ -240,15 +237,15 @@ func (up *rtpUpTrack) delLocal(local downTrack) bool {
return false return false
} }
func (up *rtpUpTrack) getLocal() []downTrack { func (up *rtpUpTrack) getLocal() []conn.DownTrack {
up.mu.Lock() up.mu.Lock()
defer up.mu.Unlock() defer up.mu.Unlock()
local := make([]downTrack, len(up.local)) local := make([]conn.DownTrack, len(up.local))
copy(local, up.local) copy(local, up.local)
return local return local
} }
func (up *rtpUpTrack) getRTP(seqno uint16, result []byte) uint16 { func (up *rtpUpTrack) GetRTP(seqno uint16, result []byte) uint16 {
return up.cache.Get(seqno, result) return up.cache.Get(seqno, result)
} }
...@@ -278,7 +275,7 @@ type rtpUpConnection struct { ...@@ -278,7 +275,7 @@ type rtpUpConnection struct {
mu sync.Mutex mu sync.Mutex
tracks []*rtpUpTrack tracks []*rtpUpTrack
local []downConnection local []conn.Down
} }
func (up *rtpUpConnection) getTracks() []*rtpUpTrack { func (up *rtpUpConnection) getTracks() []*rtpUpTrack {
...@@ -297,7 +294,7 @@ func (up *rtpUpConnection) Label() string { ...@@ -297,7 +294,7 @@ func (up *rtpUpConnection) Label() string {
return up.label return up.label
} }
func (up *rtpUpConnection) addLocal(local downConnection) error { func (up *rtpUpConnection) AddLocal(local conn.Down) error {
up.mu.Lock() up.mu.Lock()
defer up.mu.Unlock() defer up.mu.Unlock()
for _, t := range up.local { for _, t := range up.local {
...@@ -309,7 +306,7 @@ func (up *rtpUpConnection) addLocal(local downConnection) error { ...@@ -309,7 +306,7 @@ func (up *rtpUpConnection) addLocal(local downConnection) error {
return nil return nil
} }
func (up *rtpUpConnection) delLocal(local downConnection) bool { func (up *rtpUpConnection) DelLocal(local conn.Down) bool {
up.mu.Lock() up.mu.Lock()
defer up.mu.Unlock() defer up.mu.Unlock()
for i, l := range up.local { for i, l := range up.local {
...@@ -321,10 +318,10 @@ func (up *rtpUpConnection) delLocal(local downConnection) bool { ...@@ -321,10 +318,10 @@ func (up *rtpUpConnection) delLocal(local downConnection) bool {
return false return false
} }
func (up *rtpUpConnection) getLocal() []downConnection { func (up *rtpUpConnection) getLocal() []conn.Down {
up.mu.Lock() up.mu.Lock()
defer up.mu.Unlock() defer up.mu.Unlock()
local := make([]downConnection, len(up.local)) local := make([]conn.Down, len(up.local))
copy(local, up.local) copy(local, up.local)
return local return local
} }
...@@ -370,8 +367,8 @@ func (up *rtpUpConnection) complete() bool { ...@@ -370,8 +367,8 @@ func (up *rtpUpConnection) complete() bool {
return true return true
} }
func newUpConn(c client, id string) (*rtpUpConnection, error) { func newUpConn(c group.Client, id string) (*rtpUpConnection, error) {
pc, err := c.Group().API().NewPeerConnection(iceConfiguration()) pc, err := c.Group().API().NewPeerConnection(group.IceConfiguration())
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -396,10 +393,10 @@ func newUpConn(c client, id string) (*rtpUpConnection, error) { ...@@ -396,10 +393,10 @@ func newUpConn(c client, id string) (*rtpUpConnection, error) {
return nil, err return nil, err
} }
conn := &rtpUpConnection{id: id, pc: pc} up := &rtpUpConnection{id: id, pc: pc}
pc.OnTrack(func(remote *webrtc.Track, receiver *webrtc.RTPReceiver) { pc.OnTrack(func(remote *webrtc.Track, receiver *webrtc.RTPReceiver) {
conn.mu.Lock() up.mu.Lock()
mid := getTrackMid(pc, remote) mid := getTrackMid(pc, remote)
if mid == "" { if mid == "" {
...@@ -407,7 +404,7 @@ func newUpConn(c client, id string) (*rtpUpConnection, error) { ...@@ -407,7 +404,7 @@ func newUpConn(c client, id string) (*rtpUpConnection, error) {
return return
} }
label, ok := conn.labels[mid] label, ok := up.labels[mid]
if !ok { if !ok {
log.Printf("Couldn't get track's label") log.Printf("Couldn't get track's label")
isvideo := remote.Kind() == webrtc.RTPCodecTypeVideo isvideo := remote.Kind() == webrtc.RTPCodecTypeVideo
...@@ -428,34 +425,34 @@ func newUpConn(c client, id string) (*rtpUpConnection, error) { ...@@ -428,34 +425,34 @@ func newUpConn(c client, id string) (*rtpUpConnection, error) {
readerDone: make(chan struct{}), readerDone: make(chan struct{}),
} }
conn.tracks = append(conn.tracks, track) up.tracks = append(up.tracks, track)
go readLoop(conn, track) go readLoop(up, track)
go rtcpUpListener(conn, track, receiver) go rtcpUpListener(up, track, receiver)
complete := conn.complete() complete := up.complete()
var tracks []upTrack var tracks []conn.UpTrack
if(complete) { if complete {
tracks = make([]upTrack, len(conn.tracks)) tracks = make([]conn.UpTrack, len(up.tracks))
for i, t := range conn.tracks { for i, t := range up.tracks {
tracks[i] = t tracks[i] = t
} }
} }
// pushConn might need to take the lock // pushConn might need to take the lock
conn.mu.Unlock() up.mu.Unlock()
if complete { if complete {
clients := c.Group().getClients(c) clients := c.Group().GetClients(c)
for _, cc := range clients { for _, cc := range clients {
cc.pushConn(conn.id, conn, tracks, conn.label) cc.PushConn(up.id, up, tracks, up.label)
} }
go rtcpUpSender(conn) go rtcpUpSender(up)
} }
}) })
return conn, nil return up, nil
} }
func readLoop(conn *rtpUpConnection, track *rtpUpTrack) { func readLoop(conn *rtpUpConnection, track *rtpUpTrack) {
...@@ -606,7 +603,7 @@ func sendRecovery(p *rtcp.TransportLayerNack, track *rtpDownTrack) { ...@@ -606,7 +603,7 @@ func sendRecovery(p *rtcp.TransportLayerNack, track *rtpDownTrack) {
buf := make([]byte, packetcache.BufSize) buf := make([]byte, packetcache.BufSize)
for _, nack := range p.Nacks { for _, nack := range p.Nacks {
for _, seqno := range nack.PacketList() { for _, seqno := range nack.PacketList() {
l := track.remote.getRTP(seqno, buf) l := track.remote.GetRTP(seqno, buf)
if l == 0 { if l == 0 {
continue continue
} }
...@@ -650,7 +647,7 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei ...@@ -650,7 +647,7 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei
track.srRTPTime = p.RTPTime track.srRTPTime = p.RTPTime
track.mu.Unlock() track.mu.Unlock()
for _, l := range local { for _, l := range local {
l.setTimeOffset(p.NTPTime, p.RTPTime) l.SetTimeOffset(p.NTPTime, p.RTPTime)
} }
case *rtcp.SourceDescription: case *rtcp.SourceDescription:
for _, c := range p.Chunks { for _, c := range p.Chunks {
...@@ -665,7 +662,7 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei ...@@ -665,7 +662,7 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei
track.cname = i.Text track.cname = i.Text
track.mu.Unlock() track.mu.Unlock()
for _, l := range local { for _, l := range local {
l.setCname(i.Text) l.SetCname(i.Text)
} }
} }
} }
...@@ -749,8 +746,8 @@ func sendUpRTCP(conn *rtpUpConnection) error { ...@@ -749,8 +746,8 @@ func sendUpRTCP(conn *rtpUpConnection) error {
rate = r rate = r
} }
} }
if rate < minBitrate { if rate < group.MinBitrate {
rate = minBitrate rate = group.MinBitrate
} }
var ssrcs []uint32 var ssrcs []uint32
......
package main package rtpconn
import ( import (
"sort" "sort"
...@@ -6,76 +6,20 @@ import ( ...@@ -6,76 +6,20 @@ import (
"time" "time"
"sfu/rtptime" "sfu/rtptime"
"sfu/stats"
) )
type groupStats struct { func (c *webClient) GetStats() *stats.Client {
name string
clients []clientStats
}
type clientStats struct {
id string
up, down []connStats
}
type connStats struct {
id string
maxBitrate uint64
tracks []trackStats
}
type trackStats struct {
bitrate uint64
maxBitrate uint64
loss uint8
rtt time.Duration
jitter time.Duration
}
func getGroupStats() []groupStats {
names := getGroupNames()
gs := make([]groupStats, 0, len(names))
for _, name := range names {
g := getGroup(name)
if g == nil {
continue
}
clients := g.getClients(nil)
stats := groupStats{
name: name,
clients: make([]clientStats, 0, len(clients)),
}
for _, c := range clients {
c, ok := c.(*webClient)
if ok {
cs := getClientStats(c)
stats.clients = append(stats.clients, cs)
}
}
sort.Slice(stats.clients, func(i, j int) bool {
return stats.clients[i].id < stats.clients[j].id
})
gs = append(gs, stats)
}
sort.Slice(gs, func(i, j int) bool {
return gs[i].name < gs[j].name
})
return gs
}
func getClientStats(c *webClient) clientStats {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
cs := clientStats{ cs := stats.Client{
id: c.id, Id: c.id,
} }
for _, up := range c.up { for _, up := range c.up {
conns := connStats{ conns := stats.Conn{
id: up.id, Id: up.id,
} }
tracks := up.getTracks() tracks := up.getTracks()
for _, t := range tracks { for _, t := range tracks {
...@@ -87,23 +31,23 @@ func getClientStats(c *webClient) clientStats { ...@@ -87,23 +31,23 @@ func getClientStats(c *webClient) clientStats {
jitter := time.Duration(t.jitter.Jitter()) * jitter := time.Duration(t.jitter.Jitter()) *
(time.Second / time.Duration(t.jitter.HZ())) (time.Second / time.Duration(t.jitter.HZ()))
rate, _ := t.rate.Estimate() rate, _ := t.rate.Estimate()
conns.tracks = append(conns.tracks, trackStats{ conns.Tracks = append(conns.Tracks, stats.Track{
bitrate: uint64(rate) * 8, Bitrate: uint64(rate) * 8,
loss: loss, Loss: loss,
jitter: jitter, Jitter: jitter,
}) })
} }
cs.up = append(cs.up, conns) cs.Up = append(cs.Up, conns)
} }
sort.Slice(cs.up, func(i, j int) bool { sort.Slice(cs.Up, func(i, j int) bool {
return cs.up[i].id < cs.up[j].id return cs.Up[i].Id < cs.Up[j].Id
}) })
jiffies := rtptime.Jiffies() jiffies := rtptime.Jiffies()
for _, down := range c.down { for _, down := range c.down {
conns := connStats{ conns := stats.Conn{
id: down.id, Id: down.id,
maxBitrate: down.GetMaxBitrate(jiffies), MaxBitrate: down.GetMaxBitrate(jiffies),
} }
for _, t := range down.tracks { for _, t := range down.tracks {
rate, _ := t.rate.Estimate() rate, _ := t.rate.Estimate()
...@@ -112,19 +56,19 @@ func getClientStats(c *webClient) clientStats { ...@@ -112,19 +56,19 @@ func getClientStats(c *webClient) clientStats {
loss, jitter := t.stats.Get(jiffies) loss, jitter := t.stats.Get(jiffies)
j := time.Duration(jitter) * time.Second / j := time.Duration(jitter) * time.Second /
time.Duration(t.track.Codec().ClockRate) time.Duration(t.track.Codec().ClockRate)
conns.tracks = append(conns.tracks, trackStats{ conns.Tracks = append(conns.Tracks, stats.Track{
bitrate: uint64(rate) * 8, Bitrate: uint64(rate) * 8,
maxBitrate: t.maxBitrate.Get(jiffies), MaxBitrate: t.maxBitrate.Get(jiffies),
loss: uint8(uint32(loss) * 100 / 256), Loss: uint8(uint32(loss) * 100 / 256),
rtt: rtt, Rtt: rtt,
jitter: j, Jitter: j,
}) })
} }
cs.down = append(cs.down, conns) cs.Down = append(cs.Down, conns)
} }
sort.Slice(cs.down, func(i, j int) bool { sort.Slice(cs.Down, func(i, j int) bool {
return cs.down[i].id < cs.down[j].id return cs.Down[i].Id < cs.Down[j].Id
}) })
return cs return &cs
} }
package main package rtpconn
import ( import (
"errors" "errors"
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"github.com/pion/rtp" "github.com/pion/rtp"
"sfu/conn"
"sfu/packetcache" "sfu/packetcache"
"sfu/rtptime" "sfu/rtptime"
) )
...@@ -43,7 +44,7 @@ func sqrt(n int) int { ...@@ -43,7 +44,7 @@ func sqrt(n int) int {
} }
// add adds or removes a track from a writer pool // add adds or removes a track from a writer pool
func (wp *rtpWriterPool) add(track downTrack, add bool) error { func (wp *rtpWriterPool) add(track conn.DownTrack, add bool) error {
n := 4 n := 4
if wp.count > 16 { if wp.count > 16 {
n = sqrt(wp.count) n = sqrt(wp.count)
...@@ -166,7 +167,7 @@ var ErrUnknownTrack = errors.New("unknown track") ...@@ -166,7 +167,7 @@ var ErrUnknownTrack = errors.New("unknown track")
type writerAction struct { type writerAction struct {
add bool add bool
track downTrack track conn.DownTrack
maxTracks int maxTracks int
ch chan error ch chan error
} }
...@@ -192,7 +193,7 @@ func newRtpWriter(conn *rtpUpConnection, track *rtpUpTrack) *rtpWriter { ...@@ -192,7 +193,7 @@ func newRtpWriter(conn *rtpUpConnection, track *rtpUpTrack) *rtpWriter {
} }
// add adds or removes a track from a writer. // add adds or removes a track from a writer.
func (writer *rtpWriter) add(track downTrack, add bool, max int) error { func (writer *rtpWriter) add(track conn.DownTrack, add bool, max int) error {
ch := make(chan error, 1) ch := make(chan error, 1)
select { select {
case writer.action <- writerAction{add, track, max, ch}: case writer.action <- writerAction{add, track, max, ch}:
...@@ -208,13 +209,13 @@ func (writer *rtpWriter) add(track downTrack, add bool, max int) error { ...@@ -208,13 +209,13 @@ func (writer *rtpWriter) add(track downTrack, add bool, max int) error {
} }
// rtpWriterLoop is the main loop of an rtpWriter. // rtpWriterLoop is the main loop of an rtpWriter.
func rtpWriterLoop(writer *rtpWriter, conn *rtpUpConnection, track *rtpUpTrack) { func rtpWriterLoop(writer *rtpWriter, up *rtpUpConnection, track *rtpUpTrack) {
defer close(writer.done) defer close(writer.done)
buf := make([]byte, packetcache.BufSize) buf := make([]byte, packetcache.BufSize)
var packet rtp.Packet var packet rtp.Packet
local := make([]downTrack, 0) local := make([]conn.DownTrack, 0)
// reset whenever a new track is inserted // reset whenever a new track is inserted
firSent := false firSent := false
...@@ -239,10 +240,10 @@ func rtpWriterLoop(writer *rtpWriter, conn *rtpUpConnection, track *rtpUpTrack) ...@@ -239,10 +240,10 @@ func rtpWriterLoop(writer *rtpWriter, conn *rtpUpConnection, track *rtpUpTrack)
cname := track.cname cname := track.cname
track.mu.Unlock() track.mu.Unlock()
if ntp != 0 { if ntp != 0 {
action.track.setTimeOffset(ntp, rtp) action.track.SetTimeOffset(ntp, rtp)
} }
if cname != "" { if cname != "" {
action.track.setCname(cname) action.track.SetCname(cname)
} }
} else { } else {
found := false found := false
...@@ -283,7 +284,7 @@ func rtpWriterLoop(writer *rtpWriter, conn *rtpUpConnection, track *rtpUpTrack) ...@@ -283,7 +284,7 @@ func rtpWriterLoop(writer *rtpWriter, conn *rtpUpConnection, track *rtpUpTrack)
for _, l := range local { for _, l := range local {
err := l.WriteRTP(&packet) err := l.WriteRTP(&packet)
if err != nil { if err != nil {
if err == ErrKeyframeNeeded { if err == conn.ErrKeyframeNeeded {
kfNeeded = true kfNeeded = true
} }
continue continue
...@@ -292,9 +293,9 @@ func rtpWriterLoop(writer *rtpWriter, conn *rtpUpConnection, track *rtpUpTrack) ...@@ -292,9 +293,9 @@ func rtpWriterLoop(writer *rtpWriter, conn *rtpUpConnection, track *rtpUpTrack)
} }
if kfNeeded { if kfNeeded {
err := conn.sendFIR(track, !firSent) err := up.sendFIR(track, !firSent)
if err == ErrUnsupportedFeedback { if err == ErrUnsupportedFeedback {
conn.sendPLI(track) up.sendPLI(track)
} }
firSent = true firSent = true
} }
......
This diff is collapsed.
...@@ -14,14 +14,14 @@ import ( ...@@ -14,14 +14,14 @@ import (
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"syscall" "syscall"
"sfu/disk"
"sfu/group"
) )
var httpAddr string var httpAddr string
var staticRoot string var staticRoot string
var dataDir string var dataDir string
var groupsDir string
var recordingsDir string
var iceFilename string
func main() { func main() {
var cpuprofile, memprofile, mutexprofile string var cpuprofile, memprofile, mutexprofile string
...@@ -31,9 +31,9 @@ func main() { ...@@ -31,9 +31,9 @@ func main() {
"web server root `directory`") "web server root `directory`")
flag.StringVar(&dataDir, "data", "./data/", flag.StringVar(&dataDir, "data", "./data/",
"data `directory`") "data `directory`")
flag.StringVar(&groupsDir, "groups", "./groups/", flag.StringVar(&group.Directory, "groups", "./groups/",
"group description `directory`") "group description `directory`")
flag.StringVar(&recordingsDir, "recordings", "./recordings/", flag.StringVar(&disk.Directory, "recordings", "./recordings/",
"recordings `directory`") "recordings `directory`")
flag.StringVar(&cpuprofile, "cpuprofile", "", flag.StringVar(&cpuprofile, "cpuprofile", "",
"store CPU profile in `file`") "store CPU profile in `file`")
...@@ -81,9 +81,9 @@ func main() { ...@@ -81,9 +81,9 @@ func main() {
}() }()
} }
iceFilename = filepath.Join(dataDir, "ice-servers.json") group.IceFilename = filepath.Join(dataDir, "ice-servers.json")
go readPublicGroups() go group.ReadPublicGroups()
webserver() webserver()
terminate := make(chan os.Signal, 1) terminate := make(chan os.Signal, 1)
......
package stats
import (
"sort"
"time"
"sfu/group"
)
type GroupStats struct {
Name string
Clients []*Client
}
type Client struct {
Id string
Up, Down []Conn
}
type Statable interface {
GetStats() *Client
}
type Conn struct {
Id string
MaxBitrate uint64
Tracks []Track
}
type Track struct {
Bitrate uint64
MaxBitrate uint64
Loss uint8
Rtt time.Duration
Jitter time.Duration
}
func GetGroups() []GroupStats {
names := group.GetNames()
gs := make([]GroupStats, 0, len(names))
for _, name := range names {
g := group.Get(name)
if g == nil {
continue
}
clients := g.GetClients(nil)
stats := GroupStats{
Name: name,
Clients: make([]*Client, 0, len(clients)),
}
for _, c := range clients {
s, ok := c.(Statable)
if ok {
cs := s.GetStats()
stats.Clients = append(stats.Clients, cs)
} else {
stats.Clients = append(stats.Clients,
&Client{Id: c.Id()},
)
}
}
sort.Slice(stats.Clients, func(i, j int) bool {
return stats.Clients[i].Id < stats.Clients[j].Id
})
gs = append(gs, stats)
}
sort.Slice(gs, func(i, j int) bool {
return gs[i].Name < gs[j].Name
})
return gs
}
...@@ -18,6 +18,11 @@ import ( ...@@ -18,6 +18,11 @@ import (
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"sfu/disk"
"sfu/group"
"sfu/rtpconn"
"sfu/stats"
) )
var server *http.Server var server *http.Server
...@@ -47,8 +52,8 @@ func webserver() { ...@@ -47,8 +52,8 @@ func webserver() {
IdleTimeout: 120 * time.Second, IdleTimeout: 120 * time.Second,
} }
server.RegisterOnShutdown(func() { server.RegisterOnShutdown(func() {
rangeGroups(func (g *group) bool { group.Range(func(g *group.Group) bool {
go g.shutdown("server is shutting down") go g.Shutdown("server is shutting down")
return true return true
}) })
}) })
...@@ -139,7 +144,7 @@ func groupHandler(w http.ResponseWriter, r *http.Request) { ...@@ -139,7 +144,7 @@ func groupHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
g, err := addGroup(name, nil) g, err := group.Add(name, nil)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
notFound(w) notFound(w)
...@@ -168,7 +173,7 @@ func publicHandler(w http.ResponseWriter, r *http.Request) { ...@@ -168,7 +173,7 @@ func publicHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
g := getPublicGroups() g := group.GetPublic()
e := json.NewEncoder(w) e := json.NewEncoder(w)
e.Encode(g) e.Encode(g)
return return
...@@ -222,7 +227,7 @@ func statsHandler(w http.ResponseWriter, r *http.Request) { ...@@ -222,7 +227,7 @@ func statsHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
stats := getGroupStats() ss := stats.GetGroups()
fmt.Fprintf(w, "<!DOCTYPE html>\n<html><head>\n") fmt.Fprintf(w, "<!DOCTYPE html>\n<html><head>\n")
fmt.Fprintf(w, "<title>Stats</title>\n") fmt.Fprintf(w, "<title>Stats</title>\n")
...@@ -239,51 +244,51 @@ func statsHandler(w http.ResponseWriter, r *http.Request) { ...@@ -239,51 +244,51 @@ func statsHandler(w http.ResponseWriter, r *http.Request) {
return err return err
} }
printTrack := func(w io.Writer, t trackStats) { printTrack := func(w io.Writer, t stats.Track) {
fmt.Fprintf(w, "<tr><td></td><td></td><td></td>") fmt.Fprintf(w, "<tr><td></td><td></td><td></td>")
fmt.Fprintf(w, "<td>") fmt.Fprintf(w, "<td>")
printBitrate(w, t.bitrate, t.maxBitrate) printBitrate(w, t.Bitrate, t.MaxBitrate)
fmt.Fprintf(w, "</td>") fmt.Fprintf(w, "</td>")
fmt.Fprintf(w, "<td>%d%%</td>", fmt.Fprintf(w, "<td>%d%%</td>",
t.loss, t.Loss,
) )
fmt.Fprintf(w, "<td>") fmt.Fprintf(w, "<td>")
if t.rtt > 0 { if t.Rtt > 0 {
fmt.Fprintf(w, "%v", t.rtt) fmt.Fprintf(w, "%v", t.Rtt)
} }
if t.jitter > 0 { if t.Jitter > 0 {
fmt.Fprintf(w, "&#177;%v", t.jitter) fmt.Fprintf(w, "&#177;%v", t.Jitter)
} }
fmt.Fprintf(w, "</td>") fmt.Fprintf(w, "</td>")
fmt.Fprintf(w, "</tr>") fmt.Fprintf(w, "</tr>")
} }
for _, gs := range stats { for _, gs := range ss {
fmt.Fprintf(w, "<p>%v</p>\n", html.EscapeString(gs.name)) fmt.Fprintf(w, "<p>%v</p>\n", html.EscapeString(gs.Name))
fmt.Fprintf(w, "<table>") fmt.Fprintf(w, "<table>")
for _, cs := range gs.clients { for _, cs := range gs.Clients {
fmt.Fprintf(w, "<tr><td>%v</td></tr>\n", cs.id) fmt.Fprintf(w, "<tr><td>%v</td></tr>\n", cs.Id)
for _, up := range cs.up { for _, up := range cs.Up {
fmt.Fprintf(w, "<tr><td></td><td>Up</td><td>%v</td>", fmt.Fprintf(w, "<tr><td></td><td>Up</td><td>%v</td>",
up.id) up.Id)
if up.maxBitrate > 0 { if up.MaxBitrate > 0 {
fmt.Fprintf(w, "<td>%v</td>", fmt.Fprintf(w, "<td>%v</td>",
up.maxBitrate) up.MaxBitrate)
} }
fmt.Fprintf(w, "</tr>\n") fmt.Fprintf(w, "</tr>\n")
for _, t := range up.tracks { for _, t := range up.Tracks {
printTrack(w, t) printTrack(w, t)
} }
} }
for _, down := range cs.down { for _, down := range cs.Down {
fmt.Fprintf(w, "<tr><td></td><td>Down</td><td> %v</td>", fmt.Fprintf(w, "<tr><td></td><td>Down</td><td> %v</td>",
down.id) down.Id)
if down.maxBitrate > 0 { if down.MaxBitrate > 0 {
fmt.Fprintf(w, "<td>%v</td>", fmt.Fprintf(w, "<td>%v</td>",
down.maxBitrate) down.MaxBitrate)
} }
fmt.Fprintf(w, "</tr>\n") fmt.Fprintf(w, "</tr>\n")
for _, t := range down.tracks { for _, t := range down.Tracks {
printTrack(w, t) printTrack(w, t)
} }
} }
...@@ -302,7 +307,7 @@ func wsHandler(w http.ResponseWriter, r *http.Request) { ...@@ -302,7 +307,7 @@ func wsHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
go func() { go func() {
err := startClient(conn) err := rtpconn.StartClient(conn)
if err != nil { if err != nil {
log.Printf("client: %v", err) log.Printf("client: %v", err)
} }
...@@ -322,7 +327,7 @@ func recordingsHandler(w http.ResponseWriter, r *http.Request) { ...@@ -322,7 +327,7 @@ func recordingsHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
f, err := os.Open(filepath.Join(recordingsDir, pth)) f, err := os.Open(filepath.Join(disk.Directory, pth))
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
notFound(w) notFound(w)
...@@ -389,7 +394,7 @@ func handleGroupAction(w http.ResponseWriter, r *http.Request, group string) { ...@@ -389,7 +394,7 @@ func handleGroupAction(w http.ResponseWriter, r *http.Request, group string) {
return return
} }
err := os.Remove( err := os.Remove(
filepath.Join(recordingsDir, group+"/"+filename), filepath.Join(disk.Directory, group+"/"+filename),
) )
if err != nil { if err != nil {
if os.IsPermission(err) { if os.IsPermission(err) {
...@@ -409,8 +414,8 @@ func handleGroupAction(w http.ResponseWriter, r *http.Request, group string) { ...@@ -409,8 +414,8 @@ func handleGroupAction(w http.ResponseWriter, r *http.Request, group string) {
} }
} }
func checkGroupPermissions(w http.ResponseWriter, r *http.Request, group string) bool { func checkGroupPermissions(w http.ResponseWriter, r *http.Request, groupname string) bool {
desc, err := getDescription(group) desc, err := group.GetDescription(groupname)
if err != nil { if err != nil {
return false return false
} }
...@@ -420,7 +425,7 @@ func checkGroupPermissions(w http.ResponseWriter, r *http.Request, group string) ...@@ -420,7 +425,7 @@ func checkGroupPermissions(w http.ResponseWriter, r *http.Request, group string)
return false return false
} }
p, err := getPermission(desc, clientCredentials{user, pass}) p, err := desc.GetPermission(group.ClientCredentials{user, pass})
if err != nil || !p.Record { if err != nil || !p.Record {
return false return false
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment