diff --git a/rs/execution_environment/benches/scheduler.rs b/rs/execution_environment/benches/scheduler.rs index 0def9d106db..872cfa3a40b 100644 --- a/rs/execution_environment/benches/scheduler.rs +++ b/rs/execution_environment/benches/scheduler.rs @@ -37,7 +37,6 @@ fn main() { let round_schedule = RoundSchedule::new( scheduler_cores, long_execution_cores, - 0, ordered_new_execution_canister_ids, ordered_long_execution_canister_ids, ); diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index cb5be9e989e..c3e6a72cbac 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -271,8 +271,8 @@ impl SchedulerImpl { // `(compute_capacity - total_compute_allocation) * multiplier / number_of_canisters` // can be simplified to just // `(compute_capacity - total_compute_allocation) * scheduler_cores` - let free_capacity_per_canister = compute_capacity_percent - .saturating_sub(total_compute_allocation_percent) + let free_capacity_per_canister = (compute_capacity_percent + .saturating_sub(total_compute_allocation_percent)) * scheduler_cores as i64; // Fully divide the free allocation across all canisters. @@ -344,7 +344,6 @@ impl SchedulerImpl { let round_schedule = RoundSchedule::new( scheduler_cores, long_execution_cores, - total_compute_allocation_percent, round_states .iter() .skip(number_of_long_executions) @@ -644,7 +643,7 @@ impl SchedulerImpl { scheduler_round_limits: &mut SchedulerRoundLimits, registry_settings: &RegistryExecutionSettings, idkg_subnet_public_keys: &BTreeMap, - ) -> (ReplicatedState, BTreeSet, BTreeSet) { + ) -> (ReplicatedState, BTreeSet) { let measurement_scope = MeasurementScope::nested(&self.metrics.round_inner, root_measurement_scope); let mut ingress_execution_results = Vec::new(); @@ -655,9 +654,6 @@ impl SchedulerImpl { let mut heartbeat_and_timer_canister_ids = BTreeSet::new(); let mut round_executed_canister_ids = BTreeSet::new(); - // The set of canisters marked as fully executed: have no messages to execute - // or were scheduled first on a core. - let mut round_fully_executed_canister_ids = BTreeSet::new(); // Start iteration loop: // - Execute subnet messages. @@ -729,7 +725,6 @@ impl SchedulerImpl { let ( active_canisters, executed_canister_ids, - fully_executed_canister_ids, mut loop_ingress_execution_results, heap_delta, ) = self.execute_canisters_in_inner_round( @@ -744,10 +739,9 @@ impl SchedulerImpl { ); let instructions_consumed = instructions_before - round_limits.instructions; drop(execution_timer); + round_executed_canister_ids.extend(executed_canister_ids); let finalization_timer = self.metrics.round_inner_iteration_fin.start_timer(); - round_executed_canister_ids.extend(executed_canister_ids); - round_fully_executed_canister_ids.extend(fully_executed_canister_ids); total_heap_delta += heap_delta; state.metadata.heap_delta_estimate += heap_delta; @@ -858,11 +852,7 @@ impl SchedulerImpl { .heap_delta_rate_limited_canisters_per_round .observe(round_filtered_canisters.rate_limited_canister_ids.len() as f64); - ( - state, - round_filtered_canisters.active_canister_ids, - round_fully_executed_canister_ids, - ) + (state, round_filtered_canisters.active_canister_ids) } /// Executes canisters in parallel using the thread pool. @@ -889,7 +879,6 @@ impl SchedulerImpl { ) -> ( Vec, BTreeSet, - Vec, Vec<(MessageId, IngressStatus)>, NumBytes, ) { @@ -903,7 +892,6 @@ impl SchedulerImpl { canisters_by_thread.into_iter().flatten().collect(), BTreeSet::new(), vec![], - vec![], NumBytes::from(0), ); } @@ -967,7 +955,6 @@ impl SchedulerImpl { // Aggregate `results_by_thread` to get the result of this function. let mut canisters = Vec::new(); let mut executed_canister_ids = BTreeSet::new(); - let mut fully_executed_canister_ids = vec![]; let mut ingress_results = Vec::new(); let mut total_instructions_executed = NumInstructions::from(0); let mut max_instructions_executed_per_thread = NumInstructions::from(0); @@ -975,7 +962,6 @@ impl SchedulerImpl { for mut result in results_by_thread.into_iter() { canisters.append(&mut result.canisters); executed_canister_ids.extend(result.executed_canister_ids); - fully_executed_canister_ids.extend(result.fully_executed_canister_ids); ingress_results.append(&mut result.ingress_results); let instructions_executed = as_num_instructions( round_limits_per_thread.instructions - result.round_limits.instructions, @@ -1009,7 +995,6 @@ impl SchedulerImpl { ( canisters, executed_canister_ids, - fully_executed_canister_ids, ingress_results, heap_delta, ) @@ -1722,7 +1707,7 @@ impl Scheduler for SchedulerImpl { }; // Inner round. - let (mut state, active_canister_ids, fully_executed_canister_ids) = self.inner_round( + let (mut state, active_canister_ids) = self.inner_round( state, &mut csprng, &round_schedule, @@ -1883,10 +1868,6 @@ impl Scheduler for SchedulerImpl { .num_canister_snapshots .set(final_state.canister_snapshots.count() as i64); } - round_schedule.finish_round( - &mut final_state.canister_states, - fully_executed_canister_ids, - ); self.finish_round(&mut final_state, current_round_type); final_state .metadata @@ -1962,7 +1943,6 @@ fn observe_instructions_consumed_per_message( struct ExecutionThreadResult { canisters: Vec, executed_canister_ids: BTreeSet, - fully_executed_canister_ids: Vec, ingress_results: Vec<(MessageId, IngressStatus)>, slices_executed: NumSlices, messages_executed: NumMessages, @@ -2000,7 +1980,6 @@ fn execute_canisters_on_thread( // These variables accumulate the results and will be returned at the end. let mut canisters = vec![]; let mut executed_canister_ids = BTreeSet::new(); - let mut fully_executed_canister_ids = vec![]; let mut ingress_results = vec![]; let mut total_slices_executed = NumSlices::from(0); let mut total_messages_executed = NumMessages::from(0); @@ -2113,13 +2092,18 @@ fn execute_canisters_on_thread( if let Some(es) = &mut canister.execution_state { es.last_executed_round = round_id; } - RoundSchedule::finish_canister_execution( - &mut canister, - &mut fully_executed_canister_ids, - round_id, - is_first_iteration, - rank, - ); + let full_message_execution = match canister.next_execution() { + NextExecution::None => true, + NextExecution::StartNew => false, + // We just finished a full slice of executions. + NextExecution::ContinueLong | NextExecution::ContinueInstallCode => true, + }; + let scheduled_first = is_first_iteration && rank == 0; + if full_message_execution || scheduled_first { + // The very first canister is considered to have a full execution round for + // scheduling purposes even if it did not complete within the round. + canister.scheduler_state.last_full_execution_round = round_id; + } canister.system_state.canister_metrics.executed += 1; canisters.push(canister); round_limits.instructions -= @@ -2129,7 +2113,6 @@ fn execute_canisters_on_thread( ExecutionThreadResult { canisters, executed_canister_ids, - fully_executed_canister_ids, ingress_results, slices_executed: total_slices_executed, messages_executed: total_messages_executed, diff --git a/rs/execution_environment/src/scheduler/round_schedule.rs b/rs/execution_environment/src/scheduler/round_schedule.rs index f8b30150a0d..13e21c48848 100644 --- a/rs/execution_environment/src/scheduler/round_schedule.rs +++ b/rs/execution_environment/src/scheduler/round_schedule.rs @@ -1,9 +1,9 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::collections::{BTreeMap, HashMap}; use ic_base_types::{CanisterId, NumBytes}; use ic_config::flag_status::FlagStatus; use ic_replicated_state::{canister_state::NextExecution, CanisterState}; -use ic_types::{AccumulatedPriority, ComputeAllocation, ExecutionRound, LongExecutionMode}; +use ic_types::{AccumulatedPriority, ComputeAllocation, LongExecutionMode}; /// Round metrics required to prioritize a canister. #[derive(Clone, Debug)] @@ -42,8 +42,6 @@ pub struct RoundSchedule { pub scheduler_cores: usize, /// Number of cores dedicated for long executions. pub long_execution_cores: usize, - // Sum of all canisters compute allocation in percent. - pub total_compute_allocation_percent: i64, /// Ordered Canister IDs with new executions. pub ordered_new_execution_canister_ids: Vec, /// Ordered Canister IDs with long executions. @@ -54,7 +52,6 @@ impl RoundSchedule { pub fn new( scheduler_cores: usize, long_execution_cores: usize, - total_compute_allocation_percent: i64, ordered_new_execution_canister_ids: Vec, ordered_long_execution_canister_ids: Vec, ) -> Self { @@ -62,7 +59,6 @@ impl RoundSchedule { scheduler_cores, long_execution_cores: long_execution_cores .min(ordered_long_execution_canister_ids.len()), - total_compute_allocation_percent, ordered_new_execution_canister_ids, ordered_long_execution_canister_ids, } @@ -178,7 +174,6 @@ impl RoundSchedule { RoundSchedule::new( self.scheduler_cores, self.long_execution_cores, - self.total_compute_allocation_percent, ordered_new_execution_canister_ids, ordered_long_execution_canister_ids, ), @@ -234,64 +229,4 @@ impl RoundSchedule { (canisters_partitioned_by_cores, canisters) } - - pub fn finish_canister_execution( - canister: &mut CanisterState, - fully_executed_canister_ids: &mut Vec, - round_id: ExecutionRound, - is_first_iteration: bool, - rank: usize, - ) { - let full_message_execution = match canister.next_execution() { - NextExecution::None => true, - NextExecution::StartNew => false, - // We just finished a full slice of executions. - NextExecution::ContinueLong => true, - NextExecution::ContinueInstallCode => false, - }; - let scheduled_first = is_first_iteration && rank == 0; - - // The very first canister is considered to have a full execution round for - // scheduling purposes even if it did not complete within the round. - if full_message_execution || scheduled_first { - canister.scheduler_state.last_full_execution_round = round_id; - - // We schedule canisters (as opposed to individual messages), - // and we charge for every full execution round. - fully_executed_canister_ids.push(canister.canister_id()); - } - } - - pub(crate) fn finish_round( - &self, - canister_states: &mut BTreeMap, - fully_executed_canister_ids: BTreeSet, - ) { - let scheduler_cores = self.scheduler_cores; - let number_of_canisters = canister_states.len(); - let multiplier = (scheduler_cores * number_of_canisters).max(1) as i64; - - // Charge canisters for full executions in this round. - let mut total_charged_priority = 0; - for canister_id in fully_executed_canister_ids { - if let Some(canister) = canister_states.get_mut(&canister_id) { - total_charged_priority += 100 * multiplier; - canister.scheduler_state.priority_credit += (100 * multiplier).into(); - } - } - - let total_allocated = self.total_compute_allocation_percent * multiplier; - // Free capacity per canister in multiplied percent. - let free_capacity_per_canister = total_charged_priority.saturating_sub(total_allocated) - / number_of_canisters.max(1) as i64; - // Fully divide the free allocation across all canisters. - for canister in canister_states.values_mut() { - // De-facto compute allocation includes bonus allocation - let factual = canister.scheduler_state.compute_allocation.as_percent() as i64 - * multiplier - + free_capacity_per_canister; - // Increase accumulated priority by de-facto compute allocation. - canister.scheduler_state.accumulated_priority += factual.into(); - } - } } diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index b0328dbdaf7..3af9951d31b 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -5936,96 +5936,3 @@ fn inner_round_long_execution_is_a_full_execution() { // The accumulated priority invariant should be respected. assert_eq!(total_accumulated_priority - total_priority_credit, 0); } - -#[test_strategy::proptest(ProptestConfig { cases: 8, ..ProptestConfig::default() })] -fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores: usize) { - let instructions = 20; - let messages_per_round = 2; - let mut test = SchedulerTestBuilder::new() - .with_scheduler_config(SchedulerConfig { - scheduler_cores, - max_instructions_per_round: (instructions * messages_per_round).into(), - max_instructions_per_message: instructions.into(), - max_instructions_per_message_without_dts: instructions.into(), - max_instructions_per_slice: instructions.into(), - instruction_overhead_per_execution: 0.into(), - instruction_overhead_per_canister: 0.into(), - instruction_overhead_per_canister_for_finalization: 0.into(), - ..SchedulerConfig::application_subnet() - }) - .build(); - - // Bump up the round number. - test.execute_round(ExecutionRoundType::OrdinaryRound); - - // Create `messages_per_round * 2` canisters for each scheduler core. - let num_canisters = scheduler_cores as u64 * messages_per_round * 2; - let mut canister_ids = vec![]; - for _ in 0..num_canisters { - let canister_id = test.create_canister(); - // Send one messages per canister. Having `max_messages_per_round * 2` canisters, - // only half of them will finish in one round. - test.send_ingress(canister_id, ingress(instructions)); - canister_ids.push(canister_id); - } - - test.execute_round(ExecutionRoundType::OrdinaryRound); - - let mut total_accumulated_priority = 0; - let mut total_priority_credit = 0; - for (i, canister) in test.state().canisters_iter().enumerate() { - if i < num_canisters as usize / 2 { - // The first half of the canisters should finish their messages. - prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 0); - prop_assert_eq!(canister.system_state.canister_metrics.executed, 1); - prop_assert_eq!( - canister.scheduler_state.last_full_execution_round, - test.last_round() - ); - } else { - // The second half of the canisters should still have their messages. - prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 1); - prop_assert_eq!(canister.system_state.canister_metrics.executed, 0); - prop_assert_eq!(canister.scheduler_state.last_full_execution_round, 0.into()); - } - total_accumulated_priority += canister.scheduler_state.accumulated_priority.get(); - total_priority_credit += canister.scheduler_state.priority_credit.get(); - } - prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0); - - // Send one more message for first half of the canisters. - for (i, canister) in canister_ids.iter().enumerate() { - if i < num_canisters as usize / 2 { - test.send_ingress(*canister, ingress(instructions)); - } - } - - test.execute_round(ExecutionRoundType::OrdinaryRound); - - let mut total_accumulated_priority = 0; - let mut total_priority_credit = 0; - for (i, canister) in test.state().canisters_iter().enumerate() { - // Now all the canisters should be executed once. - prop_assert_eq!(canister.system_state.canister_metrics.executed, 1); - if i < num_canisters as usize / 2 { - // The first half of the canisters should have messages. - prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 1); - // The first half of the canisters should be executed two rounds ago. - prop_assert_eq!( - canister.scheduler_state.last_full_execution_round.get(), - test.last_round().get() - 1 - ); - } else { - // The second half of the canisters should finish their messages. - prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 0); - // The second half of the canisters should be executed in the last round. - prop_assert_eq!( - canister.scheduler_state.last_full_execution_round, - test.last_round() - ); - } - total_accumulated_priority += canister.scheduler_state.accumulated_priority.get(); - total_priority_credit += canister.scheduler_state.priority_credit.get(); - } - prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0); -}