From b4efa58d5d0c0642790529b14eb327d03896a0b6 Mon Sep 17 00:00:00 2001 From: Graham Held Date: Thu, 2 May 2024 06:00:03 -0700 Subject: [PATCH] feat(shell) raw-encoded pipe reader directly outputs buffer (no newline scan) (#1231) * Shell raw-encoded pipe reader directly outputs buffer (no newline scan) * Suggestions from code review and add .changes file * fmt --- .changes/enhance-shell-raw-out.md | 6 ++ plugins/shell/src/commands.rs | 5 +- plugins/shell/src/process/mod.rs | 124 ++++++++++++++++++++++-------- 3 files changed, 100 insertions(+), 35 deletions(-) create mode 100644 .changes/enhance-shell-raw-out.md diff --git a/.changes/enhance-shell-raw-out.md b/.changes/enhance-shell-raw-out.md new file mode 100644 index 00000000..0b9c7c3a --- /dev/null +++ b/.changes/enhance-shell-raw-out.md @@ -0,0 +1,6 @@ +--- +"shell": patch +--- + +When the "raw" encoding option is specified for a shell process, all bytes from the child's output streams are passed to the data handlers. +This makes it possible to read output from programs that write unencoded byte streams to stdout (like ffmpeg) \ No newline at end of file diff --git a/plugins/shell/src/commands.rs b/plugins/shell/src/commands.rs index 050713a3..3d860cc6 100644 --- a/plugins/shell/src/commands.rs +++ b/plugins/shell/src/commands.rs @@ -154,7 +154,10 @@ pub fn execute( let encoding = match options.encoding { Option::None => EncodingWrapper::Text(None), Some(encoding) => match encoding.as_str() { - "raw" => EncodingWrapper::Raw, + "raw" => { + command = command.set_raw_out(true); + EncodingWrapper::Raw + } _ => { if let Some(text_encoding) = Encoding::for_label(encoding.as_bytes()) { EncodingWrapper::Text(Some(text_encoding)) diff --git a/plugins/shell/src/process/mod.rs b/plugins/shell/src/process/mod.rs index fdb26897..44f037b0 100644 --- a/plugins/shell/src/process/mod.rs +++ b/plugins/shell/src/process/mod.rs @@ -4,7 +4,7 @@ use std::{ ffi::OsStr, - io::{BufReader, Write}, + io::{BufRead, BufReader, Write}, path::{Path, PathBuf}, process::{Command as StdCommand, Stdio}, sync::{Arc, RwLock}, @@ -41,11 +41,13 @@ pub struct TerminatedPayload { #[derive(Debug, Clone)] #[non_exhaustive] pub enum CommandEvent { - /// Stderr bytes until a newline (\n) or carriage return (\r) is found. + /// If configured for raw output, all bytes written to stderr. + /// Otherwise, bytes until a newline (\n) or carriage return (\r) is found. Stderr(Vec), - /// Stdout bytes until a newline (\n) or carriage return (\r) is found. + /// If configured for raw output, all bytes written to stdout. + /// Otherwise, bytes until a newline (\n) or carriage return (\r) is found. Stdout(Vec), - /// An error happened waiting for the command to finish or converting the stdout/stderr bytes to an UTF-8 string. + /// An error happened waiting for the command to finish or converting the stdout/stderr bytes to a UTF-8 string. Error(String), /// Command process terminated. Terminated(TerminatedPayload), @@ -53,7 +55,10 @@ pub enum CommandEvent { /// The type to spawn commands. #[derive(Debug)] -pub struct Command(StdCommand); +pub struct Command { + cmd: StdCommand, + raw_out: bool, +} /// Spawned child process. #[derive(Debug)] @@ -122,7 +127,7 @@ fn relative_command_path(command: &Path) -> crate::Result { impl From for StdCommand { fn from(cmd: Command) -> StdCommand { - cmd.0 + cmd.cmd } } @@ -136,7 +141,10 @@ impl Command { #[cfg(windows)] command.creation_flags(CREATE_NO_WINDOW); - Self(command) + Self { + cmd: command, + raw_out: false, + } } pub(crate) fn new_sidecar>(program: S) -> crate::Result { @@ -146,7 +154,7 @@ impl Command { /// Appends an argument to the command. #[must_use] pub fn arg>(mut self, arg: S) -> Self { - self.0.arg(arg); + self.cmd.arg(arg); self } @@ -157,14 +165,14 @@ impl Command { I: IntoIterator, S: AsRef, { - self.0.args(args); + self.cmd.args(args); self } /// Clears the entire environment map for the child process. #[must_use] pub fn env_clear(mut self) -> Self { - self.0.env_clear(); + self.cmd.env_clear(); self } @@ -175,7 +183,7 @@ impl Command { K: AsRef, V: AsRef, { - self.0.env(key, value); + self.cmd.env(key, value); self } @@ -187,14 +195,20 @@ impl Command { K: AsRef, V: AsRef, { - self.0.envs(envs); + self.cmd.envs(envs); self } /// Sets the working directory for the child process. #[must_use] pub fn current_dir>(mut self, current_dir: P) -> Self { - self.0.current_dir(current_dir); + self.cmd.current_dir(current_dir); + self + } + + /// Configures the reader to output bytes from the child process exactly as received + pub fn set_raw_out(mut self, raw_out: bool) -> Self { + self.raw_out = raw_out; self } @@ -229,6 +243,7 @@ impl Command { /// }); /// ``` pub fn spawn(self) -> crate::Result<(Receiver, CommandChild)> { + let raw = self.raw_out; let mut command: StdCommand = self.into(); let (stdout_reader, stdout_writer) = pipe()?; let (stderr_reader, stderr_writer) = pipe()?; @@ -249,12 +264,14 @@ impl Command { guard.clone(), stdout_reader, CommandEvent::Stdout, + raw, ); spawn_pipe_reader( tx.clone(), guard.clone(), stderr_reader, CommandEvent::Stderr, + raw, ); spawn(move || { @@ -359,35 +376,74 @@ impl Command { } } +fn read_raw_bytes) -> CommandEvent + Send + Copy + 'static>( + mut reader: BufReader, + tx: Sender, + wrapper: F, +) { + loop { + let result = reader.fill_buf(); + match result { + Ok(buf) => { + let length = buf.len(); + if length == 0 { + break; + } + let tx_ = tx.clone(); + let _ = block_on_task(async move { tx_.send(wrapper(buf.to_vec())).await }); + reader.consume(length); + } + Err(e) => { + let tx_ = tx.clone(); + let _ = block_on_task( + async move { tx_.send(CommandEvent::Error(e.to_string())).await }, + ); + } + } + } +} + +fn read_line) -> CommandEvent + Send + Copy + 'static>( + mut reader: BufReader, + tx: Sender, + wrapper: F, +) { + loop { + let mut buf = Vec::new(); + match tauri::utils::io::read_line(&mut reader, &mut buf) { + Ok(n) => { + if n == 0 { + break; + } + let tx_ = tx.clone(); + let _ = block_on_task(async move { tx_.send(wrapper(buf)).await }); + } + Err(e) => { + let tx_ = tx.clone(); + let _ = block_on_task( + async move { tx_.send(CommandEvent::Error(e.to_string())).await }, + ); + break; + } + } + } +} + fn spawn_pipe_reader) -> CommandEvent + Send + Copy + 'static>( tx: Sender, guard: Arc>, pipe_reader: PipeReader, wrapper: F, + raw_out: bool, ) { spawn(move || { let _lock = guard.read().unwrap(); - let mut reader = BufReader::new(pipe_reader); - - loop { - let mut buf = Vec::new(); - match tauri::utils::io::read_line(&mut reader, &mut buf) { - Ok(n) => { - if n == 0 { - break; - } - let tx_ = tx.clone(); - let _ = block_on_task(async move { tx_.send(wrapper(buf)).await }); - } - Err(e) => { - let tx_ = tx.clone(); - let _ = - block_on_task( - async move { tx_.send(CommandEvent::Error(e.to_string())).await }, - ); - break; - } - } + let reader = BufReader::new(pipe_reader); + + if raw_out { + read_raw_bytes(reader, tx, wrapper); + } else { + read_line(reader, tx, wrapper); } }); }