English 中文(简体)
如何用Rust火箭_ws向同行传递信息
原标题:How to write a demo to forward message to peers by using Rust rocket_ws

使用Rust火箭的Im学习网,但我可以提到的参考代码很少。 火箭“ws”提供的例子代码只能向发送者传递信息。

#[get("/echo")]
fn echo(ws: ws::WebSocket) -> ws::Channel< static> {
    ws.channel(move |mut stream| Box::pin(async move {
        while let Some(message) = stream.next().await {
            let _ = stream.send(message?).await;
        }

        Ok(())
    }))
}

但我要向其他客户传达信息。 因此,你想写一下一点小mo,只是向与这一轮服务器有联系的所有同侪客户发送电 whi。

我试图利用哈希姆普,但失败。 我认为,这一解决办法根本不正确。

use futures::{SinkExt, StreamExt};
use rocket::{get, routes, State, data::IoStream};
use ws::stream::DuplexStream;
use tokio::sync::Mutex;
use std::{net::SocketAddr, sync::{Arc}, collections::HashMap};
type PeersMap = Arc<Mutex<HashMap<SocketAddr, i32>>>;

mod entity;

// static USER_MAP: Arc<Mutex<HashMap<i32, DuplexStream>>> = Arc::new(Mutex::new(HashMap::new()));

#[get("/<user_id>")]
async fn ws_test< a>(user_id: i32, socket: ws::WebSocket, user_map: & a State<Arc<Mutex<HashMap<i32, DuplexStream>>>>) -> ws::Channel< a> {

    socket.channel(move |stream| Box::pin(async move {
        user_map.lock().await.insert(user_id, stream);

        if let Some(stream) = user_map.lock().await.get_mut(&user_id) {
            while let Some(message) = stream.next().await {
                // let target = user_map.lock().unwrap().get(k);
                dbg!(&message);
                match message {
                    Ok(res) => {
                        if let ws::Message::Text(concrete) = res {
                            let received_message: entity::ReceivedMessage = serde_json::from_str(&concrete).unwrap();
                            let target_id = received_message.get_target_id().parse::<i32>().unwrap();
                            // let message = 
                            match user_map.lock().await.get_mut(&target_id) {
                                Some(target_stream) => {
                                    let _ = stream.send(ws::Message::Text(concrete.clone())).await;
                                    let _ = target_stream.send(ws::Message::Text(concrete)).await;
                                },
                                None => println!("only support text message"),
                            };
                        }
                    },
                    Err(err) => println!("{err}"),
                }
            }
        }
        Ok(())
    }))

}

#[rocket::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // let peers: PeerMap = Arc::new(Mutex::new(HashMap::new()));
    let user_map: Arc<Mutex<HashMap<i32, DuplexStream>>> = Arc::new(Mutex::new(HashMap::new()));

    rocket::build()
    .manage(user_map)
    .mount("/ws", routes![ws_test])
    .launch().await?;
    Ok(())
}
问题回答

我发现,“结构代码”可被混入思考和流中,我可以利用这些构件进行检索并妥善发送信息。 因此,我成功地完成了我的法典:

use futures::{SinkExt, StreamExt, stream::SplitSink};
use rocket::{get, routes, State};
use ws::{stream::DuplexStream, Message};
use tokio::sync::Mutex;
use std::{sync::Arc, collections::HashMap};

mod entity;

mod auth;

#[get("/<user_id>")]
async fn ws_test< a>(user_id: i32, socket: ws::WebSocket, user_map: & a State<Arc<Mutex<HashMap<i32, Arc<Mutex<SplitSink<DuplexStream, Message>>>>>>>) -> ws::Channel< a> {
    
    socket.channel(move |stream| Box::pin(async move {
        let (sender, mut receiever) = stream.split();
        let arc_sender = Arc::new(Mutex::new(sender));

        match user_map.try_lock() {
            Ok(mut res) => {
                res.insert(user_id, arc_sender.clone());
                println!("{user_id}接入成功")
            },
            Err(_) => {
                panic!("向user_map进行插入的时候无法获取互斥锁")
            },
        }
        while let Some(message) = receiever.next().await {
            match message {
                Ok(res) => {
                    if let ws::Message::Text(concrete) = res {
                        let received_message: entity::ReceivedMessage = serde_json::from_str(&concrete).unwrap();
                        let target_id = received_message.get_target_id().parse::<i32>().unwrap();
                        match user_map.try_lock() {
                            Ok(mut map) => {
                                match map.get_mut(&target_id) {
                                    Some(target_sender) => {
                                        let _ = target_sender.lock().await.send(ws::Message::Text(concrete)).await;
                                    },
                                    None => {
                                        let _ = arc_sender.clone().lock().await.send(ws::Message::Text("未找到目标".to_string())).await;
                                    },
                                };
                            },
                            Err(_) => panic!("user_map目前被锁定"),
                        }
                    }
                    else if let ws::Message::Close(_) = res {
                        println!("客户端{user_id}关闭了连接");
                        let _ = arc_sender.lock().await.close();
                        user_map.lock().await.remove(&user_id);
                    }
                },
                Err(_) => {
                    print!("处理关闭时仍在连接的客户端{user_id}...");
                    let _ = arc_sender.lock().await.close();
                    user_map.lock().await.remove(&user_id);
                    println!("完毕");
                },
            }
        };
        Ok(())
    }))

}

#[rocket::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let user_map: Arc<Mutex<HashMap<i32, Arc<Mutex<SplitSink<DuplexStream, Message>>>>>> = Arc::new(Mutex::new(HashMap::new()));

    rocket::build()
    .manage(user_map)
    .mount("/ws", routes![ws_test])
    .launch().await?;
    Ok(())
}




相关问题
correct socket.io implementation

I m trying out socket io for my project to show online friends, and I was wondering (actually, it looks kinda strange to me) that whenever I try to rerender the page (it doesn t matter if a user ...

缩略语

我可以把我们的网页连接起来,使用轮椅。

WebSocket stress testing

I would like to do some stress testing to a WebSocket based application. Anyone knows a tool that may help me in this task? Update: I forgot to mention, but I m favoring open source or free tools, ...

Web Sockets - server load

I m trying to learn new technology called Web Sockets. I ve got the setup (pywebsocket as Apache2 module) working and I m playing with examples. http://code.google.com/p/websocket-sample/wiki/samples?...

Web sockets server side processing model

To implement a server supporting clients using web sockets, do servers keep an open HTTP connection with each client? How can this scale? What are the "programming models" when implementing this ...

Will html5 websockets be crippled by firewalls?

I m extremely excited about html5 s websockets spec but I have a concern. These days everyone is operating off of some network, with routers (wired/wireless) that have built in firewalls, windows has ...

ColdFusion Socket Gateway

What is the performance like on a Socket Gateway for CF? EDIT : I meant to ask, given the way its built is it suitable for large scale applications or just demo purposes? I.e 2000+ users being ...

热门标签