1 // Copyright (c) 2020 by Juliusz Chroboczek. 2 3 // Permission is hereby granted, free of charge, to any person obtaining a copy 4 // of this software and associated documentation files (the "Software"), to deal 5 // in the Software without restriction, including without limitation the rights 6 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 7 // copies of the Software, and to permit persons to whom the Software is 8 // furnished to do so, subject to the following conditions: 9 // 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the Software. 12 // 13 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 18 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 19 // THE SOFTWARE. 20 21 'use strict'; 22 23 /** 24 * toHex formats an array as a hexadecimal string. 25 * 26 * @param {number[]|Uint8Array} array - the array to format 27 * @returns {string} - the hexadecimal representation of array 28 */ 29 function toHex(array) { 30 let a = new Uint8Array(array); 31 function hex(x) { 32 let h = x.toString(16); 33 if(h.length < 2) 34 h = '0' + h; 35 return h; 36 } 37 return a.reduce((x, y) => x + hex(y), ''); 38 } 39 40 /** 41 * newRandomId returns a random string of 32 hex digits (16 bytes). 42 * 43 * @returns {string} 44 */ 45 function newRandomId() { 46 let a = new Uint8Array(16); 47 crypto.getRandomValues(a); 48 return toHex(a); 49 } 50 51 let localIdCounter = 0; 52 53 /** 54 * newLocalId returns a string that is unique in this session. 55 * 56 * @returns {string} 57 */ 58 function newLocalId() { 59 let id = `${localIdCounter}` 60 localIdCounter++; 61 return id; 62 } 63 64 /** 65 * @typedef {Object} user 66 * @property {string} username 67 * @property {Array<string>} permissions 68 * @property {Object<string,any>} data 69 * @property {Object<string,Object<string,boolean>>} streams 70 */ 71 72 /** 73 * ServerConnection encapsulates a websocket connection to the server and 74 * all the associated streams. 75 * @constructor 76 */ 77 function ServerConnection() { 78 /** 79 * The id of this connection. 80 * 81 * @type {string} 82 * @const 83 */ 84 this.id = newRandomId(); 85 /** 86 * The group that we have joined, or null if we haven't joined yet. 87 * 88 * @type {string} 89 */ 90 this.group = null; 91 /** 92 * The username we joined as. 93 * 94 * @type {string} 95 */ 96 this.username = null; 97 /** 98 * The set of users in this group, including ourself. 99 * 100 * @type {Object<string,user>} 101 */ 102 this.users = {}; 103 /** 104 * The underlying websocket. 105 * 106 * @type {WebSocket} 107 */ 108 this.socket = null; 109 /** 110 * The set of all up streams, indexed by their id. 111 * 112 * @type {Object<string,Stream>} 113 */ 114 this.up = {}; 115 /** 116 * The set of all down streams, indexed by their id. 117 * 118 * @type {Object<string,Stream>} 119 */ 120 this.down = {}; 121 /** 122 * The ICE configuration used by all associated streams. 123 * 124 * @type {RTCConfiguration} 125 */ 126 this.rtcConfiguration = null; 127 /** 128 * The permissions granted to this connection. 129 * 130 * @type {Array<string>} 131 */ 132 this.permissions = []; 133 /** 134 * userdata is a convenient place to attach data to a ServerConnection. 135 * It is not used by the library. 136 * 137 * @type{Object<unknown,unknown>} 138 */ 139 this.userdata = {}; 140 141 /* Callbacks */ 142 143 /** 144 * onconnected is called when the connection has been established 145 * 146 * @type{(this: ServerConnection) => void} 147 */ 148 this.onconnected = null; 149 /** 150 * onclose is called when the connection is closed 151 * 152 * @type{(this: ServerConnection, code: number, reason: string) => void} 153 */ 154 this.onclose = null; 155 /** 156 * onpeerconnection is called before we establish a new peer connection. 157 * It may either return null, or a new RTCConfiguration that overrides 158 * the value obtained from the server. 159 * 160 * @type{(this: ServerConnection) => RTCConfiguration} 161 */ 162 this.onpeerconnection = null; 163 /** 164 * onuser is called whenever a user in the group changes. The users 165 * array has already been updated. 166 * 167 * @type{(this: ServerConnection, id: string, kind: string) => void} 168 */ 169 this.onuser = null; 170 /** 171 * onjoined is called whenever we join or leave a group or whenever the 172 * permissions we have in a group change. 173 * 174 * kind is one of 'join', 'fail', 'change' or 'leave'. 175 * 176 * @type{(this: ServerConnection, kind: string, group: string, permissions: Array<string>, status: Object<string,any>, data: Object<string,any>, message: string) => void} 177 */ 178 this.onjoined = null; 179 /** 180 * ondownstream is called whenever a new down stream is added. It 181 * should set up the stream's callbacks; actually setting up the UI 182 * should be done in the stream's ondowntrack callback. 183 * 184 * @type{(this: ServerConnection, stream: Stream) => void} 185 */ 186 this.ondownstream = null; 187 /** 188 * onchat is called whenever a new chat message is received. 189 * 190 * @type {(this: ServerConnection, id: string, dest: string, username: string, time: number, privileged: boolean, history: boolean, kind: string, message: unknown) => void} 191 */ 192 this.onchat = null; 193 /** 194 * onusermessage is called when an application-specific message is 195 * received. Id is null when the message originated at the server, 196 * a user-id otherwise. 197 * 198 * 'kind' is typically one of 'error', 'warning', 'info' or 'mute'. If 199 * 'id' is non-null, 'privileged' indicates whether the message was 200 * sent by an operator. 201 * 202 * @type {(this: ServerConnection, id: string, dest: string, username: string, time: number, privileged: boolean, kind: string, message: unknown) => void} 203 */ 204 this.onusermessage = null; 205 } 206 207 /** 208 * @typedef {Object} message 209 * @property {string} type 210 * @property {string} [kind] 211 * @property {string} [id] 212 * @property {string} [replace] 213 * @property {string} [source] 214 * @property {string} [dest] 215 * @property {string} [username] 216 * @property {string} [password] 217 * @property {string} [token] 218 * @property {boolean} [privileged] 219 * @property {Array<string>} [permissions] 220 * @property {Object<string,any>} [status] 221 * @property {Object<string,any>} [data] 222 * @property {string} [group] 223 * @property {unknown} [value] 224 * @property {boolean} [noecho] 225 * @property {string} [sdp] 226 * @property {RTCIceCandidate} [candidate] 227 * @property {string} [label] 228 * @property {Object<string,Array<string>>|Array<string>} [request] 229 * @property {Object<string,any>} [rtcConfiguration] 230 */ 231 232 /** 233 * close forcibly closes a server connection. The onclose callback will 234 * be called when the connection is effectively closed. 235 */ 236 ServerConnection.prototype.close = function() { 237 this.socket && this.socket.close(1000, 'Close requested by client'); 238 this.socket = null; 239 }; 240 241 /** 242 * send sends a message to the server. 243 * @param {message} m - the message to send. 244 */ 245 ServerConnection.prototype.send = function(m) { 246 if(!this.socket || this.socket.readyState !== this.socket.OPEN) { 247 // send on a closed socket doesn't throw 248 throw(new Error('Connection is not open')); 249 } 250 return this.socket.send(JSON.stringify(m)); 251 }; 252 253 /** 254 * connect connects to the server. 255 * 256 * @param {string} url - The URL to connect to. 257 * @returns {Promise<ServerConnection>} 258 * @function 259 */ 260 ServerConnection.prototype.connect = async function(url) { 261 let sc = this; 262 if(sc.socket) { 263 sc.socket.close(1000, 'Reconnecting'); 264 sc.socket = null; 265 } 266 267 sc.socket = new WebSocket(url); 268 269 return await new Promise((resolve, reject) => { 270 this.socket.onerror = function(e) { 271 reject(e); 272 }; 273 this.socket.onopen = function(e) { 274 sc.send({ 275 type: 'handshake', 276 id: sc.id, 277 }); 278 if(sc.onconnected) 279 sc.onconnected.call(sc); 280 resolve(sc); 281 }; 282 this.socket.onclose = function(e) { 283 sc.permissions = []; 284 for(let id in sc.up) { 285 let c = sc.up[id]; 286 c.close(); 287 } 288 for(let id in sc.down) { 289 let c = sc.down[id]; 290 c.close(); 291 } 292 for(let id in sc.users) { 293 delete(sc.users[id]); 294 if(sc.onuser) 295 sc.onuser.call(sc, id, 'delete'); 296 } 297 if(sc.group && sc.onjoined) 298 sc.onjoined.call(sc, 'leave', sc.group, [], {}, {}, ''); 299 sc.group = null; 300 sc.username = null; 301 if(sc.onclose) 302 sc.onclose.call(sc, e.code, e.reason); 303 reject(new Error('websocket close ' + e.code + ' ' + e.reason)); 304 }; 305 this.socket.onmessage = function(e) { 306 let m = JSON.parse(e.data); 307 switch(m.type) { 308 case 'handshake': 309 break; 310 case 'offer': 311 sc.gotOffer(m.id, m.label, m.source, m.username, 312 m.sdp, m.replace); 313 break; 314 case 'answer': 315 sc.gotAnswer(m.id, m.sdp); 316 break; 317 case 'renegotiate': 318 sc.gotRenegotiate(m.id); 319 break; 320 case 'close': 321 sc.gotClose(m.id); 322 break; 323 case 'abort': 324 sc.gotAbort(m.id); 325 break; 326 case 'ice': 327 sc.gotRemoteIce(m.id, m.candidate); 328 break; 329 case 'joined': 330 if(sc.group) { 331 if(m.group !== sc.group) { 332 throw new Error('Joined multiple groups'); 333 } 334 } else { 335 sc.group = m.group; 336 } 337 sc.username = m.username; 338 sc.permissions = m.permissions || []; 339 sc.rtcConfiguration = m.rtcConfiguration || null; 340 if(m.kind == 'leave') { 341 for(let id in sc.users) { 342 delete(sc.users[id]); 343 if(sc.onuser) 344 sc.onuser.call(sc, id, 'delete'); 345 } 346 } 347 if(sc.onjoined) 348 sc.onjoined.call(sc, m.kind, m.group, 349 m.permissions || [], 350 m.status, m.data, 351 m.value || null); 352 break; 353 case 'user': 354 switch(m.kind) { 355 case 'add': 356 if(m.id in sc.users) 357 console.warn(`Duplicate user ${m.id} ${m.username}`); 358 sc.users[m.id] = { 359 username: m.username, 360 permissions: m.permissions || [], 361 data: m.data || {}, 362 streams: {}, 363 }; 364 break; 365 case 'change': 366 if(!(m.id in sc.users)) { 367 console.warn(`Unknown user ${m.id} ${m.username}`); 368 sc.users[m.id] = { 369 username: m.username, 370 permissions: m.permissions || [], 371 data: m.data || {}, 372 streams: {}, 373 }; 374 } else { 375 sc.users[m.id].username = m.username; 376 sc.users[m.id].permissions = m.permissions || []; 377 sc.users[m.id].data = m.data || {}; 378 } 379 break; 380 case 'delete': 381 if(!(m.id in sc.users)) 382 console.warn(`Unknown user ${m.id} ${m.username}`); 383 delete(sc.users[m.id]); 384 break; 385 default: 386 console.warn(`Unknown user action ${m.kind}`); 387 return; 388 } 389 if(sc.onuser) 390 sc.onuser.call(sc, m.id, m.kind); 391 break; 392 case 'chat': 393 case 'chathistory': 394 if(sc.onchat) 395 sc.onchat.call( 396 sc, m.source, m.dest, m.username, m.time, m.privileged, 397 m.type === 'chathistory', m.kind, m.value, 398 ); 399 break; 400 case 'usermessage': 401 if(sc.onusermessage) 402 sc.onusermessage.call( 403 sc, m.source, m.dest, m.username, m.time, 404 m.privileged, m.kind, m.value, 405 ); 406 break; 407 case 'ping': 408 sc.send({ 409 type: 'pong', 410 }); 411 break; 412 case 'pong': 413 /* nothing */ 414 break; 415 default: 416 console.warn('Unexpected server message', m.type); 417 return; 418 } 419 }; 420 }); 421 }; 422 423 /** 424 * join requests to join a group. The onjoined callback will be called 425 * when we've effectively joined. 426 * 427 * @param {string} group - The name of the group to join. 428 * @param {string} username - the username to join as. 429 * @param {string|Object} credentials - password or authServer. 430 * @param {Object<string,any>} [data] - the initial associated data. 431 */ 432 ServerConnection.prototype.join = async function(group, username, credentials, data) { 433 let m = { 434 type: 'join', 435 kind: 'join', 436 group: group, 437 username: username, 438 }; 439 if((typeof credentials) === 'string') { 440 m.password = credentials; 441 } else { 442 switch(credentials.type) { 443 case 'password': 444 m.password = credentials.password; 445 break; 446 case 'token': 447 m.token = credentials.token; 448 break; 449 case 'authServer': 450 let r = await fetch(credentials.authServer, { 451 method: "POST", 452 headers: { 453 "Content-Type": "application/json", 454 }, 455 body: JSON.stringify({ 456 location: credentials.location, 457 username: username, 458 password: credentials.password, 459 }), 460 }); 461 if(!r.ok) 462 throw new Error( 463 `The authorisation server said: ${r.status} ${r.statusText}`, 464 ); 465 m.token = await r.text(); 466 break; 467 default: 468 throw new Error(`Unknown credentials type ${credentials.type}`); 469 } 470 } 471 472 if(data) 473 m.data = data; 474 475 this.send(m); 476 }; 477 478 /** 479 * leave leaves a group. The onjoined callback will be called when we've 480 * effectively left. 481 * 482 * @param {string} group - The name of the group to join. 483 */ 484 ServerConnection.prototype.leave = function(group) { 485 this.send({ 486 type: 'join', 487 kind: 'leave', 488 group: group, 489 }); 490 }; 491 492 /** 493 * request sets the list of requested tracks 494 * 495 * @param {Object<string,Array<string>>} what 496 * - A dictionary that maps labels to a sequence of 'audio', 'video' 497 * or 'video-low. An entry with an empty label '' provides the default. 498 */ 499 ServerConnection.prototype.request = function(what) { 500 this.send({ 501 type: 'request', 502 request: what, 503 }); 504 }; 505 506 /** 507 * findByLocalId finds an active connection with the given localId. 508 * It returns null if none was find. 509 * 510 * @param {string} localId 511 * @returns {Stream} 512 */ 513 ServerConnection.prototype.findByLocalId = function(localId) { 514 if(!localId) 515 return null; 516 517 let sc = this; 518 519 for(let id in sc.up) { 520 let s = sc.up[id]; 521 if(s.localId === localId) 522 return s; 523 } 524 return null; 525 } 526 527 /** 528 * getRTCConfiguration returns the RTCConfiguration that should be used 529 * with this peer connection. This usually comes from the server, but may 530 * be overridden by the onpeerconnection callback. 531 * 532 * @returns {RTCConfiguration} 533 */ 534 ServerConnection.prototype.getRTCConfiguration = function() { 535 if(this.onpeerconnection) { 536 let conf = this.onpeerconnection.call(this); 537 if(conf !== null) 538 return conf; 539 } 540 return this.rtcConfiguration; 541 } 542 543 /** 544 * newUpStream requests the creation of a new up stream. 545 * 546 * @param {string} [localId] 547 * - The local id of the stream to create. If a stream already exists with 548 * the same local id, it is replaced with the new stream. 549 * @returns {Stream} 550 */ 551 ServerConnection.prototype.newUpStream = function(localId) { 552 let sc = this; 553 let id = newRandomId(); 554 if(sc.up[id]) 555 throw new Error('Eek!'); 556 557 if(typeof RTCPeerConnection === 'undefined') 558 throw new Error("This browser doesn't support WebRTC"); 559 560 561 let pc = new RTCPeerConnection(sc.getRTCConfiguration()); 562 if(!pc) 563 throw new Error("Couldn't create peer connection"); 564 565 let oldId = null; 566 if(localId) { 567 let old = sc.findByLocalId(localId); 568 oldId = old && old.id; 569 if(old) 570 old.close(true); 571 } 572 573 let c = new Stream(this, id, localId || newLocalId(), pc, true); 574 if(oldId) 575 c.replace = oldId; 576 sc.up[id] = c; 577 578 pc.onnegotiationneeded = async e => { 579 await c.negotiate(); 580 }; 581 582 pc.onicecandidate = e => { 583 if(!e.candidate) 584 return; 585 c.gotLocalIce(e.candidate); 586 }; 587 588 pc.oniceconnectionstatechange = e => { 589 if(c.onstatus) 590 c.onstatus.call(c, pc.iceConnectionState); 591 if(pc.iceConnectionState === 'failed') 592 c.restartIce(); 593 }; 594 595 pc.ontrack = console.error; 596 return c; 597 }; 598 599 /** 600 * chat sends a chat message to the server. The server will normally echo 601 * the message back to the client. 602 * 603 * @param {string} kind 604 * - The kind of message, either '', 'me' or an application-specific type. 605 * @param {string} dest - The id to send the message to, empty for broadcast. 606 * @param {string} value - The text of the message. 607 */ 608 ServerConnection.prototype.chat = function(kind, dest, value) { 609 this.send({ 610 type: 'chat', 611 source: this.id, 612 dest: dest, 613 username: this.username, 614 kind: kind, 615 value: value, 616 }); 617 }; 618 619 /** 620 * userAction sends a request to act on a user. 621 * 622 * @param {string} kind - One of "op", "unop", "kick", "present", "unpresent". 623 * @param {string} dest - The id of the user to act upon. 624 * @param {any} [value] - An action-dependent parameter. 625 */ 626 ServerConnection.prototype.userAction = function(kind, dest, value) { 627 this.send({ 628 type: 'useraction', 629 source: this.id, 630 dest: dest, 631 username: this.username, 632 kind: kind, 633 value: value, 634 }); 635 }; 636 637 /** 638 * userMessage sends an application-specific message to a user. 639 * This is similar to a chat message, but is not saved in the chat history. 640 * 641 * @param {string} kind - The kind of application-specific message. 642 * @param {string} dest - The id to send the message to, empty for broadcast. 643 * @param {unknown} [value] - An optional parameter. 644 * @param {boolean} [noecho] - If set, don't echo back the message to the sender. 645 */ 646 ServerConnection.prototype.userMessage = function(kind, dest, value, noecho) { 647 this.send({ 648 type: 'usermessage', 649 source: this.id, 650 dest: dest, 651 username: this.username, 652 kind: kind, 653 value: value, 654 noecho: noecho, 655 }); 656 }; 657 658 /** 659 * groupAction sends a request to act on the current group. 660 * 661 * @param {string} kind 662 * - One of 'clearchat', 'lock', 'unlock', 'record' or 'unrecord'. 663 * @param {string} [message] - An optional user-readable message. 664 */ 665 ServerConnection.prototype.groupAction = function(kind, message) { 666 this.send({ 667 type: 'groupaction', 668 source: this.id, 669 kind: kind, 670 username: this.username, 671 value: message, 672 }); 673 }; 674 675 /** 676 * gotOffer is called when we receive an offer from the server. Don't call this. 677 * 678 * @param {string} id 679 * @param {string} label 680 * @param {string} source 681 * @param {string} username 682 * @param {string} sdp 683 * @param {string} replace 684 * @function 685 */ 686 ServerConnection.prototype.gotOffer = async function(id, label, source, username, sdp, replace) { 687 let sc = this; 688 689 if(sc.up[id]) { 690 console.error("Duplicate connection id"); 691 sc.send({ 692 type: 'abort', 693 id: id, 694 }); 695 return; 696 } 697 698 let oldLocalId = null; 699 700 if(replace) { 701 let old = sc.down[replace]; 702 if(old) { 703 oldLocalId = old.localId; 704 old.close(true); 705 } else 706 console.error("Replacing unknown stream"); 707 } 708 709 let c = sc.down[id]; 710 if(c && oldLocalId) 711 console.error("Replacing duplicate stream"); 712 713 if(!c) { 714 let pc; 715 try { 716 pc = new RTCPeerConnection(sc.getRTCConfiguration()); 717 } catch(e) { 718 console.error(e); 719 sc.send({ 720 type: 'abort', 721 id: id, 722 }); 723 return; 724 } 725 c = new Stream(this, id, oldLocalId || newLocalId(), pc, false); 726 c.label = label; 727 sc.down[id] = c; 728 729 c.pc.onicecandidate = function(e) { 730 if(!e.candidate) 731 return; 732 c.gotLocalIce(e.candidate); 733 }; 734 735 pc.oniceconnectionstatechange = e => { 736 if(c.onstatus) 737 c.onstatus.call(c, pc.iceConnectionState); 738 if(pc.iceConnectionState === 'failed') { 739 sc.send({ 740 type: 'renegotiate', 741 id: id, 742 }); 743 } 744 }; 745 746 c.pc.ontrack = function(e) { 747 if(e.streams.length < 1) { 748 console.error("Got track with no stream"); 749 return; 750 } 751 c.stream = e.streams[0]; 752 let changed = recomputeUserStreams(sc, source); 753 if(c.ondowntrack) { 754 c.ondowntrack.call( 755 c, e.track, e.transceiver, e.streams[0], 756 ); 757 } 758 if(changed && sc.onuser) 759 sc.onuser.call(sc, source, "change"); 760 }; 761 } 762 763 c.source = source; 764 c.username = username; 765 766 if(sc.ondownstream) 767 sc.ondownstream.call(sc, c); 768 769 try { 770 await c.pc.setRemoteDescription({ 771 type: 'offer', 772 sdp: sdp, 773 }); 774 775 await c.flushRemoteIceCandidates(); 776 777 let answer = await c.pc.createAnswer(); 778 if(!answer) 779 throw new Error("Didn't create answer"); 780 await c.pc.setLocalDescription(answer); 781 this.send({ 782 type: 'answer', 783 id: id, 784 sdp: c.pc.localDescription.sdp, 785 }); 786 } catch(e) { 787 try { 788 if(c.onerror) 789 c.onerror.call(c, e); 790 } finally { 791 c.abort(); 792 } 793 return; 794 } 795 796 c.localDescriptionSent = true; 797 c.flushLocalIceCandidates(); 798 if(c.onnegotiationcompleted) 799 c.onnegotiationcompleted.call(c); 800 }; 801 802 /** 803 * gotAnswer is called when we receive an answer from the server. Don't 804 * call this. 805 * 806 * @param {string} id 807 * @param {string} sdp 808 * @function 809 */ 810 ServerConnection.prototype.gotAnswer = async function(id, sdp) { 811 let c = this.up[id]; 812 if(!c) 813 throw new Error('unknown up stream'); 814 try { 815 await c.pc.setRemoteDescription({ 816 type: 'answer', 817 sdp: sdp, 818 }); 819 } catch(e) { 820 try { 821 if(c.onerror) 822 c.onerror.call(c, e); 823 } finally { 824 c.close(); 825 } 826 return; 827 } 828 await c.flushRemoteIceCandidates(); 829 if(c.onnegotiationcompleted) 830 c.onnegotiationcompleted.call(c); 831 }; 832 833 /** 834 * gotRenegotiate is called when we receive a renegotiation request from 835 * the server. Don't call this. 836 * 837 * @param {string} id 838 * @function 839 */ 840 ServerConnection.prototype.gotRenegotiate = function(id) { 841 let c = this.up[id]; 842 if(!c) 843 throw new Error('unknown up stream'); 844 c.restartIce(); 845 }; 846 847 /** 848 * gotClose is called when we receive a close request from the server. 849 * Don't call this. 850 * 851 * @param {string} id 852 */ 853 ServerConnection.prototype.gotClose = function(id) { 854 let c = this.down[id]; 855 if(!c) { 856 console.warn('unknown down stream', id); 857 return; 858 } 859 c.close(); 860 }; 861 862 /** 863 * gotAbort is called when we receive an abort message from the server. 864 * Don't call this. 865 * 866 * @param {string} id 867 */ 868 ServerConnection.prototype.gotAbort = function(id) { 869 let c = this.up[id]; 870 if(!c) 871 throw new Error('unknown up stream'); 872 c.close(); 873 }; 874 875 /** 876 * gotRemoteIce is called when we receive an ICE candidate from the server. 877 * Don't call this. 878 * 879 * @param {string} id 880 * @param {RTCIceCandidate} candidate 881 * @function 882 */ 883 ServerConnection.prototype.gotRemoteIce = async function(id, candidate) { 884 let c = this.up[id]; 885 if(!c) 886 c = this.down[id]; 887 if(!c) 888 throw new Error('unknown stream'); 889 if(c.pc.remoteDescription) 890 await c.pc.addIceCandidate(candidate).catch(console.warn); 891 else 892 c.remoteIceCandidates.push(candidate); 893 }; 894 895 /** 896 * Stream encapsulates a MediaStream, a set of tracks. 897 * 898 * A stream is said to go "up" if it is from the client to the server, and 899 * "down" otherwise. 900 * 901 * @param {ServerConnection} sc 902 * @param {string} id 903 * @param {string} localId 904 * @param {RTCPeerConnection} pc 905 * 906 * @constructor 907 */ 908 function Stream(sc, id, localId, pc, up) { 909 /** 910 * The associated ServerConnection. 911 * 912 * @type {ServerConnection} 913 * @const 914 */ 915 this.sc = sc; 916 /** 917 * The id of this stream. 918 * 919 * @type {string} 920 * @const 921 */ 922 this.id = id; 923 /** 924 * The local id of this stream. 925 * 926 * @type {string} 927 * @const 928 */ 929 this.localId = localId; 930 /** 931 * Indicates whether the stream is in the client->server direction. 932 * 933 * @type {boolean} 934 * @const 935 */ 936 this.up = up; 937 /** 938 * For down streams, the id of the client that created the stream. 939 * 940 * @type {string} 941 */ 942 this.source = null; 943 /** 944 * For down streams, the username of the client who created the stream. 945 * 946 * @type {string} 947 */ 948 this.username = null; 949 /** 950 * The associated RTCPeerConnection. This is null before the stream 951 * is connected, and may change over time. 952 * 953 * @type {RTCPeerConnection} 954 */ 955 this.pc = pc; 956 /** 957 * The associated MediaStream. This is null before the stream is 958 * connected, and may change over time. 959 * 960 * @type {MediaStream} 961 */ 962 this.stream = null; 963 /** 964 * The label assigned by the originator to this stream. 965 * 966 * @type {string} 967 */ 968 this.label = null; 969 /** 970 * The id of the stream that we are currently replacing. 971 * 972 * @type {string} 973 */ 974 this.replace = null; 975 /** 976 * Indicates whether we have already sent a local description. 977 * 978 * @type {boolean} 979 */ 980 this.localDescriptionSent = false; 981 /** 982 * Buffered local ICE candidates. This will be flushed by 983 * flushLocalIceCandidates after we send a local description. 984 * 985 * @type {RTCIceCandidate[]} 986 */ 987 this.localIceCandidates = []; 988 /** 989 * Buffered remote ICE candidates. This will be flushed by 990 * flushRemoteIceCandidates when we get a remote SDP description. 991 * 992 * @type {RTCIceCandidate[]} 993 */ 994 this.remoteIceCandidates = []; 995 /** 996 * The statistics last computed by the stats handler. This is 997 * a dictionary indexed by track id, with each value a dictionary of 998 * statistics. 999 * 1000 * @type {Object<string,unknown>} 1001 */ 1002 this.stats = {}; 1003 /** 1004 * The id of the periodic handler that computes statistics, as 1005 * returned by setInterval. 1006 * 1007 * @type {number} 1008 */ 1009 this.statsHandler = null; 1010 /** 1011 * userdata is a convenient place to attach data to a Stream. 1012 * It is not used by the library. 1013 * 1014 * @type{Object<unknown,unknown>} 1015 */ 1016 this.userdata = {}; 1017 1018 /* Callbacks */ 1019 1020 /** 1021 * onclose is called when the stream is closed. Replace will be true 1022 * if the stream is being replaced by another one with the same id. 1023 * 1024 * @type{(this: Stream, replace: boolean) => void} 1025 */ 1026 this.onclose = null; 1027 /** 1028 * onerror is called whenever a fatal error occurs. The stream will 1029 * then be closed, and onclose called normally. 1030 * 1031 * @type{(this: Stream, error: unknown) => void} 1032 */ 1033 this.onerror = null; 1034 /** 1035 * onnegotiationcompleted is called whenever negotiation or 1036 * renegotiation has completed. 1037 * 1038 * @type{(this: Stream) => void} 1039 */ 1040 this.onnegotiationcompleted = null; 1041 /** 1042 * ondowntrack is called whenever a new track is added to a stream. 1043 * If the stream parameter differs from its previous value, then it 1044 * indicates that the old stream has been discarded. 1045 * 1046 * @type{(this: Stream, track: MediaStreamTrack, transceiver: RTCRtpTransceiver, stream: MediaStream) => void} 1047 */ 1048 this.ondowntrack = null; 1049 /** 1050 * onstatus is called whenever the status of the stream changes. 1051 * 1052 * @type{(this: Stream, status: string) => void} 1053 */ 1054 this.onstatus = null; 1055 /** 1056 * onstats is called when we have new statistics about the connection 1057 * 1058 * @type{(this: Stream, stats: Object<unknown,unknown>) => void} 1059 */ 1060 this.onstats = null; 1061 } 1062 1063 /** 1064 * setStream sets the stream of an upwards connection. 1065 * 1066 * @param {MediaStream} stream 1067 */ 1068 Stream.prototype.setStream = function(stream) { 1069 let c = this; 1070 c.stream = stream; 1071 let changed = recomputeUserStreams(c.sc, c.sc.id); 1072 if(changed && c.sc.onuser) 1073 c.sc.onuser.call(c.sc, c.sc.id, "change"); 1074 } 1075 1076 /** 1077 * close closes a stream. 1078 * 1079 * For streams in the up direction, this may be called at any time. For 1080 * streams in the down direction, this will be called automatically when 1081 * the server signals that it is closing a stream. 1082 * 1083 * @param {boolean} [replace] 1084 * - true if the stream is being replaced by another one with the same id 1085 */ 1086 Stream.prototype.close = function(replace) { 1087 let c = this; 1088 1089 if(!c.sc) { 1090 console.warn('Closing closed stream'); 1091 return; 1092 } 1093 1094 if(c.statsHandler) { 1095 clearInterval(c.statsHandler); 1096 c.statsHandler = null; 1097 } 1098 1099 c.pc.close(); 1100 1101 if(c.up && !replace && c.localDescriptionSent) { 1102 try { 1103 c.sc.send({ 1104 type: 'close', 1105 id: c.id, 1106 }); 1107 } catch(e) { 1108 } 1109 } 1110 1111 let userid; 1112 if(c.up) { 1113 userid = c.sc.id; 1114 if(c.sc.up[c.id] === c) 1115 delete(c.sc.up[c.id]); 1116 else 1117 console.warn('Closing unknown stream'); 1118 } else { 1119 userid = c.source; 1120 if(c.sc.down[c.id] === c) 1121 delete(c.sc.down[c.id]); 1122 else 1123 console.warn('Closing unknown stream'); 1124 } 1125 let changed = recomputeUserStreams(c.sc, userid); 1126 if(changed && c.sc.onuser) 1127 c.sc.onuser.call(c.sc, userid, "change"); 1128 c.sc = null; 1129 1130 if(c.onclose) 1131 c.onclose.call(c, replace); 1132 }; 1133 1134 /** 1135 * recomputeUserStreams recomputes the user.streams array for a given user. 1136 * It returns true if anything changed. 1137 * 1138 * @param {ServerConnection} sc 1139 * @param {string} id 1140 * @returns {boolean} 1141 */ 1142 function recomputeUserStreams(sc, id) { 1143 let user = sc.users[id]; 1144 if(!user) { 1145 console.warn("recomputing streams for unknown user"); 1146 return false; 1147 } 1148 1149 let streams = id === sc.id ? sc.up : sc.down; 1150 let old = user.streams; 1151 user.streams = {}; 1152 for(id in streams) { 1153 let c = streams[id]; 1154 if(!c.stream) 1155 continue; 1156 if(!user.streams[c.label]) 1157 user.streams[c.label] = {}; 1158 c.stream.getTracks().forEach(t => { 1159 user.streams[c.label][t.kind] = true; 1160 }); 1161 } 1162 1163 return JSON.stringify(old) != JSON.stringify(user.streams); 1164 } 1165 1166 /** 1167 * abort requests that the server close a down stream. 1168 */ 1169 Stream.prototype.abort = function() { 1170 let c = this; 1171 if(c.up) 1172 throw new Error("Abort called on an up stream"); 1173 c.sc.send({ 1174 type: 'abort', 1175 id: c.id, 1176 }); 1177 }; 1178 1179 /** 1180 * gotLocalIce is Called when we get a local ICE candidate. Don't call this. 1181 * 1182 * @param {RTCIceCandidate} candidate 1183 * @function 1184 */ 1185 Stream.prototype.gotLocalIce = function(candidate) { 1186 let c = this; 1187 if(c.localDescriptionSent) 1188 c.sc.send({type: 'ice', 1189 id: c.id, 1190 candidate: candidate, 1191 }); 1192 else 1193 c.localIceCandidates.push(candidate); 1194 }; 1195 1196 /** 1197 * flushLocalIceCandidates flushes any buffered local ICE candidates. 1198 * It is called when we send an offer. 1199 * 1200 * @function 1201 */ 1202 Stream.prototype.flushLocalIceCandidates = function () { 1203 let c = this; 1204 let candidates = c.localIceCandidates; 1205 c.localIceCandidates = []; 1206 candidates.forEach(candidate => { 1207 try { 1208 c.sc.send({type: 'ice', 1209 id: c.id, 1210 candidate: candidate, 1211 }); 1212 } catch(e) { 1213 console.warn(e); 1214 } 1215 }); 1216 c.localIceCandidates = []; 1217 }; 1218 1219 /** 1220 * flushRemoteIceCandidates flushes any buffered remote ICE candidates. It is 1221 * called automatically when we get a remote description. 1222 * 1223 * @function 1224 */ 1225 Stream.prototype.flushRemoteIceCandidates = async function () { 1226 let c = this; 1227 let candidates = c.remoteIceCandidates; 1228 c.remoteIceCandidates = []; 1229 /** @type {Array.<Promise<void>>} */ 1230 let promises = []; 1231 candidates.forEach(candidate => { 1232 promises.push(c.pc.addIceCandidate(candidate).catch(console.warn)); 1233 }); 1234 return await Promise.all(promises); 1235 }; 1236 1237 /** 1238 * negotiate negotiates or renegotiates an up stream. It is called 1239 * automatically when required. If the client requires renegotiation, it 1240 * is probably better to call restartIce which will cause negotiate to be 1241 * called asynchronously. 1242 * 1243 * @function 1244 * @param {boolean} [restartIce] - Whether to restart ICE. 1245 */ 1246 Stream.prototype.negotiate = async function (restartIce) { 1247 let c = this; 1248 if(!c.up) 1249 throw new Error('not an up stream'); 1250 1251 let options = {}; 1252 if(restartIce) 1253 options = {iceRestart: true}; 1254 let offer = await c.pc.createOffer(options); 1255 if(!offer) 1256 throw(new Error("Didn't create offer")); 1257 await c.pc.setLocalDescription(offer); 1258 1259 c.sc.send({ 1260 type: 'offer', 1261 source: c.sc.id, 1262 username: c.sc.username, 1263 kind: this.localDescriptionSent ? 'renegotiate' : '', 1264 id: c.id, 1265 replace: this.replace, 1266 label: c.label, 1267 sdp: c.pc.localDescription.sdp, 1268 }); 1269 this.localDescriptionSent = true; 1270 this.replace = null; 1271 c.flushLocalIceCandidates(); 1272 }; 1273 1274 /** 1275 * restartIce causes an ICE restart on a stream. For up streams, it is 1276 * called automatically when ICE signals that the connection has failed, 1277 * but may also be called by the application. For down streams, it 1278 * requests that the server perform an ICE restart. In either case, 1279 * it returns immediately, negotiation will happen asynchronously. 1280 */ 1281 1282 Stream.prototype.restartIce = function () { 1283 let c = this; 1284 if(!c.up) { 1285 c.sc.send({ 1286 type: 'renegotiate', 1287 id: c.id, 1288 }); 1289 return; 1290 } 1291 1292 if('restartIce' in c.pc) { 1293 try { 1294 c.pc.restartIce(); 1295 return; 1296 } catch(e) { 1297 console.warn(e); 1298 } 1299 } 1300 1301 // negotiate is async, but this returns immediately. 1302 c.negotiate(true); 1303 }; 1304 1305 /** 1306 * request sets the list of tracks. If this is not called, or called with 1307 * a null argument, then the default is provided by ServerConnection.request. 1308 * 1309 * @param {Array<string>} what - a sequence of 'audio', 'video' or 'video-low'. 1310 */ 1311 Stream.prototype.request = function(what) { 1312 let c = this; 1313 c.sc.send({ 1314 type: 'requestStream', 1315 id: c.id, 1316 request: what, 1317 }); 1318 }; 1319 1320 /** 1321 * updateStats is called periodically, if requested by setStatsInterval, 1322 * in order to recompute stream statistics and invoke the onstats handler. 1323 * 1324 * @function 1325 */ 1326 Stream.prototype.updateStats = async function() { 1327 let c = this; 1328 let old = c.stats; 1329 /** @type{Object<string,unknown>} */ 1330 let stats = {}; 1331 1332 let transceivers = c.pc.getTransceivers(); 1333 for(let i = 0; i < transceivers.length; i++) { 1334 let t = transceivers[i]; 1335 let stid = t.sender.track && t.sender.track.id; 1336 let rtid = t.receiver.track && t.receiver.track.id; 1337 1338 let report = null; 1339 if(stid) { 1340 try { 1341 report = await t.sender.getStats(); 1342 } catch(e) { 1343 } 1344 } 1345 1346 if(report) { 1347 for(let r of report.values()) { 1348 if(stid && r.type === 'outbound-rtp') { 1349 let id = stid; 1350 // Firefox doesn't implement rid, use ssrc 1351 // to discriminate simulcast tracks. 1352 id = id + '-' + r.ssrc; 1353 if(!('bytesSent' in r)) 1354 continue; 1355 if(!stats[id]) 1356 stats[id] = {}; 1357 stats[id][r.type] = {}; 1358 stats[id][r.type].timestamp = r.timestamp; 1359 stats[id][r.type].bytesSent = r.bytesSent; 1360 if(old[id] && old[id][r.type]) 1361 stats[id][r.type].rate = 1362 ((r.bytesSent - old[id][r.type].bytesSent) * 1000 / 1363 (r.timestamp - old[id][r.type].timestamp)) * 8; 1364 } 1365 } 1366 } 1367 1368 report = null; 1369 if(rtid) { 1370 try { 1371 report = await t.receiver.getStats(); 1372 } catch(e) { 1373 console.error(e); 1374 } 1375 } 1376 1377 if(report) { 1378 for(let r of report.values()) { 1379 if(rtid && r.type === 'track') { 1380 if(!('totalAudioEnergy' in r)) 1381 continue; 1382 if(!stats[rtid]) 1383 stats[rtid] = {}; 1384 stats[rtid][r.type] = {}; 1385 stats[rtid][r.type].timestamp = r.timestamp; 1386 stats[rtid][r.type].totalAudioEnergy = r.totalAudioEnergy; 1387 if(old[rtid] && old[rtid][r.type]) 1388 stats[rtid][r.type].audioEnergy = 1389 (r.totalAudioEnergy - old[rtid][r.type].totalAudioEnergy) * 1000 / 1390 (r.timestamp - old[rtid][r.type].timestamp); 1391 } 1392 } 1393 } 1394 } 1395 1396 c.stats = stats; 1397 1398 if(c.onstats) 1399 c.onstats.call(c, c.stats); 1400 }; 1401 1402 /** 1403 * setStatsInterval sets the interval in milliseconds at which the onstats 1404 * handler will be called. This is only useful for up streams. 1405 * 1406 * @param {number} ms - The interval in milliseconds. 1407 */ 1408 Stream.prototype.setStatsInterval = function(ms) { 1409 let c = this; 1410 if(c.statsHandler) { 1411 clearInterval(c.statsHandler); 1412 c.statsHandler = null; 1413 } 1414 1415 if(ms <= 0) 1416 return; 1417 1418 c.statsHandler = setInterval(() => { 1419 c.updateStats(); 1420 }, ms); 1421 }; 1422