1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
//!
//! A custom reimplementation of [tokio::io::copy](https://docs.rs/tokio/0.2.6/tokio/io/fn.copy.html)
//!

use futures::ready;
use log::{info, warn};
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::str;
use std::task::{Context, Poll};
use tokio::io;

/// Asynchronously copies the entire contents of a reader into a writer.
///
/// *difference it echos information being passed from reader to writer*
///
/// This function returns a future that will continuously read data from
/// `reader` and then write it into `writer` in a streaming fashion until
/// `reader` returns EOF.
///
/// On success, the total number of bytes that were copied from `reader` to
/// `writer` is returned.
///
/// This is an asynchronous version of [`std::io::copy`][std].
///
/// # Errors
///
/// The returned future will finish with an error will return an error
/// immediately if any call to `poll_read` or `poll_write` returns an error.
///
/// # Examples
///
/// ```
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut reader: &[u8] = b"hello";
/// let mut writer: Vec<u8> = vec![];
///
/// copy(&mut reader, &mut writer).await?;
///
/// assert_eq!(&b"hello"[..], &writer[..]);
/// # Ok(())
/// # }
/// ```
///
/// [std]: https://doc.rust-lang.org/std/io/fn.copy.html
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CopyOver<'a, R: ?Sized, W: ?Sized> {
    reader: &'a mut R,
    read_done: bool,
    writer: &'a mut W,
    pos: usize,
    cap: usize,
    amt: u64,
    buf: Box<[u8]>,
    to: &'a SocketAddr,
    from: &'a SocketAddr,
}

/// do the actual async copy from read to write
pub fn copy<'a, R, W>(
    reader: &'a mut R,
    writer: &'a mut W,
    from: &'a SocketAddr,
    to: &'a SocketAddr,
) -> CopyOver<'a, R, W>
where
    R: io::AsyncRead + Unpin + ?Sized,
    W: io::AsyncWrite + Unpin + ?Sized,
{
    CopyOver {
        reader,
        read_done: false,
        writer,
        amt: 0,
        pos: 0,
        cap: 0,
        buf: Box::new([0; 2048]),
        to: to,
        from: from,
    }
}

impl<R, W> Future for CopyOver<'_, R, W>
where
    R: io::AsyncRead + Unpin + ?Sized,
    W: io::AsyncWrite + Unpin + ?Sized,
{
    type Output = io::Result<u64>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
        loop {
            // If our buffer is empty, then we need to read some data to
            // continue.
            if self.pos == self.cap && !self.read_done {
                let me = &mut *self;
                let n = ready!(Pin::new(&mut *me.reader).poll_read(cx, &mut me.buf))?;
                if n == 0 {
                    self.read_done = true;
                } else {
                    self.pos = 0;
                    self.cap = n;
                }
            }

            {
                let me = &mut *self;
                let _ = str::from_utf8(&me.buf[me.pos..me.cap - 2])
                    .map(|string| info!("{}->{}: {} ", me.from, me.to, string))
                    .map_err(|e| warn!("{}", e));
            }
            // If our buffer has some data, let's write it out!
            while self.pos < self.cap {
                let me = &mut *self;
                let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, &me.buf[me.pos..me.cap]))?;
                if i == 0 {
                    return Poll::Ready(Err(io::Error::new(
                        io::ErrorKind::WriteZero,
                        "write zero byte into writer",
                    )));
                } else {
                    self.pos += i;
                    self.amt += i as u64;
                }
            }

            // If we've written al the data and we've seen EOF, flush out the
            // data and finish the transfer.
            // done with the entire transfer.
            if self.pos == self.cap && self.read_done {
                let me = &mut *self;
                ready!(Pin::new(&mut *me.writer).poll_flush(cx))?;
                return Poll::Ready(Ok(self.amt));
            }
        }
    }
}