diff --git a/faux-mgs/src/main.rs b/faux-mgs/src/main.rs index 06160de..3f7c346 100644 --- a/faux-mgs/src/main.rs +++ b/faux-mgs/src/main.rs @@ -36,10 +36,12 @@ use slog::warn; use slog::Drain; use slog::Level; use slog::Logger; +use slog_async::AsyncGuard; use std::collections::BTreeMap; use std::fs; use std::fs::File; use std::io; +use std::mem; use std::net::SocketAddrV6; use std::path::Path; use std::path::PathBuf; @@ -437,25 +439,23 @@ fn ignition_command_from_str(s: &str) -> Result { } } -fn build_logger(level: Level, path: Option<&Path>) -> Result { +fn build_logger( + level: Level, + path: Option<&Path>, +) -> Result<(Logger, AsyncGuard)> { fn make_drain( level: Level, decorator: D, - ) -> slog::Fuse { + ) -> (slog::Fuse, AsyncGuard) { let drain = slog_term::FullFormat::new(decorator) .build() .filter_level(level) .fuse(); - slog_async::Async::new(drain).build().fuse() + let (drain, guard) = slog_async::Async::new(drain).build_with_guard(); + (drain.fuse(), guard) } - let drain = if let Some(path) = path { - // Special case /dev/null - don't even bother with slog_async, just - // return a discarding logger. - if path == Path::new("/dev/null") { - return Ok(Logger::root(slog::Discard, o!())); - } - + let (drain, guard) = if let Some(path) = path { let file = File::create(path).with_context(|| { format!("failed to create logfile {}", path.display()) })?; @@ -464,7 +464,7 @@ fn build_logger(level: Level, path: Option<&Path>) -> Result { make_drain(level, slog_term::TermDecorator::new().build()) }; - Ok(Logger::root(drain, o!("component" => "faux-mgs"))) + Ok((Logger::root(drain, o!("component" => "faux-mgs")), guard)) } fn build_requested_interfaces(patterns: Vec) -> Result> { @@ -507,7 +507,8 @@ fn build_requested_interfaces(patterns: Vec) -> Result> { async fn main() -> Result<()> { let args = Args::parse(); - let log = build_logger(args.log_level, args.logfile.as_deref())?; + let (log, log_guard) = + build_logger(args.log_level, args.logfile.as_deref())?; let per_attempt_timeout = Duration::from_millis(args.per_attempt_timeout_millis); @@ -589,7 +590,17 @@ async fn main() -> Result<()> { .await?; // If usart::run() returns, the user detached; exit. - return Ok(()); + // + // We don't just `return Ok(())` here because we'll bump into + // https://github.com/tokio-rs/tokio/issues/2466: `usart::run()` + // reads from stdin, which means we end up with a task blocked in a + // system call, preventing tokio from shutting down the runtime + // created via `tokio::main`. We could create an explicit `Runtime` + // and call `shutdown_background`; instead, we explicitly exit to + // bypass tokio's shutdown. We first drop our `log_guard` to ensure + // any messages have been flushed. + mem::drop(log_guard); + std::process::exit(0); } Command::ServeHostPhase2 { directory } => { populate_phase2_images(&host_phase2_provider, &directory, &log) diff --git a/faux-mgs/src/usart.rs b/faux-mgs/src/usart.rs index 1c0d440..ad09d04 100644 --- a/faux-mgs/src/usart.rs +++ b/faux-mgs/src/usart.rs @@ -13,7 +13,10 @@ use gateway_messages::SERIAL_CONSOLE_IDLE_TIMEOUT; use gateway_sp_comms::error::CommunicationError; use gateway_sp_comms::AttachedSerialConsoleSend; use gateway_sp_comms::SingleSp; +use slog::error; +use slog::info; use slog::warn; +use std::collections::VecDeque; use std::fs::File; use std::io; use std::io::Write; @@ -24,7 +27,9 @@ use std::time::Duration; use termios::Termios; use tokio::io::AsyncReadExt; use tokio::sync::mpsc; +use tokio::sync::oneshot; use tokio::time; +use tokio::time::Interval; use tokio::time::MissedTickBehavior; use crate::picocom_map::RemapRules; @@ -42,8 +47,9 @@ pub(crate) async fn run( uart_logfile: Option, log: slog::Logger, ) -> Result<()> { - // Put terminal in raw mode, if requested, with a guard to restore it. - let _guard = + // Put terminal in raw mode, if requested, with a guard to restore it on + // drop or when we return. + let termios_guard = if raw { Some(UnrawTermiosGuard::make_stdout_raw()?) } else { None }; // Parse imap/omap strings. @@ -81,32 +87,33 @@ pub(crate) async fn run( .with_context(|| "failed to attach to serial console")?; let (console_tx, mut console_rx) = console.split(); + let (fatal_err_tx, mut fatal_err_rx) = oneshot::channel(); let (send_tx, send_rx) = mpsc::channel(8); - let tx_to_sp_handle = tokio::spawn(async move { - relay_data_to_sp(console_tx, send_rx, log).await.unwrap(); - }); - + let tx_to_sp_handle = tokio::spawn(relay_data_to_sp( + console_tx, + send_rx, + fatal_err_tx, + log.clone(), + )); + + let mut encountered_fatal_error = false; loop { tokio::select! { result = stdin.read_buf(&mut stdin_buf) => { let n = result.context("failed to read from stdin")?; if n == 0 { - mem::drop(send_tx); - tx_to_sp_handle.await.unwrap(); - return Ok(()); + break; } match out_buf.ingest(&mut stdin_buf) { IngestResult::Ok => (), IngestResult::Exit => { - mem::drop(send_tx); - tx_to_sp_handle.await.unwrap(); - return Ok(()); + break; } IngestResult::Break => { send_tx.send(SendTxData::Break) .await - .with_context(|| "failed to send data (task shutdown?)")?; + .context("failed to send data (task shutdown?)")?; println!("\n\r*** break sent ***\r"); } } @@ -114,8 +121,26 @@ pub(crate) async fn run( flush_delay.start_if_unstarted().await; } + fatal_err_result = &mut fatal_err_rx => { + // The sending half of `fatal_err_rx` is held by our + // `relay_data_to_sp` task; it should only exit if we tell it to + // (which we do _below_ this select loop if we break out due to + // the user exiting) or if it encounters a fatal error (in which + // case it first sends a message on this channel). + let fatal_err = fatal_err_result + .expect("tx_to_sp task panicked"); + error!( + log, "fatal communication error with SP"; + "err" => #%fatal_err, + ); + encountered_fatal_error = true; + break; + } + chunk = console_rx.recv() => { - let chunk = chunk.unwrap(); + // The sending half of `console_rx` is held by the task spawned + // when `sp` was created; it should not exit until we drop `sp`. + let chunk = chunk.expect("internal SP task panicked"); if let Some(uart_logfile) = uart_logfile.as_mut() { uart_logfile @@ -138,29 +163,87 @@ pub(crate) async fn run( } } } + + // Drop the sending half of this channel to signal our tx-to-sp task to + // exit. + mem::drop(send_tx); + let console_tx = tx_to_sp_handle.await.expect("tx_to_sp task panicked"); + + // If we encountered a fatal error, we will not attempt to detach from the + // SP. One possible fatal error is _someone else detached us_, and they + // might have attached themselves in the meantime. Other fatal errors + // indicate a serious problem communicating with the SP, and it's likely + // detaching will fail anyway. + if !encountered_fatal_error { + console_tx + .detach() + .await + .context("failed to detach from SP console")?; + } + + // Restore termios settings, if we put the terminal into raw mode. (This + // would happen automatically when the guard is dropped, but doing it + // explicitly lets us check for errors.) + if let Some(guard) = termios_guard { + guard.restore()?; + } + + Ok(()) } async fn relay_data_to_sp( mut console_tx: AttachedSerialConsoleSend, mut data_rx: mpsc::Receiver, + fatal_err_tx: oneshot::Sender, log: slog::Logger, -) -> Result<()> { +) -> AttachedSerialConsoleSend { let mut keepalive = time::interval(SERIAL_CONSOLE_IDLE_TIMEOUT / 4); keepalive.set_missed_tick_behavior(MissedTickBehavior::Delay); + // If we fail to send a message, we need to resend it; we'll keep a running + // ticker and check for messages-to-send every second. + let mut check_for_resend = time::interval(Duration::from_secs(1)); + check_for_resend.set_missed_tick_behavior(MissedTickBehavior::Skip); + + let mut messages_to_send = VecDeque::new(); + + // If we warn the user that we've failed to communicate with the SP, we will + // also inform them if we later succeed. + let mut recently_warned = false; + loop { tokio::select! { maybe_data = data_rx.recv() => { match maybe_data { - Some(SendTxData::Buf(buf)) => { - console_tx.write(buf).await?; - keepalive.reset(); - } - Some(SendTxData::Break) => { - console_tx.send_break().await?; - keepalive.reset(); + Some(message) => { + messages_to_send.push_front(message); + if let Err(fatal_err) = drain_messages_to_send( + &mut messages_to_send, + &mut console_tx, + &mut keepalive, + &mut recently_warned, + &log, + ).await { + fatal_err_tx + .send(fatal_err) + .expect("parent task exited"); + return console_tx; + } } - None => break, + None => return console_tx, + } + } + + _ = check_for_resend.tick() => { + if let Err(fatal_err) = drain_messages_to_send( + &mut messages_to_send, + &mut console_tx, + &mut keepalive, + &mut recently_warned, + &log, + ).await { + fatal_err_tx.send(fatal_err).expect("parent task exited"); + return console_tx; } } @@ -172,10 +255,11 @@ async fn relay_data_to_sp( Err(CommunicationError::SpError(SpError::BadRequest( BadRequestReason::DeserializationError, ))) => { - warn!(log, concat!( - "This SP does not support console keepalives! ", - "Please update it at your earliest convenience.", - )); + warn!( + log, + "This SP does not support console keepalives! \ + Please update it at your earliest convenience.", + ); // Change our keepalive timer to only tick once ever 4 // hours (i.e., probably never, unless someone leaves // the console open.) @@ -184,18 +268,70 @@ async fn relay_data_to_sp( ); keepalive.reset(); } - Err(err) => return Err(err.into()), + Err(err) => { + warn!( + log, "failed to send console keepalive"; + "err" => #%err, + ); + } } } } } +} - console_tx.detach().await?; +async fn drain_messages_to_send( + messages: &mut VecDeque, + tx: &mut AttachedSerialConsoleSend, + keepalive: &mut Interval, + recently_warned: &mut bool, + log: &slog::Logger, +) -> Result<(), CommunicationError> { + while let Some(message) = messages.front().cloned() { + let result = match message { + SendTxData::Buf(data) => tx.write(data).await, + SendTxData::Break => tx.send_break().await, + }; + + match result { + Ok(()) => { + if *recently_warned { + info!(log, "communication with SP reestablished"); + *recently_warned = false; + } + messages.pop_front(); + keepalive.reset(); + } + // These error cases are fatal: if we get this response, we do not + // expect any future writes to succeed. + Err( + fatal_err @ (CommunicationError::SpError( + SpError::BadRequest(_) + | SpError::RequestUnsupportedForSp + | SpError::RequestUnsupportedForComponent + | SpError::SerialConsoleNotAttached + | SpError::SerialConsoleAlreadyAttached, + ) + | CommunicationError::BogusSerialConsoleState + | CommunicationError::VersionMismatch { .. }), + ) => { + return Err(fatal_err); + } + Err(non_fatal_err) => { + warn!( + log, "communication error with SP (will retry)"; + "err" => #%non_fatal_err, + ); + *recently_warned = true; + return Ok(()); + } + } + } Ok(()) } -#[derive(Debug)] +#[derive(Debug, Clone)] enum SendTxData { Buf(Vec), Break, @@ -204,11 +340,14 @@ enum SendTxData { struct UnrawTermiosGuard { stdout: i32, ios: Termios, + restored: bool, } impl Drop for UnrawTermiosGuard { fn drop(&mut self) { - termios::tcsetattr(self.stdout, termios::TCSAFLUSH, &self.ios).unwrap(); + if !self.restored { + _ = termios::tcsetattr(self.stdout, termios::TCSAFLUSH, &self.ios); + } } } @@ -223,7 +362,14 @@ impl UnrawTermiosGuard { .with_context(|| "failed to set TCSANOW on stdout")?; termios::tcflush(stdout, termios::TCIOFLUSH) .with_context(|| "failed to set TCIOFLUSH on stdout")?; - Ok(Self { stdout, ios: orig_ios }) + Ok(Self { stdout, ios: orig_ios, restored: false }) + } + + fn restore(mut self) -> Result<()> { + termios::tcsetattr(self.stdout, termios::TCSAFLUSH, &self.ios) + .context("failed to restore stdout termios settings")?; + self.restored = true; + Ok(()) } } @@ -255,12 +401,12 @@ impl FlushDelay { async fn start_if_unstarted(&mut self) { if !self.started { self.started = true; - self.tx.send(()).await.unwrap(); + self.tx.send(()).await.expect("inner task panicked"); } } async fn ready(&mut self) { - self.rx.recv().await.unwrap(); + self.rx.recv().await.expect("inner task panicked"); self.started = false; } }