Separate Yew Platform (#2893)

* Remove pinned module.

* Create Yew Platform.

* Remove some additional dependencies.

* Fix clippy.

* Restore wasm-bindgen-futures.

* Fix docs.

* Migrate to prokio.

* Fix docs warnings.

* Fix dependencies.
This commit is contained in:
Kaede Hoshikawa 2022-10-21 06:56:15 +09:00 committed by GitHub
parent 71408e2e56
commit da09755c27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 86 additions and 1796 deletions

View File

@ -18,15 +18,3 @@ cargo clippy --release --no-default-features --features default,ssr -- --deny=wa
cargo clippy --release --no-default-features --features csr,default,ssr -- --deny=warnings
cargo clippy --release --no-default-features --features hydration,ssr -- --deny=warnings
cargo clippy --release --no-default-features --features default,hydration,ssr -- --deny=warnings
cargo clippy --release --no-default-features --features tokio -- --deny=warnings
cargo clippy --release --no-default-features --features csr,tokio -- --deny=warnings
cargo clippy --release --no-default-features --features default,tokio -- --deny=warnings
cargo clippy --release --no-default-features --features csr,default,tokio -- --deny=warnings
cargo clippy --release --no-default-features --features hydration,tokio -- --deny=warnings
cargo clippy --release --no-default-features --features default,hydration,tokio -- --deny=warnings
cargo clippy --release --no-default-features --features ssr,tokio -- --deny=warnings
cargo clippy --release --no-default-features --features csr,ssr,tokio -- --deny=warnings
cargo clippy --release --no-default-features --features default,ssr,tokio -- --deny=warnings
cargo clippy --release --no-default-features --features csr,default,ssr,tokio -- --deny=warnings
cargo clippy --release --no-default-features --features hydration,ssr,tokio -- --deny=warnings
cargo clippy --release --no-default-features --features default,hydration,ssr,tokio -- --deny=warnings

View File

@ -18,15 +18,3 @@ cargo clippy --no-default-features --features default,ssr -- --deny=warnings
cargo clippy --no-default-features --features csr,default,ssr -- --deny=warnings
cargo clippy --no-default-features --features hydration,ssr -- --deny=warnings
cargo clippy --no-default-features --features default,hydration,ssr -- --deny=warnings
cargo clippy --no-default-features --features tokio -- --deny=warnings
cargo clippy --no-default-features --features csr,tokio -- --deny=warnings
cargo clippy --no-default-features --features default,tokio -- --deny=warnings
cargo clippy --no-default-features --features csr,default,tokio -- --deny=warnings
cargo clippy --no-default-features --features hydration,tokio -- --deny=warnings
cargo clippy --no-default-features --features default,hydration,tokio -- --deny=warnings
cargo clippy --no-default-features --features ssr,tokio -- --deny=warnings
cargo clippy --no-default-features --features csr,ssr,tokio -- --deny=warnings
cargo clippy --no-default-features --features default,ssr,tokio -- --deny=warnings
cargo clippy --no-default-features --features csr,default,ssr,tokio -- --deny=warnings
cargo clippy --no-default-features --features hydration,ssr,tokio -- --deny=warnings
cargo clippy --no-default-features --features default,hydration,ssr,tokio -- --deny=warnings

20
examples/Cargo.lock generated
View File

@ -1595,6 +1595,17 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pinned"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a829027bd95e54cfe13e3e258a1ae7b645960553fb82b75ff852c29688ee595b"
dependencies = [
"futures 0.3.24",
"rustversion",
"thiserror",
]
[[package]]
name = "pkg-config"
version = "0.3.25"
@ -2115,18 +2126,18 @@ checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
[[package]]
name = "thiserror"
version = "1.0.34"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c1b05ca9d106ba7d2e31a9dab4a64e7be2cce415321966ea3132c49a656e252"
checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.34"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8f2591983642de85c921015f3f070c665a197ed69e417af436115e3a1407487"
checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
dependencies = [
"proc-macro2",
"quote",
@ -2744,6 +2755,7 @@ dependencies = [
"num_cpus",
"once_cell",
"pin-project",
"pinned",
"serde",
"slab",
"thiserror",

View File

@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0"
pulldown-cmark = { version = "0.9", default-features = false }
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
yew = { path = "../../packages/yew", features = ["tokio", "csr"] }
yew = { path = "../../packages/yew", features = ["csr"] }
gloo = "0.8"
[dependencies.web-sys]

View File

@ -7,7 +7,7 @@ license = "MIT OR Apache-2.0"
[dependencies]
wasm-bindgen = "0.2"
yew = { path = "../../packages/yew", features = ["csr", "tokio"] }
yew = { path = "../../packages/yew", features = ["csr"] }
wasm-bindgen-futures = "0.4"
js-sys = "0.3"
once_cell = "1"

View File

@ -32,4 +32,4 @@ clap = { version = "3.1.7", features = ["derive"] }
[features]
hydration = ["yew/hydration"]
ssr = ["yew/ssr", "yew/tokio"]
ssr = ["yew/ssr"]

View File

@ -34,5 +34,5 @@ hyper = { version = "0.14", features = ["server", "http1"] }
jemallocator = "0.5"
[features]
ssr = ["yew/ssr", "yew/tokio"]
ssr = ["yew/ssr"]
hydration = ["yew/hydration"]

View File

@ -25,16 +25,23 @@ slab = "0.4"
wasm-bindgen = "0.2"
yew-macro = { version = "^0.19.0", path = "../yew-macro" }
thiserror = "1.0"
futures = { version = "0.3", default-features = false, features = ["std", "async-await"] }
futures = { version = "0.3", default-features = false, features = ["std"] }
html-escape = { version = "0.2.9", optional = true }
implicit-clone = { version = "0.3", features = ["map"] }
base64ct = { version = "1.5.0", features = ["std"], optional = true }
bincode = { version = "1.3.3", optional = true }
serde = { version = "1", features = ["derive"] }
tracing = "0.1.36"
pin-project = "1.0.11"
prokio = "0.1.0"
rustversion = "1"
[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
# We still need tokio as we have docs linked to it.
tokio = { version = "1.19", features = ["rt"] }
[dependencies.web-sys]
version = "^0.3.59"
features = [
@ -70,16 +77,8 @@ features = [
"SubmitEvent"
]
[target.'cfg(target_arch = "wasm32")'.dependencies]
# we move it here so no promise-based spawn_local can present for
# non-wasm32 targets.
wasm-bindgen-futures = "0.4"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
num_cpus = { version = "1.13", optional = true }
once_cell = "1"
tokio = { version = "1.21.1", features = ["rt", "time"], optional = true }
tokio-stream = { version = "0.1", features = ["time"], optional = true }
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
tokio = { version = "1.19", features = ["full"] }
[dev-dependencies]
wasm-bindgen-test = "0.3"
@ -95,15 +94,11 @@ features = [
]
[features]
tokio = ["dep:tokio", "dep:num_cpus", "dep:tokio-stream"]
ssr = ["dep:html-escape", "dep:base64ct", "dep:bincode"]
csr = []
hydration = ["csr", "dep:bincode"]
default = []
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
tokio = { version = "1.19", features = ["full"] }
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "documenting"]

View File

@ -1,6 +1,6 @@
[tasks.native-test]
command = "cargo"
args = ["test", "--features", "csr,ssr,hydration,tokio"]
args = ["test", "--features", "csr,ssr,hydration"]
[tasks.wasm-test]
command = "wasm-pack"

View File

@ -248,9 +248,9 @@ impl<COMP: BaseComponent> Scope<COMP> {
///
/// This method will not notify the component when the stream has been fully exhausted. If
/// you want this feature, you can add an EOF message variant for your component and use
/// [`StreamExt::chain`] and [`stream::once`] to chain an EOF message to the original stream.
/// If your stream is produced by another crate, you can use [`StreamExt::map`] to transform
/// the stream's item type to the component message type.
/// [`StreamExt::chain`] and [`stream::once`](futures::stream::once) to chain an EOF message to
/// the original stream. If your stream is produced by another crate, you can use
/// [`StreamExt::map`] to transform the stream's item type to the component message type.
pub fn send_stream<S, M>(&self, stream: S)
where
M: Into<COMP::Message>,

View File

@ -26,7 +26,6 @@
//! - `csr`: Enables Client-side Rendering support and [`Renderer`]. Only enable this feature if you
//! are making a Yew application (not a library).
//! - `ssr`: Enables Server-side Rendering support and [`ServerRenderer`].
//! - `tokio`: Enables future-based APIs on non-wasm32 targets with tokio runtime.
//! - `hydration`: Enables Hydration support.
//!
//! ## Example

View File

@ -0,0 +1,48 @@
//! Yew's compatibility between JavaScript Runtime and Native Runtimes.
//!
//! This module is also published under the name [prokio] on crates.io.
//!
//! # Rationale
//!
//! When designing components and libraries that works on both WebAssembly targets backed by
//! JavaScript Runtime and non-WebAssembly targets with Native Runtimes. Developers usually face
//! challenges that requires applying multiple feature flags throughout their application:
//!
//! 1. Select I/O and timers that works with the target runtime.
//! 2. Native Runtimes usually require `Send` futures and WebAssembly types are usually `!Send`.
//!
//! # Implementation
//!
//! To alleviate these issues, Yew implements a single-threaded runtime that executes `?Send`
//! (`Send` or `!Send`) futures.
//!
//! On platforms with multi-threading support, Yew spawns multiple independent runtimes
//! proportional to the CPU core number. When tasks are spawned with a runtime handle, it will
//! randomly select a worker thread from the internal pool. All tasks spawned with `spawn_local`
//! will run on the same thread as the thread the task was running. When the runtime runs in a
//! WebAssembly target, all tasks will be scheduled on the main thread.
//!
//! This runtime is designed in favour of IO-bounded workload with similar runtime cost.
//! When running I/O workloads, it would produce a slightly better performance as tasks are
//! never moved to another thread. However, If a worker thread is busy,
//! other threads will not be able to steal tasks scheduled on the busy thread.
//! When you have a CPU-bounded task where CPU time is significantly
//! more expensive, it should be spawned with a dedicated thread (or Web Worker) and communicates
//! with the application using channels.
//!
//! Yew platform provides the following components:
//!
//! 1. A Task Scheduler that is capable of running non-Send tasks.
//! 2. A Timer that is compatible with the scheduler backend.
//! 3. Task Synchronisation Mechanisms.
//!
//! # Runtime Backend
//!
//! The Yew runtime is implemented with different runtimes depending on the target platform and can
//! use all features (timers / IO / task synchronisation) from the selected native runtime:
//!
//! - `wasm-bindgen-futures` (WebAssembly targets)
//! - `tokio` (non-WebAssembly targets)
#[doc(inline)]
pub use prokio::*;

View File

@ -1,212 +0,0 @@
use std::cell::UnsafeCell;
use std::fmt::{self, Write};
use std::marker::PhantomData;
use std::rc::Rc;
use std::task::{Poll, Waker};
use futures::stream::{FusedStream, Stream};
static BUF_SIZE: usize = 1024;
enum BufStreamState {
Ready,
Pending(Waker),
Done,
}
struct Inner {
buf: String,
state: BufStreamState,
// This type is not send or sync.
_marker: PhantomData<Rc<()>>,
}
impl Inner {
#[inline]
const fn new() -> Self {
Self {
buf: String::new(),
state: BufStreamState::Ready,
_marker: PhantomData,
}
}
#[inline]
fn wake(&mut self) {
if let BufStreamState::Pending(ref waker) = self.state {
waker.wake_by_ref();
self.state = BufStreamState::Ready;
}
}
#[inline]
fn buf_reserve(&mut self) {
if self.buf.is_empty() {
self.buf.reserve(BUF_SIZE);
}
}
}
impl Write for Inner {
fn write_str(&mut self, s: &str) -> fmt::Result {
if s.is_empty() {
return Ok(());
}
self.wake();
if s.len() < BUF_SIZE {
self.buf_reserve();
}
self.buf.write_str(s)
}
fn write_char(&mut self, c: char) -> fmt::Result {
self.wake();
self.buf_reserve();
self.buf.write_char(c)
}
fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result {
self.wake();
self.buf_reserve();
self.buf.write_fmt(args)
}
}
/// An asynchronous [`String`] writer.
///
/// This type implements [`fmt::Write`] and can be used with [`write!`] and [`writeln!`].
pub(crate) struct BufWriter {
inner: Rc<UnsafeCell<Inner>>,
}
impl Write for BufWriter {
#[inline]
fn write_str(&mut self, s: &str) -> fmt::Result {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
inner.write_str(s)
}
#[inline]
fn write_char(&mut self, c: char) -> fmt::Result {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
inner.write_char(c)
}
#[inline]
fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
inner.write_fmt(args)
}
}
impl Drop for BufWriter {
fn drop(&mut self) {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
inner.wake();
inner.state = BufStreamState::Done;
}
}
/// An asynchronous [`String`] reader.
pub(crate) struct BufReader {
inner: Rc<UnsafeCell<Inner>>,
}
impl Stream for BufReader {
type Item = String;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
if !inner.buf.is_empty() {
let buf = std::mem::take(&mut inner.buf);
return Poll::Ready(Some(buf));
}
if let BufStreamState::Done = inner.state {
return Poll::Ready(None);
}
inner.state = BufStreamState::Pending(cx.waker().clone());
Poll::Pending
}
}
impl FusedStream for BufReader {
fn is_terminated(&self) -> bool {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &*self.inner.get() };
matches!(
(&inner.state, inner.buf.is_empty()),
(BufStreamState::Done, true)
)
}
}
/// Creates an asynchronous buffer that operates over String.
pub(crate) fn buffer() -> (BufWriter, BufReader) {
let inner = Rc::new(UnsafeCell::new(Inner::new()));
let w = {
let inner = inner.clone();
BufWriter { inner }
};
let r = BufReader { inner };
(w, r)
}

View File

@ -1,70 +0,0 @@
//! Asynchronous utilities to work with `String`s.
use std::future::Future;
use futures::future::{self, MaybeDone};
use futures::stream::{FusedStream, Stream};
use futures::StreamExt;
use pin_project::pin_project;
mod buffer;
pub(crate) use buffer::{buffer, BufReader, BufWriter};
/// A buffered asynchronous [`String`] [`Stream`].
///
/// A BufStream combines a BufWriter - BufReader pair and a resolving future that writes to the
/// buffer and polls the future alongside the buffer.
#[pin_project]
pub(crate) struct BufStream<F>
where
F: Future<Output = ()>,
{
#[pin]
resolver: MaybeDone<F>,
inner: BufReader,
}
impl<F> BufStream<F>
where
F: Future<Output = ()>,
{
/// Creates a `BufStream`.
pub fn new<C>(f: C) -> Self
where
C: FnOnce(BufWriter) -> F,
{
let (w, r) = buffer();
let resolver = future::maybe_done(f(w));
BufStream { inner: r, resolver }
}
}
impl<F> Stream for BufStream<F>
where
F: Future<Output = ()>,
{
type Item = String;
#[inline]
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
let _ = this.resolver.poll(cx);
this.inner.poll_next_unpin(cx)
}
}
impl<F> FusedStream for BufStream<F>
where
F: Future<Output = ()>,
{
#[inline]
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}

View File

@ -1,190 +0,0 @@
//! Compatibility between JavaScript Runtime and Native Runtimes.
//!
//! When designing components and libraries that works on both WebAssembly targets backed by
//! JavaScript Runtime and non-WebAssembly targets with Native Runtimes. Developers usually face
//! challenges that requires applying multiple feature flags throughout their application:
//!
//! 1. Select I/O and timers that works with the target runtime.
//! 2. Native Runtimes usually require `Send` futures and WebAssembly usually use `!Send`
//! primitives for better performance during Client-side Rendering.
//!
//! To alleviate these issues, Yew implements a single-threaded runtime that executes `?Send`
//! (`Send` or `!Send`) futures. When your application starts with `yew::Renderer` or is rendered by
//! `yew::ServerRenderer`, it is executed within the Yew runtime. On systems with multi-threading
//! support, it spawns multiple independent runtimes in a worker pool proportional to the CPU
//! core number. The renderer will randomly select a worker thread from the internal pool. All tasks
//! spawned with `spawn_local` in the application will run on the same thread as the
//! rendering thread the renderer has selected. When the renderer runs in a WebAssembly target, all
//! tasks will be scheduled on the main thread.
//!
//! This runtime is designed in favour of IO-bounded workload with similar runtime cost. It produces
//! better performance by pinning tasks to a single worker thread. However, this means that if a
//! worker thread is back-logged, other threads will not be able to "help" by running tasks
//! scheduled on the busy thread. When you have a CPU-bounded task where CPU time is significantly
//! more expensive than rendering tasks, it should be spawned with a dedicated thread or
//! `yew-agent` and communicates with the application using channels or agent bridges.
//!
//! # Runtime Backend
//!
//! Yew runtime is implemented with different runtimes depending on the target platform and can use
//! all features (timers / IO / task synchronisation) from the selected native runtime:
//!
//! - `wasm-bindgen-futures` (WebAssembly targets)
//! - `tokio` (non-WebAssembly targets)
//!
//! # Compatibility with other async runtimes
//!
//! Yew's ServerRenderer can also be executed in applications using other async runtimes(e.g.:
//! `async-std`). Rendering tasks will enter Yew runtime and be executed with `tokio`. When the
//! rendering task finishes, the result is returned to the original runtime. This process is
//! transparent to the future that executes the renderer. The Yew application still needs to use
//! `tokio`'s timer, IO and task synchronisation primitives.
use std::future::Future;
use std::io::Result;
use std::marker::PhantomData;
#[cfg(feature = "ssr")]
pub(crate) mod fmt;
pub mod pinned;
pub mod time;
#[cfg(target_arch = "wasm32")]
#[path = "rt_wasm_bindgen/mod.rs"]
mod imp;
#[cfg(all(not(target_arch = "wasm32"), feature = "tokio"))]
#[path = "rt_tokio/mod.rs"]
mod imp;
#[cfg(all(not(target_arch = "wasm32"), not(feature = "tokio")))]
#[path = "rt_none/mod.rs"]
mod imp;
/// Spawns a task on current thread.
///
/// # Panics
///
/// This function will panic when not being executed from within a Yew Application.
#[inline(always)]
pub fn spawn_local<F>(f: F)
where
F: Future<Output = ()> + 'static,
{
imp::spawn_local(f);
}
/// A Runtime Builder.
#[derive(Debug)]
pub struct RuntimeBuilder {
worker_threads: usize,
}
impl Default for RuntimeBuilder {
fn default() -> Self {
Self {
worker_threads: imp::get_default_runtime_size(),
}
}
}
impl RuntimeBuilder {
/// Creates a new Runtime Builder.
pub fn new() -> Self {
Self::default()
}
/// Sets the number of worker threads the Runtime will use.
///
/// # Default
///
/// The default number of worker threads is the number of available logical CPU cores.
///
/// # Note
///
/// This setting has no effect if current platform has no thread support (e.g.: WebAssembly).
pub fn worker_threads(&mut self, val: usize) -> &mut Self {
self.worker_threads = val;
self
}
/// Creates a Runtime.
pub fn build(&mut self) -> Result<Runtime> {
Ok(Runtime {
inner: imp::Runtime::new(self.worker_threads)?,
})
}
}
/// The Yew Runtime.
#[derive(Debug, Clone, Default)]
pub struct Runtime {
inner: imp::Runtime,
}
impl Runtime {
/// Creates a runtime Builder.
pub fn builder() -> RuntimeBuilder {
RuntimeBuilder::new()
}
/// Spawns a task with it pinned to a worker thread.
///
/// This can be used to execute non-Send futures without blocking the current thread.
///
/// [`spawn_local`] is available with tasks executed with `spawn_pinned`.
pub fn spawn_pinned<F, Fut>(&self, create_task: F)
where
F: FnOnce() -> Fut,
F: Send + 'static,
Fut: Future<Output = ()> + 'static,
{
self.inner.spawn_pinned(create_task);
}
}
/// A Local Runtime Handle.
///
/// This type can be used to acquire a runtime handle to spawn local tasks.
#[derive(Debug, Clone)]
pub struct LocalHandle {
inner: imp::LocalHandle,
// This type is not send or sync.
_marker: PhantomData<*const ()>,
}
impl LocalHandle {
/// Creates a Handle to current Runtime worker.
///
/// # Panics
///
/// This method will panic if not called within Yew Runtime.
pub fn current() -> Self {
let inner = imp::LocalHandle::current();
Self {
inner,
_marker: PhantomData,
}
}
/// Creates a Handle to current Runtime worker.
///
/// This methods will return `None` if called from outside Yew Runtime.
pub fn try_current() -> Option<Self> {
let inner = imp::LocalHandle::try_current()?;
Some(Self {
inner,
_marker: PhantomData,
})
}
/// Spawns a Future with current Runtime worker.
pub fn spawn_local<F>(&self, f: F)
where
F: Future<Output = ()> + 'static,
{
self.inner.spawn_local(f);
}
}

View File

@ -1,6 +0,0 @@
//! Task synchronisation primitives for pinned tasks.
//!
//! This module provides task synchronisation for `!Send` futures.
pub mod mpsc;
pub mod oneshot;

View File

@ -1,401 +0,0 @@
//! A multi-producer single-receiver channel.
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::rc::Rc;
use std::task::{Poll, Waker};
use futures::sink::Sink;
use futures::stream::{FusedStream, Stream};
use thiserror::Error;
/// Error returned by [`try_next`](UnboundedReceiver::try_next).
#[derive(Error, Debug)]
#[error("queue is empty")]
pub struct TryRecvError {
_marker: PhantomData<()>,
}
/// Error returned by [`send_now`](UnboundedSender::send_now).
#[derive(Error, Debug)]
#[error("failed to send")]
pub struct SendError<T> {
/// The send value.
pub inner: T,
}
/// Error returned by [`UnboundedSender`] when used as a [`Sink`](futures::sink::Sink).
#[derive(Error, Debug)]
#[error("failed to send")]
pub struct TrySendError {
_marker: PhantomData<()>,
}
#[derive(Debug)]
struct Inner<T> {
rx_waker: Option<Waker>,
closed: bool,
sender_ctr: usize,
items: VecDeque<T>,
// This type is not send or sync.
_marker: PhantomData<Rc<()>>,
}
impl<T> Inner<T> {
fn close(&mut self) {
self.closed = true;
if let Some(ref m) = self.rx_waker {
m.wake_by_ref();
}
}
}
/// The receiver of an unbounded mpsc channel.
#[derive(Debug)]
pub struct UnboundedReceiver<T> {
inner: Rc<UnsafeCell<Inner<T>>>,
}
impl<T> UnboundedReceiver<T> {
/// Try to read the next value from the channel.
///
/// This function will return:
/// - `Ok(Some(T))` if a value is ready.
/// - `Ok(None)` if the channel has become closed.
/// - `Err(TryRecvError)` if the channel is not closed and the channel is empty.
pub fn try_next(&self) -> std::result::Result<Option<T>, TryRecvError> {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions and hence uniquely owns the
// mutable reference.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
match (inner.items.pop_front(), inner.closed) {
(Some(m), _) => Ok(Some(m)),
(None, false) => Ok(None),
(None, true) => Err(TryRecvError {
_marker: PhantomData,
}),
}
}
}
impl<T> Stream for UnboundedReceiver<T> {
type Item = T;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions and hence uniquely owns the
// mutable reference.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
match (inner.items.pop_front(), inner.closed) {
(Some(m), _) => Poll::Ready(Some(m)),
(None, false) => {
inner.rx_waker = Some(cx.waker().clone());
Poll::Pending
}
(None, true) => Poll::Ready(None),
}
}
}
impl<T> FusedStream for UnboundedReceiver<T> {
fn is_terminated(&self) -> bool {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions and hence uniquely owns the
// mutable reference.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &*self.inner.get() };
inner.items.is_empty() && inner.closed
}
}
impl<T> Drop for UnboundedReceiver<T> {
fn drop(&mut self) {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions and hence uniquely owns the
// mutable reference.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
inner.close();
}
}
/// The sender of an unbounded mpsc channel.
#[derive(Debug)]
pub struct UnboundedSender<T> {
inner: Rc<UnsafeCell<Inner<T>>>,
}
impl<T> UnboundedSender<T> {
/// Sends a value to the unbounded receiver.
pub fn send_now(&self, item: T) -> Result<(), SendError<T>> {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any function that have already acquired a mutable
// reference.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
if inner.closed {
return Err(SendError { inner: item });
}
inner.items.push_back(item);
if let Some(ref m) = inner.rx_waker {
m.wake_by_ref();
}
Ok(())
}
/// Closes the channel.
///
/// Every sender (dropped or not) is considered closed when this method is called.
pub fn close_now(&self) {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any function that have already acquired a mutable
// reference.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
inner.close();
}
}
impl<T> Clone for UnboundedSender<T> {
fn clone(&self) -> Self {
let self_ = Self {
inner: self.inner.clone(),
};
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions and hence uniquely owns the
// mutable reference.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
inner.sender_ctr += 1;
self_
}
}
impl<T> Drop for UnboundedSender<T> {
fn drop(&mut self) {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions and hence uniquely owns the
// mutable reference.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
let sender_ctr = {
inner.sender_ctr -= 1;
inner.sender_ctr
};
if sender_ctr == 0 {
inner.close();
}
}
}
impl<T> Sink<T> for &'_ UnboundedSender<T> {
type Error = TrySendError;
fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.send_now(item).map_err(|_| TrySendError {
_marker: PhantomData,
})
}
fn poll_ready(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let inner = unsafe { &*self.inner.get() };
match inner.closed {
false => Poll::Ready(Ok(())),
true => Poll::Ready(Err(TrySendError {
_marker: PhantomData,
})),
}
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.close_now();
Poll::Ready(Ok(()))
}
}
/// Creates an unbounded channel.
///
/// # Note
///
/// This channel has an infinite buffer and can run out of memory if the channel is not actively
/// drained.
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let inner = Rc::new(UnsafeCell::new(Inner {
rx_waker: None,
closed: false,
sender_ctr: 1,
items: VecDeque::new(),
_marker: PhantomData,
}));
(
UnboundedSender {
inner: inner.clone(),
},
UnboundedReceiver { inner },
)
}
#[cfg(not(target_arch = "wasm32"))]
#[cfg(feature = "tokio")]
#[cfg(test)]
mod tests {
use std::time::Duration;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use tokio::task::LocalSet;
use tokio::test;
use super::*;
use crate::platform::spawn_local;
use crate::platform::time::sleep;
#[test]
async fn mpsc_works() {
let local_set = LocalSet::new();
local_set
.run_until(async {
let (tx, mut rx) = unbounded::<usize>();
spawn_local(async move {
for i in 0..10 {
(&tx).send(i).await.expect("failed to send.");
sleep(Duration::from_millis(1)).await;
}
});
for i in 0..10 {
let received = rx.next().await.expect("failed to receive");
assert_eq!(i, received);
}
assert_eq!(rx.next().await, None);
})
.await;
}
#[test]
async fn mpsc_drops_receiver() {
let (tx, rx) = unbounded::<usize>();
drop(rx);
(&tx).send(0).await.expect_err("should fail to send.");
}
#[test]
async fn mpsc_multi_sender() {
let local_set = LocalSet::new();
local_set
.run_until(async {
let (tx, mut rx) = unbounded::<usize>();
spawn_local(async move {
let tx2 = tx.clone();
for i in 0..10 {
if i % 2 == 0 {
(&tx).send(i).await.expect("failed to send.");
} else {
(&tx2).send(i).await.expect("failed to send.");
}
sleep(Duration::from_millis(1)).await;
}
drop(tx2);
for i in 10..20 {
(&tx).send(i).await.expect("failed to send.");
sleep(Duration::from_millis(1)).await;
}
});
for i in 0..20 {
let received = rx.next().await.expect("failed to receive");
assert_eq!(i, received);
}
assert_eq!(rx.next().await, None);
})
.await;
}
#[test]
async fn mpsc_drops_sender() {
let (tx, mut rx) = unbounded::<usize>();
drop(tx);
assert_eq!(rx.next().await, None);
}
}

