use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use async_trait::async_trait;
use brane_ast::Workflow;
use brane_ast::func_id::FunctionId;
use brane_ast::locations::Location;
use brane_cfg::info::Info as _;
use brane_cfg::infra::InfraFile;
use brane_cfg::node::{CentralConfig, NodeConfig, NodeSpecificConfig};
use brane_exe::pc::ProgramCounter;
use brane_exe::spec::{TaskInfo, VmPlugin};
use brane_exe::{Error as VmError, FullValue, RunState, Vm};
use brane_prx::client::ProxyClient;
use brane_tsk::errors::{CommitError, ExecuteError, PreprocessError, StdoutError, StringError};
use brane_tsk::spec::{AppId, JobStatus};
use enum_debug::EnumDebug as _;
use log::{debug, info, warn};
use serde_json_any_key::MapIterToJson;
use specifications::address::Address;
use specifications::data::{AccessKind, DataName, PreprocessKind};
use specifications::profiling::ProfileScopeHandle;
use specifications::working::TransferRegistryTar;
use specifications::{driving as driving_grpc, working as working_grpc};
use tokio::sync::mpsc::Sender;
use tonic::{Response, Status, Streaming};
pub use crate::errors::RemoteVmError as Error;
use crate::planner::InstancePlanner;
use crate::spec::{GlobalState, LocalState};
macro_rules! mundane_status_update {
($state:ident, $status:expr) => {
if $status.progress_index() > $state.progress_index() {
$state = $status;
}
};
}
pub struct InstancePlugin;
#[async_trait]
impl VmPlugin for InstancePlugin {
type CommitError = CommitError;
type ExecuteError = ExecuteError;
type GlobalState = GlobalState;
type LocalState = LocalState;
type PreprocessError = PreprocessError;
type StdoutError = StdoutError;
async fn preprocess(
global: Arc<RwLock<Self::GlobalState>>,
_local: Self::LocalState,
pc: ProgramCounter,
loc: Location,
name: DataName,
preprocess: PreprocessKind,
prof: ProfileScopeHandle<'_>,
) -> Result<AccessKind, Self::PreprocessError> {
info!("Preprocessing {} '{}' on '{}' in a distributed environment...", name.variant(), name.name(), loc);
debug!("Preprocessing to be done: {:?}", preprocess);
let disk = prof.time("File loading");
let (proxy, delegate_address, workflow): (Arc<ProxyClient>, Address, String) = {
let state: RwLockReadGuard<GlobalState> = global.read().unwrap();
match state.infra.as_ref().unwrap().get(&loc) {
Some(info) => (
state.proxy.clone(),
info.delegate.clone(),
state.workflow.clone().unwrap_or_else(|| panic!("Workflow state not injected by the time the workflow is being executed")),
),
None => {
return Err(PreprocessError::UnknownLocationError { loc });
},
}
};
disk.stop();
let transfer: TransferRegistryTar = match preprocess {
PreprocessKind::TransferRegistryTar { location, dataname } => TransferRegistryTar { location, dataname: Some(dataname.into()) },
};
debug!("Sending preprocess request to job node '{}'...", delegate_address);
let job = prof.time(format!("on {delegate_address}"));
let message: working_grpc::PreprocessRequest = working_grpc::PreprocessRequest {
use_case: "central".into(),
kind: transfer,
workflow,
pc: Some(specifications::working::ProgramCounter {
func_id: if let FunctionId::Func(id) = pc.func_id { id as u64 } else { u64::MAX },
edge_idx: pc.edge_idx as u64,
}),
};
let mut client: working_grpc::JobServiceClient = match proxy.connect_to_job(delegate_address.to_string()).await {
Ok(result) => match result {
Ok(client) => client,
Err(err) => {
return Err(PreprocessError::GrpcConnectError { endpoint: delegate_address, err });
},
},
Err(err) => {
return Err(PreprocessError::ProxyError { err: Box::new(err) });
},
};
let response: Response<working_grpc::PreprocessReply> = match client.preprocess(message).await {
Ok(response) => response,
Err(err) => {
return Err(PreprocessError::GrpcRequestError { what: "PreprocessRequest", endpoint: delegate_address, err });
},
};
let result: working_grpc::PreprocessReply = response.into_inner();
job.stop();
let par = prof.time("Result parsing");
let access: AccessKind = match serde_json::from_str(&result.access) {
Ok(access) => access,
Err(err) => {
return Err(PreprocessError::AccessKindParseError { endpoint: delegate_address, raw: result.access, err });
},
};
par.stop();
Ok(access)
}
async fn execute(
global: &Arc<RwLock<Self::GlobalState>>,
_local: &Self::LocalState,
info: TaskInfo<'_>,
prof: ProfileScopeHandle<'_>,
) -> Result<Option<FullValue>, Self::ExecuteError> {
info!("Executing task '{}' at '{}' in a distributed environment...", info.name, info.location);
debug!("Package: '{}' v{}", info.package_name, info.package_version);
debug!("Input data: {:?}", info.input.keys().map(|k| format!("{k}")).collect::<Vec<String>>());
debug!("Result: {:?}", info.result);
debug!("Input arguments: {:#?}", info.args);
debug!("Requirements: {:?}", info.requirements);
let disk = prof.time("File loading");
let (proxy, delegate_address, workflow): (Arc<ProxyClient>, Address, String) = {
let state: RwLockReadGuard<GlobalState> = global.read().unwrap();
(
state.proxy.clone(),
match state.infra.as_ref().unwrap().get(info.location) {
Some(info) => info.delegate.clone(),
None => {
return Err(ExecuteError::UnknownLocationError { loc: info.location.clone() });
},
},
state.workflow.as_ref().unwrap().clone(),
)
};
disk.stop();
debug!("Sending execute request to job node '{}'...", delegate_address);
let job = prof.time(format!("on {delegate_address}"));
let message: working_grpc::ExecuteRequest = working_grpc::ExecuteRequest {
use_case: "central".into(),
workflow,
call_pc: specifications::working::ProgramCounter {
func_id: if let FunctionId::Func(id) = info.pc.func_id { id as u64 } else { u64::MAX },
edge_idx: info.pc.edge_idx as u64,
},
task_def: info.def as u64,
input: info.input.to_json_map().unwrap(),
result: info.result.clone(),
args: serde_json::to_string(&info.args).unwrap(),
};
let mut client: working_grpc::JobServiceClient = match proxy.connect_to_job(delegate_address.to_string()).await {
Ok(result) => match result {
Ok(client) => client,
Err(err) => {
return Err(ExecuteError::GrpcConnectError { endpoint: delegate_address, err });
},
},
Err(err) => {
return Err(ExecuteError::ProxyError { err: Box::new(err) });
},
};
let response: Response<Streaming<working_grpc::ExecuteReply>> = match client.execute(message).await {
Ok(response) => response,
Err(err) => {
return Err(ExecuteError::GrpcRequestError { what: "ExecuteRequest", endpoint: delegate_address, err });
},
};
let mut stream: Streaming<working_grpc::ExecuteReply> = response.into_inner();
let mut state: JobStatus = JobStatus::Unknown;
let mut result: Result<FullValue, String> = Err("No response".into());
#[allow(irrefutable_let_patterns)]
while let message = stream.message().await {
match message {
Ok(Some(reply)) => {
let status: JobStatus = match JobStatus::from_status(
match working_grpc::TaskStatus::try_from(reply.status) {
Ok(status) => status,
Err(_) => {
warn!("Unknown job status '{}' (skipping message)", reply.status);
continue;
},
},
reply.value,
) {
Ok(status) => status,
Err(err) => {
warn!("Incoming message does not have a parseable job status: {} (skipping message)", err);
continue;
},
};
debug!("Received status update: {:?}", working_grpc::TaskStatus::from(&status));
match &status {
JobStatus::Unknown => {
warn!("Received JobStatus::Unknown, which doesn't make a whole lot of sense");
},
JobStatus::Received => {
mundane_status_update!(state, status);
},
JobStatus::Authorized => {
mundane_status_update!(state, status);
},
JobStatus::Denied => {
result = Err("Permission denied".into());
state = status;
break;
},
JobStatus::AuthorizationFailed(err) => {
result = Err(err.clone());
state = status;
break;
},
JobStatus::Created => {
mundane_status_update!(state, status);
},
JobStatus::CreationFailed(err) => {
result = Err(err.clone());
state = status;
break;
},
JobStatus::Ready => {
mundane_status_update!(state, status);
},
JobStatus::Initialized => {
mundane_status_update!(state, status);
},
JobStatus::InitializationFailed(err) => {
result = Err(err.clone());
state = status;
break;
},
JobStatus::Started => {
mundane_status_update!(state, status);
},
JobStatus::StartingFailed(err) => {
result = Err(err.clone());
state = status;
break;
},
JobStatus::Heartbeat => {
mundane_status_update!(state, status);
},
JobStatus::Completed => {
mundane_status_update!(state, status);
},
JobStatus::CompletionFailed(err) => {
result = Err(err.clone());
state = status;
break;
},
JobStatus::Finished(value) => {
result = Ok(value.clone());
state = status;
break;
},
JobStatus::Stopped => {
result = Err("Job was stopped".into());
state = status;
break;
},
JobStatus::DecodingFailed(err) => {
result = Err(err.clone());
state = status;
break;
},
JobStatus::Failed(code, stdout, stderr) => {
result = Err(format!(
"Job failed with exit code {}\n\nstdout:\n{}\n{}\n{}\n\nstderr:\n{}\n{}\n{}\n",
code,
(0..80).map(|_| '-').collect::<String>(),
stdout,
(0..80).map(|_| '-').collect::<String>(),
(0..80).map(|_| '-').collect::<String>(),
stderr,
(0..80).map(|_| '-').collect::<String>()
));
state = status;
break;
},
}
},
Ok(None) => {
break;
},
Err(status) => {
result = Err(format!("Status error: {status}"));
break;
},
}
}
job.stop();
let result: FullValue = match result {
Ok(result) => result,
Err(err) => {
return Err(ExecuteError::ExecuteError {
endpoint: delegate_address,
name: info.name.into(),
status: state.into(),
err: StringError(err),
});
},
};
debug!("Task '{}' result: {:?}", info.name, result);
Ok(if let FullValue::Void = result { None } else { Some(result) })
}
async fn stdout(
global: &Arc<RwLock<Self::GlobalState>>,
_local: &Self::LocalState,
text: &str,
newline: bool,
_prof: ProfileScopeHandle<'_>,
) -> Result<(), Self::StdoutError> {
info!("Writing '{}' to stdout in a distributed environment...", text);
debug!("Newline: {}", if newline { "yes" } else { "no" });
let tx: Arc<Sender<Result<driving_grpc::ExecuteReply, Status>>> = {
let state: RwLockReadGuard<GlobalState> = global.read().unwrap();
state.tx.as_ref().expect("Missing `tx` in GlobalState; did you forget to update it before this poll?").clone()
};
if let Err(err) = tx
.send(Ok(driving_grpc::ExecuteReply {
stdout: Some(format!("{}{}", text, if newline { "\n" } else { "" })),
stderr: None,
debug: None,
value: None,
close: false,
}))
.await
{
return Err(StdoutError::TxWriteError { err });
}
Ok(())
}
async fn publicize(
_global: &Arc<RwLock<Self::GlobalState>>,
_local: &Self::LocalState,
loc: &Location,
name: &str,
path: &Path,
_prof: ProfileScopeHandle<'_>,
) -> Result<(), Self::CommitError> {
info!("Publicizing intermediate result '{}' living at '{}' in a distributed environment...", name, loc);
debug!("File: '{}'", path.display());
Ok(())
}
async fn commit(
global: &Arc<RwLock<Self::GlobalState>>,
_local: &Self::LocalState,
loc: &Location,
name: &str,
path: &Path,
data_name: &str,
prof: ProfileScopeHandle<'_>,
) -> Result<(), Self::CommitError> {
info!("Committing intermediate result '{}' living at '{}' as '{}' in a distributed environment...", name, loc, data_name);
debug!("File: '{}'", path.display());
let disk = prof.time("File loading");
let (proxy, delegate_address): (Arc<ProxyClient>, Address) = {
let state: RwLockReadGuard<GlobalState> = global.read().unwrap();
match state.infra.as_ref().unwrap().get(loc) {
Some(info) => (state.proxy.clone(), info.delegate.clone()),
None => {
return Err(CommitError::UnknownLocationError { loc: loc.clone() });
},
}
};
disk.stop();
debug!("Sending commit request to job node '{}'...", delegate_address);
let job = prof.time(format!("on {delegate_address}"));
let message: working_grpc::CommitRequest = working_grpc::CommitRequest { result_name: name.into(), data_name: data_name.into() };
let mut client: working_grpc::JobServiceClient = match proxy.connect_to_job(delegate_address.to_string()).await {
Ok(result) => match result {
Ok(client) => client,
Err(err) => {
return Err(CommitError::GrpcConnectError { endpoint: delegate_address, err });
},
},
Err(err) => {
return Err(CommitError::ProxyError { err: Box::new(err) });
},
};
let response: Response<working_grpc::CommitReply> = match client.commit(message).await {
Ok(response) => response,
Err(err) => {
return Err(CommitError::GrpcRequestError { what: "CommitRequest", endpoint: delegate_address, err });
},
};
let _: working_grpc::CommitReply = response.into_inner();
job.stop();
Ok(())
}
}
#[derive(Clone)]
pub struct InstanceVm {
state: RunState<GlobalState>,
}
impl InstanceVm {
#[inline]
pub fn new(node_config_path: impl Into<PathBuf>, app_id: AppId, proxy: Arc<ProxyClient>) -> Self {
Self {
state: Self::new_state(GlobalState { node_config_path: node_config_path.into(), app_id, proxy, infra: None, workflow: None, tx: None }),
}
}
pub async fn exec(
self,
tx: Sender<Result<driving_grpc::ExecuteReply, Status>>,
id: AppId,
workflow: Workflow,
prof: ProfileScopeHandle<'_>,
) -> (Self, Result<FullValue, Error>) {
let plr_addr: Address = {
let mut global = self.state.global.write().unwrap();
debug!("Loading node config file '{}'...", global.node_config_path.display());
let central_cfg: CentralConfig = match NodeConfig::from_path(&global.node_config_path) {
Ok(cfg) => match cfg.node {
NodeSpecificConfig::Central(central) => central,
NodeSpecificConfig::Worker(_) | NodeSpecificConfig::Proxy(_) => {
let path: PathBuf = global.node_config_path.clone();
drop(global);
return (self, Err(Error::IllegalNodeConfig { path, got: cfg.node.variant().to_string() }));
},
},
Err(err) => {
let path: PathBuf = global.node_config_path.clone();
drop(global);
return (self, Err(Error::NodeConfigLoad { path, err }));
},
};
debug!("Loading infra file '{}'...", central_cfg.paths.infra.display());
let infra: InfraFile = match InfraFile::from_path(¢ral_cfg.paths.infra) {
Ok(infra) => infra,
Err(err) => {
let path: PathBuf = global.node_config_path.clone();
drop(global);
return (self, Err(Error::InfraFileLoad { path, err }));
},
};
global.infra = Some(infra);
central_cfg.services.plr.address
};
debug!("Planning workflow on Kafka planner...");
let plan: Workflow = match prof.nest_fut("planning (brane-drv)", |scope| InstancePlanner::plan(&plr_addr, id, workflow, scope)).await {
Ok(plan) => plan,
Err(err) => {
return (self, Err(Error::PlanError { err }));
},
};
{
let mut state: RwLockWriteGuard<GlobalState> = self.state.global.write().unwrap();
state.workflow = Some(serde_json::to_string(&plan).unwrap());
state.tx = Some(Arc::new(tx));
}
let this: Arc<RwLock<Self>> = Arc::new(RwLock::new(self));
let result: Result<FullValue, VmError> = prof.nest_fut("execution", |scope| Self::run::<InstancePlugin>(this.clone(), plan, scope)).await;
let this: Self = match Arc::try_unwrap(this) {
Ok(this) => this.into_inner().unwrap(),
Err(_) => {
panic!("Could not get self back");
},
};
let value: FullValue = match result {
Ok(value) => value,
Err(err) => {
return (this, Err(Error::ExecError { err }));
},
};
(this, Ok(value))
}
}
impl Vm for InstanceVm {
type GlobalState = GlobalState;
type LocalState = LocalState;
fn store_state(this: &Arc<RwLock<Self>>, state: RunState<Self::GlobalState>) -> Result<(), VmError> {
let mut lock: RwLockWriteGuard<Self> = this.write().unwrap();
lock.state = state;
Ok(())
}
fn load_state(this: &Arc<RwLock<Self>>) -> Result<RunState<Self::GlobalState>, VmError> {
let lock: RwLockReadGuard<Self> = this.read().unwrap();
Ok(lock.state.clone())
}
}