Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented flush function, without remote changes considered. #89

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/replicate/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pin-project.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio = { workspace = true, default-features = false }
tokio-serde.workspace = true
tokio-util.workspace = true
url.workspace = true
tokio-serde = { workspace = true, features = ["json"] }
tokio-util = { workspace = true, features = ["codec"] }
url = { workspace = true, features = ["serde"] }
uuid = { workspace = true, features = ["v4", "serde"] }
146 changes: 144 additions & 2 deletions crates/replicate/common/src/data_model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ pub struct LocalChanges {
/// The *pending* version of [`LocalChanges`]. These are built up internally as the
/// local data model is mutated. At any time, the data model can be flushed and
/// [`PendingLocalChanges`] are converted to [`LocalChanges`].
///
/// The reason that we have this type in addition to [`LocalChanges`] is because
/// [`PendingLocalChanges`] refers to the states in the data model and therefore to
/// read the value of a mutated entity's state requires access to [`DataModel::data`],
/// whereas [`LocalChanges`] is self-contained and doesn't require any access to the
/// [`DataModel`]. It is important for [`LocalChanges`] to be self-contained because it
/// is going to be sent to a separate task and won't necessarily have read access
/// anymore to the overall data model.
#[derive(Debug, Clone, Eq, PartialEq, Default)]
struct PendingLocalChanges {
spawns: EntityMap<State>,
Expand Down Expand Up @@ -198,6 +206,8 @@ impl DataModel {
let deleted_data = self.data.remove(&entity).ok_or(EntityNotPresent)?;
let insert_result = self.pending.despawns.insert(entity, deleted_data.state);
debug_assert!(insert_result.is_none(), "sanity: can't despawn twice");
// No point sending mutations when we already send despawn state
self.pending.mutations.remove(&entity);

Ok(())
}
Expand Down Expand Up @@ -320,9 +330,24 @@ impl DataModel {
pub fn flush(
&mut self,
_remote_changes: &RemoteChanges,
_local_changes: &mut LocalChanges,
local_changes: &mut LocalChanges,
) {
todo!()
// TODO: For now, we aren't actually applying any remote changes, because the
// server isn't implemented.

local_changes.mutations.clear();
local_changes.despawns.clear();
local_changes.mutations.clear();

local_changes.spawns.extend(self.pending.spawns.drain());
local_changes.despawns.extend(self.pending.despawns.drain());
local_changes
.mutations
.extend(self.pending.mutations.iter().map(|(&entity, &mutation)| {
let state = self.get(entity).expect("entity should be present").clone();
(entity, (mutation, state))
}));
self.pending.mutations.clear();
}
}

Expand Down Expand Up @@ -457,6 +482,123 @@ mod test_dm {
assert_eq!(dm.pending.despawns.len(), 0);
assert_eq!(dm.pending.mutations.len(), 0);
}

#[test]
fn test_flush_no_remote() {
fn assert_dm_matches_local(dm: &DataModel, expected_local: &LocalChanges) {
let pending = &dm.pending;
// Sanity check, make sure that pending and data model are consistent.
pending
.spawns
.iter()
// Only check for existence on items that weren't despawned
.filter(|(e, _bytes)| !pending.despawns.contains_key(e))
.for_each(|(&entity, _bytes)| {
assert!(dm.get(entity).is_ok(), "entity {entity:?}");
});
pending.despawns.iter().for_each(|(&entity, _bytes)| {
assert_eq!(dm.get(entity), Err(EntityNotPresent), "entity {entity:?}");
});
pending.mutations.iter().for_each(|(&entity, _mutation)| {
assert!(dm.get(entity).is_ok(), "entity {entity:?}")
});

// Actually compare pending to expected
assert_eq!(pending.spawns, expected_local.spawns);
assert_eq!(pending.despawns, expected_local.despawns);
assert_eq!(pending.mutations.len(), expected_local.mutations.len());
pending.mutations.iter().for_each(|(&entity, &mutation)| {
let l_entry = &expected_local.mutations[&entity];
assert_eq!(l_entry.0, mutation, "entity: {entity:?}");
assert_eq!(l_entry.1, dm.get(entity).unwrap(), "entity: {entity:?}");
});

// compare data model states to expected
let expected_states = {
let mut states = expected_local.spawns.clone();
states.extend(
expected_local
.mutations
.iter()
.map(|(&e, (_state_mutation, bytes))| (e, bytes.clone())),
);
states.retain(|e, _bytes| !expected_local.despawns.contains_key(e));
states
};
assert_eq!(dm.data.len(), expected_states.len());
for (e, entity_data) in dm.data.iter() {
assert_eq!(entity_data.state, expected_states[&e], "entity: {e:?}");
}
}

let mut dm = DataModel::default();
let mut expected_local = LocalChanges::default();

fn check(dm: &DataModel, expected_local: &LocalChanges) {
assert_dm_matches_local(dm, expected_local);
let mut local_from_flush = LocalChanges::default();
let remote = RemoteChanges::default();
// Clone to avoid clearing pending local changes from datamodel needed in later steps.
dm.clone().flush(&remote, &mut local_from_flush);
assert_eq!(&local_from_flush, expected_local);
}

// Spawn e0
let s0_a = State::from_static(b"s0_a");
let e0 = dm.spawn(s0_a.clone());
expected_local.spawns.insert(e0, s0_a.clone());
check(&dm, &expected_local);

// Spawn e1
let s1_a = State::from_static(b"s1_a");
let e1 = dm.spawn(s1_a.clone());
assert_eq!(dm.get(e1).unwrap(), &s1_a);
expected_local.spawns.insert(e1, s1_a);
check(&dm, &expected_local);

// Update e0 unreliably
let s0_b = State::from_static(b"s0_b");
dm.update(e0, s0_b.clone()).unwrap();
expected_local
.mutations
.insert(e0, (StateMutation::Unreliable, s0_b));
check(&dm, &expected_local);

// Update e0 reliably
let s0_c = State::from_static(b"s0_c");
dm.update_reliable(e0, s0_c.clone()).unwrap();
// Unreliable overwritten with Reliable, along with state.
expected_local
.mutations
.insert(e0, (StateMutation::Reliable, s0_c));
check(&dm, &expected_local);

// Update e0 unreliably
let s0_d = State::from_static(b"s0_d");
dm.update(e0, s0_d.clone()).unwrap();
// Reliable is not overwritten, but the state is.
expected_local
.mutations
.insert(e0, (StateMutation::Reliable, s0_d.clone()));
check(&dm, &expected_local);

// Update e1 unreliably
let s1_b = State::from_static(b"s1_b");
dm.update(e1, s1_b.clone()).unwrap();
expected_local
.mutations
.insert(e1, (StateMutation::Unreliable, s1_b));
check(&dm, &expected_local);

// Despawn e0
dm.despawn(e0).unwrap();
// The mutations should dissapear...
let state_at_despawn = expected_local.mutations.remove(&e0).unwrap().1;
// ...but the spawns should remain.
assert_eq!(expected_local.spawns[&e0], s0_a);
expected_local.despawns.insert(e0, state_at_despawn);
check(&dm, &expected_local);
}
}

#[cfg(test)]
Expand Down
Loading