View File

@ -1,225 +0,0 @@
//! A one-time send - receive channel.
use std::cell::UnsafeCell;
use std::future::Future;
use std::marker::PhantomData;
use std::rc::Rc;
use std::task::{Poll, Waker};
use thiserror::Error;
/// Error returned by awaiting the [`Receiver`].
#[derive(Debug, Error)]
#[error("channel has been closed.")]
pub struct RecvError {
_marker: PhantomData<()>,
}
#[derive(Debug)]
struct Inner<T> {
rx_waker: Option<Waker>,
closed: bool,
item: Option<T>,
// This type is not send or sync.
_marker: PhantomData<Rc<()>>,
}
/// The receiver of a oneshot channel.
#[derive(Debug)]
pub struct Receiver<T> {
inner: Rc<UnsafeCell<Inner<T>>>,
}
impl<T> Future for Receiver<T> {
type Output = Result<T, RecvError>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions and hence uniquely owns the
// mutable reference.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
// Implementation Note:
//
// It might be neater to use a match pattern here.
// However, this will slow down the polling process by 10%.
if let Some(m) = inner.item.take() {
return Poll::Ready(Ok(m));
}
if inner.closed {
return Poll::Ready(Err(RecvError {
_marker: PhantomData,
}));
}
inner.rx_waker = Some(cx.waker().clone());
Poll::Pending
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions and hence uniquely owns the
// mutable reference.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
inner.closed = true;
}
}
/// The sender of a oneshot channel.
#[derive(Debug)]
pub struct Sender<T> {
inner: Rc<UnsafeCell<Inner<T>>>,
}
impl<T> Sender<T> {
/// Send an item to the other side of the channel, consumes the sender.
pub fn send(self, item: T) -> Result<(), T> {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions and hence uniquely owns the
// mutable reference.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
if inner.closed {
return Err(item);
}
inner.item = Some(item);
if let Some(ref m) = inner.rx_waker {
m.wake_by_ref();
}
Ok(())
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions and hence uniquely owns the
// mutable reference.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };
inner.closed = true;
if inner.item.is_none() {
if let Some(ref m) = inner.rx_waker {
m.wake_by_ref();
}
}
}
}
/// Creates a oneshot channel.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Rc::new(UnsafeCell::new(Inner {
rx_waker: None,
closed: false,
item: None,
_marker: PhantomData,
}));
(
Sender {
inner: inner.clone(),
},
Receiver { inner },
)
}
#[cfg(not(target_arch = "wasm32"))]
#[cfg(feature = "tokio")]
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Barrier;
use tokio::task::LocalSet;
use tokio::test;
use super::*;
use crate::platform::spawn_local;
use crate::platform::time::sleep;
#[test]
async fn oneshot_works() {
let (tx, rx) = channel();
tx.send(0).expect("failed to send.");
assert_eq!(rx.await.expect("failed to receive."), 0);
}
#[test]
async fn oneshot_drops_sender() {
let local_set = LocalSet::new();
local_set
.run_until(async {
let (tx, rx) = channel::<usize>();
spawn_local(async move {
sleep(Duration::from_millis(1)).await;
drop(tx);
});
rx.await.expect_err("successful to receive.");
})
.await;
}
#[test]
async fn oneshot_drops_receiver() {
let local_set = LocalSet::new();
local_set
.run_until(async {
let (tx, rx) = channel::<usize>();
let bar = Arc::new(Barrier::new(2));
{
let bar = bar.clone();
spawn_local(async move {
sleep(Duration::from_millis(1)).await;
drop(rx);
bar.wait().await;
});
}
bar.wait().await;
tx.send(0).expect_err("successful to send.");
})
.await;
}
}

