channel on upload plugin

pull/354/head
Lucas Nogueira 2 years ago
parent 8f0f6b8ac5
commit 090fe353ad
No known key found for this signature in database
GPG Key ID: 7C32FCA95C8C95D7

@ -1,30 +1,38 @@
import { invoke } from "@tauri-apps/api/tauri"; import { invoke, transformCallback } from "@tauri-apps/api/tauri";
import { appWindow } from "tauri-plugin-window-api";
interface ProgressPayload { interface ProgressPayload {
id: number;
progress: number; progress: number;
total: number; total: number;
} }
type ProgressHandler = (progress: number, total: number) => void; type ProgressHandler = (progress: ProgressPayload) => void;
const handlers: Map<number, ProgressHandler> = new Map();
let listening = false;
async function listenToEventIfNeeded(event: string): Promise<void> { // TODO: use channel from @tauri-apps/api on v2
if (listening) { class Channel<T = unknown> {
return await Promise.resolve(); id: number;
} // @ts-expect-error field used by the IPC serializer
return await appWindow private readonly __TAURI_CHANNEL_MARKER__ = true;
.listen<ProgressPayload>(event, ({ payload }) => { #onmessage: (response: T) => void = () => {
const handler = handlers.get(payload.id); // no-op
if (handler != null) { };
handler(payload.progress, payload.total);
} constructor() {
}) this.id = transformCallback((response: T) => {
.then(() => { this.#onmessage(response);
listening = true;
}); });
}
set onmessage(handler: (response: T) => void) {
this.#onmessage = handler;
}
get onmessage(): (response: T) => void {
return this.#onmessage;
}
toJSON(): string {
return `__CHANNEL__:${this.id}`;
}
} }
async function upload( async function upload(
@ -37,17 +45,17 @@ async function upload(
window.crypto.getRandomValues(ids); window.crypto.getRandomValues(ids);
const id = ids[0]; const id = ids[0];
const onProgress = new Channel<ProgressPayload>();
if (progressHandler != null) { if (progressHandler != null) {
handlers.set(id, progressHandler); onProgress.onmessage = progressHandler;
} }
await listenToEventIfNeeded("upload://progress");
await invoke("plugin:upload|upload", { await invoke("plugin:upload|upload", {
id, id,
url, url,
filePath, filePath,
headers: headers ?? {}, headers: headers ?? {},
onProgress,
}); });
} }
@ -65,17 +73,17 @@ async function download(
window.crypto.getRandomValues(ids); window.crypto.getRandomValues(ids);
const id = ids[0]; const id = ids[0];
const onProgress = new Channel<ProgressPayload>();
if (progressHandler != null) { if (progressHandler != null) {
handlers.set(id, progressHandler); onProgress.onmessage = progressHandler;
} }
await listenToEventIfNeeded("download://progress");
await invoke("plugin:upload|download", { await invoke("plugin:upload|download", {
id, id,
url, url,
filePath, filePath,
headers: headers ?? {}, headers: headers ?? {},
onProgress,
}); });
} }

@ -5,16 +5,17 @@
use futures_util::TryStreamExt; use futures_util::TryStreamExt;
use serde::{ser::Serializer, Serialize}; use serde::{ser::Serializer, Serialize};
use tauri::{ use tauri::{
api::ipc::Channel,
command, command,
plugin::{Builder as PluginBuilder, TauriPlugin}, plugin::{Builder as PluginBuilder, TauriPlugin},
Runtime, Window, Runtime,
}; };
use tokio::{fs::File, io::AsyncWriteExt}; use tokio::{fs::File, io::AsyncWriteExt};
use tokio_util::codec::{BytesCodec, FramedRead}; use tokio_util::codec::{BytesCodec, FramedRead};
use read_progress_stream::ReadProgressStream; use read_progress_stream::ReadProgressStream;
use std::{collections::HashMap, sync::Mutex}; use std::collections::HashMap;
type Result<T> = std::result::Result<T, Error>; type Result<T> = std::result::Result<T, Error>;
@ -39,19 +40,17 @@ impl Serialize for Error {
#[derive(Clone, Serialize)] #[derive(Clone, Serialize)]
struct ProgressPayload { struct ProgressPayload {
id: u32,
progress: u64, progress: u64,
total: u64, total: u64,
} }
#[command] #[command]
async fn download<R: Runtime>( async fn download<R: Runtime>(
window: Window<R>,
id: u32,
url: &str, url: &str,
file_path: &str, file_path: &str,
headers: HashMap<String, String>, headers: HashMap<String, String>,
) -> Result<u32> { on_progress: Channel<R>,
) -> Result<()> {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let mut request = client.get(url); let mut request = client.get(url);
@ -69,33 +68,28 @@ async fn download<R: Runtime>(
while let Some(chunk) = stream.try_next().await? { while let Some(chunk) = stream.try_next().await? {
file.write_all(&chunk).await?; file.write_all(&chunk).await?;
let _ = window.emit( let _ = on_progress.send(&ProgressPayload {
"download://progress", progress: chunk.len() as u64,
ProgressPayload { total,
id, });
progress: chunk.len() as u64,
total,
},
);
} }
Ok(id) Ok(())
} }
#[command] #[command]
async fn upload<R: Runtime>( async fn upload<R: Runtime>(
window: Window<R>,
id: u32,
url: &str, url: &str,
file_path: &str, file_path: &str,
headers: HashMap<String, String>, headers: HashMap<String, String>,
on_progress: Channel<R>,
) -> Result<serde_json::Value> { ) -> Result<serde_json::Value> {
// Read the file // Read the file
let file = File::open(file_path).await?; let file = File::open(file_path).await?;
// Create the request and attach the file to the body // Create the request and attach the file to the body
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let mut request = client.post(url).body(file_to_body(id, window, file)); let mut request = client.post(url).body(file_to_body(on_progress, file));
// Loop trought the headers keys and values // Loop trought the headers keys and values
// and add them to the request object. // and add them to the request object.
@ -108,20 +102,13 @@ async fn upload<R: Runtime>(
response.json().await.map_err(Into::into) response.json().await.map_err(Into::into)
} }
fn file_to_body<R: Runtime>(id: u32, window: Window<R>, file: File) -> reqwest::Body { fn file_to_body<R: Runtime>(channel: Channel<R>, file: File) -> reqwest::Body {
let stream = FramedRead::new(file, BytesCodec::new()).map_ok(|r| r.freeze()); let stream = FramedRead::new(file, BytesCodec::new()).map_ok(|r| r.freeze());
let window = Mutex::new(window);
reqwest::Body::wrap_stream(ReadProgressStream::new( reqwest::Body::wrap_stream(ReadProgressStream::new(
stream, stream,
Box::new(move |progress, total| { Box::new(move |progress, total| {
let _ = window.lock().unwrap().emit( let _ = channel.send(&ProgressPayload { progress, total });
"upload://progress",
ProgressPayload {
id,
progress,
total,
},
);
}), }),
)) ))
} }

Loading…
Cancel
Save