1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
//! Definitions and declarations of data structures relating comms

use crate::prelude::{Rx, Tx};
use crate::states::ConnStates;
use db::utils::UID;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::net::TcpStream;
use tokio::stream::Stream;
use tokio::sync::{mpsc, Mutex};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};

/// The types of message recieved
#[derive(Debug)]
pub enum Message {
    /// Message recieved from connected client
    FromClient(String),

    /// Message recieved from shared Rx
    OnRx(String),
}

/// Wrapper around connected socket, this is non-persistent data and only valid
/// within the main `process`.
#[derive(Debug)]
pub struct Client {
    /// unique id for client
    pub uid: UID,

    /// current state of connected client
    pub state: ConnStates,

    /// encodes and decodes incoming streams
    pub lines: Framed<TcpStream, LinesCodec>,

    /// socket address of connected client
    pub addr: SocketAddr,
    rx: Rx,
}

impl Client {
    /// asyncronously create a new client instance
    /// this instance of client is only valid during the duration of
    /// the time spent alive and connected to server
    pub async fn new(
        uid: UID,
        server: Arc<Mutex<Server>>,
        stream: TcpStream,
    ) -> tokio::io::Result<Self> {
        let addr = stream.peer_addr()?;
        let (tx, rx) = mpsc::unbounded_channel();
        let comms = Comms(addr, tx);
        server.lock().await.clients.insert(uid.clone(), comms);
        Ok(Self {
            uid: uid,
            state: ConnStates::AwaitingName,
            lines: Framed::new(stream, LinesCodec::new()),
            addr: addr,
            rx: rx,
        })
    }
}

impl Stream for Client {
    type Item = Result<Message, LinesCodecError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if let Poll::Ready(Some(v)) = Pin::new(&mut self.rx).poll_next(cx) {
            return Poll::Ready(Some(Ok(Message::OnRx(v))));
        }

        let result: Option<_> = futures::ready!(Pin::new(&mut self.lines).poll_next(cx));

        Poll::Ready(match result {
            Some(Ok(message)) => Some(Ok(Message::FromClient(message))),
            Some(Err(e)) => Some(Err(e)),
            None => None,
        })
    }
}

/// Server owned structure that holds each clients SocketAddr and Tx channel
#[derive(Debug)]
pub struct Comms(pub SocketAddr, pub Tx);

/// Shared ownership structure between all connected clients.
#[derive(Debug)]
pub struct Server {
    /// Holds information regarding connected clients
    pub clients: HashMap<UID, Comms>,
}

impl Server {
    /// creates shared struct between clients and actual server
    pub fn new() -> Self {
        Self {
            clients: HashMap::new(),
        }
    }

    /// helper function that broadcasts data to all connected clients
    pub async fn broadcast(&mut self, sender: SocketAddr, message: &str) {
        for (_uid, comms) in self.clients.iter_mut() {
            if comms.0 != sender {
                let _ = comms.1.send(message.into());
            } else {
                let msg = format!("You broadcasted, {}", message);
                let _ = comms.1.send(msg.into());
            }
        }
    }
}