fix(http): use tokio oneshot channel for detecting abort (#1395)

* fix(http): properly handle aborting

closes #1376

* abort early in JS

* avoid using unnecessary mutexes

* fix lint

* update bundle

* simplify
pull/1531/head
Amr Bashir 11 months ago committed by GitHub
parent b07c092cd2
commit ac9a25cc12
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,6 @@
---
"http": "patch"
"http-js": "patch"
---
Fix cancelling requests using `AbortSignal`.

13
Cargo.lock generated

@ -6570,6 +6570,7 @@ dependencies = [
"tauri-plugin", "tauri-plugin",
"tauri-plugin-fs", "tauri-plugin-fs",
"thiserror", "thiserror",
"tokio",
"url", "url",
"urlpattern", "urlpattern",
] ]
@ -7114,10 +7115,22 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"socket2", "socket2",
"tokio-macros",
"tracing", "tracing",
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "tokio-macros"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.69",
]
[[package]] [[package]]
name = "tokio-native-tls" name = "tokio-native-tls"
version = "0.3.1" version = "0.3.1"

@ -26,6 +26,7 @@ serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
tauri = { workspace = true } tauri = { workspace = true }
thiserror = { workspace = true } thiserror = { workspace = true }
tokio = { version = "1", features = [ "sync", "macros" ] }
tauri-plugin-fs = { path = "../fs", version = "2.0.0-beta.10" } tauri-plugin-fs = { path = "../fs", version = "2.0.0-beta.10" }
urlpattern = "0.2" urlpattern = "0.2"
regex = "1" regex = "1"

@ -1 +1 @@
if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";async function t(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}return"function"==typeof SuppressedError&&SuppressedError,e.fetch=async function(e,r){const n=r?.maxRedirections,a=r?.connectTimeout,s=r?.proxy;r&&(delete r.maxRedirections,delete r.connectTimeout,delete r.proxy);const i=r?.signal,o=r?.headers?r.headers instanceof Headers?r.headers:new Headers(r.headers):new Headers,d=new Request(e,r),c=await d.arrayBuffer(),u=0!==c.byteLength?Array.from(new Uint8Array(c)):null;for(const[e,t]of d.headers)o.get(e)||o.set(e,t);const _=(o instanceof Headers?Array.from(o.entries()):Array.isArray(o)?o:Object.entries(o)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()])),f=await t("plugin:http|fetch",{clientConfig:{method:d.method,url:d.url,headers:_,data:u,maxRedirections:n,connectTimeout:a,proxy:s}});i?.addEventListener("abort",(()=>{t("plugin:http|fetch_cancel",{rid:f})}));const{status:h,statusText:p,url:l,headers:y,rid:w}=await t("plugin:http|fetch_send",{rid:f}),T=await t("plugin:http|fetch_read_body",{rid:w}),A=new Response(T instanceof ArrayBuffer&&0!==T.byteLength?T:T instanceof Array&&T.length>0?new Uint8Array(T):null,{headers:y,status:h,statusText:p});return Object.defineProperty(A,"url",{value:l}),Object.defineProperty(A,"headers",{value:new Headers(y)}),A},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})} if("__TAURI__"in window){var __TAURI_PLUGIN_HTTP__=function(e){"use strict";async function t(e,t={},r){return window.__TAURI_INTERNALS__.invoke(e,t,r)}"function"==typeof SuppressedError&&SuppressedError;const r="Request canceled";return e.fetch=async function(e,n){const a=n?.signal;if(a?.aborted)throw new Error(r);const o=n?.maxRedirections,s=n?.connectTimeout,i=n?.proxy;n&&(delete n.maxRedirections,delete n.connectTimeout,delete n.proxy);const d=n?.headers?n.headers instanceof Headers?n.headers:new Headers(n.headers):new Headers,c=new Request(e,n),u=await c.arrayBuffer(),f=0!==u.byteLength?Array.from(new Uint8Array(u)):null;for(const[e,t]of c.headers)d.get(e)||d.set(e,t);const _=(d instanceof Headers?Array.from(d.entries()):Array.isArray(d)?d:Object.entries(d)).map((([e,t])=>[e,"string"==typeof t?t:t.toString()]));if(a?.aborted)throw new Error(r);const h=await t("plugin:http|fetch",{clientConfig:{method:c.method,url:c.url,headers:_,data:f,maxRedirections:o,connectTimeout:s,proxy:i}}),l=()=>t("plugin:http|fetch_cancel",{rid:h});if(a?.aborted)throw l(),new Error(r);a?.addEventListener("abort",(()=>l));const{status:p,statusText:w,url:y,headers:T,rid:A}=await t("plugin:http|fetch_send",{rid:h}),g=await t("plugin:http|fetch_read_body",{rid:A}),R=new Response(g instanceof ArrayBuffer&&0!==g.byteLength?g:g instanceof Array&&g.length>0?new Uint8Array(g):null,{status:p,statusText:w});return Object.defineProperty(R,"url",{value:y}),Object.defineProperty(R,"headers",{value:new Headers(T)}),R},e}({});Object.defineProperty(window.__TAURI__,"http",{value:__TAURI_PLUGIN_HTTP__})}

