Handle text messages properly in web-sys websocket service (#1005)

* Handle text messages properly in web-sys websocket service

* Enable sudo mode to avoid chrome issues

* nudge

* nudge

* firefox

* nudge

* nudge
This commit is contained in:
Justin Starry 2020-03-08 22:58:40 +08:00 committed by GitHub
parent d700525959
commit 20351ce848
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 165 additions and 37 deletions

View File

@ -8,7 +8,7 @@ dist: trusty
language: rust
sudo: false
addons:
chrome: stable
firefox: latest
cache: cargo
before_cache:
@ -32,12 +32,11 @@ install:
- cargo install cargo-update || true
- cargo install-update-config --version =0.2.59 wasm-bindgen-cli
- cargo install-update --allow-no-update wasm-bindgen-cli
- LATEST_CHROMEDRIVER_VERSION=`curl -s "https://chromedriver.storage.googleapis.com/LATEST_RELEASE"`
- curl --retry 5 -LO "https://chromedriver.storage.googleapis.com/${LATEST_CHROMEDRIVER_VERSION}/chromedriver_linux64.zip"
- unzip chromedriver_linux64.zip
- curl --retry 5 -LO https://github.com/mozilla/geckodriver/releases/download/v0.26.0/geckodriver-v0.26.0-linux64.tar.gz
- tar -xzf geckodriver-v0.26.0-linux64.tar.gz
- ./ci/install_cargo_web.sh
script:
- ./ci/run_checks.sh
- CHROMEDRIVER=$(pwd)/chromedriver ./ci/run_tests.sh
- CHROMEDRIVER=$(pwd)/chromedriver ./ci/check_examples.sh
- GECKODRIVER=$(pwd)/geckodriver ./ci/run_tests.sh
- ./ci/check_examples.sh

View File

@ -3,14 +3,15 @@ echo "$(rustup default)" | grep -q "1.39.0"
emscripten_supported=$?
set -euxo pipefail # https://vaneyckt.io/posts/safer_bash_scripts_with_set_euxo_pipefail/
cargo test --target wasm32-unknown-unknown --features wasm_test,std_web
cargo test --target wasm32-unknown-unknown --features wasm_test,web_sys
if [ "$emscripten_supported" == "0" ]; then
# TODO - Emscripten builds are broken on rustc > 1.39.0
cargo web test --target asmjs-unknown-emscripten --features std_web
cargo web test --target wasm32-unknown-emscripten --features std_web
fi
cargo test --target wasm32-unknown-unknown --features wasm_test,std_web
cargo test --target wasm32-unknown-unknown --features wasm_test,web_sys
cargo test --doc --features doc_test,wasm_test,yaml,msgpack,cbor,std_web
cargo test --doc --features doc_test,wasm_test,yaml,msgpack,cbor,web_sys

View File

@ -82,6 +82,10 @@ pub enum FormatError {
/// on a WebSocket that is using a binary serialization format, like Cbor.
#[error("received text for a binary format")]
ReceivedTextForBinary,
/// Received binary for a text format, e.g. someone sending binary
/// on a WebSocket that is using a text serialization format, like Json.
#[error("received binary for a text format")]
ReceivedBinaryForText,
/// Trying to encode a binary format as text", e.g., trying to
/// store a Cbor encoded value in a String.
#[error("trying to encode a binary format as Text")]

View File

@ -3,7 +3,7 @@
use super::Task;
use crate::callback::Callback;
use crate::format::{Binary, Text};
use crate::format::{Binary, FormatError, Text};
use cfg_if::cfg_if;
use cfg_match::cfg_match;
use std::fmt;
@ -21,7 +21,7 @@ cfg_if! {
}
/// A status of a websocket connection. Used for status notification.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum WebSocketStatus {
/// Fired when a websocket connection was opened.
Opened,
@ -121,7 +121,7 @@ impl WebSocketService {
feature = "std_web" => ({
let ws = self.connect_common(url, &notification)?.0;
ws.add_event_listener(move |event: SocketMessageEvent| {
did_process_binary(&event, &callback);
process_binary(&event, &callback);
});
Ok(WebSocketTask { ws, notification })
}),
@ -129,7 +129,7 @@ impl WebSocketService {
let ConnectCommon(ws, listeners) = self.connect_common(url, &notification)?;
let listener = EventListener::new(&ws, "message", move |event: &Event| {
let event = event.dyn_ref::<MessageEvent>().unwrap();
did_process_binary(&event, &callback);
process_binary(&event, &callback);
});
WebSocketTask::new(ws, notification, listener, listeners)
}),
@ -226,32 +226,35 @@ impl WebSocketService {
struct ConnectCommon(WebSocket, #[cfg(feature = "web_sys")] [EventListener; 3]);
fn did_process_binary<OUT: 'static>(
fn process_binary<OUT: 'static>(
#[cfg(feature = "std_web")] event: &SocketMessageEvent,
#[cfg(feature = "web_sys")] event: &MessageEvent,
callback: &Callback<OUT>,
) -> bool
where
) where
OUT: From<Binary>,
{
let bytes = cfg_match! {
feature = "std_web" => event.data().into_array_buffer(),
feature = "web_sys" => Some(event.data()),
#[cfg(feature = "std_web")]
let bytes = event.data().into_array_buffer();
#[cfg(feature = "web_sys")]
let bytes = if !event.data().is_string() {
Some(event.data())
} else {
None
};
match bytes {
None => false,
Some(bytes) => {
let bytes: Vec<u8> = cfg_match! {
feature = "std_web" => bytes.into(),
feature = "web_sys" => Uint8Array::new(&bytes).to_vec(),
};
let data = Ok(bytes);
let out = OUT::from(data);
callback.emit(out);
true
}
}
let data = if let Some(bytes) = bytes {
let bytes: Vec<u8> = cfg_match! {
feature = "std_web" => bytes.into(),
feature = "web_sys" => Uint8Array::new(&bytes).to_vec(),
};
Ok(bytes)
} else {
Err(FormatError::ReceivedTextForBinary.into())
};
let out = OUT::from(data);
callback.emit(out);
}
fn process_text<OUT: 'static>(
@ -266,11 +269,14 @@ fn process_text<OUT: 'static>(
feature = "web_sys" => event.data().as_string(),
};
if let Some(text) = text {
let data = Ok(text);
let out = OUT::from(data);
callback.emit(out);
}
let data = if let Some(text) = text {
Ok(text)
} else {
Err(FormatError::ReceivedBinaryForText.into())
};
let out = OUT::from(data);
callback.emit(out);
}
fn process_both<OUT: 'static>(
@ -280,8 +286,16 @@ fn process_both<OUT: 'static>(
) where
OUT: From<Text> + From<Binary>,
{
if !did_process_binary(event, callback) {
#[cfg(feature = "std_web")]
let is_text = event.data().into_text().is_some();
#[cfg(feature = "web_sys")]
let is_text = event.data().is_string();
if is_text {
process_text(event, callback);
} else {
process_binary(event, callback);
}
}
@ -340,3 +354,113 @@ impl Drop for WebSocketTask {
}
}
}
#[cfg(test)]
#[cfg(feature = "wasm_test")]
mod tests {
use super::*;
use crate::callback::{test_util::CallbackFuture, Callback};
use crate::format::{FormatError, Json};
use serde::{Deserialize, Serialize};
use wasm_bindgen_test::{wasm_bindgen_test as test, wasm_bindgen_test_configure};
wasm_bindgen_test_configure!(run_in_browser);
#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct Message {
test: String,
}
#[test]
async fn connect() {
let url = "wss://echo.websocket.org";
let cb_future = CallbackFuture::<Json<Result<Message, anyhow::Error>>>::default();
let callback: Callback<_> = cb_future.clone().into();
let status_future = CallbackFuture::<WebSocketStatus>::default();
let notification: Callback<_> = status_future.clone().into();
let mut ws = WebSocketService::new();
let mut task = ws.connect(url, callback, notification).unwrap();
assert_eq!(status_future.await, WebSocketStatus::Opened);
let msg = Message {
test: String::from("hello"),
};
task.send(Json(&msg));
match cb_future.clone().await {
Json(Ok(received)) => assert_eq!(received, msg),
Json(Err(err)) => assert!(false, err),
}
task.send_binary(Json(&msg));
match cb_future.await {
Json(Ok(received)) => assert_eq!(received, msg),
Json(Err(err)) => assert!(false, err),
}
}
#[test]
async fn connect_text() {
let url = "wss://echo.websocket.org";
let cb_future = CallbackFuture::<Json<Result<Message, anyhow::Error>>>::default();
let callback: Callback<_> = cb_future.clone().into();
let status_future = CallbackFuture::<WebSocketStatus>::default();
let notification: Callback<_> = status_future.clone().into();
let mut ws = WebSocketService::new();
let mut task = ws.connect_text(url, callback, notification).unwrap();
assert_eq!(status_future.await, WebSocketStatus::Opened);
let msg = Message {
test: String::from("hello"),
};
task.send(Json(&msg));
match cb_future.clone().await {
Json(Ok(received)) => assert_eq!(received, msg),
Json(Err(err)) => assert!(false, err),
}
task.send_binary(Json(&msg));
match cb_future.await {
Json(Ok(received)) => assert!(false, received),
Json(Err(err)) => assert_eq!(
err.to_string(),
FormatError::ReceivedBinaryForText.to_string()
),
}
}
#[test]
async fn connect_binary() {
let url = "wss://echo.websocket.org";
let cb_future = CallbackFuture::<Json<Result<Message, anyhow::Error>>>::default();
let callback: Callback<_> = cb_future.clone().into();
let status_future = CallbackFuture::<WebSocketStatus>::default();
let notification: Callback<_> = status_future.clone().into();
let mut ws = WebSocketService::new();
let mut task = ws.connect_binary(url, callback, notification).unwrap();
assert_eq!(status_future.await, WebSocketStatus::Opened);
let msg = Message {
test: String::from("hello"),
};
task.send_binary(Json(&msg));
match cb_future.clone().await {
Json(Ok(received)) => assert_eq!(received, msg),
Json(Err(err)) => assert!(false, err),
}
task.send(Json(&msg));
match cb_future.await {
Json(Ok(received)) => assert!(false, received),
Json(Err(err)) => assert_eq!(
err.to_string(),
FormatError::ReceivedTextForBinary.to_string()
),
}
}
}