fix(http): fixes blocking body fetch function

pull/2522/head
adrieljss 4 months ago
parent 194fdeed96
commit 86ad81fb5b
No known key found for this signature in database
GPG Key ID: 849F13CBD0B4AD05

@ -3,4 +3,4 @@
"http-js": minor "http-js": minor
--- ---
Add stream support for HTTP stream responses. Fix HTTP stream not streaming the body.

@ -1 +1 @@
if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";function t(e,t,r,n){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot read private member from an object whose class did not declare it");return"m"===r?n:"a"===r?n.call(e):n?n.value:t.get(e)}function r(e,t,r,n,s){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot write private member to an object whose class did not declare it");return t.set(e,r),r}var n,s,a;"function"==typeof SuppressedError&&SuppressedError;const i="__TAURI_TO_IPC_KEY__";class o{constructor(){this.__TAURI_CHANNEL_MARKER__=!0,n.set(this,(()=>{})),s.set(this,0),a.set(this,[]),this.id=function(e,t=!1){return window.__TAURI_INTERNALS__.transformCallback(e,t)}((({message:e,id:i})=>{if(i==t(this,s,"f"))for(t(this,n,"f").call(this,e),r(this,s,t(this,s,"f")+1);t(this,s,"f")in t(this,a,"f");){const e=t(this,a,"f")[t(this,s,"f")];t(this,n,"f").call(this,e),delete t(this,a,"f")[t(this,s,"f")],r(this,s,t(this,s,"f")+1)}else t(this,a,"f")[i]=e}))}set onmessage(e){r(this,n,e)}get onmessage(){return t(this,n,"f")}[(n=new WeakMap,s=new WeakMap,a=new WeakMap,i)](){return`__CHANNEL__:${this.id}`}toJSON(){return this[i]()}}async function c(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}const d="Request canceled";return e.fetch=async function(e,t){const r=t?.signal;if(r?.aborted)throw new Error(d);const n=t?.maxRedirections,s=t?.connectTimeout,a=t?.proxy,i=t?.danger;t&&(delete t.maxRedirections,delete t.connectTimeout,delete t.proxy,delete t.danger);const f=t?.headers?t.headers instanceof Headers?t.headers:new Headers(t.headers):new Headers,h=new Request(e,t),_=await h.arrayBuffer(),u=0!==_.byteLength?Array.from(new Uint8Array(_)):null;for(const[e,t]of h.headers)f.get(e)||f.set(e,t);const l=(f instanceof Headers?Array.from(f.entries()):Array.isArray(f)?f:Object.entries(f)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(r?.aborted)throw new Error(d);const w=new o,p=new ReadableStream({start:e=>{w.onmessage=t=>{if(r?.aborted)return e.error(d),void e.close();(t instanceof ArrayBuffer?0!=t.byteLength:0!=t.length)?e.enqueue(new Uint8Array(t)):e.close()}}}),m=await c("plugin:http|fetch",{clientConfig:{method:h.method,url:h.url,headers:l,data:u,maxRedirections:n,connectTimeout:s,proxy:a,danger:i},streamChannel:w}),y=()=>c("plugin:http|fetch_cancel",{rid:m});if(r?.aborted)throw y(),new Error(d);r?.addEventListener("abort",(()=>{y()}));const{status:T,statusText:g,url:A,headers:R}=await c("plugin:http|fetch_send",{rid:m}),b=new Response(p,{status:T,statusText:g});return Object.defineProperty(b,"url",{value:A}),Object.defineProperty(b,"headers",{value:new Headers(R)}),b},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})} if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";function t(e,t,r,n){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot read private member from an object whose class did not declare it");return"m"===r?n:"a"===r?n.call(e):n?n.value:t.get(e)}function r(e,t,r,n,s){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot write private member to an object whose class did not declare it");return t.set(e,r),r}var n,s,a;"function"==typeof SuppressedError&&SuppressedError;const o="__TAURI_TO_IPC_KEY__";class i{constructor(){this.__TAURI_CHANNEL_MARKER__=!0,n.set(this,(()=>{})),s.set(this,0),a.set(this,[]),this.id=function(e,t=!1){return window.__TAURI_INTERNALS__.transformCallback(e,t)}((({message:e,id:o})=>{if(o==t(this,s,"f"))for(t(this,n,"f").call(this,e),r(this,s,t(this,s,"f")+1);t(this,s,"f")in t(this,a,"f");){const e=t(this,a,"f")[t(this,s,"f")];t(this,n,"f").call(this,e),delete t(this,a,"f")[t(this,s,"f")],r(this,s,t(this,s,"f")+1)}else t(this,a,"f")[o]=e}))}set onmessage(e){r(this,n,e)}get onmessage(){return t(this,n,"f")}[(n=new WeakMap,s=new WeakMap,a=new WeakMap,o)](){return`__CHANNEL__:${this.id}`}toJSON(){return this[o]()}}async function c(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}const d="Request canceled";return e.fetch=async function(e,t){const r=t?.signal;if(r?.aborted)throw new Error(d);const n=t?.maxRedirections,s=t?.connectTimeout,a=t?.proxy,o=t?.danger;t&&(delete t.maxRedirections,delete t.connectTimeout,delete t.proxy,delete t.danger);const h=t?.headers?t.headers instanceof Headers?t.headers:new Headers(t.headers):new Headers,f=new Request(e,t),_=await f.arrayBuffer(),u=0!==_.byteLength?Array.from(new Uint8Array(_)):null;for(const[e,t]of f.headers)h.get(e)||h.set(e,t);const l=(h instanceof Headers?Array.from(h.entries()):Array.isArray(h)?h:Object.entries(h)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(r?.aborted)throw new Error(d);const w=new ReadableStream({start:e=>{const t=new i;t.onmessage=t=>{if(r?.aborted)return e.error(d),void e.close();(t instanceof ArrayBuffer?0!=t.byteLength:0!=t.length)?e.enqueue(new Uint8Array(t)):e.close()},c("plugin:http|fetch_stream_body",{streamChannel:t}).catch((t=>{e.error(t),e.close()}))}}),p=await c("plugin:http|fetch",{clientConfig:{method:f.method,url:f.url,headers:l,data:u,maxRedirections:n,connectTimeout:s,proxy:a,danger:o}}),m=()=>c("plugin:http|fetch_cancel",{rid:p});if(r?.aborted)throw m(),new Error(d);r?.addEventListener("abort",(()=>{m()}));const{status:y,statusText:T,url:g,headers:A}=await c("plugin:http|fetch_send",{rid:p}),R=new Response(w,{status:y,statusText:T});return Object.defineProperty(R,"url",{value:g}),Object.defineProperty(R,"headers",{value:new Headers(A)}),R},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})}

