use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD;
use brane_ast::Workflow;
use brane_ast::locations::Location;
use brane_exe::Vm;
use brane_exe::errors::VmError;
use brane_exe::pc::ProgramCounter;
use brane_exe::spec::{RunState, TaskInfo, VmPlugin};
use brane_exe::value::FullValue;
use brane_shr::formatters::BlockFormatter;
use brane_shr::fs::copy_dir_recursively_async;
use brane_tsk::docker::{self, DockerOptions, ExecuteInfo, ImageSource, Network};
use brane_tsk::errors::{CommitError, ExecuteError, PreprocessError, StdoutError};
use brane_tsk::spec::{LOCALHOST, Planner as _};
use brane_tsk::tools::decode_base64;
use chrono::Utc;
use log::{debug, info};
use parking_lot::Mutex;
use specifications::container::{Image, VolumeBind};
use specifications::data::{AccessKind, DataIndex, DataInfo, DataName, PreprocessKind};
use specifications::package::{PackageIndex, PackageInfo};
use specifications::profiling::ProfileScopeHandle;
use tokio::fs as tfs;
use tokio::io::AsyncWriteExt;
pub use crate::errors::OfflineVmError as Error;
use crate::planner::OfflinePlanner;
use crate::spec::{GlobalState, LocalState};
pub struct OfflinePlugin;
#[async_trait::async_trait]
impl VmPlugin for OfflinePlugin {
type CommitError = CommitError;
type ExecuteError = ExecuteError;
type GlobalState = GlobalState;
type LocalState = LocalState;
type PreprocessError = PreprocessError;
type StdoutError = StdoutError;
async fn preprocess(
_global: Arc<RwLock<Self::GlobalState>>,
_local: Self::LocalState,
pc: ProgramCounter,
_loc: Location,
name: DataName,
preprocess: PreprocessKind,
_prof: ProfileScopeHandle<'_>,
) -> Result<AccessKind, Self::PreprocessError> {
info!("Preprocessing data '{name}' for call at {pc} in an offline environment");
debug!("Method of preprocessing: {preprocess:?}");
match preprocess {
PreprocessKind::TransferRegistryTar { .. } => Err(PreprocessError::UnavailableData { name }),
}
}
async fn execute(
global: &Arc<RwLock<Self::GlobalState>>,
_local: &Self::LocalState,
info: TaskInfo<'_>,
prof: ProfileScopeHandle<'_>,
) -> Result<Option<FullValue>, Self::ExecuteError> {
let mut info = info;
info!("Calling task '{}' in an offline environment", info.name);
debug!("Package: '{}', version {}", info.package_name, info.package_version);
debug!(
"Task input (data-wise): {}",
info.input.iter().map(|(name, access)| format!("'{name}' ({access:?})")).collect::<Vec<String>>().join(", ")
);
debug!("Task generates result? {}", if info.result.is_some() { "yes" } else { "no" });
let get = prof.time("Information retrieval");
let (docker_opts, package_dir, results_dir, pindex, keep_container): (DockerOptions, PathBuf, PathBuf, Arc<PackageIndex>, bool) = {
let state: RwLockReadGuard<GlobalState> = global.read().unwrap();
(state.docker_opts.clone(), state.package_dir.clone(), state.results_dir.clone(), state.pindex.clone(), state.keep_containers)
};
let pinfo: &PackageInfo =
match pindex.get(info.package_name, if info.package_version.is_latest() { None } else { Some(info.package_version) }) {
Some(pinfo) => pinfo,
None => return Err(ExecuteError::UnknownPackage { name: info.package_name.into(), version: *info.package_version }),
};
get.stop();
let binds: Vec<VolumeBind> = prof
.time_fut("argument preprocessing", docker::preprocess_args(&mut info.args, &info.input, info.result, None::<String>, results_dir))
.await?;
let params: String = match serde_json::to_string(&info.args) {
Ok(params) => params,
Err(err) => {
return Err(ExecuteError::ArgsEncodeError { err });
},
};
let image: Image = Image::new(info.package_name, Some(info.package_version), Some(pinfo.digest.as_ref().unwrap()));
let einfo: ExecuteInfo = ExecuteInfo {
name: info.name.into(),
image: image.clone(),
image_source: ImageSource::Path(package_dir.join(info.package_name).join(info.package_version.to_string()).join("image.tar")),
command: vec![
"-d".into(),
"--application-id".into(),
"test".into(),
"--location-id".into(),
"localhost".into(),
"--job-id".into(),
"1".into(),
pinfo.kind.into(),
info.name.into(),
STANDARD.encode(params),
],
binds,
network: Network::None,
capabilities: info.requirements.clone(),
};
debug!("Executing task '{}'...", info.name);
let (code, stdout, stderr) = match prof.time_fut("execution", docker::run_and_wait(docker_opts, einfo, keep_container)).await {
Ok(res) => res,
Err(err) => {
return Err(ExecuteError::DockerError { name: info.name.into(), image: Box::new(image), err });
},
};
debug!("Container return code: {}", code);
debug!("Container stdout/stderr:\n\nstdout:\n{}\n\nstderr:\n{}\n", BlockFormatter::new(&stdout), BlockFormatter::new(&stderr));
if code != 0 {
return Err(ExecuteError::ExternalCallFailed { name: info.name.into(), image: Box::new(image), code, stdout, stderr });
}
let dec = prof.time("Decoding");
let output = stdout.lines().last().unwrap_or_default().to_string();
let raw: String = decode_base64(output)?;
let value: Option<FullValue> = match serde_json::from_str(&raw) {
Ok(value) => value,
Err(err) => {
return Err(ExecuteError::JsonDecodeError { raw, err });
},
};
dec.stop();
debug!("Task '{}' returned value: '{:?}'", info.name, value);
Ok(value)
}
async fn stdout(
_global: &Arc<RwLock<Self::GlobalState>>,
_local: &Self::LocalState,
text: &str,
newline: bool,
_prof: ProfileScopeHandle<'_>,
) -> Result<(), Self::StdoutError> {
info!("Writing '{}' to stdout (newline: {}) in an offline environment...", text, if newline { "yes" } else { "no" });
if !newline {
print!("{text}");
} else {
println!("{text}");
}
Ok(())
}
async fn publicize(
_global: &Arc<RwLock<Self::GlobalState>>,
_local: &Self::LocalState,
_loc: &Location,
name: &str,
path: &Path,
_prof: ProfileScopeHandle<'_>,
) -> Result<(), Self::CommitError> {
info!("Publicizing intermediate result '{}' in an offline environment...", name);
debug!("Physical file(s): {}", path.display());
Ok(())
}
async fn commit(
global: &Arc<RwLock<Self::GlobalState>>,
_local: &Self::LocalState,
_loc: &Location,
name: &str,
path: &Path,
data_name: &str,
prof: ProfileScopeHandle<'_>,
) -> Result<(), Self::CommitError> {
info!("Committing intermediate result '{}' to '{}' in an offline environment...", name, data_name);
debug!("Physical file(s): {}", path.display());
let inf = prof.time("Information retrieval");
let (results_dir, dataset_dir, info): (PathBuf, PathBuf, Option<DataInfo>) = {
let state: RwLockReadGuard<GlobalState> = global.read().unwrap();
(state.results_dir.clone(), state.dataset_dir.clone(), state.dindex.get(data_name).cloned())
};
inf.stop();
let copy = prof.time("Copying");
if let Some(info) = info {
if let Some(access) = info.access.get(LOCALHOST) {
debug!("Dataset '{}' already exists; overwriting file...", data_name);
match access {
AccessKind::File { path: data_path } => {
if let Err(err) = copy_dir_recursively_async(results_dir.join(path), data_path).await {
return Err(CommitError::DataCopyError { err });
}
},
}
} else {
return Err(CommitError::UnavailableDataError { name: data_name.into(), locs: info.access.keys().cloned().collect() });
}
} else {
debug!("Dataset '{}' doesn't exist; creating new entry...", data_name);
let dir: PathBuf = dataset_dir.join(data_name);
if !dir.is_dir() {
if dir.exists() {
return Err(CommitError::DataDirNotADir { path: dir });
}
if let Err(err) = tfs::create_dir_all(&dir).await {
return Err(CommitError::DataDirCreateError { path: dir, err });
}
}
let info: DataInfo = DataInfo {
name: data_name.into(),
owners: None, description: None, created: Utc::now(),
access: HashMap::from([("localhost".into(), AccessKind::File { path: dir.join("data") })]),
};
let info_path: PathBuf = dir.join("data.yml");
let mut handle: tfs::File = match tfs::File::create(&info_path).await {
Ok(handle) => handle,
Err(err) => {
return Err(CommitError::DataInfoCreateError { path: info_path, err });
},
};
let sinfo: String = match serde_yaml::to_string(&info) {
Ok(sinfo) => sinfo,
Err(err) => {
return Err(CommitError::DataInfoSerializeError { err });
},
};
if let Err(err) = handle.write_all(sinfo.as_bytes()).await {
return Err(CommitError::DataInfoWriteError { path: info_path, err });
}
let source: PathBuf = results_dir.join(path);
let target: PathBuf = dir.join("data");
debug!("Copying '{}' to '{}'...", source.display(), target.display());
if let Err(err) = copy_dir_recursively_async(source, target).await {
return Err(CommitError::DataCopyError { err });
}
debug!("Dataset created successfully.");
}
copy.stop();
Ok(())
}
}
pub struct OfflineVm {
state: RunState<GlobalState>,
}
impl OfflineVm {
#[inline]
pub fn new(
docker_opts: DockerOptions,
keep_containers: bool,
package_dir: impl Into<PathBuf>,
dataset_dir: impl Into<PathBuf>,
results_dir: impl Into<PathBuf>,
package_index: Arc<PackageIndex>,
data_index: Arc<DataIndex>,
) -> Self {
Self {
state: Self::new_state(GlobalState {
docker_opts,
keep_containers,
package_dir: package_dir.into(),
dataset_dir: dataset_dir.into(),
results_dir: results_dir.into(),
pindex: package_index,
dindex: data_index,
results: Arc::new(Mutex::new(HashMap::new())),
}),
}
}
pub async fn exec(self, workflow: Workflow) -> (Self, Result<FullValue, Error>) {
let plan: Result<Workflow, Error> = {
let planner: OfflinePlanner = {
let state: RwLockReadGuard<GlobalState> = self.state.global.read().unwrap();
OfflinePlanner::new(state.dindex.clone(), state.results.clone())
};
match planner.plan(workflow).await {
Ok(plan) => Ok(plan),
Err(err) => Err(Error::PlanError { err }),
}
};
let plan: Workflow = match plan {
Ok(plan) => plan,
Err(err) => return (self, Err(err)),
};
let this: Arc<RwLock<Self>> = Arc::new(RwLock::new(self));
let result: Result<FullValue, VmError> = Self::run::<OfflinePlugin>(this.clone(), plan, ProfileScopeHandle::dummy()).await;
let this: Self = match Arc::try_unwrap(this) {
Ok(this) => this.into_inner().unwrap(),
Err(_) => {
panic!("Could not get self back");
},
};
let value: FullValue = match result {
Ok(value) => value,
Err(err) => {
return (this, Err(Error::ExecError { err }));
},
};
(this, Ok(value))
}
#[inline]
pub fn results_dir(&self) -> PathBuf { self.state.global.read().unwrap().results_dir.clone() }
}
impl Vm for OfflineVm {
type GlobalState = GlobalState;
type LocalState = LocalState;
fn store_state(this: &Arc<RwLock<Self>>, state: RunState<Self::GlobalState>) -> Result<(), brane_exe::Error> {
let mut lock: RwLockWriteGuard<Self> = this.write().unwrap();
lock.state = state;
Ok(())
}
fn load_state(this: &Arc<RwLock<Self>>) -> Result<RunState<Self::GlobalState>, brane_exe::Error> {
let lock: RwLockReadGuard<Self> = this.read().unwrap();
Ok(lock.state.clone())
}
}