use std::sync::Arc;
use brane_ast::Workflow;
use brane_ast::ast::Edge;
use brane_ast::func_id::FunctionId;
use brane_cfg::info::Info as _;
use brane_cfg::node::{NodeConfig, NodeSpecificConfig, WorkerConfig};
use brane_exe::pc::ProgramCounter;
use brane_shr::formatters::BlockFormatter;
use enum_debug::EnumDebug as _;
use error_trace::trace;
use log::{debug, error, info};
use specifications::data::DataName;
use specifications::profiling::ProfileReport;
use specifications::registering::{CheckTransferReply, CheckTransferRequest};
use warp::hyper::StatusCode;
use warp::reject::Rejection;
use warp::reply::{self, Reply, Response};
use crate::data::assert_asset_permission;
use crate::spec::Context;
async fn check_data_or_result(name: DataName, body: CheckTransferRequest, context: Arc<Context>) -> Result<impl Reply, Rejection> {
let CheckTransferRequest { use_case, workflow, task } = body;
debug!("Loading node.yml file '{}'...", context.node_config_path.display());
let node_config: NodeConfig = match NodeConfig::from_path(&context.node_config_path) {
Ok(config) => config,
Err(err) => {
error!("{}", trace!(("Failed to load NodeConfig file"), err));
return Err(warp::reject::reject());
},
};
let worker_config: WorkerConfig = if let NodeSpecificConfig::Worker(worker) = node_config.node {
worker
} else {
error!("Given NodeConfig file '{}' does not have properties for a worker node.", context.node_config_path.display());
return Err(warp::reject::reject());
};
let report = ProfileReport::auto_reporting_file(
format!("brane-reg /check/{}/{}", if name.is_data() { "data" } else { "result" }, name.name()),
format!("brane-reg_{}_check-{}-{}", worker_config.name, if name.is_data() { "data" } else { "result" }, name.name()),
);
debug!("Parsing workflow in request body...\n\nWorkflow:\n{}\n", BlockFormatter::new(serde_json::to_string_pretty(&workflow).unwrap()));
let prep = report.time("Request parsing");
let workflow: Workflow = match serde_json::from_value(workflow) {
Ok(wf) => wf,
Err(err) => {
debug!("{}", trace!(("Given request has an invalid workflow"), err));
return Ok(warp::reply::with_status(Response::new("Invalid workflow".to_string().into()), StatusCode::BAD_REQUEST));
},
};
let task: Option<ProgramCounter> =
task.map(|t| ProgramCounter::new(if let Some(id) = t.0 { FunctionId::Func(id as usize) } else { FunctionId::Main }, t.1 as usize));
let target: String = if let Some(task) = task {
let edge: &Edge = if task.is_main() {
match workflow.graph.get(task.edge_idx) {
Some(edge) => edge,
None => {
let msg: String = format!(
"Given request has an invalid workflow: edge index {} is out-of-bounds for '<main>' of {} edges",
task.edge_idx,
workflow.graph.len()
);
debug!("{}", msg);
return Ok(warp::reply::with_status(Response::new(msg.into()), StatusCode::BAD_REQUEST));
},
}
} else {
match workflow.funcs.get(&task.func_id.id()) {
Some(graph) => match graph.get(task.edge_idx) {
Some(edge) => edge,
None => {
let msg: String = format!(
"Given request has an invalid workflow: edge index {} is out-of-bounds for '{}' of {} edges",
task.func_id,
task.edge_idx,
graph.len()
);
debug!("{}", msg);
return Ok(warp::reply::with_status(Response::new(msg.into()), StatusCode::BAD_REQUEST));
},
},
None => {
let msg: String = format!("Given request has an invalid workflow: unknown function ID '{}'", task.func_id);
debug!("{}", msg);
return Ok(warp::reply::with_status(Response::new(msg.into()), StatusCode::BAD_REQUEST));
},
}
};
if let Edge::Node { task: _, locs: _, at, input, result: _, metadata: _, next: _ } = edge {
if !input.contains_key(&name) {
let msg: String = format!("Bad request: requested dataset '{}' not part of input to node '{}'", name, task);
debug!("{}", msg);
return Ok(warp::reply::with_status(Response::new(msg.into()), StatusCode::BAD_REQUEST));
}
match at {
Some(at) => at.clone(),
None => {
let msg: String = format!("Given request has an invalid workflow: encountered unplanned Node '{}'", task);
debug!("{}", msg);
return Ok(warp::reply::with_status(Response::new(msg.into()), StatusCode::BAD_REQUEST));
},
}
} else {
let msg: String = format!("Given task ID '{}' is invalid: expected Node, got {}", task, edge.variant());
debug!("{}", msg);
return Ok(warp::reply::with_status(Response::new(msg.into()), StatusCode::BAD_REQUEST));
}
} else {
match &*workflow.user {
Some(user) => user.clone(),
None => {
debug!("Given request has an invalid workflow: no task given and no final result receiver defined");
return Ok(warp::reply::with_status(
Response::new("No task ID specified and no result receiver in workflow".to_string().into()),
StatusCode::BAD_REQUEST,
));
},
}
};
prep.stop();
match report.time_fut("Checker", assert_asset_permission(&worker_config, &use_case, &workflow, &target, name.clone(), task)).await {
Ok(None) => {
info!("Checker authorized transfer of dataset '{}' to '{}'", name, target);
let res: String = match serde_json::to_string(&CheckTransferReply { verdict: true, reasons: vec![] }) {
Ok(res) => res,
Err(err) => {
error!("{}", trace!(("Failed to serialize ChecKTransferReply"), err));
return Ok(warp::reply::with_status(
Response::new("Internal server error".to_string().into()),
StatusCode::INTERNAL_SERVER_ERROR,
));
},
};
Ok(reply::with_status(Response::new(res.into()), StatusCode::OK))
},
Ok(Some(reasons)) => {
info!("Checker denied transfer of dataset '{}' to '{}'", name, target);
if !reasons.is_empty() {
debug!("Reasons:\n{}\n", reasons.iter().map(|r| format!(" - {r}")).collect::<Vec<String>>().join("\n"));
}
let res: String = match serde_json::to_string(&CheckTransferReply { verdict: false, reasons }) {
Ok(res) => res,
Err(err) => {
error!("{}", trace!(("Failed to serialize ChecKTransferReply"), err));
return Ok(warp::reply::with_status(
Response::new("Internal server error".to_string().into()),
StatusCode::INTERNAL_SERVER_ERROR,
));
},
};
Ok(reply::with_status(Response::new(res.into()), StatusCode::OK))
},
Err(err) => {
error!("{}", trace!(("Failed to consult the checker"), err));
Err(warp::reject::reject())
},
}
}
pub async fn check_data(name: String, body: CheckTransferRequest, context: Arc<Context>) -> Result<impl Reply, Rejection> {
info!("Handling GET on `/data/check/{name}` (i.e., check transfer permission)...");
check_data_or_result(DataName::Data(name), body, context).await
}
pub async fn check_result(name: String, body: CheckTransferRequest, context: Arc<Context>) -> Result<impl Reply, Rejection> {
info!("Handling GET on `/results/check/{name}` (i.e., check transfer permission)...");
check_data_or_result(DataName::IntermediateResult(name), body, context).await
}