From eee8a2d621408e59eb87b16ccc86624d2d460003 Mon Sep 17 00:00:00 2001 From: Johan Klokkhammer Helsing Date: Sat, 5 Feb 2022 10:31:47 +0100 Subject: [PATCH 1/4] Add ice trickle support on wasm Following the same pattern as the native implementation. Now that both wasm and native support trickling, we're one step closer to supporting cross-platform connections. Issue: #7 --- matchbox_socket/Cargo.toml | 4 +- .../src/webrtc_socket/native/message_loop.rs | 19 +- .../src/webrtc_socket/wasm/message_loop.rs | 227 ++++++++++++++---- 3 files changed, 195 insertions(+), 55 deletions(-) diff --git a/matchbox_socket/Cargo.toml b/matchbox_socket/Cargo.toml index a75fbda9..7b51d81d 100644 --- a/matchbox_socket/Cargo.toml +++ b/matchbox_socket/Cargo.toml @@ -36,9 +36,9 @@ wasm-bindgen = { version = "0.2", features = [ "serde-serialize" ], default-feat js-sys = { version = "0.3", default-features = false } web-sys = { version = "0.3.22", default-features = false, features = [ "MessageEvent", - "RtcPeerConnection", + "RtcPeerConnection", "RtcPeerConnectionIceEvent", "RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit", - "RtcIceGatheringState", + "RtcIceCandidate", "RtcIceCandidateInit", "RtcConfiguration", "RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType", ] } diff --git a/matchbox_socket/src/webrtc_socket/native/message_loop.rs b/matchbox_socket/src/webrtc_socket/native/message_loop.rs index 460ad827..a01a7771 100644 --- a/matchbox_socket/src/webrtc_socket/native/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/native/message_loop.rs @@ -156,7 +156,15 @@ impl CandidateTrickle { peer_connection: &RTCPeerConnection, candidate: RTCIceCandidate, ) { - let candidate = candidate.to_json().await.unwrap().candidate; + // Can't directly serialize/deserialize the candidate, as + // webrtc-rs' to_json uses snake_case and so isn't compatible + // with browsers + // let candidate = candidate.to_json().await.unwrap(); + // let candidate = serde_json::to_string(&candidate).unwrap(); + + let candidate = candidate.to_json().await.unwrap(); + assert_eq!(candidate.sdp_mline_index, 0); // we're assuming this on the other side + let candidate = candidate.candidate; // Local candidates can only be sent after the remote description if peer_connection.remote_description().await.is_some() { @@ -185,9 +193,18 @@ impl CandidateTrickle { match signal { PeerSignal::IceCandidate(candidate) => { debug!("got an IceCandidate signal! {}", candidate); + // Can't directly serialize/deserialize the candidate, as + // webrtc-rs' to_json uses snake_case and so isn't compatible + // with browsers + // let candidate = serde_json::from_str(&candidate).unwrap(); // todo: error handling + // peer_connection + // .add_ice_candidate(candidate) + // .await?; + // TODO: this looks like it's fixed in webrtc-rs 0.5 peer_connection .add_ice_candidate(RTCIceCandidateInit { candidate, + sdp_mline_index: 0, ..Default::default() }) .await?; diff --git a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs index bf288a2b..b85efda0 100644 --- a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs @@ -1,18 +1,17 @@ -use futures::FutureExt; -use futures::{stream::FuturesUnordered, StreamExt}; +use futures::{future::FusedFuture, stream::FuturesUnordered, FutureExt, StreamExt}; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures_timer::Delay; use futures_util::select; use js_sys::Reflect; use log::{debug, error, warn}; use serde::Serialize; -use std::collections::HashMap; -use std::time::Duration; +use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; use wasm_bindgen::{prelude::*, JsCast, JsValue}; use wasm_bindgen_futures::JsFuture; use web_sys::{ MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelInit, RtcDataChannelType, - RtcIceGatheringState, RtcPeerConnection, RtcSdpType, RtcSessionDescriptionInit, + RtcIceCandidate, RtcIceCandidateInit, RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType, + RtcSessionDescriptionInit, }; use crate::webrtc_socket::KEEP_ALIVE_INTERVAL; @@ -123,13 +122,104 @@ pub async fn message_loop( debug!("Message loop finished"); } +struct CandidateTrickle { + signal_peer: SignalPeer, + // TODO: is this mutex really needed? just use yet another channel? + pending: futures::lock::Mutex>, +} + +impl CandidateTrickle { + fn new(signal_peer: SignalPeer) -> Self { + Self { + signal_peer, + pending: Default::default(), + } + } + + fn on_local_candidate(&self, peer_connection: RtcPeerConnection, candidate: RtcIceCandidate) { + // Can't directly serialize/deserialize the candidate, as + // webrtc-rs' to_json uses snake_case and so isn't compatible + // with browsers + // let candidate = js_sys::JSON::stringify(&candidate.to_json()) + // .unwrap() + // .as_string() + // .unwrap(); + let candidate = candidate.candidate(); + + // Local candidates can only be sent after the remote description + if peer_connection.remote_description().is_some() { + // Can send local candidate already + debug!("sending IceCandidate signal {}", candidate); + self.signal_peer.send(PeerSignal::IceCandidate(candidate)); + } else { + // Can't send yet, store in pending + debug!("storing pending IceCandidate signal {}", candidate); + // TODO: fix neatly + self.pending.try_lock().unwrap().push(candidate); + } + } + + async fn send_pending_candidates(&self) { + let mut pending = self.pending.lock().await; + for candidate in std::mem::take(&mut *pending) { + self.signal_peer.send(PeerSignal::IceCandidate(candidate)); + } + } + + async fn listen_for_remote_candidates( + peer_connection: RtcPeerConnection, + mut signal_receiver: UnboundedReceiver, + ) -> Result<(), Box> { + while let Some(signal) = signal_receiver.next().await { + match signal { + PeerSignal::IceCandidate(candidate) => { + debug!("got an IceCandidate signal! {}", candidate); + // Can't directly serialize/deserialize the candidate, as + // webrtc-rs' to_json uses snake_case and so isn't compatible + // with browsers + // let candidate = js_sys::JSON::parse(&candidate).unwrap(); + // let candidate = JsCast::unchecked_into::(candidate); + + let mut candidate_init = RtcIceCandidateInit::new(&candidate); + candidate_init.sdp_m_line_index(Some(0)); + + JsFuture::from( + peer_connection.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some( + &candidate_init, + )), + ) + .await + .efix()?; + } + PeerSignal::Offer(_) => { + warn!("Got an unexpected Offer, while waiting for IceCandidate. Ignoring.") + } + PeerSignal::Answer(_) => { + warn!("Got an unexpected Answer, while waiting for IceCandidate. Ignoring.") + } + } + } + + debug!("stopping ice candidate listening"); + + Ok(()) + } +} + async fn handshake_offer( signal_peer: SignalPeer, mut signal_receiver: UnboundedReceiver, messages_from_peers_tx: UnboundedSender<(PeerId, Packet)>, -) -> Result<(PeerId, RtcDataChannel), Box> { +) -> Result< + ( + PeerId, + RtcDataChannel, + Pin>>>>, + ), + Box, +> { debug!("making offer"); - let conn = create_rtc_peer_connection(); + let (conn, trickle) = create_rtc_peer_connection(signal_peer.clone()); let (channel_ready_tx, mut channel_ready_rx) = futures_channel::mpsc::channel(1); let data_channel = create_data_channel( conn.clone(), @@ -154,11 +244,9 @@ async fn handshake_offer( .await .efix()?; - wait_for_ice_complete(conn.clone()).await; - debug!("created offer for new peer"); - signal_peer.send(PeerSignal::Offer(conn.local_description().unwrap().sdp())); + signal_peer.send(PeerSignal::Offer(offer_sdp)); let sdp: String; @@ -174,12 +262,10 @@ async fn handshake_offer( break; } PeerSignal::Offer(_) => { - warn!("Got an unexpected Offer, while waiting for Answer. Ignoring.") + warn!("Got an unexpected Offer, while waiting for Answer. Ignoring."); } PeerSignal::IceCandidate(_) => { - warn!( - "Got an ice candidate message, but ice trickle is not yet supported. Ignoring." - ) + warn!("Got an ice candidate message while waiting for Answer. Ignoring."); } }; } @@ -194,20 +280,41 @@ async fn handshake_offer( .await .efix()?; - debug!("waiting for data channel to open"); - channel_ready_rx.next().await; + trickle.send_pending_candidates().await; + + let mut trickle_fut = + Box::pin(CandidateTrickle::listen_for_remote_candidates(conn, signal_receiver).fuse()); + + let mut channel_ready_fut = channel_ready_rx.next(); + loop { + select! { + _ = channel_ready_fut => break, + trickle = trickle_fut => { + // TODO: this means that the signalling is down, should return an error + error!("{:?}", trickle); + continue; + } + }; + } - Ok((signal_peer.id, data_channel)) + Ok((signal_peer.id, data_channel, trickle_fut)) } async fn handshake_accept( signal_peer: SignalPeer, mut signal_receiver: UnboundedReceiver, messages_from_peers_tx: UnboundedSender<(PeerId, Packet)>, -) -> Result<(PeerId, RtcDataChannel), Box> { +) -> Result< + ( + PeerId, + RtcDataChannel, + Pin>>>>, + ), + Box, +> { debug!("handshake_accept"); - let conn = create_rtc_peer_connection(); + let (conn, trickle) = create_rtc_peer_connection(signal_peer.clone()); let (channel_ready_tx, mut channel_ready_rx) = futures_channel::mpsc::channel(1); let data_channel = create_data_channel( conn.clone(), @@ -257,24 +364,38 @@ async fn handshake_accept( .as_string() .ok_or("")?; + signal_peer.send(PeerSignal::Answer(answer_sdp.clone())); + let answer_description = session_desc_init.sdp(&answer_sdp); JsFuture::from(conn.set_local_description(answer_description)) .await .efix()?; - wait_for_ice_complete(conn.clone()).await; - - let answer = PeerSignal::Answer(conn.local_description().unwrap().sdp()); - signal_peer.send(answer); + trickle.send_pending_candidates().await; + let mut trickle_fut = Box::pin( + CandidateTrickle::listen_for_remote_candidates(conn.clone(), signal_receiver).fuse(), + ); + let mut channel_ready_fut = channel_ready_rx.next(); debug!("waiting for data channel to open"); - channel_ready_rx.next().await; + loop { + select! { + _ = channel_ready_fut => break, + // TODO: this means that the signalling is down, should return an error + trickle = trickle_fut => { + error!("Trickle error {:?}", trickle); + continue; + } + } + } - Ok((signal_peer.id, data_channel)) + Ok((signal_peer.id, data_channel, trickle_fut)) } -fn create_rtc_peer_connection() -> RtcPeerConnection { +fn create_rtc_peer_connection( + signal_peer: SignalPeer, +) -> (RtcPeerConnection, Arc) { #[derive(Serialize)] pub struct IceServerConfig { pub urls: [String; 1], @@ -290,32 +411,25 @@ fn create_rtc_peer_connection() -> RtcPeerConnection { }; let ice_server_config_list = [ice_server_config]; peer_config.ice_servers(&JsValue::from_serde(&ice_server_config_list).unwrap()); - RtcPeerConnection::new_with_configuration(&peer_config).unwrap() -} - -async fn wait_for_ice_complete(conn: RtcPeerConnection) { - if conn.ice_gathering_state() == RtcIceGatheringState::Complete { - debug!("Ice already completed"); - return; - } - - let (mut tx, mut rx) = futures_channel::mpsc::channel(1); - - let conn_clone = conn.clone(); - let onstatechange: Box = Box::new(move |_| { - if conn_clone.ice_gathering_state() == RtcIceGatheringState::Complete { - tx.try_send(()).unwrap(); - } - }); - - let onstatechange = Closure::wrap(onstatechange); - - conn.set_onicegatheringstatechange(Some(onstatechange.as_ref().unchecked_ref())); - - rx.next().await; + let connection = RtcPeerConnection::new_with_configuration(&peer_config).unwrap(); + let trickle = Arc::new(CandidateTrickle::new(signal_peer)); + + let connection2 = connection.clone(); + let trickle2 = trickle.clone(); + let onicecandidate: Box = + Box::new(move |event: RtcPeerConnectionIceEvent| { + // todo: can this really be None? + if let Some(candidate) = event.candidate() { + trickle2.on_local_candidate(connection2.clone(), candidate); + // todo: maybe just call the api directly? + // or do it in a promise callback? + } + }); + let onicecandidate = Closure::wrap(onicecandidate); + connection.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref())); + onicecandidate.forget(); - conn.set_onicegatheringstatechange(None); - debug!("Ice completed"); + (connection, trickle) } fn create_data_channel( @@ -362,7 +476,16 @@ fn create_data_channel( } // Expect/unwrap is broken in select for some reason :/ -fn check(res: &Result<(PeerId, RtcDataChannel), Box>) { +fn check( + res: &Result< + ( + PeerId, + RtcDataChannel, + Pin>>>>, + ), + Box, + >, +) { // but doing it inside a typed function works fine res.as_ref().expect("handshake failed"); } From 4e390ed8dc8dc15c30aa30eca55329cc749d854d Mon Sep 17 00:00:00 2001 From: Johan Klokkhammer Helsing Date: Sat, 5 Feb 2022 11:41:00 +0100 Subject: [PATCH 2/4] Set negotiated to false on wasm Also set it explicitly on native --- matchbox_socket/src/webrtc_socket/native/message_loop.rs | 1 + matchbox_socket/src/webrtc_socket/wasm/message_loop.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/matchbox_socket/src/webrtc_socket/native/message_loop.rs b/matchbox_socket/src/webrtc_socket/native/message_loop.rs index a01a7771..5bf0948f 100644 --- a/matchbox_socket/src/webrtc_socket/native/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/native/message_loop.rs @@ -427,6 +427,7 @@ async fn create_data_channel( let config = RTCDataChannelInit { ordered: Some(false), max_retransmits: Some(0), + negotiated: Some(false), id: Some(0), ..Default::default() }; diff --git a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs index b85efda0..b125736b 100644 --- a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs @@ -441,7 +441,7 @@ fn create_data_channel( let mut data_channel_config: RtcDataChannelInit = RtcDataChannelInit::new(); data_channel_config.ordered(false); data_channel_config.max_retransmits(0); - data_channel_config.negotiated(true); + data_channel_config.negotiated(false); data_channel_config.id(0); let channel: RtcDataChannel = From 55c59d8e3b559cb7022aa7291770235adeb25107 Mon Sep 17 00:00:00 2001 From: sapir Date: Tue, 8 Feb 2022 03:29:10 +0200 Subject: [PATCH 3/4] Fix dropped trickle loop future on wasm --- .../src/webrtc_socket/wasm/message_loop.rs | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs index b125736b..8202aa78 100644 --- a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs @@ -37,6 +37,7 @@ pub async fn message_loop( let mut offer_handshakes = FuturesUnordered::new(); let mut accept_handshakes = FuturesUnordered::new(); + let mut trickle_futs = FuturesUnordered::new(); let mut handshake_signals = HashMap::new(); let mut data_channels: HashMap = HashMap::new(); @@ -51,20 +52,32 @@ pub async fn message_loop( res = offer_handshakes.select_next_some() => { check(&res); - let peer = res.unwrap(); - data_channels.insert(peer.0.clone(), peer.1.clone()); + + let (peer_id, data_channel, trickle_fut) = res.unwrap(); + data_channels.insert(peer_id.clone(), data_channel); + trickle_futs.push(trickle_fut); + debug!("Notifying about new peer"); - new_connected_peers_tx.unbounded_send(peer.0).expect("send failed"); + new_connected_peers_tx.unbounded_send(peer_id).expect("send failed"); }, + res = accept_handshakes.select_next_some() => { // TODO: this could be de-duplicated check(&res); - let peer = res.unwrap(); - data_channels.insert(peer.0.clone(), peer.1.clone()); + + let (peer_id, data_channel, trickle_fut) = res.unwrap(); + data_channels.insert(peer_id.clone(), data_channel); + trickle_futs.push(trickle_fut); + debug!("Notifying about new peer"); - new_connected_peers_tx.unbounded_send(peer.0).expect("send failed"); + new_connected_peers_tx.unbounded_send(peer_id).expect("send failed"); }, + res = trickle_futs.select_next_some() => { + error!("ice candidate trickle loop stopped: {:?}", res); + break; + } + message = events_receiver.next() => { if let Some(event) = message { debug!("{:?}", event); From aebcffc6386cac274bd08885296c3c6a18c69dda Mon Sep 17 00:00:00 2001 From: Johan Klokkhammer Helsing Date: Fri, 11 Feb 2022 08:57:35 +0100 Subject: [PATCH 4/4] wasm socket accept: Use data channel in ondatachannel callback --- matchbox_socket/Cargo.toml | 3 +- .../src/webrtc_socket/wasm/message_loop.rs | 107 +++++++++++++----- 2 files changed, 79 insertions(+), 31 deletions(-) diff --git a/matchbox_socket/Cargo.toml b/matchbox_socket/Cargo.toml index 7b51d81d..bb4cb192 100644 --- a/matchbox_socket/Cargo.toml +++ b/matchbox_socket/Cargo.toml @@ -39,7 +39,8 @@ web-sys = { version = "0.3.22", default-features = false, features = [ "RtcPeerConnection", "RtcPeerConnectionIceEvent", "RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit", "RtcIceCandidate", "RtcIceCandidateInit", - "RtcConfiguration", "RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType", + "RtcConfiguration", + "RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType", "RtcDataChannelEvent" ] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs index 8202aa78..adbe568c 100644 --- a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs @@ -1,4 +1,4 @@ -use futures::{future::FusedFuture, stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::{future::FusedFuture, pin_mut, stream::FuturesUnordered, FutureExt, StreamExt}; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures_timer::Delay; use futures_util::select; @@ -9,9 +9,9 @@ use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; use wasm_bindgen::{prelude::*, JsCast, JsValue}; use wasm_bindgen_futures::JsFuture; use web_sys::{ - MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelInit, RtcDataChannelType, - RtcIceCandidate, RtcIceCandidateInit, RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType, - RtcSessionDescriptionInit, + MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelEvent, RtcDataChannelInit, + RtcDataChannelType, RtcIceCandidate, RtcIceCandidateInit, RtcPeerConnection, + RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit, }; use crate::webrtc_socket::KEEP_ALIVE_INTERVAL; @@ -316,7 +316,7 @@ async fn handshake_offer( async fn handshake_accept( signal_peer: SignalPeer, mut signal_receiver: UnboundedReceiver, - messages_from_peers_tx: UnboundedSender<(PeerId, Packet)>, + from_peer_message_tx: UnboundedSender<(PeerId, Packet)>, ) -> Result< ( PeerId, @@ -328,13 +328,6 @@ async fn handshake_accept( debug!("handshake_accept"); let (conn, trickle) = create_rtc_peer_connection(signal_peer.clone()); - let (channel_ready_tx, mut channel_ready_rx) = futures_channel::mpsc::channel(1); - let data_channel = create_data_channel( - conn.clone(), - messages_from_peers_tx, - signal_peer.id.clone(), - channel_ready_tx, - ); let offer: Option; loop { @@ -390,18 +383,21 @@ async fn handshake_accept( CandidateTrickle::listen_for_remote_candidates(conn.clone(), signal_receiver).fuse(), ); - let mut channel_ready_fut = channel_ready_rx.next(); + let data_channel_fut = + wait_for_data_channel(conn.clone(), signal_peer.id.clone(), from_peer_message_tx).fuse(); + pin_mut!(data_channel_fut); + debug!("waiting for data channel to open"); - loop { + let data_channel = loop { select! { - _ = channel_ready_fut => break, + data_channel = data_channel_fut => break data_channel, // TODO: this means that the signalling is down, should return an error trickle = trickle_fut => { error!("Trickle error {:?}", trickle); continue; } } - } + }; Ok((signal_peer.id, data_channel, trickle_fut)) } @@ -461,20 +457,6 @@ fn create_data_channel( connection.create_data_channel_with_data_channel_dict("webudp", &data_channel_config); channel.set_binary_type(RtcDataChannelType::Arraybuffer); - let channel_onmsg_func: Box = Box::new(move |event: MessageEvent| { - debug!("incoming {:?}", event); - if let Ok(arraybuf) = event.data().dyn_into::() { - let uarray = js_sys::Uint8Array::new(&arraybuf); - let body = uarray.to_vec(); - incoming_tx - .unbounded_send((peer_id.clone(), body.into_boxed_slice())) - .unwrap(); - } - }); - let channel_onmsg_closure = Closure::wrap(channel_onmsg_func); - channel.set_onmessage(Some(channel_onmsg_closure.as_ref().unchecked_ref())); - channel_onmsg_closure.forget(); - let channel_onopen_func: Box = Box::new(move |_| { debug!("Rtc data channel opened :D :D"); channel_ready @@ -485,9 +467,74 @@ fn create_data_channel( channel.set_onopen(Some(channel_onopen_closure.as_ref().unchecked_ref())); channel_onopen_closure.forget(); + setup_data_channel(&channel, peer_id, incoming_tx); + channel } +async fn wait_for_data_channel( + connection: RtcPeerConnection, + peer_id: PeerId, + // new_peer_tx: UnboundedSender, + from_peer_message_tx: UnboundedSender<(PeerId, Packet)>, +) -> RtcDataChannel { + let (channel_tx, mut channel_rx) = futures_channel::mpsc::channel(1); + + let ondatachannel_func: Box = + Box::new(move |event: RtcDataChannelEvent| { + let channel = event.channel(); + debug!("new data channel {}", channel.label()); + + // Move this callback to setup_data_channel? + let channel2 = channel.clone(); + let mut channel_tx = channel_tx.clone(); + let channel_onopen_func: Box = Box::new(move |_| { + debug!("Data channel ready"); + channel_tx.try_send(channel2.clone()).unwrap(); + // new_peer_tx + // .try_send(peer_id) + // .expect("failed to notify about new peer"); + }); + let channel_onopen_closure = Closure::wrap(channel_onopen_func); + channel.set_onopen(Some(channel_onopen_closure.as_ref().unchecked_ref())); + channel_onopen_closure.forget(); + + setup_data_channel(&channel, peer_id.clone(), from_peer_message_tx.clone()); + }); + let ondatachannel_closure = Closure::wrap(ondatachannel_func); + connection.set_ondatachannel(Some(ondatachannel_closure.as_ref().unchecked_ref())); + ondatachannel_closure.forget(); + + channel_rx.next().await.unwrap() +} + +fn setup_data_channel( + data_channel: &RtcDataChannel, + peer_id: PeerId, + from_peer_message_tx: UnboundedSender<(PeerId, Packet)>, +) { + // TODO: set_onclose + + // TODO: set_onerror + + let data_channel_onmessage_func: Box = + Box::new(move |event: MessageEvent| { + debug!("rx {:?}", event); + if let Ok(arraybuf) = event.data().dyn_into::() { + let uarray = js_sys::Uint8Array::new(&arraybuf); + let body = uarray.to_vec(); + from_peer_message_tx + .unbounded_send((peer_id.clone(), body.into_boxed_slice())) + .unwrap(); + } + }); + let data_channel_onmessage_closure = Closure::wrap(data_channel_onmessage_func); + data_channel.set_onmessage(Some( + data_channel_onmessage_closure.as_ref().unchecked_ref(), + )); + data_channel_onmessage_closure.forget(); +} + // Expect/unwrap is broken in select for some reason :/ fn check( res: &Result<