@ -186,10 +186,9 @@ export async function fetch(
throw new Error(ERROR_REQUEST_CANCELLED) throw new Error(ERROR_REQUEST_CANCELLED)
} }
const streamChannel = new Channel<ArrayBuffer | number[]>()
const readableStreamBody = new ReadableStream({ const readableStreamBody = new ReadableStream({
start: (controller) => { start: (controller) => {
const streamChannel = new Channel<ArrayBuffer | number[]>()
streamChannel.onmessage = (res: ArrayBuffer | number[]) => { streamChannel.onmessage = (res: ArrayBuffer | number[]) => {
// close early if aborted // close early if aborted
if (signal?.aborted) { if (signal?.aborted) {
@ -212,6 +211,14 @@ export async function fetch(
// have untraceable error that's hard to debug. // have untraceable error that's hard to debug.
controller.enqueue(new Uint8Array(res)) controller.enqueue(new Uint8Array(res))
} }
// run a non-blocking body stream fetch
invoke('plugin:http|fetch_stream_body', {
streamChannel
}).catch((e) => {
controller.error(e)
controller.close()
})
} }
}) })
@ -225,8 +232,7 @@ export async function fetch(
connectTimeout, connectTimeout,
proxy, proxy,
danger danger
}, }
streamChannel
}) })
const abort = () => invoke('plugin:http|fetch_cancel', { rid }) const abort = () => invoke('plugin:http|fetch_cancel', { rid })

