use std::collections::HashMap;
use std::error::Error;
use std::fmt::{Display, Formatter, Result as FResult};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::{Arc, MutexGuard};
use log::{debug, error, info};
use specifications::address::Address;
use tokio::net::{TcpListener, TcpStream};
use warp::http::StatusCode;
use warp::hyper::body::Bytes;
use warp::hyper::{Body, Response};
use warp::{Rejection, Reply};
use crate::errors::RedirectError;
use crate::ports::PortAllocator;
use crate::redirect::path_server_factory;
use crate::spec::{Context, NewPathRequest, NewPathRequestTlsOptions};
macro_rules! response {
(StatusCode:: $status:ident) => {
Response::builder().status(StatusCode::$status).body(Body::empty()).unwrap()
};
}
macro_rules! reject {
($msg:expr) => {{
#[derive(Debug)]
struct InternalError;
impl Display for InternalError {
fn fmt(&self, f: &mut Formatter<'_>) -> FResult { write!(f, "An internal error has occurred.") }
}
impl Error for InternalError {}
impl warp::reject::Reject for InternalError {}
warp::reject::custom(InternalError)
}};
}
pub async fn new_outgoing_path(body: Bytes, context: Arc<Context>) -> Result<impl Reply, Rejection> {
info!("Handling POST on '/outgoing/new' (i.e., create new outgoing proxy path)...");
debug!("Parsing incoming body...");
let body: NewPathRequest = match serde_json::from_slice(&body) {
Ok(body) => body,
Err(err) => {
error!("Failed to parse incoming request body as JSON: {}", err);
return Ok(response!(StatusCode::BAD_REQUEST));
},
};
{
let opened: MutexGuard<HashMap<(String, Option<NewPathRequestTlsOptions>), u16>> = context.opened.lock().unwrap();
if let Some(port) = opened.get(&(body.address.clone(), body.tls.clone())) {
debug!("A path to '{}' with the same TLS options already exists", body.address);
debug!("OK, returning port {} to client", port);
return Ok(Response::new(Body::from(port.to_string())));
}
}
debug!("Finding available port...");
let port: u16 = {
let mut lock: MutexGuard<PortAllocator> = context.ports.lock().unwrap();
match lock.allocate() {
Some(port) => port,
None => {
error!("No more ports left in range");
return Ok(response!(StatusCode::INSUFFICIENT_STORAGE));
},
}
};
debug!("Allocating on: {}", port);
debug!("Launching service...");
let address: SocketAddr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port).into();
let server = match path_server_factory(&context, address, body.address.clone(), body.tls.clone()).await {
Ok(server) => server,
Err(err) => {
error!("Failed to create the path server: {}", err);
return Err(reject!("An internal server error has occurred."));
},
};
tokio::spawn(server);
{
let mut opened: MutexGuard<HashMap<(String, Option<NewPathRequestTlsOptions>), u16>> = context.opened.lock().unwrap();
opened.insert((body.address, body.tls), port);
}
debug!("OK, returning port {} to client", port);
Ok(Response::new(Body::from(port.to_string())))
}
pub async fn new_incoming_path(port: u16, address: Address, context: Arc<Context>) -> Result<(), RedirectError> {
debug!("Creating new incoming path on port {} to '{}'...", port, address);
if context.proxy.outgoing_range.contains(&port) {
return Err(RedirectError::PortInOutgoingRange { port, range: context.proxy.outgoing_range.clone() });
}
let socket_addr: SocketAddr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port).into();
debug!("Creating listener on '{}'", socket_addr);
let listener: TcpListener = match TcpListener::bind(socket_addr).await {
Ok(listener) => listener,
Err(err) => {
return Err(RedirectError::ListenerCreateError { address: socket_addr, err });
},
};
tokio::spawn(async move {
info!("Initialized inbound listener '>{}' to '{}'", port, address);
loop {
debug!(">{}->{}: Ready for new connection", port, address);
let (mut iconn, client_addr): (TcpStream, SocketAddr) = match listener.accept().await {
Ok(res) => res,
Err(err) => {
error!(">{}->{}: Failed to accept incoming connection: {}", port, address, err);
continue;
},
};
debug!(">{}->{}: Got new connection from '{}'", port, address, client_addr);
let addr: String = format!("{}:{}", address.domain(), address.port());
debug!("Connecting to '{}'...", addr);
let mut oconn: TcpStream = match TcpStream::connect(&addr).await {
Ok(oconn) => oconn,
Err(err) => {
error!(">{}->{}: Failed to connect to internal '{}': {}", port, address, addr, err);
continue;
},
};
debug!(">{}->{}: Bidirectional link started", port, address);
if let Err(err) = tokio::io::copy_bidirectional(&mut iconn, &mut oconn).await {
error!(">{}->{}: Bidirectional link failed: {}", port, address, err);
continue;
}
debug!(">{}->{}: Bidirectional link completed", port, address);
}
});
Ok(())
}