View File

@ -1,72 +0,0 @@
use std::future::Future;
use std::io;
use std::marker::PhantomData;
pub(crate) mod time;
pub(crate) fn get_default_runtime_size() -> usize {
0
}
static NO_RUNTIME_NOTICE: &str = r#"No runtime configured for this platform, \
features that requires a runtime can't be used. \
Either compile with `target_arch = "wasm32", or enable the `tokio` feature."#;
fn panic_no_runtime() -> ! {
panic!("{}", NO_RUNTIME_NOTICE);
}
#[inline(always)]
pub(super) fn spawn_local<F>(_f: F)
where
F: Future<Output = ()> + 'static,
{
panic_no_runtime();
}
#[derive(Debug, Clone)]
pub(crate) struct Runtime {}
impl Default for Runtime {
fn default() -> Self {
panic_no_runtime();
}
}
impl Runtime {
pub fn new(_size: usize) -> io::Result<Self> {
panic_no_runtime();
}
pub fn spawn_pinned<F, Fut>(&self, _create_task: F)
where
F: FnOnce() -> Fut,
F: Send + 'static,
Fut: Future<Output = ()> + 'static,
{
panic_no_runtime();
}
}
#[derive(Debug, Clone)]
pub(crate) struct LocalHandle {
// This type is not send or sync.
_marker: PhantomData<*const ()>,
}
impl LocalHandle {
pub fn try_current() -> Option<Self> {
panic_no_runtime();
}
pub fn current() -> Self {
panic_no_runtime();
}
pub fn spawn_local<F>(&self, _f: F)
where
F: Future<Output = ()> + 'static,
{
panic_no_runtime();
}
}

