diff --git a/.changes/http-stream-support.md b/.changes/http-stream-support.md new file mode 100644 index 00000000..8c33f646 --- /dev/null +++ b/.changes/http-stream-support.md @@ -0,0 +1,6 @@ +--- +"http": patch +"http-js": patch +--- + +Fix `fetch` blocking until the whole response is read even if it was a streaming response. diff --git a/plugins/http/api-iife.js b/plugins/http/api-iife.js index bb2cb09b..07653dee 100644 --- a/plugins/http/api-iife.js +++ b/plugins/http/api-iife.js @@ -1 +1 @@ -if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";function t(e,t,r,n){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot read private member from an object whose class did not declare it");return"m"===r?n:"a"===r?n.call(e):n?n.value:t.get(e)}function r(e,t,r,n,s){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot write private member to an object whose class did not declare it");return t.set(e,r),r}var n,s,a;"function"==typeof SuppressedError&&SuppressedError;const i="__TAURI_TO_IPC_KEY__";class o{constructor(){this.__TAURI_CHANNEL_MARKER__=!0,n.set(this,(()=>{})),s.set(this,0),a.set(this,[]),this.id=function(e,t=!1){return window.__TAURI_INTERNALS__.transformCallback(e,t)}((({message:e,id:i})=>{if(i==t(this,s,"f"))for(t(this,n,"f").call(this,e),r(this,s,t(this,s,"f")+1);t(this,s,"f")in t(this,a,"f");){const e=t(this,a,"f")[t(this,s,"f")];t(this,n,"f").call(this,e),delete t(this,a,"f")[t(this,s,"f")],r(this,s,t(this,s,"f")+1)}else t(this,a,"f")[i]=e}))}set onmessage(e){r(this,n,e)}get onmessage(){return t(this,n,"f")}[(n=new WeakMap,s=new WeakMap,a=new WeakMap,i)](){return`__CHANNEL__:${this.id}`}toJSON(){return this[i]()}}async function c(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}const d="Request canceled";return e.fetch=async function(e,t){const r=t?.signal;if(r?.aborted)throw new Error(d);const n=t?.maxRedirections,s=t?.connectTimeout,a=t?.proxy,i=t?.danger;t&&(delete t.maxRedirections,delete t.connectTimeout,delete t.proxy,delete t.danger);const f=t?.headers?t.headers instanceof Headers?t.headers:new Headers(t.headers):new Headers,h=new Request(e,t),_=await h.arrayBuffer(),u=0!==_.byteLength?Array.from(new Uint8Array(_)):null;for(const[e,t]of h.headers)f.get(e)||f.set(e,t);const l=(f instanceof Headers?Array.from(f.entries()):Array.isArray(f)?f:Object.entries(f)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(r?.aborted)throw new Error(d);const w=new o,p=new ReadableStream({start:e=>{w.onmessage=t=>{if(r?.aborted)return e.error(d),void e.close();(t instanceof ArrayBuffer?0!=t.byteLength:0!=t.length)?e.enqueue(new Uint8Array(t)):e.close()}}}),m=await c("plugin:http|fetch",{clientConfig:{method:h.method,url:h.url,headers:l,data:u,maxRedirections:n,connectTimeout:s,proxy:a,danger:i},streamChannel:w}),y=()=>c("plugin:http|fetch_cancel",{rid:m});if(r?.aborted)throw y(),new Error(d);r?.addEventListener("abort",(()=>{y()}));const{status:T,statusText:g,url:A,headers:R}=await c("plugin:http|fetch_send",{rid:m}),b=new Response(p,{status:T,statusText:g});return Object.defineProperty(b,"url",{value:A}),Object.defineProperty(b,"headers",{value:new Headers(R)}),b},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})} +if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";function t(e,t,r,n){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot read private member from an object whose class did not declare it");return"m"===r?n:"a"===r?n.call(e):n?n.value:t.get(e)}function r(e,t,r,n,s){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot write private member to an object whose class did not declare it");return t.set(e,r),r}var n,s,a;"function"==typeof SuppressedError&&SuppressedError;const i="__TAURI_TO_IPC_KEY__";class o{constructor(){this.__TAURI_CHANNEL_MARKER__=!0,n.set(this,(()=>{})),s.set(this,0),a.set(this,[]),this.id=function(e,t=!1){return window.__TAURI_INTERNALS__.transformCallback(e,t)}((({message:e,id:i})=>{if(i==t(this,s,"f"))for(t(this,n,"f").call(this,e),r(this,s,t(this,s,"f")+1);t(this,s,"f")in t(this,a,"f");){const e=t(this,a,"f")[t(this,s,"f")];t(this,n,"f").call(this,e),delete t(this,a,"f")[t(this,s,"f")],r(this,s,t(this,s,"f")+1)}else t(this,a,"f")[i]=e}))}set onmessage(e){r(this,n,e)}get onmessage(){return t(this,n,"f")}[(n=new WeakMap,s=new WeakMap,a=new WeakMap,i)](){return`__CHANNEL__:${this.id}`}toJSON(){return this[i]()}}async function c(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}const d="Request cancelled";return e.fetch=async function(e,t){const r=t?.signal;if(r?.aborted)throw new Error(d);const n=t?.maxRedirections,s=t?.connectTimeout,a=t?.proxy,i=t?.danger;t&&(delete t.maxRedirections,delete t.connectTimeout,delete t.proxy,delete t.danger);const h=t?.headers?t.headers instanceof Headers?t.headers:new Headers(t.headers):new Headers,f=new Request(e,t),_=await f.arrayBuffer(),u=0!==_.byteLength?Array.from(new Uint8Array(_)):null;for(const[e,t]of f.headers)h.get(e)||h.set(e,t);const l=(h instanceof Headers?Array.from(h.entries()):Array.isArray(h)?h:Object.entries(h)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(r?.aborted)throw new Error(d);const w=await c("plugin:http|fetch",{clientConfig:{method:f.method,url:f.url,headers:l,data:u,maxRedirections:n,connectTimeout:s,proxy:a,danger:i}}),p=()=>c("plugin:http|fetch_cancel",{rid:w});if(r?.aborted)throw p(),new Error(d);r?.addEventListener("abort",(()=>{p()}));const{status:y,statusText:m,url:T,headers:g,rid:A}=await c("plugin:http|fetch_send",{rid:w}),R=new ReadableStream({start:e=>{const t=new o;t.onmessage=t=>{r?.aborted?e.error(d):(t instanceof ArrayBuffer?0!=t.byteLength:0!=t.length)?e.enqueue(new Uint8Array(t)):e.close()},c("plugin:http|fetch_read_body",{rid:A,streamChannel:t}).catch((t=>{e.error(t)}))}}),b=new Response(R,{status:y,statusText:m});return Object.defineProperty(b,"url",{value:T}),Object.defineProperty(b,"headers",{value:new Headers(g)}),b},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})} diff --git a/plugins/http/guest-js/index.ts b/plugins/http/guest-js/index.ts index 2d86e9d8..4c33f072 100644 --- a/plugins/http/guest-js/index.ts +++ b/plugins/http/guest-js/index.ts @@ -106,7 +106,7 @@ export interface DangerousSettings { acceptInvalidHostnames?: boolean } -const ERROR_REQUEST_CANCELLED = 'Request canceled' +const ERROR_REQUEST_CANCELLED = 'Request cancelled' /** * Fetch a resource from the network. It returns a `Promise` that resolves to the @@ -186,35 +186,6 @@ export async function fetch( throw new Error(ERROR_REQUEST_CANCELLED) } - const streamChannel = new Channel() - - const readableStreamBody = new ReadableStream({ - start: (controller) => { - streamChannel.onmessage = (res: ArrayBuffer | number[]) => { - // close early if aborted - if (signal?.aborted) { - controller.error(ERROR_REQUEST_CANCELLED) - controller.close() - return - } - - // close when the signal to close (an empty chunk) - // is sent from the IPC. - if ( - res instanceof ArrayBuffer ? res.byteLength == 0 : res.length == 0 - ) { - controller.close() - return - } - - // the content conversion (like .text(), .json(), etc.) in Response - // must have Uint8Array as its content, else it will - // have untraceable error that's hard to debug. - controller.enqueue(new Uint8Array(res)) - } - } - }) - const rid = await invoke('plugin:http|fetch', { clientConfig: { method: req.method, @@ -225,8 +196,7 @@ export async function fetch( connectTimeout, proxy, danger - }, - streamChannel + } }) const abort = () => invoke('plugin:http|fetch_cancel', { rid }) @@ -253,11 +223,47 @@ export async function fetch( status, statusText, url, - headers: responseHeaders + headers: responseHeaders, + rid: responseRid } = await invoke('plugin:http|fetch_send', { rid }) + const readableStreamBody = new ReadableStream({ + start: (controller) => { + const streamChannel = new Channel() + streamChannel.onmessage = (res: ArrayBuffer | number[]) => { + // close early if aborted + if (signal?.aborted) { + controller.error(ERROR_REQUEST_CANCELLED) + return + } + + // close when the signal to close (an empty chunk) + // is sent from the IPC. + if ( + res instanceof ArrayBuffer ? res.byteLength == 0 : res.length == 0 + ) { + controller.close() + return + } + + // the content conversion (like .text(), .json(), etc.) in Response + // must have Uint8Array as its content, else it will + // have untraceable error that's hard to debug. + controller.enqueue(new Uint8Array(res)) + } + + // run a non-blocking body stream fetch + invoke('plugin:http|fetch_read_body', { + rid: responseRid, + streamChannel + }).catch((e) => { + controller.error(e) + }) + } + }) + const res = new Response(readableStreamBody, { status, statusText diff --git a/plugins/http/src/commands.rs b/plugins/http/src/commands.rs index 17e514e2..18953121 100644 --- a/plugins/http/src/commands.rs +++ b/plugins/http/src/commands.rs @@ -22,6 +22,9 @@ use crate::{ const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); +struct ReqwestResponse(reqwest::Response); +impl tauri::Resource for ReqwestResponse {} + type CancelableResponseResult = Result; type CancelableResponseFuture = Pin + Send + Sync>>; @@ -178,7 +181,6 @@ pub async fn fetch( client_config: ClientConfig, command_scope: CommandScope, global_scope: GlobalScope, - stream_channel: Channel, ) -> crate::Result { let ClientConfig { method, @@ -312,20 +314,7 @@ pub async fn fetch( #[cfg(feature = "tracing")] tracing::trace!("{:?}", request); - let fut = async move { - let mut res = request.send().await?; - - // send response through IPC channel - while let Some(chunk) = res.chunk().await? { - stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(chunk.to_vec()))?; - } - - // send empty vector when done - stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(Vec::new()))?; - - // return that response - Ok(res) - }; + let fut = async move { request.send().await.map_err(Into::into) }; let mut resources_table = webview.resources_table(); let rid = resources_table.add_request(Box::pin(fut)); @@ -370,7 +359,7 @@ pub fn fetch_cancel(webview: Webview, rid: ResourceId) -> crate:: Ok(()) } -#[tauri::command] +#[command] pub async fn fetch_send( webview: Webview, rid: ResourceId, @@ -410,6 +399,9 @@ pub async fn fetch_send( )); } + let mut resources_table = webview.resources_table(); + let rid = resources_table.add(ReqwestResponse(res)); + Ok(FetchResponse { status: status.as_u16(), status_text: status.canonical_reason().unwrap_or_default().to_string(), @@ -419,6 +411,30 @@ pub async fn fetch_send( }) } +#[command] +pub async fn fetch_read_body( + webview: Webview, + rid: ResourceId, + stream_channel: Channel, +) -> crate::Result<()> { + let res = { + let mut resources_table = webview.resources_table(); + resources_table.take::(rid)? + }; + + let mut res = Arc::into_inner(res).unwrap().0; + + // send response through IPC channel + while let Some(chunk) = res.chunk().await? { + stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(chunk.to_vec()))?; + } + + // send empty vector when done + stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(Vec::new()))?; + + Ok(()) +} + // forbidden headers per fetch spec https://fetch.spec.whatwg.org/#terminology-headers #[cfg(not(feature = "unsafe-headers"))] fn is_unsafe_header(header: &HeaderName) -> bool { diff --git a/plugins/http/src/lib.rs b/plugins/http/src/lib.rs index 4e11e561..65829e09 100644 --- a/plugins/http/src/lib.rs +++ b/plugins/http/src/lib.rs @@ -36,7 +36,8 @@ pub fn init() -> TauriPlugin { .invoke_handler(tauri::generate_handler![ commands::fetch, commands::fetch_cancel, - commands::fetch_send + commands::fetch_send, + commands::fetch_read_body ]) .build() } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 88803d24..1e10af4f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -4209,4 +4209,4 @@ snapshots: zod@3.24.2: {} - zwitch@1.0.5: {} + zwitch@1.0.5: {} \ No newline at end of file