@ -24,6 +24,7 @@
"LICENSE" "LICENSE"
], ],
"dependencies": { "dependencies": {
"@tauri-apps/api": "^2.0.0" "@tauri-apps/api": "^2.0.0",
"@tauri-apps/plugin-http": "link:"
} }
} }

@ -22,6 +22,9 @@ use crate::{
const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
struct ReqwestResponse(reqwest::Response);
impl tauri::Resource for ReqwestResponse {}
type CancelableResponseResult = Result<reqwest::Response>; type CancelableResponseResult = Result<reqwest::Response>;
type CancelableResponseFuture = type CancelableResponseFuture =
Pin<Box<dyn Future<Output = CancelableResponseResult> + Send + Sync>>; Pin<Box<dyn Future<Output = CancelableResponseResult> + Send + Sync>>;
@ -178,7 +181,6 @@ pub async fn fetch<R: Runtime>(
client_config: ClientConfig, client_config: ClientConfig,
command_scope: CommandScope<Entry>, command_scope: CommandScope<Entry>,
global_scope: GlobalScope<Entry>, global_scope: GlobalScope<Entry>,
stream_channel: Channel<tauri::ipc::InvokeResponseBody>,
) -> crate::Result<ResourceId> { ) -> crate::Result<ResourceId> {
let ClientConfig { let ClientConfig {
method, method,
@ -312,20 +314,7 @@ pub async fn fetch<R: Runtime>(
#[cfg(feature = "tracing")] #[cfg(feature = "tracing")]
tracing::trace!("{:?}", request); tracing::trace!("{:?}", request);
let fut = async move { let fut = async move { request.send().await.map_err(Into::into) };
let mut res = request.send().await?;
// send response through IPC channel
while let Some(chunk) = res.chunk().await? {
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(chunk.to_vec()))?;
}
// send empty vector when done
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(Vec::new()))?;
// return that response
Ok(res)
};
let mut resources_table = webview.resources_table(); let mut resources_table = webview.resources_table();
let rid = resources_table.add_request(Box::pin(fut)); let rid = resources_table.add_request(Box::pin(fut));
@ -370,7 +359,31 @@ pub fn fetch_cancel<R: Runtime>(webview: Webview<R>, rid: ResourceId) -> crate::
Ok(()) Ok(())
} }
#[tauri::command] #[command]
pub async fn fetch_stream_body<R: Runtime>(
webview: Webview<R>,
rid: ResourceId,
stream_channel: Channel<tauri::ipc::InvokeResponseBody>,
) -> crate::Result<()> {
let res = {
let mut resources_table = webview.resources_table();
resources_table.take::<ReqwestResponse>(rid)?
};
let mut res = Arc::into_inner(res).unwrap().0;
// send response through IPC channel
while let Some(chunk) = res.chunk().await? {
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(chunk.to_vec()))?;
}
// send empty vector when done
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(Vec::new()))?;
Ok(())
}
#[command]
pub async fn fetch_send<R: Runtime>( pub async fn fetch_send<R: Runtime>(
webview: Webview<R>, webview: Webview<R>,
rid: ResourceId, rid: ResourceId,

@ -36,7 +36,8 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
.invoke_handler(tauri::generate_handler![ .invoke_handler(tauri::generate_handler![
commands::fetch, commands::fetch,
commands::fetch_cancel, commands::fetch_cancel,
commands::fetch_send commands::fetch_send,
commands::fetch_stream_body,
]) ])
.build() .build()
} }

