From 499bb649b9bc213fd75a564b86aa6c32778e583b Mon Sep 17 00:00:00 2001 From: Alex Rozgo Date: Thu, 15 Dec 2022 00:36:15 -0600 Subject: [PATCH 1/4] cross native and wasm, ice trickle and data channel creation --- matchbox_socket/Cargo.toml | 2 +- .../src/webrtc_socket/native/message_loop.rs | 61 ++----- .../src/webrtc_socket/wasm/message_loop.rs | 170 +++++++++++++----- 3 files changed, 138 insertions(+), 95 deletions(-) diff --git a/matchbox_socket/Cargo.toml b/matchbox_socket/Cargo.toml index fcb54b42..8de418bf 100644 --- a/matchbox_socket/Cargo.toml +++ b/matchbox_socket/Cargo.toml @@ -39,7 +39,7 @@ web-sys = { version = "0.3.22", default-features = false, features = [ "MessageEvent", "RtcPeerConnection", "RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit", - "RtcIceGatheringState", + "RtcIceGatheringState", "RtcIceCandidate", "RtcIceCandidateInit", "RtcIceTransportPolicy", "RtcConfiguration", "RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType", ] } serde-wasm-bindgen = { version = "0.4" } diff --git a/matchbox_socket/src/webrtc_socket/native/message_loop.rs b/matchbox_socket/src/webrtc_socket/native/message_loop.rs index 1f97ba94..8fde0c28 100644 --- a/matchbox_socket/src/webrtc_socket/native/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/native/message_loop.rs @@ -317,6 +317,16 @@ async fn handshake_accept( debug!("handshake_accept"); let (connection, trickle) = create_rtc_peer_connection(signal_peer.clone(), config).await?; + let (channel_ready_tx, mut channel_ready_rx) = futures_channel::mpsc::channel(1); + let data_channel = create_data_channel( + &connection, + channel_ready_tx, + signal_peer.id.clone(), + new_peer_tx, + from_peer_message_tx, + ) + .await; + let offer; loop { match signal_receiver.next().await.ok_or("error")? { @@ -347,23 +357,15 @@ async fn handshake_accept( .fuse(), ); - let data_channel_fut = wait_for_data_channel( - &connection, - signal_peer.id.clone(), - new_peer_tx, - from_peer_message_tx, - ) - .fuse(); - pin_mut!(data_channel_fut); - - let data_channel = loop { + let mut channel_ready_fut = channel_ready_rx.next(); + loop { select! { - data_channel = data_channel_fut => break data_channel, + _ = channel_ready_fut => break, // TODO: this means that the signalling is down, should return an // error _ = trickle_fut => continue, }; - }; + } Ok((signal_peer.id, data_channel, trickle_fut)) } @@ -424,6 +426,7 @@ async fn create_data_channel( let config = RTCDataChannelInit { ordered: Some(false), max_retransmits: Some(0), + negotiated: Some(124), ..Default::default() }; @@ -446,40 +449,6 @@ async fn create_data_channel( channel } -async fn wait_for_data_channel( - connection: &RTCPeerConnection, - peer_id: PeerId, - new_peer_tx: UnboundedSender, - from_peer_message_tx: UnboundedSender<(PeerId, Packet)>, -) -> Arc { - let (channel_tx, mut channel_rx) = futures_channel::mpsc::channel(1); - - connection.on_data_channel(Box::new(move |channel| { - debug!("new data channel"); - let peer_id = peer_id.clone(); - let mut new_peer_tx = new_peer_tx.clone(); - let from_peer_message_tx = from_peer_message_tx.clone(); - let mut channel_tx = channel_tx.clone(); - Box::pin(async move { - let peer_id2 = peer_id.clone(); - let channel2 = Arc::clone(&channel); - - // TODO: register close & error callbacks - channel.on_open(Box::new(move || { - debug!("Data channel ready"); - Box::pin(async move { - new_peer_tx.send(peer_id2).await.unwrap(); - channel_tx.try_send(channel2).unwrap(); - }) - })); - - setup_data_channel(&channel, peer_id, from_peer_message_tx).await; - }) - })); - - channel_rx.next().await.unwrap() -} - async fn setup_data_channel( data_channel: &RTCDataChannel, peer_id: PeerId, diff --git a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs index 8ab34482..54f89214 100644 --- a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs @@ -12,7 +12,8 @@ use wasm_bindgen::{prelude::*, JsCast, JsValue}; use wasm_bindgen_futures::JsFuture; use web_sys::{ MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelInit, RtcDataChannelType, - RtcIceGatheringState, RtcPeerConnection, RtcSdpType, RtcSessionDescriptionInit, + RtcIceCandidate, RtcIceCandidateInit, RtcIceTransportPolicy, RtcPeerConnection, RtcSdpType, + RtcSessionDescriptionInit, }; use crate::webrtc_socket::KEEP_ALIVE_INTERVAL; @@ -131,6 +132,7 @@ async fn handshake_offer( config: &WebRtcSocketConfig, ) -> Result<(PeerId, RtcDataChannel), Box> { debug!("making offer"); + let conn = create_rtc_peer_connection(config); let (channel_ready_tx, mut channel_ready_rx) = futures_channel::mpsc::channel(1); let data_channel = create_data_channel( @@ -140,30 +142,25 @@ async fn handshake_offer( channel_ready_tx, ); + // Create offer let offer = JsFuture::from(conn.create_offer()).await.efix()?; - let offer_sdp = Reflect::get(&offer, &JsValue::from_str("sdp")) .efix()? .as_string() .ok_or("")?; - let mut rtc_session_desc_init_dict: RtcSessionDescriptionInit = RtcSessionDescriptionInit::new(RtcSdpType::Offer); - let offer_description = rtc_session_desc_init_dict.sdp(&offer_sdp); - JsFuture::from(conn.set_local_description(offer_description)) .await .efix()?; - - wait_for_ice_complete(conn.clone()).await; - debug!("created offer for new peer"); - signal_peer.send(PeerSignal::Offer(conn.local_description().unwrap().sdp())); - let sdp: String; + let mut candidates = vec![]; + // Wait for answer + let sdp: String; loop { let signal = signal_receiver .next() @@ -175,29 +172,77 @@ async fn handshake_offer( sdp = answer; break; } - PeerSignal::Offer(_) => { - warn!("Got an unexpected Offer, while waiting for Answer. Ignoring.") + PeerSignal::IceCandidate(candidate) => { + debug!("got an IceCandidate signal! {}", candidate); + candidates.push(candidate); } - PeerSignal::IceCandidate(_) => { - warn!( - "Got an ice candidate message, but ice trickle is not yet supported. Ignoring." - ) + _ => { + warn!("ignoring other signal!!!"); } }; } + // Set remote description let mut remote_description: RtcSessionDescriptionInit = RtcSessionDescriptionInit::new(RtcSdpType::Answer); - remote_description.sdp(&sdp); - debug!("setting remote description"); JsFuture::from(conn.set_remote_description(&remote_description)) .await .efix()?; + // send ICE candidates to remote peer + let signal_peer_ice = signal_peer.clone(); + let onicecandidate: Box = Box::new(move |event| { + let event = Reflect::get(&event, &JsValue::from_str("candidate")).efix(); + if let Ok(event) = event { + if let Ok(candidate) = event.dyn_into::() { + debug!("sending IceCandidate signal {}", candidate.candidate()); + signal_peer_ice.send(PeerSignal::IceCandidate(candidate.candidate())); + } + } + }); + let onicecandidate = Closure::wrap(onicecandidate); + conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref())); + + // handle pending ICE candidates + for canditate in candidates { + let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&canditate); + ice_candidate.sdp_m_line_index(Some(0)); + JsFuture::from( + conn.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&ice_candidate)), + ) + .await + .efix()?; + } + + // select for channel ready or ice candidates debug!("waiting for data channel to open"); - channel_ready_rx.next().await; + loop { + select! { + _ = channel_ready_rx.next() => { + debug!("channel ready"); + // wait_for_ice_complete(conn.clone()).await; + break; + } + msg = signal_receiver.next() => { + if let Some(PeerSignal::IceCandidate(candidate)) = msg { + debug!("got an IceCandidate signal! {}", candidate); + let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&candidate); + ice_candidate.sdp_m_line_index(Some(0)); + JsFuture::from( + conn.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&ice_candidate)), + ) + .await + .efix()?; + } + } + }; + } + + conn.set_onicecandidate(None); + + debug!("Ice completed: {:?}", conn.ice_gathering_state()); Ok((signal_peer.id, data_channel)) } @@ -219,6 +264,8 @@ async fn handshake_accept( channel_ready_tx, ); + let mut candidates = vec![]; + let offer: Option; loop { match signal_receiver.next().await.ok_or("error")? { @@ -226,6 +273,10 @@ async fn handshake_accept( offer = Some(o); break; } + PeerSignal::IceCandidate(candidate) => { + debug!("got an IceCandidate signal! {}", candidate); + candidates.push(candidate); + } _ => { warn!("ignoring other signal!!!"); } @@ -266,13 +317,61 @@ async fn handshake_accept( .await .efix()?; - wait_for_ice_complete(conn.clone()).await; - let answer = PeerSignal::Answer(conn.local_description().unwrap().sdp()); signal_peer.send(answer); + // send ICE candidates to remote peer + let signal_peer_ice = signal_peer.clone(); + let onicecandidate: Box = Box::new(move |event| { + let event = Reflect::get(&event, &JsValue::from_str("candidate")).efix(); + if let Ok(event) = event { + if let Ok(candidate) = event.dyn_into::() { + debug!("sending IceCandidate signal {}", candidate.candidate()); + signal_peer_ice.send(PeerSignal::IceCandidate(candidate.candidate())); + } + } + }); + let onicecandidate = Closure::wrap(onicecandidate); + conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref())); + + // handle pending ICE candidates + for canditate in candidates { + let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&canditate); + ice_candidate.sdp_m_line_index(Some(0)); + JsFuture::from( + conn.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&ice_candidate)), + ) + .await + .efix()?; + } + + // select for channel ready or ice candidates debug!("waiting for data channel to open"); - channel_ready_rx.next().await; + loop { + select! { + _ = channel_ready_rx.next() => { + debug!("channel ready"); + // wait_for_ice_complete(conn.clone()).await; + break; + } + msg = signal_receiver.next() => { + if let Some(PeerSignal::IceCandidate(candidate)) = msg { + debug!("got an IceCandidate signal! {}", candidate); + let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&candidate); + ice_candidate.sdp_m_line_index(Some(0)); + JsFuture::from( + conn.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&ice_candidate)), + ) + .await + .efix()?; + } + } + }; + } + + conn.set_onicecandidate(None); + + debug!("Ice completed: {:?}", conn.ice_gathering_state()); Ok((signal_peer.id, data_channel)) } @@ -297,31 +396,6 @@ fn create_rtc_peer_connection(config: &WebRtcSocketConfig) -> RtcPeerConnection RtcPeerConnection::new_with_configuration(&peer_config).unwrap() } -async fn wait_for_ice_complete(conn: RtcPeerConnection) { - if conn.ice_gathering_state() == RtcIceGatheringState::Complete { - debug!("Ice already completed"); - return; - } - - let (mut tx, mut rx) = futures_channel::mpsc::channel(1); - - let conn_clone = conn.clone(); - let onstatechange: Box = Box::new(move |_| { - if conn_clone.ice_gathering_state() == RtcIceGatheringState::Complete { - tx.try_send(()).unwrap(); - } - }); - - let onstatechange = Closure::wrap(onstatechange); - - conn.set_onicegatheringstatechange(Some(onstatechange.as_ref().unchecked_ref())); - - rx.next().await; - - conn.set_onicegatheringstatechange(None); - debug!("Ice completed"); -} - fn create_data_channel( connection: RtcPeerConnection, incoming_tx: futures_channel::mpsc::UnboundedSender<(PeerId, Packet)>, @@ -332,7 +406,7 @@ fn create_data_channel( data_channel_config.ordered(false); data_channel_config.max_retransmits(0); data_channel_config.negotiated(true); - data_channel_config.id(0); + data_channel_config.id(124); let channel: RtcDataChannel = connection.create_data_channel_with_data_channel_dict("webudp", &data_channel_config); From efa0b3e3c91fc8b3ee08ebd08422f58e10c3ec36 Mon Sep 17 00:00:00 2001 From: Alex Rozgo Date: Thu, 15 Dec 2022 00:51:57 -0600 Subject: [PATCH 2/4] cleanup unused WebRTC features --- matchbox_socket/Cargo.toml | 2 +- matchbox_socket/src/webrtc_socket/wasm/message_loop.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/matchbox_socket/Cargo.toml b/matchbox_socket/Cargo.toml index 8de418bf..07ba4bef 100644 --- a/matchbox_socket/Cargo.toml +++ b/matchbox_socket/Cargo.toml @@ -39,7 +39,7 @@ web-sys = { version = "0.3.22", default-features = false, features = [ "MessageEvent", "RtcPeerConnection", "RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit", - "RtcIceGatheringState", "RtcIceCandidate", "RtcIceCandidateInit", "RtcIceTransportPolicy", + "RtcIceGatheringState", "RtcIceCandidate", "RtcIceCandidateInit", "RtcConfiguration", "RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType", ] } serde-wasm-bindgen = { version = "0.4" } diff --git a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs index 54f89214..5e668239 100644 --- a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs @@ -12,8 +12,7 @@ use wasm_bindgen::{prelude::*, JsCast, JsValue}; use wasm_bindgen_futures::JsFuture; use web_sys::{ MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelInit, RtcDataChannelType, - RtcIceCandidate, RtcIceCandidateInit, RtcIceTransportPolicy, RtcPeerConnection, RtcSdpType, - RtcSessionDescriptionInit, + RtcIceCandidate, RtcIceCandidateInit, RtcPeerConnection, RtcSdpType, RtcSessionDescriptionInit, }; use crate::webrtc_socket::KEEP_ALIVE_INTERVAL; From 3737f2046cf18042e90bb57c0a109e18fb0798a6 Mon Sep 17 00:00:00 2001 From: Alex Rozgo Date: Thu, 15 Dec 2022 12:38:55 -0600 Subject: [PATCH 3/4] tidy up comments, debug output and naming --- .../src/webrtc_socket/wasm/message_loop.rs | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs index 5e668239..c34ee508 100644 --- a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs @@ -156,7 +156,7 @@ async fn handshake_offer( debug!("created offer for new peer"); signal_peer.send(PeerSignal::Offer(conn.local_description().unwrap().sdp())); - let mut candidates = vec![]; + let mut received_candidates = vec![]; // Wait for answer let sdp: String; @@ -173,10 +173,10 @@ async fn handshake_offer( } PeerSignal::IceCandidate(candidate) => { debug!("got an IceCandidate signal! {}", candidate); - candidates.push(candidate); + received_candidates.push(candidate); } _ => { - warn!("ignoring other signal!!!"); + warn!("ignoring unexpected signal: {signal:?}"); } }; } @@ -205,7 +205,7 @@ async fn handshake_offer( conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref())); // handle pending ICE candidates - for canditate in candidates { + for canditate in received_candidates { let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&canditate); ice_candidate.sdp_m_line_index(Some(0)); JsFuture::from( @@ -221,7 +221,6 @@ async fn handshake_offer( select! { _ = channel_ready_rx.next() => { debug!("channel ready"); - // wait_for_ice_complete(conn.clone()).await; break; } msg = signal_receiver.next() => { @@ -239,6 +238,10 @@ async fn handshake_offer( }; } + // stop listening for ICE candidates + // TODO: we should support getting new ICE candidates even after connecting, + // since it's possible to return to the ice gathering state + // See: conn.set_onicecandidate(None); debug!("Ice completed: {:?}", conn.ice_gathering_state()); @@ -263,21 +266,26 @@ async fn handshake_accept( channel_ready_tx, ); - let mut candidates = vec![]; + let mut received_candidates = vec![]; let offer: Option; loop { - match signal_receiver.next().await.ok_or("error")? { + let signal = signal_receiver + .next() + .await + .ok_or("Signal server connection lost in the middle of a handshake")?; + + match signal { PeerSignal::Offer(o) => { offer = Some(o); break; } PeerSignal::IceCandidate(candidate) => { debug!("got an IceCandidate signal! {}", candidate); - candidates.push(candidate); + received_candidates.push(candidate); } _ => { - warn!("ignoring other signal!!!"); + warn!("ignoring unexpected signal: {signal:?}"); } } } @@ -334,7 +342,7 @@ async fn handshake_accept( conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref())); // handle pending ICE candidates - for canditate in candidates { + for canditate in received_candidates { let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&canditate); ice_candidate.sdp_m_line_index(Some(0)); JsFuture::from( @@ -350,7 +358,6 @@ async fn handshake_accept( select! { _ = channel_ready_rx.next() => { debug!("channel ready"); - // wait_for_ice_complete(conn.clone()).await; break; } msg = signal_receiver.next() => { @@ -368,6 +375,10 @@ async fn handshake_accept( }; } + // stop listening for ICE candidates + // TODO: we should support getting new ICE candidates even after connecting, + // since it's possible to return to the ice gathering state + // See: conn.set_onicecandidate(None); debug!("Ice completed: {:?}", conn.ice_gathering_state()); From 079cd51b1ce460ad28b74474835f122732b44c4d Mon Sep 17 00:00:00 2001 From: Alex Rozgo Date: Fri, 16 Dec 2022 11:57:33 -0600 Subject: [PATCH 4/4] constant for data channel id --- matchbox_socket/src/webrtc_socket/mod.rs | 1 + matchbox_socket/src/webrtc_socket/native/message_loop.rs | 4 ++-- matchbox_socket/src/webrtc_socket/wasm/message_loop.rs | 5 ++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/matchbox_socket/src/webrtc_socket/mod.rs b/matchbox_socket/src/webrtc_socket/mod.rs index 52fa2ec6..eb2c8976 100644 --- a/matchbox_socket/src/webrtc_socket/mod.rs +++ b/matchbox_socket/src/webrtc_socket/mod.rs @@ -8,6 +8,7 @@ mod messages; mod signal_peer; const KEEP_ALIVE_INTERVAL: u64 = 10_000; +const DATA_CHANNEL_ID: u16 = 124; // TODO: maybe use cfg-if to make this slightly tidier #[cfg(not(target_arch = "wasm32"))] diff --git a/matchbox_socket/src/webrtc_socket/native/message_loop.rs b/matchbox_socket/src/webrtc_socket/native/message_loop.rs index 8fde0c28..196dc427 100644 --- a/matchbox_socket/src/webrtc_socket/native/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/native/message_loop.rs @@ -26,7 +26,7 @@ use webrtc::{ use crate::webrtc_socket::{ messages::{PeerEvent, PeerId, PeerRequest, PeerSignal}, signal_peer::SignalPeer, - Packet, WebRtcSocketConfig, KEEP_ALIVE_INTERVAL, + Packet, WebRtcSocketConfig, DATA_CHANNEL_ID, KEEP_ALIVE_INTERVAL, }; pub async fn message_loop( @@ -426,7 +426,7 @@ async fn create_data_channel( let config = RTCDataChannelInit { ordered: Some(false), max_retransmits: Some(0), - negotiated: Some(124), + negotiated: Some(DATA_CHANNEL_ID), ..Default::default() }; diff --git a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs index c34ee508..f7f7ac00 100644 --- a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs @@ -15,11 +15,10 @@ use web_sys::{ RtcIceCandidate, RtcIceCandidateInit, RtcPeerConnection, RtcSdpType, RtcSessionDescriptionInit, }; -use crate::webrtc_socket::KEEP_ALIVE_INTERVAL; use crate::webrtc_socket::{ messages::{PeerEvent, PeerId, PeerRequest, PeerSignal}, signal_peer::SignalPeer, - Packet, WebRtcSocketConfig, + Packet, WebRtcSocketConfig, DATA_CHANNEL_ID, KEEP_ALIVE_INTERVAL, }; pub async fn message_loop( @@ -416,7 +415,7 @@ fn create_data_channel( data_channel_config.ordered(false); data_channel_config.max_retransmits(0); data_channel_config.negotiated(true); - data_channel_config.id(124); + data_channel_config.id(DATA_CHANNEL_ID); let channel: RtcDataChannel = connection.create_data_channel_with_data_channel_dict("webudp", &data_channel_config);