Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cross native and wasm, ice trickle and data channel creation #54

Merged
merged 4 commits into from
Dec 17, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion matchbox_socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ web-sys = { version = "0.3.22", default-features = false, features = [
"MessageEvent",
"RtcPeerConnection",
"RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit",
"RtcIceGatheringState",
"RtcIceGatheringState", "RtcIceCandidate", "RtcIceCandidateInit",
"RtcConfiguration", "RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType",
] }
serde-wasm-bindgen = { version = "0.4" }
Expand Down
61 changes: 15 additions & 46 deletions matchbox_socket/src/webrtc_socket/native/message_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,16 @@ async fn handshake_accept(
debug!("handshake_accept");
let (connection, trickle) = create_rtc_peer_connection(signal_peer.clone(), config).await?;

let (channel_ready_tx, mut channel_ready_rx) = futures_channel::mpsc::channel(1);
let data_channel = create_data_channel(
&connection,
channel_ready_tx,
signal_peer.id.clone(),
new_peer_tx,
from_peer_message_tx,
)
.await;
johanhelsing marked this conversation as resolved.
Show resolved Hide resolved

let offer;
loop {
match signal_receiver.next().await.ok_or("error")? {
Expand Down Expand Up @@ -347,23 +357,15 @@ async fn handshake_accept(
.fuse(),
);

let data_channel_fut = wait_for_data_channel(
&connection,
signal_peer.id.clone(),
new_peer_tx,
from_peer_message_tx,
)
.fuse();
pin_mut!(data_channel_fut);

let data_channel = loop {
let mut channel_ready_fut = channel_ready_rx.next();
loop {
select! {
data_channel = data_channel_fut => break data_channel,
_ = channel_ready_fut => break,
// TODO: this means that the signalling is down, should return an
// error
_ = trickle_fut => continue,
};
};
}

Ok((signal_peer.id, data_channel, trickle_fut))
}
Expand Down Expand Up @@ -424,6 +426,7 @@ async fn create_data_channel(
let config = RTCDataChannelInit {
ordered: Some(false),
max_retransmits: Some(0),
negotiated: Some(124),
johanhelsing marked this conversation as resolved.
Show resolved Hide resolved
..Default::default()
};

Expand All @@ -446,40 +449,6 @@ async fn create_data_channel(
channel
}

async fn wait_for_data_channel(
connection: &RTCPeerConnection,
peer_id: PeerId,
new_peer_tx: UnboundedSender<PeerId>,
from_peer_message_tx: UnboundedSender<(PeerId, Packet)>,
) -> Arc<RTCDataChannel> {
let (channel_tx, mut channel_rx) = futures_channel::mpsc::channel(1);

connection.on_data_channel(Box::new(move |channel| {
debug!("new data channel");
let peer_id = peer_id.clone();
let mut new_peer_tx = new_peer_tx.clone();
let from_peer_message_tx = from_peer_message_tx.clone();
let mut channel_tx = channel_tx.clone();
Box::pin(async move {
let peer_id2 = peer_id.clone();
let channel2 = Arc::clone(&channel);

// TODO: register close & error callbacks
channel.on_open(Box::new(move || {
debug!("Data channel ready");
Box::pin(async move {
new_peer_tx.send(peer_id2).await.unwrap();
channel_tx.try_send(channel2).unwrap();
})
}));

setup_data_channel(&channel, peer_id, from_peer_message_tx).await;
})
}));

channel_rx.next().await.unwrap()
}

async fn setup_data_channel(
data_channel: &RTCDataChannel,
peer_id: PeerId,
Expand Down
169 changes: 121 additions & 48 deletions matchbox_socket/src/webrtc_socket/wasm/message_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use wasm_bindgen::{prelude::*, JsCast, JsValue};
use wasm_bindgen_futures::JsFuture;
use web_sys::{
MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelInit, RtcDataChannelType,
RtcIceGatheringState, RtcPeerConnection, RtcSdpType, RtcSessionDescriptionInit,
RtcIceCandidate, RtcIceCandidateInit, RtcPeerConnection, RtcSdpType, RtcSessionDescriptionInit,
};

use crate::webrtc_socket::KEEP_ALIVE_INTERVAL;
Expand Down Expand Up @@ -131,6 +131,7 @@ async fn handshake_offer(
config: &WebRtcSocketConfig,
) -> Result<(PeerId, RtcDataChannel), Box<dyn std::error::Error>> {
debug!("making offer");

let conn = create_rtc_peer_connection(config);
let (channel_ready_tx, mut channel_ready_rx) = futures_channel::mpsc::channel(1);
let data_channel = create_data_channel(
Expand All @@ -140,30 +141,25 @@ async fn handshake_offer(
channel_ready_tx,
);

// Create offer
let offer = JsFuture::from(conn.create_offer()).await.efix()?;

let offer_sdp = Reflect::get(&offer, &JsValue::from_str("sdp"))
.efix()?
.as_string()
.ok_or("")?;

let mut rtc_session_desc_init_dict: RtcSessionDescriptionInit =
RtcSessionDescriptionInit::new(RtcSdpType::Offer);

let offer_description = rtc_session_desc_init_dict.sdp(&offer_sdp);

JsFuture::from(conn.set_local_description(offer_description))
.await
.efix()?;

wait_for_ice_complete(conn.clone()).await;

debug!("created offer for new peer");

signal_peer.send(PeerSignal::Offer(conn.local_description().unwrap().sdp()));

let sdp: String;
let mut candidates = vec![];
rozgo marked this conversation as resolved.
Show resolved Hide resolved

// Wait for answer
let sdp: String;
loop {
let signal = signal_receiver
.next()
Expand All @@ -175,29 +171,77 @@ async fn handshake_offer(
sdp = answer;
break;
}
PeerSignal::Offer(_) => {
warn!("Got an unexpected Offer, while waiting for Answer. Ignoring.")
PeerSignal::IceCandidate(candidate) => {
debug!("got an IceCandidate signal! {}", candidate);
candidates.push(candidate);
}
PeerSignal::IceCandidate(_) => {
warn!(
"Got an ice candidate message, but ice trickle is not yet supported. Ignoring."
)
_ => {
warn!("ignoring other signal!!!");
rozgo marked this conversation as resolved.
Show resolved Hide resolved
}
};
}

// Set remote description
let mut remote_description: RtcSessionDescriptionInit =
RtcSessionDescriptionInit::new(RtcSdpType::Answer);

remote_description.sdp(&sdp);

debug!("setting remote description");
JsFuture::from(conn.set_remote_description(&remote_description))
.await
.efix()?;

// send ICE candidates to remote peer
let signal_peer_ice = signal_peer.clone();
let onicecandidate: Box<dyn FnMut(JsValue)> = Box::new(move |event| {
let event = Reflect::get(&event, &JsValue::from_str("candidate")).efix();
if let Ok(event) = event {
if let Ok(candidate) = event.dyn_into::<RtcIceCandidate>() {
debug!("sending IceCandidate signal {}", candidate.candidate());
signal_peer_ice.send(PeerSignal::IceCandidate(candidate.candidate()));
}
}
});
let onicecandidate = Closure::wrap(onicecandidate);
conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref()));

// handle pending ICE candidates
for canditate in candidates {
let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&canditate);
ice_candidate.sdp_m_line_index(Some(0));
JsFuture::from(
conn.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&ice_candidate)),
)
.await
.efix()?;
}

// select for channel ready or ice candidates
debug!("waiting for data channel to open");
channel_ready_rx.next().await;
loop {
select! {
_ = channel_ready_rx.next() => {
debug!("channel ready");
// wait_for_ice_complete(conn.clone()).await;
rozgo marked this conversation as resolved.
Show resolved Hide resolved
break;
}
msg = signal_receiver.next() => {
if let Some(PeerSignal::IceCandidate(candidate)) = msg {
debug!("got an IceCandidate signal! {}", candidate);
let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&candidate);
ice_candidate.sdp_m_line_index(Some(0));
JsFuture::from(
conn.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&ice_candidate)),
)
.await
.efix()?;
}
}
};
}

