Commit 2da8faa8 authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Implement per-stream requests.

parent 515c4a5c
......@@ -213,6 +213,16 @@ restart by sending a `renegotiate` message:
}
```
At any time after answering, the client may change the set of streams
being offered by sending a 'requestStream' request:
```javascript
{
type: 'answerStream'
id: id,
request: [audio, video]
}
```
## Closing streams
The offerer may close a stream at any time by sending a `close` message.
......
......@@ -149,6 +149,7 @@ type rtpDownConnection struct {
remote conn.Up
iceCandidates []*webrtc.ICECandidateInit
negotiationNeeded int
requested []string
mu sync.Mutex
tracks []*rtpDownTrack
......@@ -432,6 +433,7 @@ func (up *rtpUpTrack) hasRtcpFb(tpe, parameter string) bool {
type rtpUpConnection struct {
id string
client group.Client
label string
userId string
username string
......@@ -585,7 +587,7 @@ func newUpConn(c group.Client, id string, label string, offer string) (*rtpUpCon
}
}
up := &rtpUpConnection{id: id, label: label, pc: pc}
up := &rtpUpConnection{id: id, client: c, label: label, pc: pc}
pc.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
up.mu.Lock()
......
......@@ -141,7 +141,7 @@ type clientMessage struct {
SDP string `json:"sdp,omitempty"`
Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"`
Label string `json:"label,omitempty"`
Request map[string][]string `json:"request,omitempty"`
Request interface{} `json:"request,omitempty"`
RTCConfiguration *webrtc.Configuration `json:"rtcConfiguration,omitempty"`
}
......@@ -638,6 +638,52 @@ func gotICE(c *webClient, candidate *webrtc.ICECandidateInit, id string) error {
return conn.addICECandidate(candidate)
}
var errBadType = errors.New("bad type")
func toStringArray(r interface{}) ([]string, error) {
if r == nil {
return nil, nil
}
rr, ok := r.([]interface{})
if !ok {
return nil, errBadType
}
if rr == nil {
return nil, nil
}
rrr := make([]string, len(rr))
for i, s := range rr {
rrr[i], ok = s.(string)
if !ok {
return nil, errBadType
}
}
return rrr, nil
}
func parseRequested(r interface{}) (map[string][]string, error) {
if r == nil {
return nil, nil
}
rr, ok := r.(map[string]interface{})
if !ok {
return nil, errBadType
}
if rr == nil {
return nil, nil
}
rrr := make(map[string][]string)
for k, v := range rr {
vv, err := toStringArray(v)
if err != nil {
return nil, err
}
rrr[k] = vv
}
return rrr, nil
}
func (c *webClient) setRequested(requested map[string][]string) error {
if c.group == nil {
return errors.New("attempted to request with no group joined")
......@@ -648,6 +694,16 @@ func (c *webClient) setRequested(requested map[string][]string) error {
return nil
}
func (c *webClient) setRequestedStream(down *rtpDownConnection, requested []string) error {
var remoteClient group.Client
remote, ok := down.remote.(*rtpUpConnection)
if ok {
remoteClient = remote.client
}
down.requested = requested
return remoteClient.RequestConns(c, c.group, remote.id)
}
func (c *webClient) RequestConns(target group.Client, g *group.Group, id string) error {
return c.action(requestConnsAction{g, target, id})
}
......@@ -659,13 +715,17 @@ func requestConns(target group.Client, g *group.Group, id string) {
}
}
func requestedTracks(c *webClient, up conn.Up, tracks []conn.UpTrack) []conn.UpTrack {
r, ok := c.requested[up.Label()]
if !ok {
r, ok = c.requested[""]
}
if !ok || len(r) == 0 {
return nil
func requestedTracks(c *webClient, override []string, up conn.Up, tracks []conn.UpTrack) []conn.UpTrack {
r := override
if r == nil {
var ok bool
r, ok = c.requested[up.Label()]
if !ok {
r, ok = c.requested[""]
}
if !ok || len(r) == 0 {
return nil
}
}
var audio, video, videoLow bool
......@@ -894,8 +954,18 @@ func handleAction(c *webClient, a interface{}) error {
return nil
}
var tracks []conn.UpTrack
var override []string
if a.conn != nil {
tracks = requestedTracks(c, a.conn, a.tracks)
var old *rtpDownConnection
if a.replace != "" {
old = getDownConn(c, a.replace)
} else {
old = getDownConn(c, a.conn.Id())
}
if old != nil {
override = old.requested
}
tracks = requestedTracks(c, override, a.conn, a.tracks)
}
if len(tracks) == 0 {
......@@ -938,6 +1008,7 @@ func handleAction(c *webClient, a interface{}) error {
case requestConnsAction:
g := c.group
if g == nil || a.group != g {
log.Printf("Misdirected pushConns")
return nil
}
for _, u := range c.up {
......@@ -1241,7 +1312,21 @@ func handleClientMessage(c *webClient, m clientMessage) error {
}
}
case "request":
return c.setRequested(m.Request)
requested, err := parseRequested(m.Request)
if err != nil {
return err
}
return c.setRequested(requested)
case "requestStream":
down := getDownConn(c, m.Id)
if down == nil {
return ErrUnknownId
}
requested, err := toStringArray(m.Request)
if err != nil {
return err
}
c.setRequestedStream(down, requested)
case "offer":
if m.Id == "" {
return errEmptyId
......
......@@ -214,7 +214,7 @@ function ServerConnection() {
* @property {string} [sdp]
* @property {RTCIceCandidate} [candidate]
* @property {string} [label]
* @property {Object<string,Array<string>>} [request]
* @property {Object<string,Array<string>>|Array<string>} [request]
* @property {Object<string,any>} [rtcConfiguration]
*/
......@@ -440,11 +440,11 @@ ServerConnection.prototype.leave = function(group) {
};
/**
* request sets the list of requested media types.
* request sets the list of requested tracks
*
* @param {Object<string,Array<string>>} what
* - A dictionary that maps labels to a sequence of 'audio' and 'video'.
* An entry with an empty label '' provides the default.
* - A dictionary that maps labels to a sequence of 'audio', 'video'
* or 'video-low. An entry with an empty label '' provides the default.
*/
ServerConnection.prototype.request = function(what) {
this.send({
......@@ -1219,6 +1219,21 @@ Stream.prototype.restartIce = function () {
c.negotiate(true);
};
/**
* request sets the list of tracks. If this is not called, or called with
* a null argument, then the default is provided by ServerConnection.request.
*
* @param {Array<string>} what - a sequence of 'audio', 'video' or 'video-low'.
*/
Stream.prototype.request = function(what) {
let c = this;
c.sc.send({
type: 'requestStream',
id: c.id,
request: what,
});
};
/**
* updateStats is called periodically, if requested by setStatsInterval,
* in order to recompute stream statistics and invoke the onstats handler.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment