зеркало из https://github.com/mozilla/gecko-dev.git
165 строки
4.9 KiB
Rust
165 строки
4.9 KiB
Rust
use futures::{Stream, StreamExt};
|
|
use std::collections::HashMap;
|
|
use std::sync::{
|
|
atomic::{AtomicUsize, Ordering},
|
|
Arc, Mutex,
|
|
};
|
|
use tokio::sync::mpsc;
|
|
use warp::{sse::ServerSentEvent, Filter};
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
pretty_env_logger::init();
|
|
|
|
// Keep track of all connected users, key is usize, value
|
|
// is an event stream sender.
|
|
let users = Arc::new(Mutex::new(HashMap::new()));
|
|
// Turn our "state" into a new Filter...
|
|
let users = warp::any().map(move || users.clone());
|
|
|
|
// POST /chat -> send message
|
|
let chat_send = warp::path("chat")
|
|
.and(warp::post())
|
|
.and(warp::path::param::<usize>())
|
|
.and(warp::body::content_length_limit(500))
|
|
.and(
|
|
warp::body::bytes().and_then(|body: bytes::Bytes| async move {
|
|
std::str::from_utf8(&body)
|
|
.map(String::from)
|
|
.map_err(|_e| warp::reject::custom(NotUtf8))
|
|
}),
|
|
)
|
|
.and(users.clone())
|
|
.map(|my_id, msg, users| {
|
|
user_message(my_id, msg, &users);
|
|
warp::reply()
|
|
});
|
|
|
|
// GET /chat -> messages stream
|
|
let chat_recv = warp::path("chat").and(warp::get()).and(users).map(|users| {
|
|
// reply using server-sent events
|
|
let stream = user_connected(users);
|
|
warp::sse::reply(warp::sse::keep_alive().stream(stream))
|
|
});
|
|
|
|
// GET / -> index html
|
|
let index = warp::path::end().map(|| {
|
|
warp::http::Response::builder()
|
|
.header("content-type", "text/html; charset=utf-8")
|
|
.body(INDEX_HTML)
|
|
});
|
|
|
|
let routes = index.or(chat_recv).or(chat_send);
|
|
|
|
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
|
|
}
|
|
|
|
/// Our global unique user id counter.
|
|
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
|
|
|
|
/// Message variants.
|
|
#[derive(Debug)]
|
|
enum Message {
|
|
UserId(usize),
|
|
Reply(String),
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct NotUtf8;
|
|
impl warp::reject::Reject for NotUtf8 {}
|
|
|
|
/// Our state of currently connected users.
|
|
///
|
|
/// - Key is their id
|
|
/// - Value is a sender of `Message`
|
|
type Users = Arc<Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
|
|
|
|
fn user_connected(
|
|
users: Users,
|
|
) -> impl Stream<Item = Result<impl ServerSentEvent + Send + 'static, warp::Error>> + Send + 'static
|
|
{
|
|
// Use a counter to assign a new unique ID for this user.
|
|
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
|
|
|
|
eprintln!("new chat user: {}", my_id);
|
|
|
|
// Use an unbounded channel to handle buffering and flushing of messages
|
|
// to the event source...
|
|
let (tx, rx) = mpsc::unbounded_channel();
|
|
|
|
tx.send(Message::UserId(my_id))
|
|
// rx is right above, so this cannot fail
|
|
.unwrap();
|
|
|
|
// Save the sender in our list of connected users.
|
|
users.lock().unwrap().insert(my_id, tx);
|
|
|
|
// Convert messages into Server-Sent Events and return resulting stream.
|
|
rx.map(|msg| match msg {
|
|
Message::UserId(my_id) => Ok((warp::sse::event("user"), warp::sse::data(my_id)).into_a()),
|
|
Message::Reply(reply) => Ok(warp::sse::data(reply).into_b()),
|
|
})
|
|
}
|
|
|
|
fn user_message(my_id: usize, msg: String, users: &Users) {
|
|
let new_msg = format!("<User#{}>: {}", my_id, msg);
|
|
|
|
// New message from this user, send it to everyone else (except same uid)...
|
|
//
|
|
// We use `retain` instead of a for loop so that we can reap any user that
|
|
// appears to have disconnected.
|
|
users.lock().unwrap().retain(|uid, tx| {
|
|
if my_id == *uid {
|
|
// don't send to same user, but do retain
|
|
true
|
|
} else {
|
|
// If not `is_ok`, the SSE stream is gone, and so don't retain
|
|
tx.send(Message::Reply(new_msg.clone())).is_ok()
|
|
}
|
|
});
|
|
}
|
|
|
|
static INDEX_HTML: &str = r#"
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>Warp Chat</title>
|
|
</head>
|
|
<body>
|
|
<h1>warp chat</h1>
|
|
<div id="chat">
|
|
<p><em>Connecting...</em></p>
|
|
</div>
|
|
<input type="text" id="text" />
|
|
<button type="button" id="send">Send</button>
|
|
<script type="text/javascript">
|
|
var uri = 'http://' + location.host + '/chat';
|
|
var sse = new EventSource(uri);
|
|
function message(data) {
|
|
var line = document.createElement('p');
|
|
line.innerText = data;
|
|
chat.appendChild(line);
|
|
}
|
|
sse.onopen = function() {
|
|
chat.innerHTML = "<p><em>Connected!</em></p>";
|
|
}
|
|
var user_id;
|
|
sse.addEventListener("user", function(msg) {
|
|
user_id = msg.data;
|
|
});
|
|
sse.onmessage = function(msg) {
|
|
message(msg.data);
|
|
};
|
|
send.onclick = function() {
|
|
var msg = text.value;
|
|
var xhr = new XMLHttpRequest();
|
|
xhr.open("POST", uri + '/' + user_id, true);
|
|
xhr.send(msg);
|
|
text.value = '';
|
|
message('<You>: ' + msg);
|
|
};
|
|
</script>
|
|
</body>
|
|
</html>
|
|
"#;
|