use std::collections::HashMap;
use brane_cfg::info::Info as _;
use brane_cfg::infra::InfraFile;
use brane_cfg::node::NodeConfig;
use brane_prx::spec::NewPathRequestTlsOptions;
use log::{debug, error};
use reqwest::StatusCode;
use specifications::data::{AssetInfo, DataInfo};
use warp::http::{HeaderValue, Response};
use warp::hyper::Body;
use warp::{Rejection, Reply};
pub use crate::errors::DataError as Error;
use crate::spec::Context;
macro_rules! fail {
() => {
return Err(warp::reject::custom(Error::SecretError))
};
}
pub async fn list(context: Context) -> Result<impl Reply, Rejection> {
debug!("Handling GET on `/data/info` (i.e., list all datasets)...");
let node_config: NodeConfig = match NodeConfig::from_path(&context.node_config_path) {
Ok(config) => config,
Err(err) => {
error!("Failed to load NodeConfig file: {}", err);
return Err(warp::reject::custom(Error::SecretError));
},
};
if !node_config.node.is_central() {
error!("Provided node config file '{}' is not for a central node", context.node_config_path.display());
return Err(warp::reject::custom(Error::SecretError));
}
let infra: InfraFile = match InfraFile::from_path(&node_config.node.central().paths.infra) {
Ok(infra) => infra,
Err(err) => {
error!("{}", Error::InfrastructureOpenError { path: node_config.node.central().paths.infra.clone(), err });
return Err(warp::reject::custom(Error::SecretError));
},
};
let mut datasets: HashMap<String, DataInfo> = HashMap::new();
for (loc_name, loc) in infra {
let address: String = format!("{}/data/info", loc.registry);
let res: reqwest::Response =
match context.proxy.get(&address, Some(NewPathRequestTlsOptions { location: loc_name.clone(), use_client_auth: false })).await {
Ok(res) => match res {
Ok(res) => res,
Err(err) => {
error!("{} (skipping domain)", Error::RequestError { address, err });
continue;
},
},
Err(err) => {
error!("{} (skipping domain)", Error::ProxyError { err });
continue;
},
};
if res.status() == StatusCode::NOT_FOUND {
continue;
}
let body: String = match res.text().await {
Ok(body) => body,
Err(err) => {
error!("{} (skipping domain)", Error::ResponseBodyError { address, err });
continue;
},
};
let local_sets: HashMap<String, AssetInfo> = match serde_json::from_str(&body) {
Ok(body) => body,
Err(err) => {
debug!("Received body: \"\"\"{}\"\"\"", body);
error!("{} (skipping domain)", Error::ResponseParseError { address, err });
continue;
},
};
for (n, d) in local_sets {
if let Some(info) = datasets.get_mut(&n) {
info.access.insert(loc_name.clone(), d.access);
} else {
datasets.insert(n, d.into_data_info(loc_name.clone()));
}
}
}
let body: String = match serde_json::to_string(&datasets) {
Ok(body) => body,
Err(err) => {
error!("{}", Error::SerializeError { what: "list of all datasets", err });
fail!();
},
};
let body_len: usize = body.len();
let mut response = Response::new(Body::from(body));
response.headers_mut().insert("Content-Length", HeaderValue::from(body_len));
Ok(response)
}
pub async fn get(name: String, context: Context) -> Result<impl Reply, Rejection> {
debug!("Handling GET on `/data/info/{}` (i.e., get dataset info)...", name);
let node_config: NodeConfig = match NodeConfig::from_path(&context.node_config_path) {
Ok(config) => config,
Err(err) => {
error!("Failed to load NodeConfig file: {}", err);
return Err(warp::reject::custom(Error::SecretError));
},
};
if !node_config.node.is_central() {
error!("Provided node config file '{}' is not for a central node", context.node_config_path.display());
return Err(warp::reject::custom(Error::SecretError));
}
let infra: InfraFile = match InfraFile::from_path(&node_config.node.central().paths.infra) {
Ok(infra) => infra,
Err(err) => {
error!("{}", Error::InfrastructureOpenError { path: node_config.node.central().paths.infra.clone(), err });
return Err(warp::reject::custom(Error::SecretError));
},
};
let mut dataset: Option<DataInfo> = None;
for (loc_name, loc) in infra {
let address: String = format!("{}/data/info/{}", loc.registry, name);
let res: reqwest::Response =
match context.proxy.get(&address, Some(NewPathRequestTlsOptions { location: loc_name.clone(), use_client_auth: false })).await {
Ok(res) => match res {
Ok(res) => res,
Err(err) => {
error!("{} (skipping domain)", Error::RequestError { address, err });
continue;
},
},
Err(err) => {
error!("{} (skipping domain)", Error::ProxyError { err });
continue;
},
};
if res.status() == StatusCode::NOT_FOUND {
continue;
}
let body: String = match res.text().await {
Ok(body) => body,
Err(err) => {
error!("{} (skipping domain datasets)", Error::ResponseBodyError { address, err });
continue;
},
};
let local_set: AssetInfo = match serde_json::from_str(&body) {
Ok(body) => body,
Err(err) => {
debug!("Received body: \"\"\"{}\"\"\"", body);
error!("{} (skipping domain datasets)", Error::ResponseParseError { address, err });
continue;
},
};
if let Some(info) = &mut dataset {
info.access.insert(loc_name, local_set.access);
} else {
dataset = Some(local_set.into_data_info(loc_name));
}
}
if dataset.is_none() {
return Err(warp::reject::not_found());
}
let body: String = match serde_json::to_string(&dataset) {
Ok(body) => body,
Err(err) => {
error!("{}", Error::SerializeError { what: "dataset metadata", err });
fail!();
},
};
let body_len: usize = body.len();
let mut response = Response::new(Body::from(body));
response.headers_mut().insert("Content-Length", HeaderValue::from(body_len));
Ok(response)
}