HTTP add stream support (#2479)

* feat: add stream support

* feat: add stream support

* Revert "feat: add stream support"

This reverts commit 5edea81680.

* feat: add stream support

* Discard changes to pnpm-lock.yaml

* Discard changes to plugins/http/package.json

* fix(stream): change IPC packet

* fix: update stream message guest-js

* fix: return early when aborted

* fix: use InvokeResponseBody as packet

* fix: remove serde_bytes

* fix: remove reqwest response

* fix: content conversion bug

* fix: remove ReqwestResponses along with its implementations

* formatting and update changelog

* build api-iife.js

---------

Co-authored-by: Fabian-Lars <github@fabianlars.de>
pull/2503/head
Adriel Jansen Siahaya 3 months ago committed by GitHub
parent d37bbdef8d
commit cb38f54f4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,6 @@
---
"http": minor
"http-js": minor
---
Add stream support for HTTP stream responses.

10
Cargo.lock generated

@ -1865,7 +1865,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys 0.52.0", "windows-sys 0.59.0",
] ]
[[package]] [[package]]
@ -3373,7 +3373,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"windows-targets 0.48.5", "windows-targets 0.52.6",
] ]
[[package]] [[package]]
@ -4886,7 +4886,7 @@ dependencies = [
"once_cell", "once_cell",
"socket2", "socket2",
"tracing", "tracing",
"windows-sys 0.52.0", "windows-sys 0.59.0",
] ]
[[package]] [[package]]
@ -7142,7 +7142,7 @@ dependencies = [
"fastrand", "fastrand",
"once_cell", "once_cell",
"rustix", "rustix",
"windows-sys 0.52.0", "windows-sys 0.59.0",
] ]
[[package]] [[package]]
@ -8210,7 +8210,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.59.0",
] ]
[[package]] [[package]]

