use std::error;
use std::fmt::{Display, Formatter, Result as FResult};
use brane_ast::Workflow;
use brane_ast::ast::Edge;
use brane_ast::func_id::FunctionId;
use brane_cfg::infra::{InfraFile, InfraLocation};
use brane_exe::pc::ProgramCounter;
use brane_shr::formatters::BlockFormatter;
use enum_debug::EnumDebug as _;
use log::{debug, info};
use reqwest::{Client, Request, Response, StatusCode};
use serde_json::Value;
use specifications::address::Address;
use specifications::data::{AvailabilityKind, DataName, PreprocessKind};
use specifications::registering::{CheckTransferReply, CheckTransferRequest};
use specifications::working::{self, JobServiceClient};
use tokio::task::JoinHandle;
pub type RequestOutput = Result<Option<(String, Vec<String>)>, Error>;
#[derive(Debug)]
pub enum Error {
UnknownEdgeIdx { id: String, func_id: FunctionId, edge_idx: usize, max: usize },
UnknownExecutor { id: String, node: ProgramCounter, domain: String },
UnknownFuncId { id: String, func_id: FunctionId },
UnknownProvider { id: String, node: ProgramCounter, domain: String, dataname: DataName },
UnplannedInput { id: String, node: ProgramCounter, input: DataName },
UnplannedNode { id: String, node: ProgramCounter },
WorkflowSerialize { id: String, err: serde_json::Error },
RegistryRequest { domain: String, addr: Address, err: reqwest::Error },
RegistryRequestSend { domain: String, addr: Address, err: reqwest::Error },
RegistryResponseDownload { domain: String, addr: Address, err: reqwest::Error },
RegistryResponseFailure { domain: String, addr: Address, code: StatusCode, response: Option<String> },
RegistryResponseParse { domain: String, addr: Address, raw: String, err: serde_json::Error },
WorkerCheck { domain: String, addr: Address, err: tonic::Status },
WorkerConnect { domain: String, addr: Address, err: specifications::working::JobServiceError },
}
impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
use Error::*;
match self {
UnknownEdgeIdx { id, func_id, edge_idx, max } => {
write!(f, "Edge index {edge_idx} is out-of-bounds for function {func_id} in workflow '{id}' that has {max} edges")
},
UnknownExecutor { id, node, domain } => write!(f, "Unknown domain '{domain}' that is planned to execute task {node} in workflow '{id}'"),
UnknownFuncId { id, func_id } => write!(f, "Unknown function ID {func_id} in workflow '{id}'"),
UnknownProvider { id, node, domain, dataname } => {
write!(f, "Unknown domain '{domain}' that is planned to provide dataset '{dataname}' for task {node} in workflow '{id}'")
},
UnplannedInput { id, node, input } => write!(f, "Input '{input}' to node {node} in workflow '{id}' is unplanned"),
UnplannedNode { id, node } => write!(f, "Node {node} in workflow '{id}' is unplanned"),
WorkflowSerialize { id, .. } => write!(f, "Failed to serialize workflow '{id}' to JSON"),
RegistryRequest { domain, addr, .. } => write!(f, "Failed to build a request to registry of '{domain}' at '{addr}'"),
RegistryRequestSend { domain, addr, .. } => write!(f, "Failed to send a request to registry of '{domain}' at '{addr}'"),
RegistryResponseDownload { domain, addr, .. } => write!(f, "Failed to download response of registry of '{domain}' at '{addr}'"),
RegistryResponseFailure { domain, addr, code, response } => write!(
f,
"Registry of '{}' at '{}' returned {} ({}){}",
domain,
addr,
code.as_u16(),
code.canonical_reason().unwrap_or("???"),
if let Some(res) = response { format!("\n\nResponse:\n{}\n", BlockFormatter::new(res)) } else { String::new() }
),
RegistryResponseParse { domain, addr, raw, .. } => {
write!(f, "Failed to parse response of registry of '{}' at '{}'\n\nResponse:\n{}\n", domain, addr, BlockFormatter::new(raw))
},
WorkerCheck { domain, addr, .. } => write!(f, "Failed to send CheckRequest to worker of '{domain}' at '{addr}'"),
WorkerConnect { domain, addr, .. } => write!(f, "Failed to connect to worker of '{domain}' at '{addr}'"),
}
}
}
impl error::Error for Error {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
use Error::*;
match self {
UnknownEdgeIdx { .. } => None,
UnknownExecutor { .. } => None,
UnknownFuncId { .. } => None,
UnknownProvider { .. } => None,
UnplannedInput { .. } => None,
UnplannedNode { .. } => None,
WorkflowSerialize { err, .. } => Some(err),
RegistryRequest { err, .. } => Some(err),
RegistryRequestSend { err, .. } => Some(err),
RegistryResponseDownload { err, .. } => Some(err),
RegistryResponseFailure { .. } => None,
RegistryResponseParse { err, .. } => Some(err),
WorkerCheck { err, .. } => Some(err),
WorkerConnect { err, .. } => Some(err),
}
}
}
async fn request_workflow(checker: String, address: Address, id: String, sworkflow: String) -> RequestOutput {
info!("Spawning workflow-validation request to validate workflow '{id}' with checker '{checker}'");
let req: working::CheckWorkflowRequest = working::CheckWorkflowRequest { use_case: "central".into(), workflow: sworkflow.clone() };
debug!("[workflow '{id}' -> '{checker}'] Connecting to worker '{address}'...");
let mut client: JobServiceClient = match JobServiceClient::connect(address.to_string()).await {
Ok(client) => client,
Err(err) => return Err(Error::WorkerConnect { domain: checker, addr: address, err }),
};
debug!("[workflow '{id}' -> '{checker}'] Sending CheckRequest to worker '{address}'...");
let res: working::CheckReply = match client.check_workflow(req).await {
Ok(res) => res.into_inner(),
Err(err) => return Err(Error::WorkerCheck { domain: checker, addr: address, err }),
};
debug!("[workflow '{id}' -> '{checker}'] Worker '{address}' replied with {}", if res.verdict { "ALLOW" } else { "DENY" });
if res.verdict { Ok(None) } else { Ok(Some((checker, res.reasons))) }
}
async fn request_transfer(checker: String, address: Address, id: String, vworkflow: Value, task: ProgramCounter, data: DataName) -> RequestOutput {
info!("Spawning task-execute request to validate task '{task}' in workflow '{id}' with checker '{checker}'");
let url: String = format!("{address}/{}/check/{}", if data.is_data() { "data" } else { "results" }, data.name());
let req: CheckTransferRequest = CheckTransferRequest {
use_case: "central".into(),
workflow: vworkflow,
task: Some((if task.is_main() { None } else { Some(task.func_id.id() as u64) }, task.edge_idx as u64)),
};
debug!("[task '{id}' -> '{checker}'] Connecting to worker '{address}'...");
let client: Client = Client::new();
let req: Request = match client.get(&url).json(&req).build() {
Ok(client) => client,
Err(err) => return Err(Error::RegistryRequest { domain: checker, addr: address, err }),
};
debug!("[task '{id}' -> '{checker}'] Sending data transfer request to worker '{address}'...");
let res: Response = match client.execute(req).await {
Ok(res) => res,
Err(err) => return Err(Error::RegistryRequestSend { domain: checker, addr: address, err }),
};
if !res.status().is_success() {
return Err(Error::RegistryResponseFailure { domain: checker, addr: address, code: res.status(), response: res.text().await.ok() });
}
let res: String = match res.text().await {
Ok(res) => res,
Err(err) => return Err(Error::RegistryResponseDownload { domain: checker, addr: address, err }),
};
let res: CheckTransferReply = match serde_json::from_str(&res) {
Ok(res) => res,
Err(err) => return Err(Error::RegistryResponseParse { domain: checker, addr: address, raw: res, err }),
};
debug!("[task '{id}' -> '{checker}'] Worker '{address}' replied with {}", if res.verdict { "ALLOW" } else { "DENY" });
if res.verdict { Ok(None) } else { Ok(Some((checker, res.reasons))) }
}
async fn request_execute(checker: String, address: Address, id: String, sworkflow: String, task: ProgramCounter) -> RequestOutput {
info!("Spawning task-execute request to validate task '{task}' in workflow '{id}' with checker '{checker}'");
let req: working::CheckTaskRequest =
working::CheckTaskRequest { use_case: "central".into(), workflow: sworkflow.clone(), task_id: serde_json::to_string(&task).unwrap() };
debug!("[task '{id}' -> '{checker}'] Connecting to worker '{address}'...");
let mut client: JobServiceClient = match JobServiceClient::connect(address.to_string()).await {
Ok(client) => client,
Err(err) => return Err(Error::WorkerConnect { domain: checker, addr: address, err }),
};
debug!("[task '{id}' -> '{checker}'] Sending CheckTaskRequest to worker '{address}'...");
let res: working::CheckReply = match client.check_task(req).await {
Ok(res) => res.into_inner(),
Err(err) => return Err(Error::WorkerCheck { domain: checker, addr: address, err }),
};
debug!("[task '{id}' -> '{checker}'] Worker '{address}' replied with {}", if res.verdict { "ALLOW" } else { "DENY" });
if res.verdict { Ok(None) } else { Ok(Some((checker, res.reasons))) }
}
fn traverse_and_request(
infra: &InfraFile,
workflow: &Workflow,
vworkflow: &Value,
sworkflow: &String,
mut pc: ProgramCounter,
breakpoint: Option<ProgramCounter>,
handles: &mut Vec<(String, JoinHandle<RequestOutput>)>,
) -> Result<(), Error> {
loop {
if let Some(breakpoint) = breakpoint {
if pc == breakpoint {
return Ok(());
}
}
let edge: &Edge = if pc.is_main() {
match workflow.graph.get(pc.edge_idx) {
Some(edge) => edge,
None => {
return Err(Error::UnknownEdgeIdx {
id: workflow.id.clone(),
func_id: pc.func_id,
edge_idx: pc.edge_idx,
max: workflow.graph.len(),
});
},
}
} else {
match workflow.funcs.get(&pc.func_id.id()) {
Some(graph) => match graph.get(pc.edge_idx) {
Some(edge) => edge,
None => {
return Err(Error::UnknownEdgeIdx { id: workflow.id.clone(), func_id: pc.func_id, edge_idx: pc.edge_idx, max: graph.len() });
},
},
None => return Err(Error::UnknownFuncId { id: workflow.id.clone(), func_id: pc.func_id }),
}
};
log::trace!("Spawning requests in {:?}", edge.variant());
use Edge::*;
match edge {
Node { task: _, locs: _, at, input, result: _, metadata: _, next } => {
let at: &String = match at {
Some(at) => at,
None => return Err(Error::UnplannedNode { id: workflow.id.clone(), node: pc }),
};
for (dataname, from) in input {
match from {
Some(AvailabilityKind::Unavailable { how }) => match how {
PreprocessKind::TransferRegistryTar { location, dataname } => {
let info: &InfraLocation = match infra.get(location) {
Some(info) => info,
None => {
return Err(Error::UnknownProvider {
id: workflow.id.clone(),
node: pc,
domain: at.clone(),
dataname: dataname.clone(),
});
},
};
handles.push((
at.clone(),
tokio::spawn(request_transfer(
location.clone(),
info.registry.clone(),
workflow.id.clone(),
vworkflow.clone(),
pc,
dataname.clone(),
)),
));
},
},
Some(AvailabilityKind::Available { .. }) => continue,
None => return Err(Error::UnplannedInput { id: workflow.id.clone(), node: pc, input: dataname.clone() }),
}
}
let info: &InfraLocation = match infra.get(at) {
Some(info) => info,
None => return Err(Error::UnknownExecutor { id: workflow.id.clone(), node: pc, domain: at.clone() }),
};
handles
.push((at.clone(), tokio::spawn(request_execute(at.clone(), info.delegate.clone(), workflow.id.clone(), sworkflow.clone(), pc))));
pc = pc.jump(*next);
continue;
},
Linear { instrs: _, next } => {
pc = pc.jump(*next);
continue;
},
Stop {} => return Ok(()),
Branch { true_next, false_next, merge } => {
traverse_and_request(infra, workflow, vworkflow, sworkflow, pc.jump(*true_next), merge.map(|m| pc.jump(m)), handles)?;
if let Some(false_next) = false_next {
traverse_and_request(infra, workflow, vworkflow, sworkflow, pc.jump(*false_next), merge.map(|m| pc.jump(m)), handles)?;
}
if let Some(merge) = merge {
pc = pc.jump(*merge);
continue;
} else {
return Ok(());
}
},
Parallel { branches, merge } => {
for b in branches {
traverse_and_request(infra, workflow, vworkflow, sworkflow, pc.jump(*b), Some(pc.jump(*merge)), handles)?;
}
pc = pc.jump(*merge);
continue;
},
Join { merge: _, next } => {
pc = pc.jump(*next);
continue;
},
Loop { cond, body, next } => {
traverse_and_request(infra, workflow, vworkflow, sworkflow, pc.jump(*cond), Some(pc.jump(*body - 1)), handles)?;
traverse_and_request(infra, workflow, vworkflow, sworkflow, pc.jump(*body), Some(pc.jump(*cond)), handles)?;
if let Some(next) = next {
pc = pc.jump(*next);
continue;
} else {
return Ok(());
}
},
Call { input: _, result: _, next } => {
pc = pc.jump(*next);
continue;
},
Return { result: _ } => return Ok(()),
}
}
}
pub fn spawn_requests(infra: &InfraFile, workflow: &Workflow) -> Result<Vec<(String, JoinHandle<RequestOutput>)>, Error> {
let vworkflow: Value = match serde_json::to_value(workflow) {
Ok(swf) => swf,
Err(err) => return Err(Error::WorkflowSerialize { id: workflow.id.clone(), err }),
};
let sworkflow: String = match serde_json::to_string(&vworkflow) {
Ok(swf) => swf,
Err(err) => return Err(Error::WorkflowSerialize { id: workflow.id.clone(), err }),
};
let mut handles: Vec<(String, JoinHandle<RequestOutput>)> = Vec::with_capacity(4 * infra.len());
for (name, info) in infra {
handles.push((name.clone(), tokio::spawn(request_workflow(name.clone(), info.delegate.clone(), workflow.id.clone(), sworkflow.clone()))));
}
traverse_and_request(infra, workflow, &vworkflow, &sworkflow, ProgramCounter::start(), None, &mut handles)?;
for id in workflow.funcs.keys() {
traverse_and_request(infra, workflow, &vworkflow, &sworkflow, ProgramCounter::start_of(*id), None, &mut handles)?;
}
Ok(vec![])
}