1 // Copyright (c) 2020 by Juliusz Chroboczek.
  2 
  3 // Permission is hereby granted, free of charge, to any person obtaining a copy
  4 // of this software and associated documentation files (the "Software"), to deal
  5 // in the Software without restriction, including without limitation the rights
  6 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  7 // copies of the Software, and to permit persons to whom the Software is
  8 // furnished to do so, subject to the following conditions:
  9 //
 10 // The above copyright notice and this permission notice shall be included in
 11 // all copies or substantial portions of the Software.
 12 //
 13 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 14 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 15 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
 16 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 17 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 18 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 19 // THE SOFTWARE.
 20 
 21 'use strict';
 22 
 23 /**
 24  * toHex formats an array as a hexadecimal string.
 25  *
 26  * @param {number[]|Uint8Array} array - the array to format
 27  * @returns {string} - the hexadecimal representation of array
 28  */
 29 function toHex(array) {
 30     let a = new Uint8Array(array);
 31     function hex(x) {
 32         let h = x.toString(16);
 33         if(h.length < 2)
 34             h = '0' + h;
 35         return h;
 36     }
 37     return a.reduce((x, y) => x + hex(y), '');
 38 }
 39 
 40 /**
 41  * newRandomId returns a random string of 32 hex digits (16 bytes).
 42  *
 43  * @returns {string}
 44  */
 45 function newRandomId() {
 46     let a = new Uint8Array(16);
 47     crypto.getRandomValues(a);
 48     return toHex(a);
 49 }
 50 
 51 let localIdCounter = 0;
 52 
 53 /**
 54  * newLocalId returns a string that is unique in this session.
 55  *
 56  * @returns {string}
 57  */
 58 function newLocalId() {
 59     let id = `${localIdCounter}`
 60     localIdCounter++;
 61     return id;
 62 }
 63 
 64 /**
 65  * @typedef {Object} user
 66  * @property {string} username
 67  * @property {Array<string>} permissions
 68  * @property {Object<string,any>} data
 69  * @property {Object<string,Object<string,boolean>>} streams
 70  */
 71 
 72 /**
 73  * ServerConnection encapsulates a websocket connection to the server and
 74  * all the associated streams.
 75  * @constructor
 76  */
 77 function ServerConnection() {
 78     /**
 79      * The id of this connection.
 80      *
 81      * @type {string}
 82      * @const
 83      */
 84     this.id = newRandomId();
 85     /**
 86      * The group that we have joined, or null if we haven't joined yet.
 87      *
 88      * @type {string}
 89      */
 90     this.group = null;
 91     /**
 92      * The username we joined as.
 93      *
 94      * @type {string}
 95      */
 96     this.username = null;
 97     /**
 98      * The set of users in this group, including ourself.
 99      *
100      * @type {Object<string,user>}
101      */
102     this.users = {};
103     /**
104      * The underlying websocket.
105      *
106      * @type {WebSocket}
107      */
108     this.socket = null;
109     /**
110      * The set of all up streams, indexed by their id.
111      *
112      * @type {Object<string,Stream>}
113      */
114     this.up = {};
115     /**
116      * The set of all down streams, indexed by their id.
117      *
118      * @type {Object<string,Stream>}
119      */
120     this.down = {};
121     /**
122      * The ICE configuration used by all associated streams.
123      *
124      * @type {RTCConfiguration}
125      */
126     this.rtcConfiguration = null;
127     /**
128      * The permissions granted to this connection.
129      *
130      * @type {Array<string>}
131      */
132     this.permissions = [];
133     /**
134      * userdata is a convenient place to attach data to a ServerConnection.
135      * It is not used by the library.
136      *
137      * @type{Object<unknown,unknown>}
138      */
139     this.userdata = {};
140 
141     /* Callbacks */
142 
143     /**
144      * onconnected is called when the connection has been established
145      *
146      * @type{(this: ServerConnection) => void}
147      */
148     this.onconnected = null;
149     /**
150      * onclose is called when the connection is closed
151      *
152      * @type{(this: ServerConnection, code: number, reason: string) => void}
153      */
154     this.onclose = null;
155     /**
156      * onpeerconnection is called before we establish a new peer connection.
157      * It may either return null, or a new RTCConfiguration that overrides
158      * the value obtained from the server.
159      *
160      * @type{(this: ServerConnection) => RTCConfiguration}
161      */
162     this.onpeerconnection = null;
163     /**
164      * onuser is called whenever a user in the group changes.  The users
165      * array has already been updated.
166      *
167      * @type{(this: ServerConnection, id: string, kind: string) => void}
168      */
169     this.onuser = null;
170     /**
171      * onjoined is called whenever we join or leave a group or whenever the
172      * permissions we have in a group change.
173      *
174      * kind is one of 'join', 'fail', 'change' or 'leave'.
175      *
176      * @type{(this: ServerConnection, kind: string, group: string, permissions: Array<string>, status: Object<string,any>, data: Object<string,any>, message: string) => void}
177      */
178     this.onjoined = null;
179     /**
180      * ondownstream is called whenever a new down stream is added.  It
181      * should set up the stream's callbacks; actually setting up the UI
182      * should be done in the stream's ondowntrack callback.
183      *
184      * @type{(this: ServerConnection, stream: Stream) => void}
185      */
186     this.ondownstream = null;
187     /**
188      * onchat is called whenever a new chat message is received.
189      *
190      * @type {(this: ServerConnection, id: string, dest: string, username: string, time: number, privileged: boolean, history: boolean, kind: string, message: unknown) => void}
191      */
192     this.onchat = null;
193     /**
194      * onusermessage is called when an application-specific message is
195      * received.  Id is null when the message originated at the server,
196      * a user-id otherwise.
197      *
198      * 'kind' is typically one of 'error', 'warning', 'info' or 'mute'.  If
199      * 'id' is non-null, 'privileged' indicates whether the message was
200      * sent by an operator.
201      *
202      * @type {(this: ServerConnection, id: string, dest: string, username: string, time: number, privileged: boolean, kind: string, message: unknown) => void}
203      */
204     this.onusermessage = null;
205 }
206 
207 /**
208   * @typedef {Object} message
209   * @property {string} type
210   * @property {string} [kind]
211   * @property {string} [id]
212   * @property {string} [replace]
213   * @property {string} [source]
214   * @property {string} [dest]
215   * @property {string} [username]
216   * @property {string} [password]
217   * @property {string} [token]
218   * @property {boolean} [privileged]
219   * @property {Array<string>} [permissions]
220   * @property {Object<string,any>} [status]
221   * @property {Object<string,any>} [data]
222   * @property {string} [group]
223   * @property {unknown} [value]
224   * @property {boolean} [noecho]
225   * @property {string} [sdp]
226   * @property {RTCIceCandidate} [candidate]
227   * @property {string} [label]
228   * @property {Object<string,Array<string>>|Array<string>} [request]
229   * @property {Object<string,any>} [rtcConfiguration]
230   */
231 
232 /**
233  * close forcibly closes a server connection.  The onclose callback will
234  * be called when the connection is effectively closed.
235  */
236 ServerConnection.prototype.close = function() {
237     this.socket && this.socket.close(1000, 'Close requested by client');
238     this.socket = null;
239 };
240 
241 /**
242   * send sends a message to the server.
243   * @param {message} m - the message to send.
244   */
245 ServerConnection.prototype.send = function(m) {
246     if(!this.socket || this.socket.readyState !== this.socket.OPEN) {
247         // send on a closed socket doesn't throw
248         throw(new Error('Connection is not open'));
249     }
250     return this.socket.send(JSON.stringify(m));
251 };
252 
253 /**
254  * connect connects to the server.
255  *
256  * @param {string} url - The URL to connect to.
257  * @returns {Promise<ServerConnection>}
258  * @function
259  */
260 ServerConnection.prototype.connect = async function(url) {
261     let sc = this;
262     if(sc.socket) {
263         sc.socket.close(1000, 'Reconnecting');
264         sc.socket = null;
265     }
266 
267     sc.socket = new WebSocket(url);
268 
269     return await new Promise((resolve, reject) => {
270         this.socket.onerror = function(e) {
271             reject(e);
272         };
273         this.socket.onopen = function(e) {
274             sc.send({
275                 type: 'handshake',
276                 id: sc.id,
277             });
278             if(sc.onconnected)
279                 sc.onconnected.call(sc);
280             resolve(sc);
281         };
282         this.socket.onclose = function(e) {
283             sc.permissions = [];
284             for(let id in sc.up) {
285                 let c = sc.up[id];
286                 c.close();
287             }
288             for(let id in sc.down) {
289                 let c = sc.down[id];
290                 c.close();
291             }
292             for(let id in sc.users) {
293                 delete(sc.users[id]);
294                 if(sc.onuser)
295                     sc.onuser.call(sc, id, 'delete');
296             }
297             if(sc.group && sc.onjoined)
298                 sc.onjoined.call(sc, 'leave', sc.group, [], {}, {}, '');
299             sc.group = null;
300             sc.username = null;
301             if(sc.onclose)
302                 sc.onclose.call(sc, e.code, e.reason);
303             reject(new Error('websocket close ' + e.code + ' ' + e.reason));
304         };
305         this.socket.onmessage = function(e) {
306             let m = JSON.parse(e.data);
307             switch(m.type) {
308             case 'handshake':
309                 break;
310             case 'offer':
311                 sc.gotOffer(m.id, m.label, m.source, m.username,
312                             m.sdp, m.replace);
313                 break;
314             case 'answer':
315                 sc.gotAnswer(m.id, m.sdp);
316                 break;
317             case 'renegotiate':
318                 sc.gotRenegotiate(m.id);
319                 break;
320             case 'close':
321                 sc.gotClose(m.id);
322                 break;
323             case 'abort':
324                 sc.gotAbort(m.id);
325                 break;
326             case 'ice':
327                 sc.gotRemoteIce(m.id, m.candidate);
328                 break;
329             case 'joined':
330                 if(sc.group) {
331                     if(m.group !== sc.group) {
332                         throw new Error('Joined multiple groups');
333                     }
334                 } else {
335                     sc.group = m.group;
336                 }
337                 sc.username = m.username;
338                 sc.permissions = m.permissions || [];
339                 sc.rtcConfiguration = m.rtcConfiguration || null;
340                 if(m.kind == 'leave') {
341                     for(let id in sc.users) {
342                         delete(sc.users[id]);
343                         if(sc.onuser)
344                             sc.onuser.call(sc, id, 'delete');
345                     }
346                 }
347                 if(sc.onjoined)
348                     sc.onjoined.call(sc, m.kind, m.group,
349                                      m.permissions || [],
350                                      m.status, m.data,
351                                      m.value || null);
352                 break;
353             case 'user':
354                 switch(m.kind) {
355                 case 'add':
356                     if(m.id in sc.users)
357                         console.warn(`Duplicate user ${m.id} ${m.username}`);
358                     sc.users[m.id] = {
359                         username: m.username,
360                         permissions: m.permissions || [],
361                         data: m.data || {},
362                         streams: {},
363                     };
364                     break;
365                 case 'change':
366                     if(!(m.id in sc.users)) {
367                         console.warn(`Unknown user ${m.id} ${m.username}`);
368                         sc.users[m.id] = {
369                             username: m.username,
370                             permissions: m.permissions || [],
371                             data: m.data || {},
372                             streams: {},
373                         };
374                     } else {
375                         sc.users[m.id].username = m.username;
376                         sc.users[m.id].permissions = m.permissions || [];
377                         sc.users[m.id].data = m.data || {};
378                     }
379                     break;
380                 case 'delete':
381                     if(!(m.id in sc.users))
382                         console.warn(`Unknown user ${m.id} ${m.username}`);
383                     delete(sc.users[m.id]);
384                     break;
385                 default:
386                     console.warn(`Unknown user action ${m.kind}`);
387                     return;
388                 }
389                 if(sc.onuser)
390                     sc.onuser.call(sc, m.id, m.kind);
391                 break;
392             case 'chat':
393             case 'chathistory':
394                 if(sc.onchat)
395                     sc.onchat.call(
396                         sc, m.source, m.dest, m.username, m.time, m.privileged,
397                         m.type === 'chathistory', m.kind, m.value,
398                     );
399                 break;
400             case 'usermessage':
401                 if(sc.onusermessage)
402                     sc.onusermessage.call(
403                         sc, m.source, m.dest, m.username, m.time,
404                         m.privileged, m.kind, m.value,
405                     );
406                 break;
407             case 'ping':
408                 sc.send({
409                     type: 'pong',
410                 });
411                 break;
412             case 'pong':
413                 /* nothing */
414                 break;
415             default:
416                 console.warn('Unexpected server message', m.type);
417                 return;
418             }
419         };
420     });
421 };
422 
423 /**
424  * join requests to join a group.  The onjoined callback will be called
425  * when we've effectively joined.
426  *
427  * @param {string} group - The name of the group to join.
428  * @param {string} username - the username to join as.
429  * @param {string|Object} credentials - password or authServer.
430  * @param {Object<string,any>} [data] - the initial associated data.
431  */
432 ServerConnection.prototype.join = async function(group, username, credentials, data) {
433     let m = {
434         type: 'join',
435         kind: 'join',
436         group: group,
437         username: username,
438     };
439     if((typeof credentials) === 'string') {
440         m.password = credentials;
441     } else {
442         switch(credentials.type) {
443         case 'password':
444             m.password = credentials.password;
445             break;
446         case 'token':
447             m.token = credentials.token;
448             break;
449         case 'authServer':
450             let r = await fetch(credentials.authServer, {
451                 method: "POST",
452                 headers: {
453                     "Content-Type": "application/json",
454                 },
455                 body: JSON.stringify({
456                     location: credentials.location,
457                     username: username,
458                     password: credentials.password,
459                 }),
460             });
461             if(!r.ok)
462                 throw new Error(
463                     `The authorisation server said: ${r.status} ${r.statusText}`,
464                 );
465             m.token = await r.text();
466             break;
467         default:
468             throw new Error(`Unknown credentials type ${credentials.type}`);
469         }
470     }
471 
472     if(data)
473         m.data = data;
474 
475     this.send(m);
476 };
477 
478 /**
479  * leave leaves a group.  The onjoined callback will be called when we've
480  * effectively left.
481  *
482  * @param {string} group - The name of the group to join.
483  */
484 ServerConnection.prototype.leave = function(group) {
485     this.send({
486         type: 'join',
487         kind: 'leave',
488         group: group,
489     });
490 };
491 
492 /**
493  * request sets the list of requested tracks
494  *
495  * @param {Object<string,Array<string>>} what
496  *     - A dictionary that maps labels to a sequence of 'audio', 'video'
497  *       or 'video-low.  An entry with an empty label '' provides the default.
498  */
499 ServerConnection.prototype.request = function(what) {
500     this.send({
501         type: 'request',
502         request: what,
503     });
504 };
505 
506 /**
507  * findByLocalId finds an active connection with the given localId.
508  * It returns null if none was find.
509  *
510  * @param {string} localId
511  * @returns {Stream}
512  */
513 ServerConnection.prototype.findByLocalId = function(localId) {
514     if(!localId)
515         return null;
516 
517     let sc = this;
518 
519     for(let id in sc.up) {
520         let s = sc.up[id];
521         if(s.localId === localId)
522             return s;
523     }
524     return null;
525 }
526 
527 /**
528  * getRTCConfiguration returns the RTCConfiguration that should be used
529  * with this peer connection.  This usually comes from the server, but may
530  * be overridden by the onpeerconnection callback.
531  *
532  * @returns {RTCConfiguration}
533  */
534 ServerConnection.prototype.getRTCConfiguration = function() {
535     if(this.onpeerconnection) {
536         let conf = this.onpeerconnection.call(this);
537         if(conf !== null)
538             return conf;
539     }
540     return this.rtcConfiguration;
541 }
542 
543 /**
544  * newUpStream requests the creation of a new up stream.
545  *
546  * @param {string} [localId]
547  *   - The local id of the stream to create.  If a stream already exists with
548  *     the same local id, it is replaced with the new stream.
549  * @returns {Stream}
550  */
551 ServerConnection.prototype.newUpStream = function(localId) {
552     let sc = this;
553     let id = newRandomId();
554     if(sc.up[id])
555         throw new Error('Eek!');
556 
557     if(typeof RTCPeerConnection === 'undefined')
558         throw new Error("This browser doesn't support WebRTC");
559 
560 
561     let pc = new RTCPeerConnection(sc.getRTCConfiguration());
562     if(!pc)
563         throw new Error("Couldn't create peer connection");
564 
565     let oldId = null;
566     if(localId) {
567         let old = sc.findByLocalId(localId);
568         oldId = old && old.id;
569         if(old)
570             old.close(true);
571     }
572 
573     let c = new Stream(this, id, localId || newLocalId(), pc, true);
574     if(oldId)
575         c.replace = oldId;
576     sc.up[id] = c;
577 
578     pc.onnegotiationneeded = async e => {
579             await c.negotiate();
580     };
581 
582     pc.onicecandidate = e => {
583         if(!e.candidate)
584             return;
585         c.gotLocalIce(e.candidate);
586     };
587 
588     pc.oniceconnectionstatechange = e => {
589         if(c.onstatus)
590             c.onstatus.call(c, pc.iceConnectionState);
591         if(pc.iceConnectionState === 'failed')
592             c.restartIce();
593     };
594 
595     pc.ontrack = console.error;
596     return c;
597 };
598 
599 /**
600  * chat sends a chat message to the server.  The server will normally echo
601  * the message back to the client.
602  *
603  * @param {string} kind
604  *     -  The kind of message, either '', 'me' or an application-specific type.
605  * @param {string} dest - The id to send the message to, empty for broadcast.
606  * @param {string} value - The text of the message.
607  */
608 ServerConnection.prototype.chat = function(kind, dest, value) {
609     this.send({
610         type: 'chat',
611         source: this.id,
612         dest: dest,
613         username: this.username,
614         kind: kind,
615         value: value,
616     });
617 };
618 
619 /**
620  * userAction sends a request to act on a user.
621  *
622  * @param {string} kind - One of "op", "unop", "kick", "present", "unpresent".
623  * @param {string} dest - The id of the user to act upon.
624  * @param {any} [value] - An action-dependent parameter.
625  */
626 ServerConnection.prototype.userAction = function(kind, dest, value) {
627     this.send({
628         type: 'useraction',
629         source: this.id,
630         dest: dest,
631         username: this.username,
632         kind: kind,
633         value: value,
634     });
635 };
636 
637 /**
638  * userMessage sends an application-specific message to a user.
639  * This is similar to a chat message, but is not saved in the chat history.
640  *
641  * @param {string} kind - The kind of application-specific message.
642  * @param {string} dest - The id to send the message to, empty for broadcast.
643  * @param {unknown} [value] - An optional parameter.
644  * @param {boolean} [noecho] - If set, don't echo back the message to the sender.
645  */
646 ServerConnection.prototype.userMessage = function(kind, dest, value, noecho) {
647     this.send({
648         type: 'usermessage',
649         source: this.id,
650         dest: dest,
651         username: this.username,
652         kind: kind,
653         value: value,
654         noecho: noecho,
655     });
656 };
657 
658 /**
659  * groupAction sends a request to act on the current group.
660  *
661  * @param {string} kind
662  *     - One of 'clearchat', 'lock', 'unlock', 'record' or 'unrecord'.
663  * @param {string} [message] - An optional user-readable message.
664  */
665 ServerConnection.prototype.groupAction = function(kind, message) {
666     this.send({
667         type: 'groupaction',
668         source: this.id,
669         kind: kind,
670         username: this.username,
671         value: message,
672     });
673 };
674 
675 /**
676  * gotOffer is called when we receive an offer from the server.  Don't call this.
677  *
678  * @param {string} id
679  * @param {string} label
680  * @param {string} source
681  * @param {string} username
682  * @param {string} sdp
683  * @param {string} replace
684  * @function
685  */
686 ServerConnection.prototype.gotOffer = async function(id, label, source, username, sdp, replace) {
687     let sc = this;
688 
689     if(sc.up[id]) {
690         console.error("Duplicate connection id");
691         sc.send({
692             type: 'abort',
693             id: id,
694         });
695         return;
696     }
697 
698     let oldLocalId = null;
699 
700     if(replace) {
701         let old = sc.down[replace];
702         if(old) {
703             oldLocalId = old.localId;
704             old.close(true);
705         } else
706             console.error("Replacing unknown stream");
707     }
708 
709     let c = sc.down[id];
710     if(c && oldLocalId)
711         console.error("Replacing duplicate stream");
712 
713     if(!c) {
714         let pc;
715         try {
716             pc = new RTCPeerConnection(sc.getRTCConfiguration());
717         } catch(e) {
718             console.error(e);
719             sc.send({
720                 type: 'abort',
721                 id: id,
722             });
723             return;
724         }
725         c = new Stream(this, id, oldLocalId || newLocalId(), pc, false);
726         c.label = label;
727         sc.down[id] = c;
728 
729         c.pc.onicecandidate = function(e) {
730             if(!e.candidate)
731                 return;
732             c.gotLocalIce(e.candidate);
733         };
734 
735         pc.oniceconnectionstatechange = e => {
736             if(c.onstatus)
737                 c.onstatus.call(c, pc.iceConnectionState);
738             if(pc.iceConnectionState === 'failed') {
739                 sc.send({
740                     type: 'renegotiate',
741                     id: id,
742                 });
743             }
744         };
745 
746         c.pc.ontrack = function(e) {
747             if(e.streams.length < 1) {
748                 console.error("Got track with no stream");
749                 return;
750             }
751             c.stream = e.streams[0];
752             let changed = recomputeUserStreams(sc, source);
753             if(c.ondowntrack) {
754                 c.ondowntrack.call(
755                     c, e.track, e.transceiver, e.streams[0],
756                 );
757             }
758             if(changed && sc.onuser)
759                 sc.onuser.call(sc, source, "change");
760         };
761     }
762 
763     c.source = source;
764     c.username = username;
765 
766     if(sc.ondownstream)
767         sc.ondownstream.call(sc, c);
768 
769     try {
770         await c.pc.setRemoteDescription({
771             type: 'offer',
772             sdp: sdp,
773         });
774 
775         await c.flushRemoteIceCandidates();
776 
777         let answer = await c.pc.createAnswer();
778         if(!answer)
779             throw new Error("Didn't create answer");
780         await c.pc.setLocalDescription(answer);
781         this.send({
782             type: 'answer',
783             id: id,
784             sdp: c.pc.localDescription.sdp,
785         });
786     } catch(e) {
787         try {
788             if(c.onerror)
789                 c.onerror.call(c, e);
790         } finally {
791             c.abort();
792         }
793         return;
794     }
795 
796     c.localDescriptionSent = true;
797     c.flushLocalIceCandidates();
798     if(c.onnegotiationcompleted)
799         c.onnegotiationcompleted.call(c);
800 };
801 
802 /**
803  * gotAnswer is called when we receive an answer from the server.  Don't
804  * call this.
805  *
806  * @param {string} id
807  * @param {string} sdp
808  * @function
809  */
810 ServerConnection.prototype.gotAnswer = async function(id, sdp) {
811     let c = this.up[id];
812     if(!c)
813         throw new Error('unknown up stream');
814     try {
815         await c.pc.setRemoteDescription({
816             type: 'answer',
817             sdp: sdp,
818         });
819     } catch(e) {
820         try {
821             if(c.onerror)
822                 c.onerror.call(c, e);
823         } finally {
824             c.close();
825         }
826         return;
827     }
828     await c.flushRemoteIceCandidates();
829     if(c.onnegotiationcompleted)
830         c.onnegotiationcompleted.call(c);
831 };
832 
833 /**
834  * gotRenegotiate is called when we receive a renegotiation request from
835  * the server.  Don't call this.
836  *
837  * @param {string} id
838  * @function
839  */
840 ServerConnection.prototype.gotRenegotiate = function(id) {
841     let c = this.up[id];
842     if(!c)
843         throw new Error('unknown up stream');
844     c.restartIce();
845 };
846 
847 /**
848  * gotClose is called when we receive a close request from the server.
849  * Don't call this.
850  *
851  * @param {string} id
852  */
853 ServerConnection.prototype.gotClose = function(id) {
854     let c = this.down[id];
855     if(!c) {
856         console.warn('unknown down stream', id);
857         return;
858     }
859     c.close();
860 };
861 
862 /**
863  * gotAbort is called when we receive an abort message from the server.
864  * Don't call this.
865  *
866  * @param {string} id
867  */
868 ServerConnection.prototype.gotAbort = function(id) {
869     let c = this.up[id];
870     if(!c)
871         throw new Error('unknown up stream');
872     c.close();
873 };
874 
875 /**
876  * gotRemoteIce is called when we receive an ICE candidate from the server.
877  * Don't call this.
878  *
879  * @param {string} id
880  * @param {RTCIceCandidate} candidate
881  * @function
882  */
883 ServerConnection.prototype.gotRemoteIce = async function(id, candidate) {
884     let c = this.up[id];
885     if(!c)
886         c = this.down[id];
887     if(!c)
888         throw new Error('unknown stream');
889     if(c.pc.remoteDescription)
890         await c.pc.addIceCandidate(candidate).catch(console.warn);
891     else
892         c.remoteIceCandidates.push(candidate);
893 };
894 
895 /**
896  * Stream encapsulates a MediaStream, a set of tracks.
897  *
898  * A stream is said to go "up" if it is from the client to the server, and
899  * "down" otherwise.
900  *
901  * @param {ServerConnection} sc
902  * @param {string} id
903  * @param {string} localId
904  * @param {RTCPeerConnection} pc
905  *
906  * @constructor
907  */
908 function Stream(sc, id, localId, pc, up) {
909     /**
910      * The associated ServerConnection.
911      *
912      * @type {ServerConnection}
913      * @const
914      */
915     this.sc = sc;
916     /**
917      * The id of this stream.
918      *
919      * @type {string}
920      * @const
921      */
922     this.id = id;
923     /**
924      * The local id of this stream.
925      *
926      * @type {string}
927      * @const
928      */
929     this.localId = localId;
930     /**
931      * Indicates whether the stream is in the client->server direction.
932      *
933      * @type {boolean}
934      * @const
935      */
936     this.up = up;
937     /**
938      * For down streams, the id of the client that created the stream.
939      *
940      * @type {string}
941      */
942     this.source = null;
943     /**
944      * For down streams, the username of the client who created the stream.
945      *
946      * @type {string}
947      */
948     this.username = null;
949     /**
950      * The associated RTCPeerConnection.  This is null before the stream
951      * is connected, and may change over time.
952      *
953      * @type {RTCPeerConnection}
954      */
955     this.pc = pc;
956     /**
957      * The associated MediaStream.  This is null before the stream is
958      * connected, and may change over time.
959      *
960      * @type {MediaStream}
961      */
962     this.stream = null;
963     /**
964      * The label assigned by the originator to this stream.
965      *
966      * @type {string}
967      */
968     this.label = null;
969     /**
970      * The id of the stream that we are currently replacing.
971      *
972      * @type {string}
973      */
974     this.replace = null;
975     /**
976      * Indicates whether we have already sent a local description.
977      *
978      * @type {boolean}
979      */
980     this.localDescriptionSent = false;
981     /**
982      * Buffered local ICE candidates.  This will be flushed by
983      * flushLocalIceCandidates after we send a local description.
984      *
985      * @type {RTCIceCandidate[]}
986      */
987     this.localIceCandidates = [];
988     /**
989      * Buffered remote ICE candidates.  This will be flushed by
990      * flushRemoteIceCandidates when we get a remote SDP description.
991      *
992      * @type {RTCIceCandidate[]}
993      */
994     this.remoteIceCandidates = [];
995     /**
996      * The statistics last computed by the stats handler.  This is
997      * a dictionary indexed by track id, with each value a dictionary of
998      * statistics.
999      *
1000      * @type {Object<string,unknown>}
1001      */
1002     this.stats = {};
1003     /**
1004      * The id of the periodic handler that computes statistics, as
1005      * returned by setInterval.
1006      *
1007      * @type {number}
1008      */
1009     this.statsHandler = null;
1010     /**
1011      * userdata is a convenient place to attach data to a Stream.
1012      * It is not used by the library.
1013      *
1014      * @type{Object<unknown,unknown>}
1015      */
1016     this.userdata = {};
1017 
1018     /* Callbacks */
1019 
1020     /**
1021      * onclose is called when the stream is closed.  Replace will be true
1022      * if the stream is being replaced by another one with the same id.
1023      *
1024      * @type{(this: Stream, replace: boolean) => void}
1025      */
1026     this.onclose = null;
1027     /**
1028      * onerror is called whenever a fatal error occurs.  The stream will
1029      * then be closed, and onclose called normally.
1030      *
1031      * @type{(this: Stream, error: unknown) => void}
1032      */
1033     this.onerror = null;
1034     /**
1035      * onnegotiationcompleted is called whenever negotiation or
1036      * renegotiation has completed.
1037      *
1038      * @type{(this: Stream) => void}
1039      */
1040     this.onnegotiationcompleted = null;
1041     /**
1042      * ondowntrack is called whenever a new track is added to a stream.
1043      * If the stream parameter differs from its previous value, then it
1044      * indicates that the old stream has been discarded.
1045      *
1046      * @type{(this: Stream, track: MediaStreamTrack, transceiver: RTCRtpTransceiver, stream: MediaStream) => void}
1047      */
1048     this.ondowntrack = null;
1049     /**
1050      * onstatus is called whenever the status of the stream changes.
1051      *
1052      * @type{(this: Stream, status: string) => void}
1053      */
1054     this.onstatus = null;
1055     /**
1056      * onstats is called when we have new statistics about the connection
1057      *
1058      * @type{(this: Stream, stats: Object<unknown,unknown>) => void}
1059      */
1060     this.onstats = null;
1061 }
1062 
1063 /**
1064  * setStream sets the stream of an upwards connection.
1065  *
1066  * @param {MediaStream} stream
1067  */
1068 Stream.prototype.setStream = function(stream) {
1069     let c = this;
1070     c.stream = stream;
1071     let changed = recomputeUserStreams(c.sc, c.sc.id);
1072     if(changed && c.sc.onuser)
1073         c.sc.onuser.call(c.sc, c.sc.id, "change");
1074 }
1075 
1076 /**
1077  * close closes a stream.
1078  *
1079  * For streams in the up direction, this may be called at any time.  For
1080  * streams in the down direction, this will be called automatically when
1081  * the server signals that it is closing a stream.
1082  *
1083  * @param {boolean} [replace]
1084  *    - true if the stream is being replaced by another one with the same id
1085  */
1086 Stream.prototype.close = function(replace) {
1087     let c = this;
1088 
1089     if(!c.sc) {
1090         console.warn('Closing closed stream');
1091         return;
1092     }
1093 
1094     if(c.statsHandler) {
1095         clearInterval(c.statsHandler);
1096         c.statsHandler = null;
1097     }
1098 
1099     c.pc.close();
1100 
1101     if(c.up && !replace && c.localDescriptionSent) {
1102         try {
1103             c.sc.send({
1104                 type: 'close',
1105                 id: c.id,
1106             });
1107         } catch(e) {
1108         }
1109     }
1110 
1111     let userid;
1112     if(c.up) {
1113         userid = c.sc.id;
1114         if(c.sc.up[c.id] === c)
1115             delete(c.sc.up[c.id]);
1116         else
1117             console.warn('Closing unknown stream');
1118     } else {
1119         userid = c.source;
1120         if(c.sc.down[c.id] === c)
1121             delete(c.sc.down[c.id]);
1122         else
1123             console.warn('Closing unknown stream');
1124     }
1125     let changed = recomputeUserStreams(c.sc, userid);
1126     if(changed && c.sc.onuser)
1127         c.sc.onuser.call(c.sc, userid, "change");
1128     c.sc = null;
1129 
1130     if(c.onclose)
1131         c.onclose.call(c, replace);
1132 };
1133 
1134 /**
1135  * recomputeUserStreams recomputes the user.streams array for a given user.
1136  * It returns true if anything changed.
1137  *
1138  * @param {ServerConnection} sc
1139  * @param {string} id
1140  * @returns {boolean}
1141  */
1142 function recomputeUserStreams(sc, id) {
1143     let user = sc.users[id];
1144     if(!user) {
1145         console.warn("recomputing streams for unknown user");
1146         return false;
1147     }
1148 
1149     let streams = id === sc.id ? sc.up : sc.down;
1150     let old = user.streams;
1151     user.streams = {};
1152     for(id in streams) {
1153         let c = streams[id];
1154         if(!c.stream)
1155             continue;
1156         if(!user.streams[c.label])
1157             user.streams[c.label] = {};
1158         c.stream.getTracks().forEach(t => {
1159             user.streams[c.label][t.kind] = true;
1160         });
1161     }
1162 
1163     return JSON.stringify(old) != JSON.stringify(user.streams);
1164 }
1165 
1166 /**
1167  * abort requests that the server close a down stream.
1168  */
1169 Stream.prototype.abort = function() {
1170     let c = this;
1171     if(c.up)
1172         throw new Error("Abort called on an up stream");
1173     c.sc.send({
1174         type: 'abort',
1175         id: c.id,
1176     });
1177 };
1178 
1179 /**
1180  * gotLocalIce is Called when we get a local ICE candidate.  Don't call this.
1181  *
1182  * @param {RTCIceCandidate} candidate
1183  * @function
1184  */
1185 Stream.prototype.gotLocalIce = function(candidate) {
1186     let c = this;
1187     if(c.localDescriptionSent)
1188         c.sc.send({type: 'ice',
1189                    id: c.id,
1190                    candidate: candidate,
1191                   });
1192     else
1193         c.localIceCandidates.push(candidate);
1194 };
1195 
1196 /**
1197  * flushLocalIceCandidates flushes any buffered local ICE candidates.
1198  * It is called when we send an offer.
1199  *
1200  * @function
1201  */
1202 Stream.prototype.flushLocalIceCandidates = function () {
1203     let c = this;
1204     let candidates = c.localIceCandidates;
1205     c.localIceCandidates = [];
1206     candidates.forEach(candidate => {
1207         try {
1208             c.sc.send({type: 'ice',
1209                        id: c.id,
1210                        candidate: candidate,
1211                       });
1212         } catch(e) {
1213             console.warn(e);
1214         }
1215     });
1216     c.localIceCandidates = [];
1217 };
1218 
1219 /**
1220  * flushRemoteIceCandidates flushes any buffered remote ICE candidates.  It is
1221  * called automatically when we get a remote description.
1222  *
1223  * @function
1224  */
1225 Stream.prototype.flushRemoteIceCandidates = async function () {
1226     let c = this;
1227     let candidates = c.remoteIceCandidates;
1228     c.remoteIceCandidates = [];
1229     /** @type {Array.<Promise<void>>} */
1230     let promises = [];
1231     candidates.forEach(candidate => {
1232         promises.push(c.pc.addIceCandidate(candidate).catch(console.warn));
1233     });
1234     return await Promise.all(promises);
1235 };
1236 
1237 /**
1238  * negotiate negotiates or renegotiates an up stream.  It is called
1239  * automatically when required.  If the client requires renegotiation, it
1240  * is probably better to call restartIce which will cause negotiate to be
1241  * called asynchronously.
1242  *
1243  * @function
1244  * @param {boolean} [restartIce] - Whether to restart ICE.
1245  */
1246 Stream.prototype.negotiate = async function (restartIce) {
1247     let c = this;
1248     if(!c.up)
1249         throw new Error('not an up stream');
1250 
1251     let options = {};
1252     if(restartIce)
1253         options = {iceRestart: true};
1254     let offer = await c.pc.createOffer(options);
1255     if(!offer)
1256         throw(new Error("Didn't create offer"));
1257     await c.pc.setLocalDescription(offer);
1258 
1259     c.sc.send({
1260         type: 'offer',
1261         source: c.sc.id,
1262         username: c.sc.username,
1263         kind: this.localDescriptionSent ? 'renegotiate' : '',
1264         id: c.id,
1265         replace: this.replace,
1266         label: c.label,
1267         sdp: c.pc.localDescription.sdp,
1268     });
1269     this.localDescriptionSent = true;
1270     this.replace = null;
1271     c.flushLocalIceCandidates();
1272 };
1273 
1274 /**
1275  * restartIce causes an ICE restart on a stream.  For up streams, it is
1276  * called automatically when ICE signals that the connection has failed,
1277  * but may also be called by the application.  For down streams, it
1278  * requests that the server perform an ICE restart.  In either case,
1279  * it returns immediately, negotiation will happen asynchronously.
1280  */
1281 
1282 Stream.prototype.restartIce = function () {
1283     let c = this;
1284     if(!c.up) {
1285         c.sc.send({
1286             type: 'renegotiate',
1287             id: c.id,
1288         });
1289         return;
1290     }
1291 
1292     if('restartIce' in c.pc) {
1293         try {
1294             c.pc.restartIce();
1295             return;
1296         } catch(e) {
1297             console.warn(e);
1298         }
1299     }
1300 
1301     // negotiate is async, but this returns immediately.
1302     c.negotiate(true);
1303 };
1304 
1305 /**
1306  * request sets the list of tracks.  If this is not called, or called with
1307  * a null argument, then the default is provided by ServerConnection.request.
1308  *
1309  * @param {Array<string>} what - a sequence of 'audio', 'video' or 'video-low'.
1310  */
1311 Stream.prototype.request = function(what) {
1312     let c = this;
1313     c.sc.send({
1314         type: 'requestStream',
1315         id: c.id,
1316         request: what,
1317     });
1318 };
1319 
1320 /**
1321  * updateStats is called periodically, if requested by setStatsInterval,
1322  * in order to recompute stream statistics and invoke the onstats handler.
1323  *
1324  * @function
1325  */
1326 Stream.prototype.updateStats = async function() {
1327     let c = this;
1328     let old = c.stats;
1329     /** @type{Object<string,unknown>} */
1330     let stats = {};
1331 
1332     let transceivers = c.pc.getTransceivers();
1333     for(let i = 0; i < transceivers.length; i++) {
1334         let t = transceivers[i];
1335         let stid = t.sender.track && t.sender.track.id;
1336         let rtid = t.receiver.track && t.receiver.track.id;
1337 
1338         let report = null;
1339         if(stid) {
1340             try {
1341                 report = await t.sender.getStats();
1342             } catch(e) {
1343             }
1344         }
1345 
1346         if(report) {
1347             for(let r of report.values()) {
1348                 if(stid && r.type === 'outbound-rtp') {
1349                     let id = stid;
1350                     // Firefox doesn't implement rid, use ssrc
1351                     // to discriminate simulcast tracks.
1352                     id = id + '-' + r.ssrc;
1353                     if(!('bytesSent' in r))
1354                         continue;
1355                     if(!stats[id])
1356                         stats[id] = {};
1357                     stats[id][r.type] = {};
1358                     stats[id][r.type].timestamp = r.timestamp;
1359                     stats[id][r.type].bytesSent = r.bytesSent;
1360                     if(old[id] && old[id][r.type])
1361                         stats[id][r.type].rate =
1362                         ((r.bytesSent - old[id][r.type].bytesSent) * 1000 /
1363                          (r.timestamp - old[id][r.type].timestamp)) * 8;
1364                 }
1365             }
1366         }
1367 
1368         report = null;
1369         if(rtid) {
1370             try {
1371                 report = await t.receiver.getStats();
1372             } catch(e) {
1373                 console.error(e);
1374             }
1375         }
1376 
1377         if(report) {
1378             for(let r of report.values()) {
1379                 if(rtid && r.type === 'track') {
1380                     if(!('totalAudioEnergy' in r))
1381                         continue;
1382                     if(!stats[rtid])
1383                         stats[rtid] = {};
1384                     stats[rtid][r.type] = {};
1385                     stats[rtid][r.type].timestamp = r.timestamp;
1386                     stats[rtid][r.type].totalAudioEnergy = r.totalAudioEnergy;
1387                     if(old[rtid] && old[rtid][r.type])
1388                         stats[rtid][r.type].audioEnergy =
1389                         (r.totalAudioEnergy - old[rtid][r.type].totalAudioEnergy) * 1000 /
1390                         (r.timestamp - old[rtid][r.type].timestamp);
1391                 }
1392             }
1393         }
1394     }
1395 
1396     c.stats = stats;
1397 
1398     if(c.onstats)
1399         c.onstats.call(c, c.stats);
1400 };
1401 
1402 /**
1403  * setStatsInterval sets the interval in milliseconds at which the onstats
1404  * handler will be called.  This is only useful for up streams.
1405  *
1406  * @param {number} ms - The interval in milliseconds.
1407  */
1408 Stream.prototype.setStatsInterval = function(ms) {
1409     let c = this;
1410     if(c.statsHandler) {
1411         clearInterval(c.statsHandler);
1412         c.statsHandler = null;
1413     }
1414 
1415     if(ms <= 0)
1416         return;
1417 
1418     c.statsHandler = setInterval(() => {
1419         c.updateStats();
1420     }, ms);
1421 };
1422