feat(shell) raw-encoded pipe reader directly outputs buffer (no newline scan) (#1231)

* Shell raw-encoded pipe reader directly outputs buffer (no newline scan)

* Suggestions from code review and add .changes file

* fmt
pull/1283/head
Graham Held 1 year ago committed by GitHub
parent aea748ced7
commit b4efa58d5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,6 @@
---
"shell": patch
---
When the "raw" encoding option is specified for a shell process, all bytes from the child's output streams are passed to the data handlers.
This makes it possible to read output from programs that write unencoded byte streams to stdout (like ffmpeg)

@ -154,7 +154,10 @@ pub fn execute<R: Runtime>(
let encoding = match options.encoding {
Option::None => EncodingWrapper::Text(None),
Some(encoding) => match encoding.as_str() {
"raw" => EncodingWrapper::Raw,
"raw" => {
command = command.set_raw_out(true);
EncodingWrapper::Raw
}
_ => {
if let Some(text_encoding) = Encoding::for_label(encoding.as_bytes()) {
EncodingWrapper::Text(Some(text_encoding))

@ -4,7 +4,7 @@
use std::{
ffi::OsStr,
io::{BufReader, Write},
io::{BufRead, BufReader, Write},
path::{Path, PathBuf},
process::{Command as StdCommand, Stdio},
sync::{Arc, RwLock},
@ -41,11 +41,13 @@ pub struct TerminatedPayload {
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum CommandEvent {
/// Stderr bytes until a newline (\n) or carriage return (\r) is found.
/// If configured for raw output, all bytes written to stderr.
/// Otherwise, bytes until a newline (\n) or carriage return (\r) is found.
Stderr(Vec<u8>),
/// Stdout bytes until a newline (\n) or carriage return (\r) is found.
/// If configured for raw output, all bytes written to stdout.
/// Otherwise, bytes until a newline (\n) or carriage return (\r) is found.
Stdout(Vec<u8>),
/// An error happened waiting for the command to finish or converting the stdout/stderr bytes to an UTF-8 string.
/// An error happened waiting for the command to finish or converting the stdout/stderr bytes to a UTF-8 string.
Error(String),
/// Command process terminated.
Terminated(TerminatedPayload),
@ -53,7 +55,10 @@ pub enum CommandEvent {
/// The type to spawn commands.
#[derive(Debug)]
pub struct Command(StdCommand);
pub struct Command {
cmd: StdCommand,
raw_out: bool,
}
/// Spawned child process.
#[derive(Debug)]
@ -122,7 +127,7 @@ fn relative_command_path(command: &Path) -> crate::Result<PathBuf> {
impl From<Command> for StdCommand {
fn from(cmd: Command) -> StdCommand {
cmd.0
cmd.cmd
}
}
@ -136,7 +141,10 @@ impl Command {
#[cfg(windows)]
command.creation_flags(CREATE_NO_WINDOW);
Self(command)
Self {
cmd: command,
raw_out: false,
}
}
pub(crate) fn new_sidecar<S: AsRef<Path>>(program: S) -> crate::Result<Self> {
@ -146,7 +154,7 @@ impl Command {
/// Appends an argument to the command.
#[must_use]
pub fn arg<S: AsRef<OsStr>>(mut self, arg: S) -> Self {
self.0.arg(arg);
self.cmd.arg(arg);
self
}
@ -157,14 +165,14 @@ impl Command {
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
self.0.args(args);
self.cmd.args(args);
self
}
/// Clears the entire environment map for the child process.
#[must_use]
pub fn env_clear(mut self) -> Self {
self.0.env_clear();
self.cmd.env_clear();
self
}
@ -175,7 +183,7 @@ impl Command {
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
self.0.env(key, value);
self.cmd.env(key, value);
self
}
@ -187,14 +195,20 @@ impl Command {
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
self.0.envs(envs);
self.cmd.envs(envs);
self
}
/// Sets the working directory for the child process.
#[must_use]
pub fn current_dir<P: AsRef<Path>>(mut self, current_dir: P) -> Self {
self.0.current_dir(current_dir);
self.cmd.current_dir(current_dir);
self
}
/// Configures the reader to output bytes from the child process exactly as received
pub fn set_raw_out(mut self, raw_out: bool) -> Self {
self.raw_out = raw_out;
self
}
@ -229,6 +243,7 @@ impl Command {
/// });
/// ```
pub fn spawn(self) -> crate::Result<(Receiver<CommandEvent>, CommandChild)> {
let raw = self.raw_out;
let mut command: StdCommand = self.into();
let (stdout_reader, stdout_writer) = pipe()?;
let (stderr_reader, stderr_writer) = pipe()?;
@ -249,12 +264,14 @@ impl Command {
guard.clone(),
stdout_reader,
CommandEvent::Stdout,
raw,
);
spawn_pipe_reader(
tx.clone(),
guard.clone(),
stderr_reader,
CommandEvent::Stderr,
raw,
);
spawn(move || {
@ -359,35 +376,74 @@ impl Command {
}
}
fn read_raw_bytes<F: Fn(Vec<u8>) -> CommandEvent + Send + Copy + 'static>(
mut reader: BufReader<PipeReader>,
tx: Sender<CommandEvent>,
wrapper: F,
) {
loop {
let result = reader.fill_buf();
match result {
Ok(buf) => {
let length = buf.len();
if length == 0 {
break;
}
let tx_ = tx.clone();
let _ = block_on_task(async move { tx_.send(wrapper(buf.to_vec())).await });
reader.consume(length);
}
Err(e) => {
let tx_ = tx.clone();
let _ = block_on_task(
async move { tx_.send(CommandEvent::Error(e.to_string())).await },
);
}
}
}
}
fn read_line<F: Fn(Vec<u8>) -> CommandEvent + Send + Copy + 'static>(
mut reader: BufReader<PipeReader>,
tx: Sender<CommandEvent>,
wrapper: F,
) {
loop {
let mut buf = Vec::new();
match tauri::utils::io::read_line(&mut reader, &mut buf) {
Ok(n) => {
if n == 0 {
break;
}
let tx_ = tx.clone();
let _ = block_on_task(async move { tx_.send(wrapper(buf)).await });
}
Err(e) => {
let tx_ = tx.clone();
let _ = block_on_task(
async move { tx_.send(CommandEvent::Error(e.to_string())).await },
);
break;
}
}
}
}
fn spawn_pipe_reader<F: Fn(Vec<u8>) -> CommandEvent + Send + Copy + 'static>(
tx: Sender<CommandEvent>,
guard: Arc<RwLock<()>>,
pipe_reader: PipeReader,
wrapper: F,
raw_out: bool,
) {
spawn(move || {
let _lock = guard.read().unwrap();
let mut reader = BufReader::new(pipe_reader);
loop {
let mut buf = Vec::new();
match tauri::utils::io::read_line(&mut reader, &mut buf) {
Ok(n) => {
if n == 0 {
break;
}
let tx_ = tx.clone();
let _ = block_on_task(async move { tx_.send(wrapper(buf)).await });
}
Err(e) => {
let tx_ = tx.clone();
let _ =
block_on_task(
async move { tx_.send(CommandEvent::Error(e.to_string())).await },
);
break;
}
}
let reader = BufReader::new(pipe_reader);
if raw_out {
read_raw_bytes(reader, tx, wrapper);
} else {
read_line(reader, tx, wrapper);
}
});
}

Loading…
Cancel
Save