conn.set_onicecandidate(None);
johanhelsing marked this conversation as resolved.
Show resolved Hide resolved

debug!("Ice completed: {:?}", conn.ice_gathering_state());

Ok((signal_peer.id, data_channel))
}
Expand All @@ -219,13 +263,19 @@ async fn handshake_accept(
channel_ready_tx,
);

let mut candidates = vec![];
rozgo marked this conversation as resolved.
Show resolved Hide resolved

let offer: Option<String>;
loop {
match signal_receiver.next().await.ok_or("error")? {
PeerSignal::Offer(o) => {
offer = Some(o);
break;
}
PeerSignal::IceCandidate(candidate) => {
debug!("got an IceCandidate signal! {}", candidate);
candidates.push(candidate);
}
_ => {
warn!("ignoring other signal!!!");
}
Expand Down Expand Up @@ -266,13 +316,61 @@ async fn handshake_accept(
.await
.efix()?;

wait_for_ice_complete(conn.clone()).await;

let answer = PeerSignal::Answer(conn.local_description().unwrap().sdp());
signal_peer.send(answer);

// send ICE candidates to remote peer
let signal_peer_ice = signal_peer.clone();
let onicecandidate: Box<dyn FnMut(JsValue)> = Box::new(move |event| {
let event = Reflect::get(&event, &JsValue::from_str("candidate")).efix();
if let Ok(event) = event {
if let Ok(candidate) = event.dyn_into::<RtcIceCandidate>() {
debug!("sending IceCandidate signal {}", candidate.candidate());
signal_peer_ice.send(PeerSignal::IceCandidate(candidate.candidate()));
}
}
});
let onicecandidate = Closure::wrap(onicecandidate);
conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref()));

// handle pending ICE candidates
for canditate in candidates {
let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&canditate);
ice_candidate.sdp_m_line_index(Some(0));
JsFuture::from(
conn.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&ice_candidate)),
)
.await
.efix()?;
}