View File

@ -1,13 +0,0 @@
use std::time::Duration;
use futures::stream::LocalBoxStream;
use super::panic_no_runtime;
pub(crate) async fn sleep(_dur: Duration) {
panic_no_runtime();
}
pub(crate) fn interval(_dur: Duration) -> LocalBoxStream<'static, ()> {
panic_no_runtime();
}

View File

@ -1,205 +0,0 @@
//! We use a local worker implementation that does not produce a JoinHandle for spawn_pinned.
//! This avoids the cost to acquire a JoinHandle.
//!
//! See: [tokio-rs/tokio#4819](https://github.com/tokio-rs/tokio/issues/4819)
//!
//! We will not be able to produce a meaningful JoinHandle until WebAssembly targets support
//! unwinding.
use std::cell::RefCell;
use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use std::{io, thread};
static DEFAULT_WORKER_NAME: &str = "yew-runtime-worker";
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::channel::mpsc::UnboundedSender;
use futures::stream::StreamExt;
use tokio::task::{spawn_local, LocalSet};
type SpawnTask = Box<dyn Send + FnOnce()>;
thread_local! {
static TASK_COUNT: RefCell<Option<Arc<AtomicUsize>>> = RefCell::new(None);
static LOCAL_SET: LocalSet = LocalSet::new();
}
pub(crate) struct LocalWorker {
task_count: Arc<AtomicUsize>,
tx: UnboundedSender<SpawnTask>,
}
impl LocalWorker {
pub fn new() -> io::Result<Self> {
let (tx, mut rx) = futures::channel::mpsc::unbounded::<SpawnTask>();
let task_count: Arc<AtomicUsize> = Arc::default();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
{
let task_count = task_count.clone();
thread::Builder::new()
.name(DEFAULT_WORKER_NAME.into())
.spawn(move || {
TASK_COUNT.with(move |m| {
*m.borrow_mut() = Some(task_count);
});
LOCAL_SET.with(|local_set| {
local_set.block_on(&rt, async move {
while let Some(m) = rx.next().await {
m();
}
});
});
})?;
}
Ok(Self { task_count, tx })
}
pub fn task_count(&self) -> usize {
self.task_count.load(Ordering::Acquire)
}
pub fn spawn_pinned<F, Fut>(&self, f: F)
where
F: 'static + Send + FnOnce() -> Fut,
Fut: 'static + Future<Output = ()>,
{
let guard = LocalJobCountGuard::new(self.task_count.clone());
// We ignore the result upon a failure, this can never happen unless the runtime is
// exiting which all instances of Runtime will be dropped at that time and hence cannot
// spawn pinned tasks.
let _ = self.tx.unbounded_send(Box::new(move || {
spawn_local(async move {
let _guard = guard;
f().await;
});
}));
}
}
pub struct LocalJobCountGuard(Arc<AtomicUsize>);
impl LocalJobCountGuard {
fn new(inner: Arc<AtomicUsize>) -> Self {
inner.fetch_add(1, Ordering::AcqRel);
LocalJobCountGuard(inner)
}
}
impl Drop for LocalJobCountGuard {
fn drop(&mut self) {
self.0.fetch_sub(1, Ordering::AcqRel);
}
}
#[derive(Debug, Clone)]
pub(crate) struct LocalHandle {
// This type is not send or sync.
_marker: PhantomData<*const ()>,
task_count: Arc<AtomicUsize>,
}
impl LocalHandle {
pub fn try_current() -> Option<Self> {
// We cache the handle to prevent borrowing RefCell.
thread_local! {
static LOCAL_HANDLE: Option<LocalHandle> = TASK_COUNT
.with(|m| m.borrow().clone())
.map(|task_count| LocalHandle { task_count, _marker: PhantomData });
}
LOCAL_HANDLE.with(|m| m.clone())
}
pub fn current() -> Self {
Self::try_current().expect("outside of Yew runtime.")
}
pub fn spawn_local<F>(&self, f: F)
where
F: Future<Output = ()> + 'static,
{
let guard = LocalJobCountGuard::new(self.task_count.clone());
LOCAL_SET.with(move |local_set| {
local_set.spawn_local(async move {
let _guard = guard;
f.await;
})
});
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use futures::channel::oneshot;
use tokio::test;
use tokio::time::timeout;
use yew::platform::Runtime;
use super::*;
#[test]
async fn test_local_handle_exists() {
assert!(LocalHandle::try_current().is_none());
let runtime = Runtime::default();
let (tx, rx) = oneshot::channel();
runtime.spawn_pinned(move || async move {
tx.send(LocalHandle::try_current().is_some())
.expect("failed to send");
});
timeout(Duration::from_secs(5), rx)
.await
.expect("task timed out")
.expect("failed to receive");
}
#[test]
async fn test_local_handle_spawns_on_same_worker() {
assert!(LocalHandle::try_current().is_none());
let runtime = Runtime::default();
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
runtime.spawn_pinned(move || async move {
let handle = LocalHandle::current();
tx1.send(std::thread::current().id())
.expect("failed to send");
handle.spawn_local(async move {
tx2.send(std::thread::current().id())
.expect("failed to send");
})
});
let result1 = timeout(Duration::from_secs(5), rx1)
.await
.expect("task timed out")
.expect("failed to receive");
let result2 = timeout(Duration::from_secs(5), rx2)
.await
.expect("task timed out")
.expect("failed to receive");
assert_eq!(result1, result2);
}
}

View File

@ -1,181 +0,0 @@
use std::future::Future;
use std::sync::Arc;
use std::{fmt, io};
use once_cell::sync::Lazy;
pub(crate) mod time;
mod local_worker;
pub(crate) use local_worker::LocalHandle;
use local_worker::LocalWorker;
pub(crate) fn get_default_runtime_size() -> usize {
// We use num_cpus as std::thread::available_parallelism() does not take
// system resource constraint (e.g.: cgroups) into consideration.
num_cpus::get()
}
#[inline(always)]
pub(super) fn spawn_local<F>(f: F)
where
F: Future<Output = ()> + 'static,
{
match LocalHandle::try_current() {
Some(m) => {
// If within a Yew runtime, use a local handle increases the local task count.
m.spawn_local(f);
}
None => {
tokio::task::spawn_local(f);
}
}
}
#[derive(Clone)]
pub(crate) struct Runtime {
workers: Arc<Vec<LocalWorker>>,
}
impl fmt::Debug for Runtime {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Runtime")
.field("workers", &"Vec<LocalWorker>")
.finish()
}
}
impl Default for Runtime {
fn default() -> Self {
static DEFAULT_RT: Lazy<Runtime> = Lazy::new(|| {
Runtime::new(get_default_runtime_size()).expect("failed to create runtime.")
});
DEFAULT_RT.clone()
}
}
impl Runtime {
pub fn new(size: usize) -> io::Result<Self> {
assert!(size > 0, "must have more than 1 worker.");
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
let worker = LocalWorker::new()?;
workers.push(worker);
}
Ok(Self {
workers: workers.into(),
})
}
fn find_least_busy_local_worker(&self) -> &LocalWorker {
let mut workers = self.workers.iter();
let mut worker = workers.next().expect("must have more than 1 worker.");
let mut task_count = worker.task_count();
for current_worker in workers {
if task_count == 0 {
// We don't have to search until the end.
break;
}
let current_worker_task_count = current_worker.task_count();
if current_worker_task_count < task_count {
task_count = current_worker_task_count;
worker = current_worker;
}
}
worker
}
pub fn spawn_pinned<F, Fut>(&self, create_task: F)
where
F: FnOnce() -> Fut,
F: Send + 'static,
Fut: Future<Output = ()> + 'static,
{
let worker = self.find_least_busy_local_worker();
worker.spawn_pinned(create_task);
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use futures::channel::oneshot;
use tokio::sync::Barrier;
use tokio::test;
use tokio::time::timeout;
use super::*;
#[test]
async fn test_spawn_pinned_least_busy() {
let runtime = Runtime::new(2).expect("failed to create runtime.");
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let bar = Arc::new(Barrier::new(2));
{
let bar = bar.clone();
runtime.spawn_pinned(move || async move {
bar.wait().await;
tx1.send(std::thread::current().id())
.expect("failed to send!");
});
}
runtime.spawn_pinned(move || async move {
bar.wait().await;
tx2.send(std::thread::current().id())
.expect("failed to send!");
});
let result1 = timeout(Duration::from_secs(5), rx1)
.await
.expect("task timed out")
.expect("failed to receive");
let result2 = timeout(Duration::from_secs(5), rx2)
.await
.expect("task timed out")
.expect("failed to receive");
// first task and second task are not on the same thread.
assert_ne!(result1, result2);
}
#[test]
async fn test_spawn_local_within_send() {
let runtime = Runtime::default();
let (tx, rx) = oneshot::channel();
runtime.spawn_pinned(move || async move {
tokio::task::spawn(async move {
// tokio::task::spawn_local cannot spawn tasks outside of a local context.
//
// yew::platform::spawn_local can spawn tasks within a Send task as long as running
// under a Yew Runtime.
spawn_local(async move {
tx.send(()).expect("failed to send!");
})
});
});
timeout(Duration::from_secs(5), rx)
.await
.expect("task timed out")
.expect("failed to receive");
}
}

