From cb38f54f4a4ef30995283cd82166c62da17bac44 Mon Sep 17 00:00:00 2001 From: Adriel Jansen Siahaya <166703058+adrieljss@users.noreply.github.com> Date: Sun, 9 Mar 2025 02:45:05 +0800 Subject: [PATCH] HTTP add stream support (#2479) * feat: add stream support * feat: add stream support * Revert "feat: add stream support" This reverts commit 5edea816802fa8637982db96505832abbc7c450f. * feat: add stream support * Discard changes to pnpm-lock.yaml * Discard changes to plugins/http/package.json * fix(stream): change IPC packet * fix: update stream message guest-js * fix: return early when aborted * fix: use InvokeResponseBody as packet * fix: remove serde_bytes * fix: remove reqwest response * fix: content conversion bug * fix: remove ReqwestResponses along with its implementations * formatting and update changelog * build api-iife.js --------- Co-authored-by: Fabian-Lars --- .changes/http-stream-support.md | 6 ++++ Cargo.lock | 10 +++--- plugins/http/api-iife.js | 2 +- plugins/http/guest-js/index.ts | 59 +++++++++++++++++++++------------ plugins/http/src/commands.rs | 38 ++++++++++----------- plugins/http/src/lib.rs | 3 +- 6 files changed, 67 insertions(+), 51 deletions(-) create mode 100644 .changes/http-stream-support.md diff --git a/.changes/http-stream-support.md b/.changes/http-stream-support.md new file mode 100644 index 00000000..d9e38a6b --- /dev/null +++ b/.changes/http-stream-support.md @@ -0,0 +1,6 @@ +--- +"http": minor +"http-js": minor +--- + +Add stream support for HTTP stream responses. diff --git a/Cargo.lock b/Cargo.lock index 146fdf44..4fde9953 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1865,7 +1865,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3373,7 +3373,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -4886,7 +4886,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -7142,7 +7142,7 @@ dependencies = [ "fastrand", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -8210,7 +8210,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/plugins/http/api-iife.js b/plugins/http/api-iife.js index 76b498ad..bb2cb09b 100644 --- a/plugins/http/api-iife.js +++ b/plugins/http/api-iife.js @@ -1 +1 @@ -if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";async function t(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}"function"==typeof SuppressedError&&SuppressedError;const r="Request canceled";return e.fetch=async function(e,n){const a=n?.signal;if(a?.aborted)throw new Error(r);const o=n?.maxRedirections,s=n?.connectTimeout,i=n?.proxy,d=n?.danger;n&&(delete n.maxRedirections,delete n.connectTimeout,delete n.proxy,delete n.danger);const c=n?.headers?n.headers instanceof Headers?n.headers:new Headers(n.headers):new Headers,u=new Request(e,n),f=await u.arrayBuffer(),_=0!==f.byteLength?Array.from(new Uint8Array(f)):null;for(const[e,t]of u.headers)c.get(e)||c.set(e,t);const h=(c instanceof Headers?Array.from(c.entries()):Array.isArray(c)?c:Object.entries(c)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(a?.aborted)throw new Error(r);const l=await t("plugin:http|fetch",{clientConfig:{method:u.method,url:u.url,headers:h,data:_,maxRedirections:o,connectTimeout:s,proxy:i,danger:d}}),p=()=>t("plugin:http|fetch_cancel",{rid:l});if(a?.aborted)throw p(),new Error(r);a?.addEventListener("abort",(()=>{p()}));const{status:w,statusText:y,url:g,headers:T,rid:A}=await t("plugin:http|fetch_send",{rid:l}),R=await t("plugin:http|fetch_read_body",{rid:A}),b=new Response(R instanceof ArrayBuffer&&0!==R.byteLength?R:R instanceof Array&&R.length>0?new Uint8Array(R):null,{status:w,statusText:y});return Object.defineProperty(b,"url",{value:g}),Object.defineProperty(b,"headers",{value:new Headers(T)}),b},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})} +if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";function t(e,t,r,n){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot read private member from an object whose class did not declare it");return"m"===r?n:"a"===r?n.call(e):n?n.value:t.get(e)}function r(e,t,r,n,s){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot write private member to an object whose class did not declare it");return t.set(e,r),r}var n,s,a;"function"==typeof SuppressedError&&SuppressedError;const i="__TAURI_TO_IPC_KEY__";class o{constructor(){this.__TAURI_CHANNEL_MARKER__=!0,n.set(this,(()=>{})),s.set(this,0),a.set(this,[]),this.id=function(e,t=!1){return window.__TAURI_INTERNALS__.transformCallback(e,t)}((({message:e,id:i})=>{if(i==t(this,s,"f"))for(t(this,n,"f").call(this,e),r(this,s,t(this,s,"f")+1);t(this,s,"f")in t(this,a,"f");){const e=t(this,a,"f")[t(this,s,"f")];t(this,n,"f").call(this,e),delete t(this,a,"f")[t(this,s,"f")],r(this,s,t(this,s,"f")+1)}else t(this,a,"f")[i]=e}))}set onmessage(e){r(this,n,e)}get onmessage(){return t(this,n,"f")}[(n=new WeakMap,s=new WeakMap,a=new WeakMap,i)](){return`__CHANNEL__:${this.id}`}toJSON(){return this[i]()}}async function c(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}const d="Request canceled";return e.fetch=async function(e,t){const r=t?.signal;if(r?.aborted)throw new Error(d);const n=t?.maxRedirections,s=t?.connectTimeout,a=t?.proxy,i=t?.danger;t&&(delete t.maxRedirections,delete t.connectTimeout,delete t.proxy,delete t.danger);const f=t?.headers?t.headers instanceof Headers?t.headers:new Headers(t.headers):new Headers,h=new Request(e,t),_=await h.arrayBuffer(),u=0!==_.byteLength?Array.from(new Uint8Array(_)):null;for(const[e,t]of h.headers)f.get(e)||f.set(e,t);const l=(f instanceof Headers?Array.from(f.entries()):Array.isArray(f)?f:Object.entries(f)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(r?.aborted)throw new Error(d);const w=new o,p=new ReadableStream({start:e=>{w.onmessage=t=>{if(r?.aborted)return e.error(d),void e.close();(t instanceof ArrayBuffer?0!=t.byteLength:0!=t.length)?e.enqueue(new Uint8Array(t)):e.close()}}}),m=await c("plugin:http|fetch",{clientConfig:{method:h.method,url:h.url,headers:l,data:u,maxRedirections:n,connectTimeout:s,proxy:a,danger:i},streamChannel:w}),y=()=>c("plugin:http|fetch_cancel",{rid:m});if(r?.aborted)throw y(),new Error(d);r?.addEventListener("abort",(()=>{y()}));const{status:T,statusText:g,url:A,headers:R}=await c("plugin:http|fetch_send",{rid:m}),b=new Response(p,{status:T,statusText:g});return Object.defineProperty(b,"url",{value:A}),Object.defineProperty(b,"headers",{value:new Headers(R)}),b},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})} diff --git a/plugins/http/guest-js/index.ts b/plugins/http/guest-js/index.ts index bea18e44..2d86e9d8 100644 --- a/plugins/http/guest-js/index.ts +++ b/plugins/http/guest-js/index.ts @@ -26,7 +26,7 @@ * @module */ -import { invoke } from '@tauri-apps/api/core' +import { Channel, invoke } from '@tauri-apps/api/core' /** * Configuration of a proxy that a Client should pass requests to. @@ -186,6 +186,35 @@ export async function fetch( throw new Error(ERROR_REQUEST_CANCELLED) } + const streamChannel = new Channel() + + const readableStreamBody = new ReadableStream({ + start: (controller) => { + streamChannel.onmessage = (res: ArrayBuffer | number[]) => { + // close early if aborted + if (signal?.aborted) { + controller.error(ERROR_REQUEST_CANCELLED) + controller.close() + return + } + + // close when the signal to close (an empty chunk) + // is sent from the IPC. + if ( + res instanceof ArrayBuffer ? res.byteLength == 0 : res.length == 0 + ) { + controller.close() + return + } + + // the content conversion (like .text(), .json(), etc.) in Response + // must have Uint8Array as its content, else it will + // have untraceable error that's hard to debug. + controller.enqueue(new Uint8Array(res)) + } + } + }) + const rid = await invoke('plugin:http|fetch', { clientConfig: { method: req.method, @@ -196,7 +225,8 @@ export async function fetch( connectTimeout, proxy, danger - } + }, + streamChannel }) const abort = () => invoke('plugin:http|fetch_cancel', { rid }) @@ -223,30 +253,15 @@ export async function fetch( status, statusText, url, - headers: responseHeaders, - rid: responseRid + headers: responseHeaders } = await invoke('plugin:http|fetch_send', { rid }) - const body = await invoke( - 'plugin:http|fetch_read_body', - { - rid: responseRid - } - ) - - const res = new Response( - body instanceof ArrayBuffer && body.byteLength !== 0 - ? body - : body instanceof Array && body.length > 0 - ? new Uint8Array(body) - : null, - { - status, - statusText - } - ) + const res = new Response(readableStreamBody, { + status, + statusText + }) // url and headers are read only properties // but seems like we can set them like this diff --git a/plugins/http/src/commands.rs b/plugins/http/src/commands.rs index 3dc0297e..17e514e2 100644 --- a/plugins/http/src/commands.rs +++ b/plugins/http/src/commands.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use tauri::{ async_runtime::Mutex, command, - ipc::{CommandScope, GlobalScope}, + ipc::{Channel, CommandScope, GlobalScope}, Manager, ResourceId, ResourceTable, Runtime, State, Webview, }; use tokio::sync::oneshot::{channel, Receiver, Sender}; @@ -22,9 +22,6 @@ use crate::{ const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); -struct ReqwestResponse(reqwest::Response); -impl tauri::Resource for ReqwestResponse {} - type CancelableResponseResult = Result; type CancelableResponseFuture = Pin + Send + Sync>>; @@ -181,6 +178,7 @@ pub async fn fetch( client_config: ClientConfig, command_scope: CommandScope, global_scope: GlobalScope, + stream_channel: Channel, ) -> crate::Result { let ClientConfig { method, @@ -314,7 +312,21 @@ pub async fn fetch( #[cfg(feature = "tracing")] tracing::trace!("{:?}", request); - let fut = async move { request.send().await.map_err(Into::into) }; + let fut = async move { + let mut res = request.send().await?; + + // send response through IPC channel + while let Some(chunk) = res.chunk().await? { + stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(chunk.to_vec()))?; + } + + // send empty vector when done + stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(Vec::new()))?; + + // return that response + Ok(res) + }; + let mut resources_table = webview.resources_table(); let rid = resources_table.add_request(Box::pin(fut)); @@ -398,9 +410,6 @@ pub async fn fetch_send( )); } - let mut resources_table = webview.resources_table(); - let rid = resources_table.add(ReqwestResponse(res)); - Ok(FetchResponse { status: status.as_u16(), status_text: status.canonical_reason().unwrap_or_default().to_string(), @@ -410,19 +419,6 @@ pub async fn fetch_send( }) } -#[tauri::command] -pub(crate) async fn fetch_read_body( - webview: Webview, - rid: ResourceId, -) -> crate::Result { - let res = { - let mut resources_table = webview.resources_table(); - resources_table.take::(rid)? - }; - let res = Arc::into_inner(res).unwrap().0; - Ok(tauri::ipc::Response::new(res.bytes().await?.to_vec())) -} - // forbidden headers per fetch spec https://fetch.spec.whatwg.org/#terminology-headers #[cfg(not(feature = "unsafe-headers"))] fn is_unsafe_header(header: &HeaderName) -> bool { diff --git a/plugins/http/src/lib.rs b/plugins/http/src/lib.rs index d775760c..4e11e561 100644 --- a/plugins/http/src/lib.rs +++ b/plugins/http/src/lib.rs @@ -36,8 +36,7 @@ pub fn init() -> TauriPlugin { .invoke_handler(tauri::generate_handler![ commands::fetch, commands::fetch_cancel, - commands::fetch_send, - commands::fetch_read_body, + commands::fetch_send ]) .build() }