@ -1 +1 @@
if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";async function t(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}"function"==typeof SuppressedError&&SuppressedError;const r="Request canceled";return e.fetch=async function(e,n){const a=n?.signal;if(a?.aborted)throw new Error(r);const o=n?.maxRedirections,s=n?.connectTimeout,i=n?.proxy,d=n?.danger;n&&(delete n.maxRedirections,delete n.connectTimeout,delete n.proxy,delete n.danger);const c=n?.headers?n.headers instanceof Headers?n.headers:new Headers(n.headers):new Headers,u=new Request(e,n),f=await u.arrayBuffer(),_=0!==f.byteLength?Array.from(new Uint8Array(f)):null;for(const[e,t]of u.headers)c.get(e)||c.set(e,t);const h=(c instanceof Headers?Array.from(c.entries()):Array.isArray(c)?c:Object.entries(c)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(a?.aborted)throw new Error(r);const l=await t("plugin:http|fetch",{clientConfig:{method:u.method,url:u.url,headers:h,data:_,maxRedirections:o,connectTimeout:s,proxy:i,danger:d}}),p=()=>t("plugin:http|fetch_cancel",{rid:l});if(a?.aborted)throw p(),new Error(r);a?.addEventListener("abort",(()=>{p()}));const{status:w,statusText:y,url:g,headers:T,rid:A}=await t("plugin:http|fetch_send",{rid:l}),R=await t("plugin:http|fetch_read_body",{rid:A}),b=new Response(R instanceof ArrayBuffer&&0!==R.byteLength?R:R instanceof Array&&R.length>0?new Uint8Array(R):null,{status:w,statusText:y});return Object.defineProperty(b,"url",{value:g}),Object.defineProperty(b,"headers",{value:new Headers(T)}),b},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})} if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";function t(e,t,r,n){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot read private member from an object whose class did not declare it");return"m"===r?n:"a"===r?n.call(e):n?n.value:t.get(e)}function r(e,t,r,n,s){if("function"==typeof t||!t.has(e))throw new TypeError("Cannot write private member to an object whose class did not declare it");return t.set(e,r),r}var n,s,a;"function"==typeof SuppressedError&&SuppressedError;const i="__TAURI_TO_IPC_KEY__";class o{constructor(){this.__TAURI_CHANNEL_MARKER__=!0,n.set(this,(()=>{})),s.set(this,0),a.set(this,[]),this.id=function(e,t=!1){return window.__TAURI_INTERNALS__.transformCallback(e,t)}((({message:e,id:i})=>{if(i==t(this,s,"f"))for(t(this,n,"f").call(this,e),r(this,s,t(this,s,"f")+1);t(this,s,"f")in t(this,a,"f");){const e=t(this,a,"f")[t(this,s,"f")];t(this,n,"f").call(this,e),delete t(this,a,"f")[t(this,s,"f")],r(this,s,t(this,s,"f")+1)}else t(this,a,"f")[i]=e}))}set onmessage(e){r(this,n,e)}get onmessage(){return t(this,n,"f")}[(n=new WeakMap,s=new WeakMap,a=new WeakMap,i)](){return`__CHANNEL__:${this.id}`}toJSON(){return this[i]()}}async function c(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}const d="Request canceled";return e.fetch=async function(e,t){const r=t?.signal;if(r?.aborted)throw new Error(d);const n=t?.maxRedirections,s=t?.connectTimeout,a=t?.proxy,i=t?.danger;t&&(delete t.maxRedirections,delete t.connectTimeout,delete t.proxy,delete t.danger);const f=t?.headers?t.headers instanceof Headers?t.headers:new Headers(t.headers):new Headers,h=new Request(e,t),_=await h.arrayBuffer(),u=0!==_.byteLength?Array.from(new Uint8Array(_)):null;for(const[e,t]of h.headers)f.get(e)||f.set(e,t);const l=(f instanceof Headers?Array.from(f.entries()):Array.isArray(f)?f:Object.entries(f)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(r?.aborted)throw new Error(d);const w=new o,p=new ReadableStream({start:e=>{w.onmessage=t=>{if(r?.aborted)return e.error(d),void e.close();(t instanceof ArrayBuffer?0!=t.byteLength:0!=t.length)?e.enqueue(new Uint8Array(t)):e.close()}}}),m=await c("plugin:http|fetch",{clientConfig:{method:h.method,url:h.url,headers:l,data:u,maxRedirections:n,connectTimeout:s,proxy:a,danger:i},streamChannel:w}),y=()=>c("plugin:http|fetch_cancel",{rid:m});if(r?.aborted)throw y(),new Error(d);r?.addEventListener("abort",(()=>{y()}));const{status:T,statusText:g,url:A,headers:R}=await c("plugin:http|fetch_send",{rid:m}),b=new Response(p,{status:T,statusText:g});return Object.defineProperty(b,"url",{value:A}),Object.defineProperty(b,"headers",{value:new Headers(R)}),b},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})}

@ -26,7 +26,7 @@
* @module * @module
*/ */
import { invoke } from '@tauri-apps/api/core' import { Channel, invoke } from '@tauri-apps/api/core'
/** /**
* Configuration of a proxy that a Client should pass requests to. * Configuration of a proxy that a Client should pass requests to.
@ -186,6 +186,35 @@ export async function fetch(
throw new Error(ERROR_REQUEST_CANCELLED) throw new Error(ERROR_REQUEST_CANCELLED)
} }
const streamChannel = new Channel<ArrayBuffer | number[]>()
const readableStreamBody = new ReadableStream({
start: (controller) => {
streamChannel.onmessage = (res: ArrayBuffer | number[]) => {
// close early if aborted
if (signal?.aborted) {
controller.error(ERROR_REQUEST_CANCELLED)
controller.close()
return
}
// close when the signal to close (an empty chunk)
// is sent from the IPC.
if (
res instanceof ArrayBuffer ? res.byteLength == 0 : res.length == 0
) {
controller.close()
return
}
// the content conversion (like .text(), .json(), etc.) in Response
// must have Uint8Array as its content, else it will
// have untraceable error that's hard to debug.
controller.enqueue(new Uint8Array(res))
}
}
})
const rid = await invoke<number>('plugin:http|fetch', { const rid = await invoke<number>('plugin:http|fetch', {
clientConfig: { clientConfig: {
method: req.method, method: req.method,
@ -196,7 +225,8 @@ export async function fetch(
connectTimeout, connectTimeout,
proxy, proxy,
danger danger
} },
streamChannel
}) })
const abort = () => invoke('plugin:http|fetch_cancel', { rid }) const abort = () => invoke('plugin:http|fetch_cancel', { rid })
@ -223,30 +253,15 @@ export async function fetch(
status, status,
statusText, statusText,
url, url,
headers: responseHeaders, headers: responseHeaders
rid: responseRid
} = await invoke<FetchSendResponse>('plugin:http|fetch_send', { } = await invoke<FetchSendResponse>('plugin:http|fetch_send', {
rid rid
}) })
const body = await invoke<ArrayBuffer | number[]>( const res = new Response(readableStreamBody, {
'plugin:http|fetch_read_body', status,
{ statusText
rid: responseRid })
}
)
const res = new Response(
body instanceof ArrayBuffer && body.byteLength !== 0
? body
: body instanceof Array && body.length > 0
? new Uint8Array(body)
: null,
{
status,
statusText
}
)
// url and headers are read only properties // url and headers are read only properties
// but seems like we can set them like this // but seems like we can set them like this

@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use tauri::{ use tauri::{
async_runtime::Mutex, async_runtime::Mutex,
command, command,
ipc::{CommandScope, GlobalScope}, ipc::{Channel, CommandScope, GlobalScope},
Manager, ResourceId, ResourceTable, Runtime, State, Webview, Manager, ResourceId, ResourceTable, Runtime, State, Webview,
}; };
use tokio::sync::oneshot::{channel, Receiver, Sender}; use tokio::sync::oneshot::{channel, Receiver, Sender};
@ -22,9 +22,6 @@ use crate::{
const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
struct ReqwestResponse(reqwest::Response);
impl tauri::Resource for ReqwestResponse {}
type CancelableResponseResult = Result<reqwest::Response>; type CancelableResponseResult = Result<reqwest::Response>;
type CancelableResponseFuture = type CancelableResponseFuture =
Pin<Box<dyn Future<Output = CancelableResponseResult> + Send + Sync>>; Pin<Box<dyn Future<Output = CancelableResponseResult> + Send + Sync>>;
@ -181,6 +178,7 @@ pub async fn fetch<R: Runtime>(
client_config: ClientConfig, client_config: ClientConfig,
command_scope: CommandScope<Entry>, command_scope: CommandScope<Entry>,
global_scope: GlobalScope<Entry>, global_scope: GlobalScope<Entry>,
stream_channel: Channel<tauri::ipc::InvokeResponseBody>,
) -> crate::Result<ResourceId> { ) -> crate::Result<ResourceId> {
let ClientConfig { let ClientConfig {
method, method,
@ -314,7 +312,21 @@ pub async fn fetch<R: Runtime>(
#[cfg(feature = "tracing")] #[cfg(feature = "tracing")]
tracing::trace!("{:?}", request); tracing::trace!("{:?}", request);
let fut = async move { request.send().await.map_err(Into::into) }; let fut = async move {
let mut res = request.send().await?;
// send response through IPC channel
while let Some(chunk) = res.chunk().await? {
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(chunk.to_vec()))?;
}
// send empty vector when done
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(Vec::new()))?;
// return that response
Ok(res)
};
let mut resources_table = webview.resources_table(); let mut resources_table = webview.resources_table();
let rid = resources_table.add_request(Box::pin(fut)); let rid = resources_table.add_request(Box::pin(fut));
@ -398,9 +410,6 @@ pub async fn fetch_send<R: Runtime>(
)); ));
} }
let mut resources_table = webview.resources_table();
let rid = resources_table.add(ReqwestResponse(res));
Ok(FetchResponse { Ok(FetchResponse {
status: status.as_u16(), status: status.as_u16(),
status_text: status.canonical_reason().unwrap_or_default().to_string(), status_text: status.canonical_reason().unwrap_or_default().to_string(),
@ -410,19 +419,6 @@ pub async fn fetch_send<R: Runtime>(
}) })
} }
#[tauri::command]
pub(crate) async fn fetch_read_body<R: Runtime>(
webview: Webview<R>,
rid: ResourceId,
) -> crate::Result<tauri::ipc::Response> {
let res = {
let mut resources_table = webview.resources_table();
resources_table.take::<ReqwestResponse>(rid)?
};
let res = Arc::into_inner(res).unwrap().0;
Ok(tauri::ipc::Response::new(res.bytes().await?.to_vec()))
}
// forbidden headers per fetch spec https://fetch.spec.whatwg.org/#terminology-headers // forbidden headers per fetch spec https://fetch.spec.whatwg.org/#terminology-headers
#[cfg(not(feature = "unsafe-headers"))] #[cfg(not(feature = "unsafe-headers"))]
fn is_unsafe_header(header: &HeaderName) -> bool { fn is_unsafe_header(header: &HeaderName) -> bool {

@ -36,8 +36,7 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
.invoke_handler(tauri::generate_handler![ .invoke_handler(tauri::generate_handler![
commands::fetch, commands::fetch,
commands::fetch_cancel, commands::fetch_cancel,
commands::fetch_send, commands::fetch_send
commands::fetch_read_body,
]) ])
.build() .build()
} }

Loading…
Cancel
Save