View File

@ -1,14 +0,0 @@
use std::future::Future;
use std::time::Duration;
use futures::stream::{Stream, StreamExt};
use tokio_stream::wrappers::IntervalStream;
#[inline(always)]
pub(crate) fn sleep(dur: Duration) -> impl Future<Output = ()> {
tokio::time::sleep(dur)
}
pub(crate) fn interval(dur: Duration) -> impl Stream<Item = ()> {
IntervalStream::new(tokio::time::interval(dur)).then(|_| async {})
}

View File

@ -1,56 +0,0 @@
use std::future::Future;
use std::io;
use std::marker::PhantomData;
pub(crate) mod time;
pub(crate) fn get_default_runtime_size() -> usize {
0
}
pub(super) use wasm_bindgen_futures::spawn_local;
#[derive(Debug, Clone, Default)]
pub(crate) struct Runtime {}
impl Runtime {
pub fn new(_size: usize) -> io::Result<Self> {
Ok(Self {})
}
pub fn spawn_pinned<F, Fut>(&self, create_task: F)
where
F: FnOnce() -> Fut,
F: Send + 'static,
Fut: Future<Output = ()> + 'static,
{
spawn_local(create_task())
}
}
#[derive(Debug, Clone)]
pub(crate) struct LocalHandle {
// This type is not send or sync.
_marker: PhantomData<*const ()>,
}
impl LocalHandle {
pub fn try_current() -> Option<Self> {
Some(Self {
_marker: PhantomData,
})
}
pub fn current() -> Self {
Self {
_marker: PhantomData,
}
}
pub fn spawn_local<F>(&self, f: F)
where
F: Future<Output = ()> + 'static,
{
spawn_local(f);
}
}

