From ac9a25cc12ee2b325f00212ba74316da3369bde5 Mon Sep 17 00:00:00 2001 From: Amr Bashir Date: Mon, 8 Jul 2024 20:31:29 +0300 Subject: [PATCH] fix(http): use tokio oneshot channel for detecting abort (#1395) * fix(http): properly handle aborting closes #1376 * abort early in JS * avoid using unnecessary mutexes * fix lint * update bundle * simplify --- .changes/http-abort.md | 6 +++ Cargo.lock | 13 +++++ plugins/http/Cargo.toml | 1 + plugins/http/api-iife.js | 2 +- plugins/http/guest-js/index.ts | 32 +++++++++--- plugins/http/src/commands.rs | 89 ++++++++++++++++++++++++---------- 6 files changed, 109 insertions(+), 34 deletions(-) create mode 100644 .changes/http-abort.md diff --git a/.changes/http-abort.md b/.changes/http-abort.md new file mode 100644 index 00000000..f4c74d55 --- /dev/null +++ b/.changes/http-abort.md @@ -0,0 +1,6 @@ +--- +"http": "patch" +"http-js": "patch" +--- + +Fix cancelling requests using `AbortSignal`. diff --git a/Cargo.lock b/Cargo.lock index 23148035..8546be66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6570,6 +6570,7 @@ dependencies = [ "tauri-plugin", "tauri-plugin-fs", "thiserror", + "tokio", "url", "urlpattern", ] @@ -7114,10 +7115,22 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "socket2", + "tokio-macros", "tracing", "windows-sys 0.48.0", ] +[[package]] +name = "tokio-macros" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.69", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" diff --git a/plugins/http/Cargo.toml b/plugins/http/Cargo.toml index 362404e8..e9dc7bdd 100644 --- a/plugins/http/Cargo.toml +++ b/plugins/http/Cargo.toml @@ -26,6 +26,7 @@ serde = { workspace = true } serde_json = { workspace = true } tauri = { workspace = true } thiserror = { workspace = true } +tokio = { version = "1", features = [ "sync", "macros" ] } tauri-plugin-fs = { path = "../fs", version = "2.0.0-beta.10" } urlpattern = "0.2" regex = "1" diff --git a/plugins/http/api-iife.js b/plugins/http/api-iife.js index 5f00ef5e..c1612050 100644 --- a/plugins/http/api-iife.js +++ b/plugins/http/api-iife.js @@ -1 +1 @@ -if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";async function t(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}return"function"==typeof SuppressedError&&SuppressedError,e.fetch=async function(e,r){const n=r?.maxRedirections,a=r?.connectTimeout,s=r?.proxy;r&&(delete r.maxRedirections,delete r.connectTimeout,delete r.proxy);const i=r?.signal,o=r?.headers?r.headers instanceof Headers?r.headers:new Headers(r.headers):new Headers,d=new Request(e,r),c=await d.arrayBuffer(),u=0!==c.byteLength?Array.from(new Uint8Array(c)):null;for(const[e,t]of d.headers)o.get(e)||o.set(e,t);const _=(o instanceof Headers?Array.from(o.entries()):Array.isArray(o)?o:Object.entries(o)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()])),f=await t("plugin:http|fetch",{clientConfig:{method:d.method,url:d.url,headers:_,data:u,maxRedirections:n,connectTimeout:a,proxy:s}});i?.addEventListener("abort",(()=>{t("plugin:http|fetch_cancel",{rid:f})}));const{status:h,statusText:p,url:l,headers:y,rid:w}=await t("plugin:http|fetch_send",{rid:f}),T=await t("plugin:http|fetch_read_body",{rid:w}),A=new Response(T instanceof ArrayBuffer&&0!==T.byteLength?T:T instanceof Array&&T.length>0?new Uint8Array(T):null,{headers:y,status:h,statusText:p});return Object.defineProperty(A,"url",{value:l}),Object.defineProperty(A,"headers",{value:new Headers(y)}),A},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})} +if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";async function t(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}"function"==typeof SuppressedError&&SuppressedError;const r="Request canceled";return e.fetch=async function(e,n){const a=n?.signal;if(a?.aborted)throw new Error(r);const o=n?.maxRedirections,s=n?.connectTimeout,i=n?.proxy;n&&(delete n.maxRedirections,delete n.connectTimeout,delete n.proxy);const d=n?.headers?n.headers instanceof Headers?n.headers:new Headers(n.headers):new Headers,c=new Request(e,n),u=await c.arrayBuffer(),f=0!==u.byteLength?Array.from(new Uint8Array(u)):null;for(const[e,t]of c.headers)d.get(e)||d.set(e,t);const _=(d instanceof Headers?Array.from(d.entries()):Array.isArray(d)?d:Object.entries(d)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(a?.aborted)throw new Error(r);const h=await t("plugin:http|fetch",{clientConfig:{method:c.method,url:c.url,headers:_,data:f,maxRedirections:o,connectTimeout:s,proxy:i}}),l=()=>t("plugin:http|fetch_cancel",{rid:h});if(a?.aborted)throw l(),new Error(r);a?.addEventListener("abort",(()=>l));const{status:p,statusText:w,url:y,headers:T,rid:A}=await t("plugin:http|fetch_send",{rid:h}),g=await t("plugin:http|fetch_read_body",{rid:A}),R=new Response(g instanceof ArrayBuffer&&0!==g.byteLength?g:g instanceof Array&&g.length>0?new Uint8Array(g):null,{status:p,statusText:w});return Object.defineProperty(R,"url",{value:y}),Object.defineProperty(R,"headers",{value:new Headers(T)}),R},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})} diff --git a/plugins/http/guest-js/index.ts b/plugins/http/guest-js/index.ts index f6382fa9..af2a16c2 100644 --- a/plugins/http/guest-js/index.ts +++ b/plugins/http/guest-js/index.ts @@ -86,6 +86,8 @@ export interface ClientOptions { proxy?: Proxy; } +const ERROR_REQUEST_CANCELLED = "Request canceled"; + /** * Fetch a resource from the network. It returns a `Promise` that resolves to the * `Response` to that `Request`, whether it is successful or not. @@ -104,6 +106,12 @@ export async function fetch( input: URL | Request | string, init?: RequestInit & ClientOptions, ): Promise { + // abort early here if needed + const signal = init?.signal; + if (signal?.aborted) { + throw new Error(ERROR_REQUEST_CANCELLED); + } + const maxRedirections = init?.maxRedirections; const connectTimeout = init?.connectTimeout; const proxy = init?.proxy; @@ -115,8 +123,6 @@ export async function fetch( delete init.proxy; } - const signal = init?.signal; - const headers = init?.headers ? init.headers instanceof Headers ? init.headers @@ -153,6 +159,11 @@ export async function fetch( ], ); + // abort early here if needed + if (signal?.aborted) { + throw new Error(ERROR_REQUEST_CANCELLED); + } + const rid = await invoke("plugin:http|fetch", { clientConfig: { method: req.method, @@ -165,11 +176,17 @@ export async function fetch( }, }); - signal?.addEventListener("abort", () => { - void invoke("plugin:http|fetch_cancel", { - rid, - }); - }); + const abort = () => invoke("plugin:http|fetch_cancel", { rid }); + + // abort early here if needed + if (signal?.aborted) { + // we don't care about the result of this proimse + // eslint-disable-next-line @typescript-eslint/no-floating-promises + abort(); + throw new Error(ERROR_REQUEST_CANCELLED); + } + + signal?.addEventListener("abort", () => abort); interface FetchSendResponse { status: number; @@ -203,7 +220,6 @@ export async function fetch( ? new Uint8Array(body) : null, { - headers: responseHeaders, status, statusText, }, diff --git a/plugins/http/src/commands.rs b/plugins/http/src/commands.rs index 935c3ae9..b9c1cae2 100644 --- a/plugins/http/src/commands.rs +++ b/plugins/http/src/commands.rs @@ -11,8 +11,9 @@ use tauri::{ async_runtime::Mutex, command, ipc::{CommandScope, GlobalScope}, - Manager, ResourceId, Runtime, State, Webview, + Manager, ResourceId, ResourceTable, Runtime, State, Webview, }; +use tokio::sync::oneshot::{channel, Receiver, Sender}; use crate::{ scope::{Entry, Scope}, @@ -22,20 +23,47 @@ use crate::{ const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); struct ReqwestResponse(reqwest::Response); +impl tauri::Resource for ReqwestResponse {} -type CancelableResponseResult = Result>; +type CancelableResponseResult = Result; type CancelableResponseFuture = Pin + Send + Sync>>; -struct FetchRequest(Mutex); -impl FetchRequest { - fn new(f: CancelableResponseFuture) -> Self { - Self(Mutex::new(f)) +struct FetchRequest { + fut: Mutex, + abort_tx_rid: ResourceId, + abort_rx_rid: ResourceId, +} +impl tauri::Resource for FetchRequest {} + +struct AbortSender(Sender<()>); +impl tauri::Resource for AbortRecveiver {} + +impl AbortSender { + fn abort(self) { + let _ = self.0.send(()); } } -impl tauri::Resource for FetchRequest {} -impl tauri::Resource for ReqwestResponse {} +struct AbortRecveiver(Receiver<()>); +impl tauri::Resource for AbortSender {} + +trait AddRequest { + fn add_request(&mut self, fut: CancelableResponseFuture) -> ResourceId; +} + +impl AddRequest for ResourceTable { + fn add_request(&mut self, fut: CancelableResponseFuture) -> ResourceId { + let (tx, rx) = channel::<()>(); + let (tx, rx) = (AbortSender(tx), AbortRecveiver(rx)); + let req = FetchRequest { + fut: Mutex::new(fut), + abort_tx_rid: self.add(tx), + abort_rx_rid: self.add(rx), + }; + self.add(req) + } +} #[derive(Serialize)] #[serde(rename_all = "camelCase")] @@ -239,9 +267,9 @@ pub async fn fetch( request = request.body(data); } - let fut = async move { Ok(request.send().await.map_err(Into::into)) }; + let fut = async move { request.send().await.map_err(Into::into) }; let mut resources_table = webview.resources_table(); - let rid = resources_table.add(FetchRequest::new(Box::pin(fut))); + let rid = resources_table.add_request(Box::pin(fut)); Ok(rid) } else { @@ -260,9 +288,9 @@ pub async fn fetch( .header(header::CONTENT_TYPE, data_url.mime_type().to_string()) .body(reqwest::Body::from(body))?; - let fut = async move { Ok(Ok(reqwest::Response::from(response))) }; + let fut = async move { Ok(reqwest::Response::from(response)) }; let mut resources_table = webview.resources_table(); - let rid = resources_table.add(FetchRequest::new(Box::pin(fut))); + let rid = resources_table.add_request(Box::pin(fut)); Ok(rid) } _ => Err(Error::SchemeNotSupport(scheme.to_string())), @@ -270,14 +298,13 @@ pub async fn fetch( } #[command] -pub async fn fetch_cancel(webview: Webview, rid: ResourceId) -> crate::Result<()> { - let req = { - let resources_table = webview.resources_table(); - resources_table.get::(rid)? - }; - let mut req = req.0.lock().await; - *req = Box::pin(async { Err(Error::RequestCanceled) }); - +pub fn fetch_cancel(webview: Webview, rid: ResourceId) -> crate::Result<()> { + let mut resources_table = webview.resources_table(); + let req = resources_table.get::(rid)?; + let abort_tx = resources_table.take::(req.abort_tx_rid)?; + if let Some(abort_tx) = Arc::into_inner(abort_tx) { + abort_tx.abort(); + } Ok(()) } @@ -286,14 +313,26 @@ pub async fn fetch_send( webview: Webview, rid: ResourceId, ) -> crate::Result { - let req = { + let (req, abort_rx) = { let mut resources_table = webview.resources_table(); - resources_table.take::(rid)? + let req = resources_table.get::(rid)?; + let abort_rx = resources_table.take::(req.abort_rx_rid)?; + (req, abort_rx) }; - let res = match req.0.lock().await.as_mut().await { - Ok(Ok(res)) => res, - Ok(Err(e)) | Err(e) => return Err(e), + let Some(abort_rx) = Arc::into_inner(abort_rx) else { + return Err(Error::RequestCanceled); + }; + + let mut fut = req.fut.lock().await; + + let res = tokio::select! { + res = fut.as_mut() => res?, + _ = abort_rx.0 => { + let mut resources_table = webview.resources_table(); + resources_table.close(rid)?; + return Err(Error::RequestCanceled); + } }; let status = res.status();