initial commit

This commit is contained in:
AF 2024-03-19 11:46:45 +00:00
commit dded5403d1
4 changed files with 1196 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

1088
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

11
Cargo.toml Normal file
View File

@ -0,0 +1,11 @@
[package]
name = "protohacke-rs"
version = "0.1.0"
edition = "2021"
[dependencies]
futures = "0.3.30"
miette = { version = "5", features = ["fancy"] }
pin-project = "1"
ruchei = "0.0.72"
smol = "1.3.0"

96
src/main.rs Normal file
View File

@ -0,0 +1,96 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::{ready, AsyncRead, AsyncWrite, Sink, Stream, StreamExt};
use miette::IntoDiagnostic;
use ruchei::echo::bufferless::EchoBufferless;
async fn handle(stream: smol::io::Result<smol::net::TcpStream>) -> smol::io::Result<()> {
Forward {
stream: stream?,
write_buf: None,
}
.fuse()
.echo_bufferless()
.await
}
struct WriteBuf {
msg: Vec<u8>,
start: usize,
}
#[pin_project::pin_project]
struct Forward {
#[pin]
stream: smol::net::TcpStream,
write_buf: Option<WriteBuf>,
}
impl Stream for Forward {
type Item = smol::io::Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let mut buf = [0; 65536];
let read = ready!(this.stream.poll_read(cx, &mut buf))?;
if read == 0 {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(buf[..read].into())))
}
}
}
impl Sink<Vec<u8>> for Forward {
type Error = smol::io::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
while let Some(write_buf) = this.write_buf {
let bytes = ready!(this
.stream
.as_mut()
.poll_write(cx, &write_buf.msg[write_buf.start..]))?;
write_buf.start += bytes;
if write_buf.start == write_buf.msg.len() || bytes == 0 {
*this.write_buf = None;
}
}
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, msg: Vec<u8>) -> Result<(), Self::Error> {
if !msg.is_empty() {
*self.project().write_buf = Some(WriteBuf { msg, start: 0 });
}
Ok(())
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_ready(cx))?;
self.project().stream.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().stream.poll_close(cx)
}
}
fn main() -> miette::Result<()> {
smol::block_on(async {
smol::net::TcpListener::bind("localhost:42042")
.await
.into_diagnostic()?
.incoming()
.for_each_concurrent(None, |stream| async {
if let Err(e) = handle(stream).await.into_diagnostic() {
eprintln!("{e}");
}
})
.await;
Ok(())
})
}