View File

@ -1,75 +0,0 @@
use std::cell::Cell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::Duration;
use futures::stream;
use futures::stream::Stream;
use gloo::timers::callback::Timeout;
#[inline(always)]
pub(crate) fn sleep(dur: Duration) -> impl Future<Output = ()> {
pub struct Sleep {
inner: Option<Timeout>,
dur_left: Option<u128>,
timeout_registered: Rc<Cell<bool>>,
}
impl Future for Sleep {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
static I32_MAX_U128: u128 = 2_147_483_647;
static I32_MAX_U32: u32 = 2_147_483_647;
// If polling before the registered timeout is reached, return Pending.
if self.timeout_registered.get() {
return Poll::Pending;
}
// set_timeout can only accept maximum of i32, so we wrap around if it gets longer.
let next_timeout = match self.dur_left.map(|m| (m, u32::try_from(m))) {
Some((m_u128, Err(_))) => {
self.dur_left = Some(m_u128 - I32_MAX_U128);
I32_MAX_U32
}
Some((m_u128, _)) if m_u128 > I32_MAX_U128 => {
self.dur_left = Some(m_u128 - I32_MAX_U128);
I32_MAX_U32
}
Some((_, Ok(m_u32))) => {
self.dur_left = None;
m_u32
}
None => return Poll::Ready(()),
};
let waker = cx.waker().clone();
self.timeout_registered.set(true);
let timeout_registered = self.timeout_registered.clone();
self.inner = Some(Timeout::new(next_timeout, move || {
timeout_registered.set(false);
waker.wake();
}));
Poll::Pending
}
}
Sleep {
inner: None,
dur_left: Some(dur.as_millis()),
timeout_registered: Cell::new(false).into(),
}
}
pub(crate) fn interval(dur: Duration) -> impl Stream<Item = ()> {
stream::unfold((), move |_: ()| async move {
sleep(dur).await;
Some(((), ()))
})
}

