Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split streams from peers + add methods to close peer calls #108

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions core/client/lemverse.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ Template.lemverse.onCreated(function () {
Tracker.nonreactive(() => {
if (userProximitySensor.nearUsersCount() === 0) userStreams.destroyStream(streamTypes.main);
else if (!user.profile.shareAudio) userStreams.audio(false);
else if (user.profile.shareAudio) {
else {
userStreams.createStream().then(() => {
userStreams.audio(true);
userProximitySensor.callProximityStartedForAllNearUsers();
Expand All @@ -177,12 +177,13 @@ Template.lemverse.onCreated(function () {
if (!user) return;
Tracker.nonreactive(() => {
if (userProximitySensor.nearUsersCount() === 0) userStreams.destroyStream(streamTypes.main);
else if (!user.profile.shareVideo) userStreams.video(false);
else if (user.profile.shareVideo) {
else if (!user.profile.shareVideo) {
userStreams.video(false);
} else {
const forceNewStream = userStreams.shouldCreateNewStream(streamTypes.main, true, true);
userStreams.createStream(forceNewStream).then(() => {
userStreams.video(true);
userProximitySensor.callProximityStartedForAllNearUsers();
peer.call(Object.values(userProximitySensor.nearUsers));
});
}

Expand All @@ -201,9 +202,13 @@ Template.lemverse.onCreated(function () {
if (user.profile.shareScreen) meet.shareScreen();
else meet.unshareScreen();
} else if (user.profile.shareScreen) {
userStreams.createScreenStream().then(() => userStreams.screen(true));
userStreams.createScreenStream().then(() => {
userStreams.screen(true);
peer.call(Object.values(userProximitySensor.nearUsers));
});
} else {
userStreams.screen(false);
peer.closePeerCalls(false, streamTypes.screen);
}
});
});
Expand Down
159 changes: 80 additions & 79 deletions core/client/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,31 +80,7 @@ peer = {
_.each(this.calls, call => this.close(call.peer, Meteor.settings.public.peer.delayBeforeClosingCall, 'close-all'));
},

closeCall(userId, origin) {
debug(`closeCall: start (${origin})`, { userId });

let activeCallsCount = 0;
const _close = (remote, user, type) => {
const callsSource = remote ? this.remoteCalls : this.calls;
const call = callsSource[`${user}-${type}`];
if (call) {
activeCallsCount++;
call.close();
}

delete callsSource[`${user}-${type}`];
};

this.unlockCall(userId, true);
_close(false, userId, streamTypes.main);
_close(false, userId, streamTypes.screen);
_close(true, userId, streamTypes.main);
_close(true, userId, streamTypes.screen);
this.cancelWaitingCallAction(userId);

const debutText = activeCallsCount ? 'closeCall: call was active' : 'closeCall: call was inactive';
debug(debutText, { sourceAmount: activeCallsCount });

_removeUserFromRemoteStreamsList(userId) {
let streamsByUsers = this.remoteStreamsByUsers.get();
streamsByUsers.map(usr => {
if (usr._id === userId) {
Expand All @@ -119,6 +95,20 @@ peer = {
// We clean up remoteStreamsByUsers table by deleting all the users who have neither webcam or screen sharing active
streamsByUsers = streamsByUsers.filter(usr => usr.main.srcObject !== undefined || usr.screen.srcObject !== undefined || usr.waitingCallAnswer);
this.remoteStreamsByUsers.set(streamsByUsers);
},

closeCall(userId, origin) {
debug(`closeCall: start (${origin})`, { userId });

this.unlockCall(userId, true);
this.cancelWaitingCallAction(userId);

let closedPeerCallsCount = 0;
closedPeerCallsCount += this._closeUserPeerCalls(true, userId);
closedPeerCallsCount += this._closeUserPeerCalls(false, userId);
if (closedPeerCallsCount) audioManager.play('webrtc-out.mp3', 0.2);

this._removeUserFromRemoteStreamsList(userId);

if (!this.hasActiveStreams()) {
userStreams.destroyStream(streamTypes.main);
Expand All @@ -130,12 +120,7 @@ peer = {
delete this.callStartDates[userId];
}

$(`.js-video-${userId}-user`).remove();
debug('closeCall: call closed successfully', { userId });

if (!activeCallsCount) return;

audioManager.play('webrtc-out.mp3', 0.2);
},

close(userId, timeout = 0, origin = null) {
Expand All @@ -155,6 +140,35 @@ peer = {
};
},

closePeerCalls(remote, type) {
const callsSource = remote ? this.remoteCalls : this.calls;
const callEntries = Object.entries(callsSource);
const typeKey = `-${type}`;

callEntries.forEach(([key, call]) => {
if (key.indexOf(typeKey) === -1) return;

call.close();
delete callsSource[key];
});
},

_closeUserPeerCalls(remote, userId) {
const callsSource = remote ? this.remoteCalls : this.calls;
const callEntries = Object.entries(callsSource);
let closedCount = 0;

callEntries.forEach(([key, call]) => {
if (key.indexOf(userId) === -1) return;

call.close();
delete callsSource[key];
closedCount++;
});

return closedCount;
},

createPeerCall(peer, user, stream, streamType) {
debug(`createPeerCall: calling remote user`, { user: user._id, streamType });
if (!stream) { error(`createPeerCall: stream is undefined`, { user, stream }); return; }
Expand Down Expand Up @@ -213,63 +227,42 @@ peer = {
delete this.peerInstance;
},

/**
* To add a track it is necessary to renegotiate the connection with the remote user.
* @see https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/addTrack)
*/
updatePeersStream(stream, type) {
debug('updatePeersStream: start', { stream, type });

const callEntries = Object.entries(this.calls);

if (type === streamTypes.main) {
debug(`updatePeersStream: main stream ${stream.id}`, { stream });
const audioTrack = stream.getAudioTracks()[0];
const videoTrack = stream.getVideoTracks()[0];

// note: to add a track it is necessary to renegotiate the connection with the remote user (https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/addTrack)
callEntries.forEach(([key, call]) => {
if (key.indexOf('-screen') !== -1) return;
const senders = call.peerConnection.getSenders();

const existingSenderAudioTrack = senders.find(sender => sender.track.kind === 'audio');
if (existingSenderAudioTrack) {
if (audioTrack) existingSenderAudioTrack.replaceTrack(audioTrack);
else call.peerConnection.removeTrack(existingSenderAudioTrack);
} else if (audioTrack) call.peerConnection.addTrack(audioTrack);

const existingSenderVideoTrack = senders.find(sender => sender.track.kind === 'video');
if (existingSenderVideoTrack) {
if (videoTrack) existingSenderVideoTrack.replaceTrack(videoTrack);
else call.peerConnection.removeTrack(existingSenderVideoTrack);
} else if (videoTrack) call.peerConnection.addTrack(videoTrack);

if (!existingSenderAudioTrack || !existingSenderVideoTrack) debug(`updatePeersStream: stream main track added for user`, { key });
else debug(`updatePeersStream: stream main track updated for user`, { key });
});
} else if (type === streamTypes.screen) {
debug(`updatePeersStream: screen share stream ${stream.id}`, { stream });
const screenTrack = stream.getVideoTracks()[0];

callEntries.forEach(([key, call]) => {
if (key.indexOf('-screen') === -1) return;
const senders = call.peerConnection.getSenders();
let trackUpdated = false;

senders.forEach(sender => {
if (sender.track.id === screenTrack.id || sender.track.kind !== 'video') return;
sender.replaceTrack(screenTrack);
trackUpdated = true;
});

if (trackUpdated) debug(`updatePeersStream: stream main track updated for user ${key}`);
});
}
const typeKey = `-${streamTypes.screen}`;
const audioTrack = stream.getAudioTracks()[0];
const videoTrack = stream.getVideoTracks()[0];

Object.entries(this.calls).forEach(([key, call]) => {
if (key.indexOf(typeKey) === -1) return;
const senders = call.peerConnection.getSenders();

const existingSenderAudioTrack = senders.find(sender => sender.track.kind === 'audio');
if (existingSenderAudioTrack) {
if (audioTrack) existingSenderAudioTrack.replaceTrack(audioTrack);
else call.peerConnection.removeTrack(existingSenderAudioTrack);
} else if (audioTrack) call.peerConnection.addTrack(audioTrack);

const existingSenderVideoTrack = senders.find(sender => sender.track.kind === 'video');
if (existingSenderVideoTrack) {
if (videoTrack) existingSenderVideoTrack.replaceTrack(videoTrack);
else call.peerConnection.removeTrack(existingSenderVideoTrack);
} else if (videoTrack) call.peerConnection.addTrack(videoTrack);
});
},

onProximityStarted(nearUsers) {
call(users) {
if (!this.isEnabled()) return;

const user = Meteor.user();
if (user?.profile.guest) return; // disable proximity sensor for guest user

nearUsers.forEach(nearUser => {
users.forEach(nearUser => {
if (this.isCallInState(nearUser._id, callAction.open)) return;
this.cancelWaitingCallAction(nearUser._id);

Expand All @@ -289,13 +282,21 @@ peer = {
});
},

onProximityEnded(users) {
hangUp(users, origin = 'hang-up') {
users.forEach(user => {
if (this.lockedCalls[user._id]) return;
this.close(user._id, Meteor.settings.public.peer.delayBeforeClosingCall, 'proximity-ended');
this.close(user._id, Meteor.settings.public.peer.delayBeforeClosingCall, origin);
});
},

onProximityStarted(nearUsers) {
this.call(nearUsers);
},

onProximityEnded(users) {
this.hangUp(users, 'proximity-ended');
},

isCallInState(userId, state) {
return this.waitingCallActions[userId]?.action === state;
},
Expand Down
23 changes: 7 additions & 16 deletions core/client/user-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,20 @@ userStreams = {

screen(enabled) {
const { instance: screenStream } = this.streams.screen;
if (screenStream && !enabled) {
this.stopTracks(screenStream);
this.streams.screen.instance = undefined;
_.each(peer.calls, (call, key) => {
if (key.indexOf('-screen') === -1) return;
if (Meteor.user().options?.debug) log('me -> you screen ****** I stop sharing screen, call closing', key);
call.close();
delete peer.calls[key];
});
} else if (enabled) userProximitySensor.callProximityStartedForAllNearUsers();
if (!screenStream || enabled) return;

this.stopTracks(screenStream);
this.streams.screen.instance = undefined;
},

destroyStream(type) {
const debug = Meteor.user()?.options?.debug;
const { instance: stream } = type === streamTypes.main ? this.streams.main : this.streams.screen;
if (!stream) return;

const debug = Meteor.user({ fields: { 'options.debug': 1 } })?.options?.debug;
if (debug) log('destroyStream: start', { stream, type });
if (!stream) {
if (debug) log('destroyStream: cancelled (stream was not alive)');
return;
}

this.stopTracks(stream);

if (stream === this.streams.main.instance) this.streams.main.instance = undefined;
else if (stream === this.streams.screen.instance) this.streams.screen.instance = undefined;
},
Expand Down