// select for channel ready or ice candidates
debug!("waiting for data channel to open");
channel_ready_rx.next().await;
loop {
select! {
_ = channel_ready_rx.next() => {
debug!("channel ready");
// wait_for_ice_complete(conn.clone()).await;
rozgo marked this conversation as resolved.
Show resolved Hide resolved
break;
}
msg = signal_receiver.next() => {
if let Some(PeerSignal::IceCandidate(candidate)) = msg {
debug!("got an IceCandidate signal! {}", candidate);
let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&candidate);
ice_candidate.sdp_m_line_index(Some(0));
JsFuture::from(
conn.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&ice_candidate)),
)
.await
.efix()?;
}
}
};
}

conn.set_onicecandidate(None);
rozgo marked this conversation as resolved.
Show resolved Hide resolved

debug!("Ice completed: {:?}", conn.ice_gathering_state());

Ok((signal_peer.id, data_channel))
}
Expand All @@ -297,31 +395,6 @@ fn create_rtc_peer_connection(config: &WebRtcSocketConfig) -> RtcPeerConnection
RtcPeerConnection::new_with_configuration(&peer_config).unwrap()
}

async fn wait_for_ice_complete(conn: RtcPeerConnection) {
if conn.ice_gathering_state() == RtcIceGatheringState::Complete {
debug!("Ice already completed");
return;
}

let (mut tx, mut rx) = futures_channel::mpsc::channel(1);

let conn_clone = conn.clone();
let onstatechange: Box<dyn FnMut(JsValue)> = Box::new(move |_| {
if conn_clone.ice_gathering_state() == RtcIceGatheringState::Complete {
tx.try_send(()).unwrap();
}
});

let onstatechange = Closure::wrap(onstatechange);

conn.set_onicegatheringstatechange(Some(onstatechange.as_ref().unchecked_ref()));

rx.next().await;

conn.set_onicegatheringstatechange(None);
debug!("Ice completed");
}

fn create_data_channel(
connection: RtcPeerConnection,
incoming_tx: futures_channel::mpsc::UnboundedSender<(PeerId, Packet)>,
Expand All @@ -332,7 +405,7 @@ fn create_data_channel(
data_channel_config.ordered(false);
data_channel_config.max_retransmits(0);
data_channel_config.negotiated(true);
data_channel_config.id(0);
data_channel_config.id(124);

let channel: RtcDataChannel =
connection.create_data_channel_with_data_channel_dict("webudp", &data_channel_config);
Expand Down