View File

@ -1,20 +0,0 @@
//! Utilities for bridging time and tasks.
use std::future::Future;
use std::time::Duration;
use futures::stream::Stream;
use crate::platform::imp::time as imp;
/// Waits until duration has elapsed.
#[inline(always)]
pub fn sleep(dur: Duration) -> impl Future<Output = ()> {
imp::sleep(dur)
}
/// Creates a Stream that yields an item after every period has elapsed.
#[inline(always)]
pub fn interval(period: Duration) -> impl Stream<Item = ()> {
imp::interval(period)
}

View File

@ -4,7 +4,7 @@ use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
/// Alias for Rc<RefCell<T>>
/// Alias for `Rc<RefCell<T>>`
pub type Shared<T> = Rc<RefCell<T>>;
/// A routine which could be run.

View File

@ -4,7 +4,7 @@ use std::marker::PhantomData;
use yew::html::ChildrenRenderer;
/// Map IntoIterator<Item=Into<T>> to Iterator<Item=T>
/// Map `IntoIterator<Item = Into<T>>` to `Iterator<Item = T>`
pub fn into_node_iter<IT, T, R>(it: IT) -> impl Iterator<Item = R>
where
IT: IntoIterator<Item = T>,

View File

@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
yew = { path = "../../packages/yew", features = ["tokio", "ssr"] }
yew = { path = "../../packages/yew", features = ["ssr"] }
function_router = { path = "../../examples/function_router" }
tokio = { version = "1.19", features = ["full"] }
jemallocator = "0.5.0"