Skip to content

Commit

Permalink
wasm socket accept: Use data channel in ondatachannel callback
Browse files Browse the repository at this point in the history
  • Loading branch information
johanhelsing committed Apr 24, 2022
1 parent 55c59d8 commit aebcffc
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 31 deletions.
3 changes: 2 additions & 1 deletion matchbox_socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ web-sys = { version = "0.3.22", default-features = false, features = [
"RtcPeerConnection", "RtcPeerConnectionIceEvent",
"RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit",
"RtcIceCandidate", "RtcIceCandidateInit",
"RtcConfiguration", "RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType",
"RtcConfiguration",
"RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType", "RtcDataChannelEvent"
] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand Down
107 changes: 77 additions & 30 deletions matchbox_socket/src/webrtc_socket/wasm/message_loop.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::{future::FusedFuture, stream::FuturesUnordered, FutureExt, StreamExt};
use futures::{future::FusedFuture, pin_mut, stream::FuturesUnordered, FutureExt, StreamExt};
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures_timer::Delay;
use futures_util::select;
Expand All @@ -9,9 +9,9 @@ use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};
use wasm_bindgen::{prelude::*, JsCast, JsValue};
use wasm_bindgen_futures::JsFuture;
use web_sys::{
MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelInit, RtcDataChannelType,
RtcIceCandidate, RtcIceCandidateInit, RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType,
RtcSessionDescriptionInit,
MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelEvent, RtcDataChannelInit,
RtcDataChannelType, RtcIceCandidate, RtcIceCandidateInit, RtcPeerConnection,
RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit,
};

use crate::webrtc_socket::KEEP_ALIVE_INTERVAL;
Expand Down Expand Up @@ -316,7 +316,7 @@ async fn handshake_offer(
async fn handshake_accept(
signal_peer: SignalPeer,
mut signal_receiver: UnboundedReceiver<PeerSignal>,
messages_from_peers_tx: UnboundedSender<(PeerId, Packet)>,
from_peer_message_tx: UnboundedSender<(PeerId, Packet)>,
) -> Result<
(
PeerId,
Expand All @@ -328,13 +328,6 @@ async fn handshake_accept(
debug!("handshake_accept");

let (conn, trickle) = create_rtc_peer_connection(signal_peer.clone());
let (channel_ready_tx, mut channel_ready_rx) = futures_channel::mpsc::channel(1);
let data_channel = create_data_channel(
conn.clone(),
messages_from_peers_tx,
signal_peer.id.clone(),
channel_ready_tx,
);

let offer: Option<String>;
loop {
Expand Down Expand Up @@ -390,18 +383,21 @@ async fn handshake_accept(
CandidateTrickle::listen_for_remote_candidates(conn.clone(), signal_receiver).fuse(),
);

let mut channel_ready_fut = channel_ready_rx.next();
let data_channel_fut =
wait_for_data_channel(conn.clone(), signal_peer.id.clone(), from_peer_message_tx).fuse();
pin_mut!(data_channel_fut);

debug!("waiting for data channel to open");
loop {
let data_channel = loop {
select! {
_ = channel_ready_fut => break,
data_channel = data_channel_fut => break data_channel,
// TODO: this means that the signalling is down, should return an error
trickle = trickle_fut => {
error!("Trickle error {:?}", trickle);
continue;
}
}
}
};

Ok((signal_peer.id, data_channel, trickle_fut))
}
Expand Down Expand Up @@ -461,20 +457,6 @@ fn create_data_channel(
connection.create_data_channel_with_data_channel_dict("webudp", &data_channel_config);
channel.set_binary_type(RtcDataChannelType::Arraybuffer);

let channel_onmsg_func: Box<dyn FnMut(MessageEvent)> = Box::new(move |event: MessageEvent| {
debug!("incoming {:?}", event);
if let Ok(arraybuf) = event.data().dyn_into::<js_sys::ArrayBuffer>() {
let uarray = js_sys::Uint8Array::new(&arraybuf);
let body = uarray.to_vec();
incoming_tx
.unbounded_send((peer_id.clone(), body.into_boxed_slice()))
.unwrap();
}
});
let channel_onmsg_closure = Closure::wrap(channel_onmsg_func);
channel.set_onmessage(Some(channel_onmsg_closure.as_ref().unchecked_ref()));
channel_onmsg_closure.forget();

let channel_onopen_func: Box<dyn FnMut(JsValue)> = Box::new(move |_| {
debug!("Rtc data channel opened :D :D");
channel_ready
Expand All @@ -485,9 +467,74 @@ fn create_data_channel(
channel.set_onopen(Some(channel_onopen_closure.as_ref().unchecked_ref()));
channel_onopen_closure.forget();

setup_data_channel(&channel, peer_id, incoming_tx);

channel
}

async fn wait_for_data_channel(
connection: RtcPeerConnection,
peer_id: PeerId,
// new_peer_tx: UnboundedSender<PeerId>,
from_peer_message_tx: UnboundedSender<(PeerId, Packet)>,
) -> RtcDataChannel {
let (channel_tx, mut channel_rx) = futures_channel::mpsc::channel(1);

let ondatachannel_func: Box<dyn FnMut(RtcDataChannelEvent)> =
Box::new(move |event: RtcDataChannelEvent| {
let channel = event.channel();
debug!("new data channel {}", channel.label());

// Move this callback to setup_data_channel?
let channel2 = channel.clone();
let mut channel_tx = channel_tx.clone();
let channel_onopen_func: Box<dyn FnMut(JsValue)> = Box::new(move |_| {
debug!("Data channel ready");
channel_tx.try_send(channel2.clone()).unwrap();
// new_peer_tx
// .try_send(peer_id)
// .expect("failed to notify about new peer");
});
let channel_onopen_closure = Closure::wrap(channel_onopen_func);
channel.set_onopen(Some(channel_onopen_closure.as_ref().unchecked_ref()));
channel_onopen_closure.forget();

setup_data_channel(&channel, peer_id.clone(), from_peer_message_tx.clone());
});
let ondatachannel_closure = Closure::wrap(ondatachannel_func);
connection.set_ondatachannel(Some(ondatachannel_closure.as_ref().unchecked_ref()));
ondatachannel_closure.forget();

channel_rx.next().await.unwrap()
}

fn setup_data_channel(
data_channel: &RtcDataChannel,
peer_id: PeerId,
from_peer_message_tx: UnboundedSender<(PeerId, Packet)>,
) {
// TODO: set_onclose

// TODO: set_onerror

let data_channel_onmessage_func: Box<dyn FnMut(MessageEvent)> =
Box::new(move |event: MessageEvent| {
debug!("rx {:?}", event);
if let Ok(arraybuf) = event.data().dyn_into::<js_sys::ArrayBuffer>() {
let uarray = js_sys::Uint8Array::new(&arraybuf);
let body = uarray.to_vec();
from_peer_message_tx
.unbounded_send((peer_id.clone(), body.into_boxed_slice()))
.unwrap();
}
});
let data_channel_onmessage_closure = Closure::wrap(data_channel_onmessage_func);
data_channel.set_onmessage(Some(
data_channel_onmessage_closure.as_ref().unchecked_ref(),
));
data_channel_onmessage_closure.forget();
}

// Expect/unwrap is broken in select for some reason :/
fn check(
res: &Result<
Expand Down

0 comments on commit aebcffc

Please sign in to comment.