@ -86,6 +86,8 @@ export interface ClientOptions {
proxy?: Proxy; proxy?: Proxy;
} }
const ERROR_REQUEST_CANCELLED = "Request canceled";
/** /**
* Fetch a resource from the network. It returns a `Promise` that resolves to the * Fetch a resource from the network. It returns a `Promise` that resolves to the
* `Response` to that `Request`, whether it is successful or not. * `Response` to that `Request`, whether it is successful or not.
@ -104,6 +106,12 @@ export async function fetch(
input: URL | Request | string, input: URL | Request | string,
init?: RequestInit & ClientOptions, init?: RequestInit & ClientOptions,
): Promise<Response> { ): Promise<Response> {
// abort early here if needed
const signal = init?.signal;
if (signal?.aborted) {
throw new Error(ERROR_REQUEST_CANCELLED);
}
const maxRedirections = init?.maxRedirections; const maxRedirections = init?.maxRedirections;
const connectTimeout = init?.connectTimeout; const connectTimeout = init?.connectTimeout;
const proxy = init?.proxy; const proxy = init?.proxy;
@ -115,8 +123,6 @@ export async function fetch(
delete init.proxy; delete init.proxy;
} }
const signal = init?.signal;
const headers = init?.headers const headers = init?.headers
? init.headers instanceof Headers ? init.headers instanceof Headers
? init.headers ? init.headers
@ -153,6 +159,11 @@ export async function fetch(
], ],
); );
// abort early here if needed
if (signal?.aborted) {
throw new Error(ERROR_REQUEST_CANCELLED);
}
const rid = await invoke<number>("plugin:http|fetch", { const rid = await invoke<number>("plugin:http|fetch", {
clientConfig: { clientConfig: {
method: req.method, method: req.method,
@ -165,11 +176,17 @@ export async function fetch(
}, },
}); });
signal?.addEventListener("abort", () => { const abort = () => invoke("plugin:http|fetch_cancel", { rid });
void invoke("plugin:http|fetch_cancel", {
rid, // abort early here if needed
}); if (signal?.aborted) {
}); // we don't care about the result of this proimse
// eslint-disable-next-line @typescript-eslint/no-floating-promises
abort();
throw new Error(ERROR_REQUEST_CANCELLED);
}
signal?.addEventListener("abort", () => abort);
interface FetchSendResponse { interface FetchSendResponse {
status: number; status: number;
@ -203,7 +220,6 @@ export async function fetch(
? new Uint8Array(body) ? new Uint8Array(body)
: null, : null,
{ {
headers: responseHeaders,
status, status,
statusText, statusText,
}, },

@ -11,8 +11,9 @@ use tauri::{
async_runtime::Mutex, async_runtime::Mutex,
command, command,
ipc::{CommandScope, GlobalScope}, ipc::{CommandScope, GlobalScope},
Manager, ResourceId, Runtime, State, Webview, Manager, ResourceId, ResourceTable, Runtime, State, Webview,
}; };
use tokio::sync::oneshot::{channel, Receiver, Sender};
use crate::{ use crate::{
scope::{Entry, Scope}, scope::{Entry, Scope},
@ -22,20 +23,47 @@ use crate::{
const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
struct ReqwestResponse(reqwest::Response); struct ReqwestResponse(reqwest::Response);
impl tauri::Resource for ReqwestResponse {}
type CancelableResponseResult = Result<Result<reqwest::Response>>; type CancelableResponseResult = Result<reqwest::Response>;
type CancelableResponseFuture = type CancelableResponseFuture =
Pin<Box<dyn Future<Output = CancelableResponseResult> + Send + Sync>>; Pin<Box<dyn Future<Output = CancelableResponseResult> + Send + Sync>>;
struct FetchRequest(Mutex<CancelableResponseFuture>); struct FetchRequest {
impl FetchRequest { fut: Mutex<CancelableResponseFuture>,
fn new(f: CancelableResponseFuture) -> Self { abort_tx_rid: ResourceId,
Self(Mutex::new(f)) abort_rx_rid: ResourceId,
}
impl tauri::Resource for FetchRequest {}
struct AbortSender(Sender<()>);
impl tauri::Resource for AbortRecveiver {}
impl AbortSender {
fn abort(self) {
let _ = self.0.send(());
} }
} }
impl tauri::Resource for FetchRequest {} struct AbortRecveiver(Receiver<()>);
impl tauri::Resource for ReqwestResponse {} impl tauri::Resource for AbortSender {}
trait AddRequest {
fn add_request(&mut self, fut: CancelableResponseFuture) -> ResourceId;
}
impl AddRequest for ResourceTable {
fn add_request(&mut self, fut: CancelableResponseFuture) -> ResourceId {
let (tx, rx) = channel::<()>();
let (tx, rx) = (AbortSender(tx), AbortRecveiver(rx));
let req = FetchRequest {
fut: Mutex::new(fut),
abort_tx_rid: self.add(tx),
abort_rx_rid: self.add(rx),
};
self.add(req)
}
}
#[derive(Serialize)] #[derive(Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
@ -239,9 +267,9 @@ pub async fn fetch<R: Runtime>(
request = request.body(data); request = request.body(data);
} }
let fut = async move { Ok(request.send().await.map_err(Into::into)) }; let fut = async move { request.send().await.map_err(Into::into) };
let mut resources_table = webview.resources_table(); let mut resources_table = webview.resources_table();
let rid = resources_table.add(FetchRequest::new(Box::pin(fut))); let rid = resources_table.add_request(Box::pin(fut));
Ok(rid) Ok(rid)
} else { } else {
@ -260,9 +288,9 @@ pub async fn fetch<R: Runtime>(
.header(header::CONTENT_TYPE, data_url.mime_type().to_string()) .header(header::CONTENT_TYPE, data_url.mime_type().to_string())
.body(reqwest::Body::from(body))?; .body(reqwest::Body::from(body))?;
let fut = async move { Ok(Ok(reqwest::Response::from(response))) }; let fut = async move { Ok(reqwest::Response::from(response)) };
let mut resources_table = webview.resources_table(); let mut resources_table = webview.resources_table();
let rid = resources_table.add(FetchRequest::new(Box::pin(fut))); let rid = resources_table.add_request(Box::pin(fut));
Ok(rid) Ok(rid)
} }
_ => Err(Error::SchemeNotSupport(scheme.to_string())), _ => Err(Error::SchemeNotSupport(scheme.to_string())),
@ -270,14 +298,13 @@ pub async fn fetch<R: Runtime>(
} }
#[command] #[command]
pub async fn fetch_cancel<R: Runtime>(webview: Webview<R>, rid: ResourceId) -> crate::Result<()> { pub fn fetch_cancel<R: Runtime>(webview: Webview<R>, rid: ResourceId) -> crate::Result<()> {
let req = { let mut resources_table = webview.resources_table();
let resources_table = webview.resources_table(); let req = resources_table.get::<FetchRequest>(rid)?;
resources_table.get::<FetchRequest>(rid)? let abort_tx = resources_table.take::<AbortSender>(req.abort_tx_rid)?;
}; if let Some(abort_tx) = Arc::into_inner(abort_tx) {
let mut req = req.0.lock().await; abort_tx.abort();
*req = Box::pin(async { Err(Error::RequestCanceled) }); }
Ok(()) Ok(())
} }
@ -286,14 +313,26 @@ pub async fn fetch_send<R: Runtime>(
webview: Webview<R>, webview: Webview<R>,
rid: ResourceId, rid: ResourceId,
) -> crate::Result<FetchResponse> { ) -> crate::Result<FetchResponse> {
let req = { let (req, abort_rx) = {
let mut resources_table = webview.resources_table(); let mut resources_table = webview.resources_table();
resources_table.take::<FetchRequest>(rid)? let req = resources_table.get::<FetchRequest>(rid)?;
let abort_rx = resources_table.take::<AbortRecveiver>(req.abort_rx_rid)?;
(req, abort_rx)
}; };
let res = match req.0.lock().await.as_mut().await { let Some(abort_rx) = Arc::into_inner(abort_rx) else {
Ok(Ok(res)) => res, return Err(Error::RequestCanceled);
Ok(Err(e)) | Err(e) => return Err(e), };
let mut fut = req.fut.lock().await;
let res = tokio::select! {
res = fut.as_mut() => res?,
_ = abort_rx.0 => {
let mut resources_table = webview.resources_table();
resources_table.close(rid)?;
return Err(Error::RequestCanceled);
}
}; };
let status = res.status(); let status = res.status();

Loading…
Cancel
Save