@ -229,6 +229,9 @@ importers:
'@tauri-apps/api': '@tauri-apps/api':
specifier: ^2.0.0 specifier: ^2.0.0
version: 2.3.0 version: 2.3.0
'@tauri-apps/plugin-http':
specifier: 'link:'
version: 'link:'
plugins/log: plugins/log:
dependencies: dependencies:
@ -2283,9 +2286,9 @@ snapshots:
- encoding - encoding
- mocha - mocha
'@covector/assemble@0.12.0': '@covector/assemble@0.12.0(mocha@10.8.2)':
dependencies: dependencies:
'@covector/command': 0.8.0 '@covector/command': 0.8.0(mocha@10.8.2)
'@covector/files': 0.8.0 '@covector/files': 0.8.0
effection: 2.0.8(mocha@10.8.2) effection: 2.0.8(mocha@10.8.2)
js-yaml: 4.1.0 js-yaml: 4.1.0
@ -2296,9 +2299,10 @@ snapshots:
unified: 9.2.2 unified: 9.2.2
transitivePeerDependencies: transitivePeerDependencies:
- encoding - encoding
- mocha
- supports-color - supports-color
'@covector/changelog@0.12.0': '@covector/changelog@0.12.0(mocha@10.8.2)':
dependencies: dependencies:
'@covector/files': 0.8.0 '@covector/files': 0.8.0
effection: 2.0.8(mocha@10.8.2) effection: 2.0.8(mocha@10.8.2)
@ -2308,14 +2312,16 @@ snapshots:
unified: 9.2.2 unified: 9.2.2
transitivePeerDependencies: transitivePeerDependencies:
- encoding - encoding
- mocha
- supports-color - supports-color
'@covector/command@0.8.0': '@covector/command@0.8.0(mocha@10.8.2)':
dependencies: dependencies:
'@effection/process': 2.1.4 '@effection/process': 2.1.4(mocha@10.8.2)
effection: 2.0.8(mocha@10.8.2) effection: 2.0.8(mocha@10.8.2)
transitivePeerDependencies: transitivePeerDependencies:
- encoding - encoding
- mocha
'@covector/files@0.8.0': '@covector/files@0.8.0':
dependencies: dependencies:
@ -2362,10 +2368,8 @@ snapshots:
dependencies: dependencies:
effection: 2.0.8(mocha@10.8.2) effection: 2.0.8(mocha@10.8.2)
mocha: 10.8.2 mocha: 10.8.2
transitivePeerDependencies:
- encoding
'@effection/process@2.1.4': '@effection/process@2.1.4(mocha@10.8.2)':
dependencies: dependencies:
cross-spawn: 7.0.6 cross-spawn: 7.0.6
ctrlc-windows: 2.2.0 ctrlc-windows: 2.2.0
@ -2373,6 +2377,7 @@ snapshots:
shellwords: 0.1.1 shellwords: 0.1.1
transitivePeerDependencies: transitivePeerDependencies:
- encoding - encoding
- mocha
'@effection/stream@2.0.6': '@effection/stream@2.0.6':
dependencies: dependencies:
@ -3162,9 +3167,9 @@ snapshots:
dependencies: dependencies:
'@clack/prompts': 0.7.0 '@clack/prompts': 0.7.0
'@covector/apply': 0.10.0(mocha@10.8.2) '@covector/apply': 0.10.0(mocha@10.8.2)
'@covector/assemble': 0.12.0 '@covector/assemble': 0.12.0(mocha@10.8.2)
'@covector/changelog': 0.12.0 '@covector/changelog': 0.12.0(mocha@10.8.2)
'@covector/command': 0.8.0 '@covector/command': 0.8.0(mocha@10.8.2)
'@covector/files': 0.8.0 '@covector/files': 0.8.0
effection: 2.0.8(mocha@10.8.2) effection: 2.0.8(mocha@10.8.2)
globby: 11.1.0 globby: 11.1.0

Loading…
Cancel
Save