use std::collections::HashSet;
use std::error::Error;
use std::fmt::{Display, Formatter, Result as FResult, Write};
use std::path::PathBuf;
use bollard::ClientVersion;
use brane_ast::func_id::FunctionId;
use brane_ast::locations::{Location, Locations};
use brane_exe::pc::ProgramCounter;
use brane_shr::formatters::{BlockFormatter, Capitalizeable};
use enum_debug::EnumDebug as _;
use reqwest::StatusCode;
use serde_json::Value;
use specifications::address::Address;
use specifications::container::Image;
use specifications::data::DataName;
use specifications::driving::ExecuteReply;
use specifications::package::Capability;
use specifications::version::Version;
use specifications::working::{ExecuteReply as TaskReply, TaskStatus};
use tonic::Status;
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct StringError(pub String);
impl Display for StringError {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> FResult { write!(f, "{}", self.0) }
}
impl Error for StringError {}
#[derive(Debug)]
pub enum TaskError {
PlanError { err: PlanError },
ExecError { err: brane_exe::errors::VmError },
}
impl Display for TaskError {
fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
use TaskError::*;
match self {
PlanError { .. } => write!(f, "Failed to plan workflow"),
ExecError { .. } => write!(f, "Failed to execute workflow"),
}
}
}
impl Error for TaskError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
use TaskError::*;
match self {
PlanError { err } => Some(err),
ExecError { err } => Some(err),
}
}
}
#[derive(Debug)]
pub enum PlanError {
InfraFileLoadError { err: brane_cfg::infra::Error },
AmbigiousLocationError { name: String, locs: Locations },
RequestError { address: String, err: reqwest::Error },
RequestFailure { address: String, code: reqwest::StatusCode, err: Option<String> },
RequestBodyError { address: String, err: reqwest::Error },
RequestParseError { address: String, raw: String, err: serde_json::Error },
UnsupportedCapabilities { task: String, loc: String, expected: HashSet<Capability>, got: HashSet<Capability> },
UnknownDataset { name: String },
UnknownIntermediateResult { name: String },
DataPlanError { err: specifications::data::RuntimeDataIndexError },
DatasetUnavailable { name: String, locs: Vec<String> },
IntermediateResultUnavailable { name: String, locs: Vec<String> },
WorkflowSerialize { id: String, err: serde_json::Error },
PlanningRequestSerialize { id: String, err: serde_json::Error },
PlanningRequest { id: String, url: String, err: reqwest::Error },
PlanningRequestSend { id: String, url: String, err: reqwest::Error },
PlanningFailure { id: String, url: String, code: StatusCode, response: Option<String> },
PlanningResponseDownload { id: String, url: String, err: reqwest::Error },
PlanningResponseParse { id: String, url: String, raw: String, err: serde_json::Error },
PlanningPlanParse { id: String, url: String, raw: Value, err: serde_json::Error },
GrpcConnectError { endpoint: Address, err: specifications::working::JobServiceError },
ProxyError { err: Box<dyn 'static + Send + Error> },
GrpcRequestError { what: &'static str, endpoint: Address, err: tonic::Status },
CheckerDenied { domain: Location, reasons: Vec<String> },
}
impl Display for PlanError {
fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
use PlanError::*;
match self {
InfraFileLoadError { .. } => write!(f, "Failed to load infrastructure file"),
AmbigiousLocationError { name, locs } => write!(
f,
"Ambigious location for task '{}': {}",
name,
if let Locations::Restricted(locs) = locs {
format!("possible locations are {}, but you need to reduce that to only 1 (use On-structs for that)", locs.join(", "))
} else {
"all locations are possible, but you need to reduce that to only 1 (use On-structs for that)".into()
}
),
RequestError { address, .. } => write!(f, "Failed to send GET-request to '{address}'"),
RequestFailure { address, code, err } => write!(
f,
"GET-request to '{}' failed with {} ({}){}",
address,
code,
code.canonical_reason().unwrap_or("???"),
if let Some(err) = err { format!("\n\nResponse:\n{}\n", BlockFormatter::new(err)) } else { String::new() }
),
RequestBodyError { address, .. } => write!(f, "Failed to get the body of response from '{address}' as UTF-8 text"),
RequestParseError { address, raw, .. } => write!(f, "Failed to parse response '{raw}' from '{address}' as valid JSON"),
UnsupportedCapabilities { task, loc, expected, got } => {
write!(f, "Location '{loc}' only supports capabilities {got:?}, whereas task '{task}' requires capabilities {expected:?}")
},
UnknownDataset { name } => write!(f, "Unknown dataset '{name}'"),
UnknownIntermediateResult { name } => write!(f, "Unknown intermediate result '{name}'"),
DataPlanError { .. } => write!(f, "Failed to plan dataset"),
DatasetUnavailable { name, locs } => write!(
f,
"Dataset '{}' is unavailable{}",
name,
if !locs.is_empty() {
format!(
"; however, locations {} do (try to get download permission to those datasets)",
locs.iter().map(|l| format!("'{l}'")).collect::<Vec<String>>().join(", ")
)
} else {
String::new()
}
),
IntermediateResultUnavailable { name, locs } => write!(
f,
"Intermediate result '{}' is unavailable{}",
name,
if !locs.is_empty() {
format!(
"; however, locations {} do (try to get download permission to those datasets)",
locs.iter().map(|l| format!("'{l}'")).collect::<Vec<String>>().join(", ")
)
} else {
String::new()
}
),
WorkflowSerialize { id, .. } => write!(f, "Failed to serialize workflow '{id}'"),
PlanningRequestSerialize { id, .. } => write!(f, "Failed to serialize planning request for workflow '{id}'"),
PlanningRequest { id, url, .. } => write!(f, "Failed to create request to plan workflow '{id}' for '{url}'"),
PlanningRequestSend { id, url, .. } => write!(f, "Failed to send request to plan workflow '{id}' to '{url}'"),
PlanningFailure { id, url, code, response } => write!(
f,
"Planner failed to plan workflow '{}' (server at '{url}' returned {} ({})){}",
id,
code.as_u16(),
code.canonical_reason().unwrap_or("???"),
if let Some(res) = response { format!("\n\nResponse:\n{}\n", BlockFormatter::new(res)) } else { String::new() }
),
PlanningResponseDownload { id, url, .. } => write!(f, "Failed to download response from '{url}' for workflow '{id}'"),
PlanningResponseParse { id, url, raw, .. } => {
write!(f, "Failed to parse response from '{}' to planning workflow '{}'\n\nResponse:\n{}\n", url, id, BlockFormatter::new(raw))
},
PlanningPlanParse { id, url, raw, .. } => write!(
f,
"Failed to parse plan returned by '{}' to plan workflow '{}'\n\nPlan:\n{}\n",
url,
id,
BlockFormatter::new(format!("{:?}", raw))
),
GrpcConnectError { endpoint, .. } => write!(f, "Failed to create gRPC connection to `brane-job` service at '{endpoint}'"),
ProxyError { .. } => write!(f, "Failed to use `brane-prx` service"),
GrpcRequestError { what, endpoint, .. } => write!(f, "Failed to send {what} over gRPC connection to `brane-job` service at '{endpoint}'"),
CheckerDenied { domain, reasons } => write!(
f,
"Checker of domain '{domain}' denied plan{}",
if !reasons.is_empty() {
format!(
"\n\nReasons:\n{}",
reasons.iter().fold(String::new(), |mut output, r| {
let _ = writeln!(output, " - {r}");
output
})
)
} else {
String::new()
}
),
}
}
}
impl Error for PlanError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
use PlanError::*;
match self {
InfraFileLoadError { err } => Some(err),
AmbigiousLocationError { .. } => None,
RequestError { err, .. } => Some(err),
RequestFailure { .. } => None,
RequestBodyError { err, .. } => Some(err),
RequestParseError { err, .. } => Some(err),
UnsupportedCapabilities { .. } => None,
UnknownDataset { .. } => None,
UnknownIntermediateResult { .. } => None,
DataPlanError { err } => Some(err),
DatasetUnavailable { .. } => None,
IntermediateResultUnavailable { .. } => None,
WorkflowSerialize { err, .. } => Some(err),
PlanningRequestSerialize { err, .. } => Some(err),
PlanningRequest { err, .. } => Some(err),
PlanningRequestSend { err, .. } => Some(err),
PlanningFailure { .. } => None,
PlanningResponseDownload { err, .. } => Some(err),
PlanningResponseParse { err, .. } => Some(err),
PlanningPlanParse { err, .. } => Some(err),
GrpcConnectError { err, .. } => Some(err),
ProxyError { err } => Some(&**err),
GrpcRequestError { err, .. } => Some(err),
CheckerDenied { .. } => None,
}
}
}
#[derive(Debug)]
pub enum PreprocessError {
UnavailableData { name: DataName },
NodeConfigReadError { path: PathBuf, err: brane_cfg::info::YamlError },
InfraReadError { path: PathBuf, err: brane_cfg::infra::Error },
UnknownLocationError { loc: Location },
ProxyError { err: Box<dyn 'static + Send + Sync + Error> },
GrpcConnectError { endpoint: Address, err: specifications::working::Error },
GrpcRequestError { what: &'static str, endpoint: Address, err: tonic::Status },
AccessKindParseError { endpoint: Address, raw: String, err: serde_json::Error },
FileReadError { what: &'static str, path: PathBuf, err: std::io::Error },
IdentityFileError { path: PathBuf, err: reqwest::Error },
CertificateError { path: PathBuf, err: reqwest::Error },
LocationResolve { id: String, err: crate::caches::DomainRegistryCacheError },
DirNotADirError { what: &'static str, path: PathBuf },
DirNotExistsError { what: &'static str, path: PathBuf },
DirRemoveError { what: &'static str, path: PathBuf, err: std::io::Error },
DirCreateError { what: &'static str, path: PathBuf, err: std::io::Error },
ProxyCreateError { address: Address, err: reqwest::Error },
ClientCreateError { err: reqwest::Error },
DownloadRequestError { address: String, err: reqwest::Error },
DownloadRequestFailure { address: String, code: StatusCode, message: Option<String> },
DownloadStreamError { address: String, err: reqwest::Error },
TarCreateError { path: PathBuf, err: std::io::Error },
TarOpenError { path: PathBuf, err: std::io::Error },
TarWriteError { path: PathBuf, err: std::io::Error },
DataExtractError { err: brane_shr::fs::Error },
AccessKindSerializeError { err: serde_json::Error },
BackendFileError { err: brane_cfg::backend::Error },
UnsupportedBackend { what: &'static str },
}
impl Display for PreprocessError {
fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
use self::PreprocessError::*;
match self {
UnavailableData { name } => write!(f, "{} '{}' is not available locally", name.variant(), name.name()),
NodeConfigReadError { path, .. } => write!(f, "Failed to load node config file '{}'", path.display()),
InfraReadError { path, .. } => write!(f, "Failed to load infrastructure file '{}'", path.display()),
UnknownLocationError { loc } => write!(f, "Unknown location '{loc}'"),
ProxyError { .. } => write!(f, "Failed to prepare proxy service"),
GrpcConnectError { endpoint, .. } => write!(f, "Failed to start gRPC connection with delegate node '{endpoint}'"),
GrpcRequestError { what, endpoint, .. } => write!(f, "Failed to send {what} request to delegate node '{endpoint}'"),
AccessKindParseError { endpoint, raw, .. } => {
write!(f, "Failed to parse access kind '{raw}' sent by remote delegate '{endpoint}'")
},
FileReadError { what, path, .. } => write!(f, "Failed to read {} file '{}'", what, path.display()),
IdentityFileError { path, .. } => write!(f, "Failed to parse identity file '{}'", path.display()),
CertificateError { path, .. } => write!(f, "Failed to parse certificate '{}'", path.display()),
LocationResolve { id, .. } => write!(f, "Failed to resolve location ID '{id}' to a local registry address"),
DirNotADirError { what, path } => write!(f, "{} directory '{}' is not a directory", what.capitalize(), path.display()),
DirNotExistsError { what, path } => write!(f, "{} directory '{}' doesn't exist", what.capitalize(), path.display()),
DirRemoveError { what, path, .. } => write!(f, "Failed to remove {} directory '{}'", what, path.display()),
DirCreateError { what, path, .. } => write!(f, "Failed to create {} directory '{}'", what, path.display()),
ProxyCreateError { address, .. } => write!(f, "Failed to create proxy to '{address}'"),
ClientCreateError { .. } => write!(f, "Failed to create HTTP-client"),
DownloadRequestError { address, .. } => write!(f, "Failed to send GET download request to '{address}'"),
DownloadRequestFailure { address, code, message } => write!(
f,
"GET download request to '{}' failed with status code {} ({}){}",
address,
code,
code.canonical_reason().unwrap_or("???"),
if let Some(message) = message { format!(": {message}") } else { String::new() }
),
DownloadStreamError { address, .. } => write!(f, "Failed to get next chunk in download stream from '{address}'"),
TarCreateError { path, .. } => write!(f, "Failed to create tarball file '{}'", path.display()),
TarOpenError { path, .. } => write!(f, "Failed to re-open tarball file '{}'", path.display()),
TarWriteError { path, .. } => write!(f, "Failed to write to tarball file '{}'", path.display()),
DataExtractError { .. } => write!(f, "Failed to extract dataset"),
AccessKindSerializeError { .. } => write!(f, "Failed to serialize the given AccessKind"),
BackendFileError { .. } => write!(f, "Failed to load backend file"),
UnsupportedBackend { what } => write!(f, "Backend type '{what}' is not (yet) supported"),
}
}
}
impl Error for PreprocessError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
use PreprocessError::*;
match self {
UnavailableData { .. } => None,
NodeConfigReadError { err, .. } => Some(err),
InfraReadError { err, .. } => Some(err),
UnknownLocationError { .. } => None,
ProxyError { err } => Some(&**err),
GrpcConnectError { err, .. } => Some(err),
GrpcRequestError { err, .. } => Some(err),
AccessKindParseError { err, .. } => Some(err),
FileReadError { err, .. } => Some(err),
IdentityFileError { err, .. } => Some(err),
CertificateError { err, .. } => Some(err),
LocationResolve { err, .. } => Some(err),
DirNotADirError { .. } => None,
DirNotExistsError { .. } => None,
DirRemoveError { err, .. } => Some(err),
DirCreateError { err, .. } => Some(err),
ProxyCreateError { err, .. } => Some(err),
ClientCreateError { err } => Some(err),
DownloadRequestError { err, .. } => Some(err),
DownloadRequestFailure { .. } => None,
DownloadStreamError { err, .. } => Some(err),
TarCreateError { err, .. } => Some(err),
TarOpenError { err, .. } => Some(err),
TarWriteError { err, .. } => Some(err),
DataExtractError { err } => Some(err),
AccessKindSerializeError { err } => Some(err),
BackendFileError { err } => Some(err),
UnsupportedBackend { .. } => None,
}
}
}
#[derive(Debug)]
pub enum ExecuteError {
UnknownPackage { name: String, version: Version },
UnknownData { name: DataName },
ArgsEncodeError { err: serde_json::Error },
ExternalCallFailed { name: String, image: Box<Image>, code: i32, stdout: String, stderr: String },
Base64DecodeError { raw: String, err: base64::DecodeError },
Utf8DecodeError { raw: String, err: std::string::FromUtf8Error },
JsonDecodeError { raw: String, err: serde_json::Error },
VolumeBindError { err: specifications::container::VolumeBindError },
ResultDirNotADir { path: PathBuf },
ResultDirRemoveError { path: PathBuf, err: std::io::Error },
ResultDirCreateError { path: PathBuf, err: std::io::Error },
DockerError { name: String, image: Box<Image>, err: DockerError },
StatusEmptyStringError { status: TaskStatus },
StatusValueParseError { status: TaskStatus, raw: String, err: serde_json::Error },
StatusTripletParseError { status: TaskStatus, raw: String, err: serde_json::Error },
ClientUpdateError { status: TaskStatus, err: tokio::sync::mpsc::error::SendError<Result<TaskReply, Status>> },
NodeConfigReadError { path: PathBuf, err: brane_cfg::info::YamlError },
InfraReadError { path: PathBuf, err: brane_cfg::infra::Error },
UnknownLocationError { loc: Location },
ProxyError { err: Box<dyn 'static + Send + Sync + Error> },
GrpcConnectError { endpoint: Address, err: specifications::working::Error },
GrpcRequestError { what: &'static str, endpoint: Address, err: tonic::Status },
ExecuteError { endpoint: Address, name: String, status: TaskStatus, err: StringError },
DigestReadError { path: PathBuf, err: std::io::Error },
DigestError { path: PathBuf, err: DockerError },
ProxyCreateError { address: Address, err: reqwest::Error },
ClientCreateError { err: reqwest::Error },
DownloadRequestError { address: String, err: reqwest::Error },
DownloadRequestFailure { address: String, code: StatusCode, message: Option<String> },
DownloadStreamError { address: String, err: reqwest::Error },
ImageCreateError { path: PathBuf, err: std::io::Error },
ImageWriteError { path: PathBuf, err: std::io::Error },
IdWriteError { path: PathBuf, err: std::io::Error },
IdReadError { path: PathBuf, err: std::io::Error },
HashError { err: DockerError },
HashWriteError { path: PathBuf, err: std::io::Error },
HashReadError { path: PathBuf, err: std::io::Error },
AuthorizationFailure { checker: Address },
AuthorizationError { checker: Address, err: AuthorizeError },
PackageIndexError { endpoint: String, err: ApiError },
BackendFileError { path: PathBuf, err: brane_cfg::backend::Error },
}
impl Display for ExecuteError {
fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
use self::ExecuteError::*;
match self {
UnknownPackage { name, version } => write!(f, "Unknown package '{name}' (or it does not have version {version})"),
UnknownData { name } => write!(f, "Unknown {} '{}'", name.variant(), name.name()),
ArgsEncodeError { .. } => write!(f, "Failed to serialize input arguments"),
ExternalCallFailed { name, image, code, stdout, stderr } => write!(
f,
"Task '{}' (image '{}') failed with exit code {}\n\n{}\n\n{}\n\n",
name,
image,
code,
BlockFormatter::new(stdout),
BlockFormatter::new(stderr)
),
Base64DecodeError { raw, .. } => {
write!(f, "Failed to decode the following task output as valid Base64:\n{}\n\n", BlockFormatter::new(raw))
},
Utf8DecodeError { raw, .. } => {
write!(f, "Failed to decode the following task output as valid UTF-8:\n{}\n\n", BlockFormatter::new(raw))
},
JsonDecodeError { raw, .. } => {
write!(f, "Failed to decode the following task output as valid JSON:\n{}\n\n", BlockFormatter::new(raw))
},
VolumeBindError { .. } => write!(f, "Failed to create VolumeBind"),
ResultDirNotADir { path } => write!(f, "Result directory '{}' exists but is not a directory", path.display()),
ResultDirRemoveError { path, .. } => write!(f, "Failed to remove existing result directory '{}'", path.display()),
ResultDirCreateError { path, .. } => write!(f, "Failed to create result directory '{}'", path.display()),
DockerError { name, image, .. } => write!(f, "Failed to execute task '{name}' (image '{image}') as a Docker container"),
StatusEmptyStringError { status } => write!(f, "Incoming status update {status:?} is missing mandatory `value` field"),
StatusValueParseError { status, raw, .. } => {
write!(f, "Failed to parse '{raw}' as a FullValue in incoming status update {status:?}")
},
StatusTripletParseError { status, raw, .. } => {
write!(f, "Failed to parse '{raw}' as a return code/stdout/stderr triplet in incoming status update {status:?}")
},
ClientUpdateError { status, .. } => write!(f, "Failed to update client of status {status:?}"),
NodeConfigReadError { path, .. } => write!(f, "Failed to load node config file '{}'", path.display()),
InfraReadError { path, .. } => write!(f, "Failed to load infrastructure file '{}'", path.display()),
UnknownLocationError { loc } => write!(f, "Unknown location '{loc}'"),
ProxyError { .. } => write!(f, "Failed to prepare proxy service"),
GrpcConnectError { endpoint, .. } => write!(f, "Failed to start gRPC connection with delegate node '{endpoint}'"),
GrpcRequestError { what, endpoint, .. } => write!(f, "Failed to send {what} request to delegate node '{endpoint}'"),
ExecuteError { endpoint, name, status, .. } => {
write!(f, "Remote delegate '{endpoint}' returned status '{status:?}' while executing task '{name}'")
},
DigestReadError { path, .. } => write!(f, "Failed to read cached digest in '{}'", path.display()),
DigestError { path, .. } => write!(f, "Failed to read digest of image '{}'", path.display()),
ProxyCreateError { address, .. } => write!(f, "Failed to create proxy to '{address}'"),
ClientCreateError { .. } => write!(f, "Failed to create HTTP-client"),
DownloadRequestError { address, .. } => write!(f, "Failed to send GET download request to '{address}'"),
DownloadRequestFailure { address, code, message } => write!(
f,
"GET download request to '{}' failed with status code {} ({}){}",
address,
code,
code.canonical_reason().unwrap_or("???"),
if let Some(message) = message { format!(": {message}") } else { String::new() }
),
DownloadStreamError { address, .. } => write!(f, "Failed to get next chunk in download stream from '{address}'"),
ImageCreateError { path, .. } => write!(f, "Failed to create tarball file '{}'", path.display()),
ImageWriteError { path, .. } => write!(f, "Failed to write to tarball file '{}'", path.display()),
IdWriteError { path, .. } => write!(f, "Failed to write image ID to file '{}'", path.display()),
IdReadError { path, .. } => write!(f, "Failed to read image from file '{}'", path.display()),
HashError { .. } => write!(f, "Failed to hash image"),
HashWriteError { path, .. } => write!(f, "Failed to write image hash to file '{}'", path.display()),
HashReadError { path, .. } => write!(f, "Failed to read image hash from file '{}'", path.display()),
AuthorizationFailure { checker: _ } => write!(f, "Checker rejected workflow"),
AuthorizationError { checker: _, .. } => write!(f, "Checker failed to authorize workflow"),
PackageIndexError { endpoint, .. } => write!(f, "Failed to get PackageIndex from '{endpoint}'"),
BackendFileError { path, .. } => write!(f, "Failed to load backend file '{}'", path.display()),
}
}
}
impl Error for ExecuteError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
use self::ExecuteError::*;
match self {
UnknownPackage { .. } => None,
UnknownData { .. } => None,
ArgsEncodeError { err } => Some(err),
ExternalCallFailed { .. } => None,
Base64DecodeError { err, .. } => Some(err),
Utf8DecodeError { err, .. } => Some(err),
JsonDecodeError { err, .. } => Some(err),
VolumeBindError { err } => Some(err),
ResultDirNotADir { .. } => None,
ResultDirRemoveError { err, .. } => Some(err),
ResultDirCreateError { err, .. } => Some(err),
DockerError { err, .. } => Some(err),
StatusEmptyStringError { .. } => None,
StatusValueParseError { err, .. } => Some(err),
StatusTripletParseError { err, .. } => Some(err),
ClientUpdateError { err, .. } => Some(err),
NodeConfigReadError { err, .. } => Some(err),
InfraReadError { err, .. } => Some(err),
UnknownLocationError { .. } => None,
ProxyError { err } => Some(&**err),
GrpcConnectError { err, .. } => Some(err),
GrpcRequestError { err, .. } => Some(err),
DigestReadError { err, .. } => Some(err),
DigestError { err, .. } => Some(err),
ProxyCreateError { err, .. } => Some(err),
ClientCreateError { err } => Some(err),
DownloadRequestError { err, .. } => Some(err),
DownloadRequestFailure { .. } => None,
DownloadStreamError { err, .. } => Some(err),
ImageCreateError { err, .. } => Some(err),
ImageWriteError { err, .. } => Some(err),
IdWriteError { err, .. } => Some(err),
IdReadError { err, .. } => Some(err),
HashError { err } => Some(err),
HashWriteError { err, .. } => Some(err),
HashReadError { err, .. } => Some(err),
AuthorizationFailure { .. } => None,
AuthorizationError { err, .. } => Some(err),
PackageIndexError { err, .. } => Some(err),
BackendFileError { err, .. } => Some(err),
ExecuteError { err, .. } => Some(err),
}
}
}
#[derive(Debug)]
pub enum AuthorizeError {
TokenGenerate { secret: PathBuf, err: specifications::policy::Error },
ClientBuild { err: reqwest::Error },
ExecuteRequestBuild { addr: String, err: reqwest::Error },
ExecuteRequestSend { addr: String, err: reqwest::Error },
ExecuteRequestFailure { addr: String, code: StatusCode, err: Option<String> },
ExecuteBodyDownload { addr: String, err: reqwest::Error },
ExecuteBodyDeserialize { addr: String, raw: String, err: serde_json::Error },
AuthorizationDataMismatch { pc: ProgramCounter, data_name: DataName },
AuthorizationUserMismatch { who: String, authenticated: String, workflow: String },
AuthorizationWrongEdge { pc: ProgramCounter, got: String },
IllegalEdgeIdx { func: FunctionId, got: usize, max: usize },
IllegalFuncId { got: FunctionId },
MissingLocation { pc: ProgramCounter },
NoWorkflowUser { workflow: String },
}
impl Display for AuthorizeError {
fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
use AuthorizeError::*;
match self {
TokenGenerate { secret, .. } => write!(f, "Failed to generate new JWT using secret '{}'", secret.display()),
ClientBuild { .. } => write!(f, "Failed to build HTTP client"),
ExecuteRequestBuild { addr, .. } => write!(f, "Failed to build an ExecuteRequest destined for the checker at '{addr}'"),
ExecuteRequestSend { addr, .. } => write!(f, "Failed to send ExecuteRequest to checker '{addr}'"),
ExecuteRequestFailure { addr, code, err } => write!(
f,
"ExecuteRequest to checker '{}' failed with status code {} ({}){}",
addr,
code,
code.canonical_reason().unwrap_or("???"),
if let Some(err) = err {
format!("\n\nResponse:\n{}\n{}\n{}\n", (0..80).map(|_| '-').collect::<String>(), err, (0..80).map(|_| '-').collect::<String>())
} else {
String::new()
}
),
ExecuteBodyDownload { addr, .. } => write!(f, "Failed to download response body from '{addr}'"),
ExecuteBodyDeserialize { addr, raw, .. } => {
write!(f, "Failed to deserialize response body received from '{}' as valid JSON\n\nResponse:\n{}\n", addr, BlockFormatter::new(raw))
},
AuthorizationDataMismatch { pc, data_name } => write!(f, "Dataset '{data_name}' is not an input to task {pc}"),
AuthorizationUserMismatch { who, authenticated, workflow } => {
write!(
f,
"Authorized user '{}' does not match {} user in workflow\n\nWorkflow:\n{}\n",
authenticated,
who,
BlockFormatter::new(workflow)
)
},
AuthorizationWrongEdge { pc, got } => write!(f, "Edge {pc} in workflow is not an Edge::Node but an Edge::{got}"),
IllegalEdgeIdx { func, got, max } => write!(f, "Edge index {got} is out-of-bounds for function {func} with {max} edges"),
IllegalFuncId { got } => write!(f, "Function {got} does not exist in given workflow"),
MissingLocation { pc } => write!(f, "Node call at {pc} has no location planned"),
NoWorkflowUser { workflow } => write!(f, "Given workflow has no end user specified\n\nWorkflow:\n{}\n", BlockFormatter::new(workflow)),
}
}
}
impl Error for AuthorizeError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
use AuthorizeError::*;
match self {
TokenGenerate { err, .. } => Some(err),
ClientBuild { err, .. } => Some(err),
ExecuteRequestBuild { err, .. } => Some(err),
ExecuteRequestSend { err, .. } => Some(err),
ExecuteRequestFailure { .. } => None,
ExecuteBodyDownload { err, .. } => Some(err),
ExecuteBodyDeserialize { err, .. } => Some(err),
AuthorizationDataMismatch { .. } => None,
AuthorizationUserMismatch { .. } => None,
AuthorizationWrongEdge { .. } => None,
IllegalEdgeIdx { .. } => None,
IllegalFuncId { .. } => None,
MissingLocation { .. } => None,
NoWorkflowUser { .. } => None,
}
}
}
#[derive(Debug)]
pub enum StdoutError {
TxWriteError { err: tokio::sync::mpsc::error::SendError<Result<ExecuteReply, Status>> },
}
impl Display for StdoutError {
fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
use StdoutError::*;
match self {
TxWriteError { .. } => write!(f, "Failed to write on gRPC channel back to client"),
}
}
}
impl Error for StdoutError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
use StdoutError::*;
match self {
TxWriteError { err } => Some(err),
}
}
}
#[derive(Debug)]
pub enum CommitError {
UnavailableDataError { name: String, locs: Vec<String> },
DataDirNotADir { path: PathBuf },
DataDirCreateError { path: PathBuf, err: std::io::Error },
DataInfoCreateError { path: PathBuf, err: std::io::Error },
DataInfoSerializeError { err: serde_yaml::Error },
DataInfoWriteError { path: PathBuf, err: std::io::Error },
DirReadError { path: PathBuf, err: std::io::Error },
DirEntryReadError { path: PathBuf, i: usize, err: std::io::Error },
DataCopyError { err: brane_shr::fs::Error },
NodeConfigReadError { path: PathBuf, err: brane_cfg::info::YamlError },
InfraReadError { path: PathBuf, err: brane_cfg::infra::Error },
UnknownLocationError { loc: Location },
ProxyError { err: Box<dyn 'static + Send + Sync + Error> },
GrpcConnectError { endpoint: Address, err: specifications::working::Error },
GrpcRequestError { what: &'static str, endpoint: Address, err: tonic::Status },
AssetInfoReadError { path: PathBuf, err: specifications::data::AssetInfoError },
FileRemoveError { path: PathBuf, err: std::io::Error },
DirRemoveError { path: PathBuf, err: std::io::Error },
PathNotFileNotDir { path: PathBuf },
}
impl Display for CommitError {
fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
use self::CommitError::*;
match self {
UnavailableDataError { name, locs } => write!(
f,
"Dataset '{}' is unavailable{}",
name,
if !locs.is_empty() {
format!(
"; however, locations {} do (try to get download permission to those datasets)",
locs.iter().map(|l| format!("'{l}'")).collect::<Vec<String>>().join(", ")
)
} else {
String::new()
}
),
DataDirNotADir { path } => write!(f, "Dataset directory '{}' exists but is not a directory", path.display()),
DataDirCreateError { path, .. } => write!(f, "Failed to create dataset directory '{}'", path.display()),
DataInfoCreateError { path, .. } => write!(f, "Failed to create new data info file '{}'", path.display()),
DataInfoSerializeError { .. } => write!(f, "Failed to serialize DataInfo struct"),
DataInfoWriteError { path, .. } => write!(f, "Failed to write DataInfo to '{}'", path.display()),
DirReadError { path, .. } => write!(f, "Failed to read directory '{}'", path.display()),
DirEntryReadError { path, i, .. } => write!(f, "Failed to read entry {} in directory '{}'", i, path.display()),
DataCopyError { .. } => write!(f, "Failed to copy data directory"),
NodeConfigReadError { path, .. } => write!(f, "Failed to load node config file '{}'", path.display()),
InfraReadError { path, .. } => write!(f, "Failed to load infrastructure file '{}'", path.display()),
UnknownLocationError { loc } => write!(f, "Unknown location '{loc}'"),
ProxyError { .. } => write!(f, "Failed to prepare proxy service"),
GrpcConnectError { endpoint, .. } => write!(f, "Failed to start gRPC connection with delegate node '{endpoint}'"),
GrpcRequestError { what, endpoint, .. } => write!(f, "Failed to send {what} request to delegate node '{endpoint}'"),
AssetInfoReadError { path, .. } => write!(f, "Failed to load asset info file '{}'", path.display()),
FileRemoveError { path, .. } => write!(f, "Failed to remove file '{}'", path.display()),
DirRemoveError { path, .. } => write!(f, "Failed to remove directory '{}'", path.display()),
PathNotFileNotDir { path } => write!(f, "Given path '{}' neither points to a file nor a directory", path.display()),
}
}
}
impl Error for CommitError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
use CommitError::*;
match self {
UnavailableDataError { .. } => None,
DataDirNotADir { .. } => None,
DataDirCreateError { err, .. } => Some(err),
DataInfoCreateError { err, .. } => Some(err),
DataInfoSerializeError { err } => Some(err),
DataInfoWriteError { err, .. } => Some(err),
DirReadError { err, .. } => Some(err),
DirEntryReadError { err, .. } => Some(err),
DataCopyError { err } => Some(err),
NodeConfigReadError { err, .. } => Some(err),
InfraReadError { err, .. } => Some(err),
UnknownLocationError { .. } => None,
ProxyError { err } => Some(&**err),
GrpcConnectError { err, .. } => Some(err),
GrpcRequestError { err, .. } => Some(err),
AssetInfoReadError { err, .. } => Some(err),
FileRemoveError { err, .. } => Some(err),
DirRemoveError { err, .. } => Some(err),
PathNotFileNotDir { .. } => None,
}
}
}
#[derive(Debug)]
pub enum IdError {
ParseError { what: &'static str, raw: String, err: uuid::Error },
}
impl Display for IdError {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
use IdError::*;
match self {
ParseError { what, raw, .. } => write!(f, "Failed to parse {what} from '{raw}'"),
}
}
}
impl Error for IdError {
#[inline]
fn source(&self) -> Option<&(dyn Error + 'static)> {
use IdError::*;
match self {
ParseError { err, .. } => Some(err),
}
}
}
#[derive(Debug)]
pub enum DockerError {
ConnectionError { path: PathBuf, version: ClientVersion, err: bollard::errors::Error },
WaitError { name: String, err: bollard::errors::Error },
LogsError { name: String, err: bollard::errors::Error },
InspectContainerError { name: String, err: bollard::errors::Error },
ContainerNoNetwork { name: String },
CreateContainerError { name: String, image: Box<Image>, err: bollard::errors::Error },
StartError { name: String, image: Box<Image>, err: bollard::errors::Error },
ContainerNoState { name: String },
ContainerNoExitCode { name: String },
ContainerRemoveError { name: String, err: bollard::errors::Error },
ImageFileOpenError { path: PathBuf, err: std::io::Error },
ImageImportError { path: PathBuf, err: bollard::errors::Error },
ImageFileCreateError { path: PathBuf, err: std::io::Error },
ImageExportError { name: String, err: bollard::errors::Error },
ImageFileWriteError { path: PathBuf, err: std::io::Error },
ImageFileShutdownError { path: PathBuf, err: std::io::Error },
ImagePullError { source: String, err: bollard::errors::Error },
ImageTagError { image: Box<Image>, source: String, err: bollard::errors::Error },
ImageInspectError { image: Box<Image>, err: bollard::errors::Error },
ImageRemoveError { image: Box<Image>, id: String, err: bollard::errors::Error },
ImageTarOpenError { path: PathBuf, err: std::io::Error },
ImageTarReadError { path: PathBuf, err: std::io::Error },
ImageTarEntriesError { path: PathBuf, err: std::io::Error },
ImageTarEntryError { path: PathBuf, err: std::io::Error },
ImageTarIllegalPath { path: PathBuf, err: std::io::Error },
ImageTarManifestReadError { path: PathBuf, entry: PathBuf, err: std::io::Error },
ImageTarManifestParseError { path: PathBuf, entry: PathBuf, err: serde_json::Error },
ImageTarIllegalManifestNum { path: PathBuf, entry: PathBuf, got: usize },
ImageTarIllegalDigest { path: PathBuf, entry: PathBuf, digest: String },
ImageTarNoManifest { path: PathBuf },
}
impl Display for DockerError {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
use DockerError::*;
match self {
ConnectionError { path, version, .. } => {
write!(f, "Failed to connect to the local Docker daemon through socket '{}' and with client version {}", path.display(), version)
},
WaitError { name, .. } => write!(f, "Failed to wait for Docker container with name '{name}'"),
LogsError { name, .. } => write!(f, "Failed to get logs of Docker container with name '{name}'"),
InspectContainerError { name, .. } => write!(f, "Failed to inspect Docker container with name '{name}'"),
ContainerNoNetwork { name } => write!(f, "Docker container with name '{name}' is not connected to any networks"),
CreateContainerError { name, image, .. } => write!(f, "Could not create Docker container with name '{name}' (image: {image})"),
StartError { name, image, .. } => write!(f, "Could not start Docker container with name '{name}' (image: {image})"),
ContainerNoState { name } => write!(f, "Docker container with name '{name}' has no execution state (has it been started?)"),
ContainerNoExitCode { name } => write!(f, "Docker container with name '{name}' has no return code (did you wait before completing?)"),
ContainerRemoveError { name, .. } => write!(f, "Fialed to remove Docker container with name '{name}'"),
ImageFileOpenError { path, .. } => write!(f, "Failed to open image file '{}'", path.display()),
ImageImportError { path, .. } => write!(f, "Failed to import image file '{}' into Docker engine", path.display()),
ImageFileCreateError { path, .. } => write!(f, "Failed to create image file '{}'", path.display()),
ImageExportError { name, .. } => write!(f, "Failed to export image '{name}'"),
ImageFileWriteError { path, .. } => write!(f, "Failed to write to image file '{}'", path.display()),
ImageFileShutdownError { path, .. } => write!(f, "Failed to shut image file '{}' down", path.display()),
ImagePullError { source, .. } => write!(f, "Failed to pull image '{source}' into Docker engine"),
ImageTagError { image, source, .. } => write!(f, "Failed to tag pulled image '{source}' as '{image}'"),
ImageInspectError { image, .. } => write!(
f,
"Failed to inspect image '{}'{}",
image.name(),
if let Some(digest) = image.digest() { format!(" ({digest})") } else { String::new() }
),
ImageRemoveError { image, id, .. } => write!(f, "Failed to remove image '{}' (id: {}) from Docker engine", image.name(), id),
ImageTarOpenError { path, .. } => write!(f, "Could not open given Docker image file '{}'", path.display()),
ImageTarReadError { path, .. } => write!(f, "Could not read given Docker image file '{}'", path.display()),
ImageTarEntriesError { path, .. } => write!(f, "Could not get file entries in Docker image file '{}'", path.display()),
ImageTarEntryError { path, .. } => write!(f, "Could not get file entry from Docker image file '{}'", path.display()),
ImageTarNoManifest { path } => write!(f, "Could not find manifest.json in given Docker image file '{}'", path.display()),
ImageTarManifestReadError { path, entry, .. } => {
write!(f, "Failed to read '{}' in Docker image file '{}'", entry.display(), path.display())
},
ImageTarManifestParseError { path, entry, .. } => {
write!(f, "Could not parse '{}' in Docker image file '{}'", entry.display(), path.display())
},
ImageTarIllegalManifestNum { path, entry, got } => write!(
f,
"Got incorrect number of entries in '{}' in Docker image file '{}': got {}, expected 1",
entry.display(),
path.display(),
got
),
ImageTarIllegalDigest { path, entry, digest } => write!(
f,
"Found image digest '{}' in '{}' in Docker image file '{}' is illegal: does not start with '{}'",
digest,
entry.display(),
path.display(),
crate::docker::MANIFEST_CONFIG_PREFIX
),
ImageTarIllegalPath { path, .. } => write!(f, "Given Docker image file '{}' contains illegal path entry", path.display()),
}
}
}
impl Error for DockerError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
use DockerError::*;
match self {
ConnectionError { err, .. } => Some(err),
WaitError { err, .. } => Some(err),
LogsError { err, .. } => Some(err),
InspectContainerError { err, .. } => Some(err),
ContainerNoNetwork { .. } => None,
CreateContainerError { err, .. } => Some(err),
StartError { err, .. } => Some(err),
ContainerNoState { .. } => None,
ContainerNoExitCode { .. } => None,
ContainerRemoveError { err, .. } => Some(err),
ImageFileOpenError { err, .. } => Some(err),
ImageImportError { err, .. } => Some(err),
ImageFileCreateError { err, .. } => Some(err),
ImageExportError { err, .. } => Some(err),
ImageFileWriteError { err, .. } => Some(err),
ImageFileShutdownError { err, .. } => Some(err),
ImagePullError { err, .. } => Some(err),
ImageTagError { err, .. } => Some(err),
ImageInspectError { err, .. } => Some(err),
ImageRemoveError { err, .. } => Some(err),
ImageTarOpenError { err, .. } => Some(err),
ImageTarReadError { err, .. } => Some(err),
ImageTarEntriesError { err, .. } => Some(err),
ImageTarEntryError { err, .. } => Some(err),
ImageTarIllegalPath { err, .. } => Some(err),
ImageTarManifestReadError { err, .. } => Some(err),
ImageTarManifestParseError { err, .. } => Some(err),
ImageTarIllegalManifestNum { .. } => None,
ImageTarIllegalDigest { .. } => None,
ImageTarNoManifest { .. } => None,
}
}
}
#[derive(Debug)]
pub enum LocalError {
PackageDirReadError { path: PathBuf, err: std::io::Error },
UnreadableVersionEntry { path: PathBuf },
IllegalVersionEntry { package: String, version: String, err: specifications::version::ParseError },
NoVersions { package: String },
PackagesDirReadError { path: PathBuf, err: std::io::Error },
InvalidPackageYml { package: String, path: PathBuf, err: specifications::package::PackageInfoError },
PackageIndexError { err: specifications::package::PackageIndexError },
DatasetsReadError { path: PathBuf, err: std::io::Error },
DataInfoOpenError { path: PathBuf, err: std::io::Error },
DataInfoReadError { path: PathBuf, err: serde_yaml::Error },
DataIndexError { err: specifications::data::DataIndexError },
}
impl Display for LocalError {
fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
use LocalError::*;
match self {
PackageDirReadError { path, .. } => write!(f, "Could not read package directory '{}'", path.display()),
UnreadableVersionEntry { path } => write!(f, "Could not get the version directory from '{}'", path.display()),
IllegalVersionEntry { package, version, .. } => write!(f, "Entry '{version}' for package '{package}' is not a valid version"),
NoVersions { package } => write!(f, "Package '{package}' does not have any registered versions"),
PackagesDirReadError { path, .. } => write!(f, "Could not read from Brane packages directory '{}'", path.display()),
InvalidPackageYml { package, path, .. } => write!(f, "Could not read '{}' for package '{}'", path.display(), package),
PackageIndexError { .. } => write!(f, "Could not create PackageIndex"),
DatasetsReadError { path, .. } => write!(f, "Failed to read datasets folder '{}'", path.display()),
DataInfoOpenError { path, .. } => write!(f, "Failed to open data info file '{}'", path.display()),
DataInfoReadError { path, .. } => write!(f, "Failed to read/parse data info file '{}'", path.display()),
DataIndexError { .. } => write!(f, "Failed to create data index from local datasets"),
}
}
}
impl Error for LocalError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
use LocalError::*;
match self {
PackageDirReadError { err, .. } => Some(err),
UnreadableVersionEntry { .. } => None,
IllegalVersionEntry { err, .. } => Some(err),
NoVersions { .. } => None,
PackagesDirReadError { err, .. } => Some(err),
InvalidPackageYml { err, .. } => Some(err),
PackageIndexError { err } => Some(err),
DatasetsReadError { err, .. } => Some(err),
DataInfoOpenError { err, .. } => Some(err),
DataInfoReadError { err, .. } => Some(err),
DataIndexError { err } => Some(err),
}
}
}
#[derive(Debug)]
pub enum ApiError {
RequestError { address: String, err: reqwest::Error },
ResponseBodyError { address: String, err: reqwest::Error },
ResponseJsonParseError { address: String, raw: String, err: serde_json::Error },
NoResponse { address: String },
PackageKindParseError { address: String, index: usize, raw: String, err: specifications::package::PackageKindError },
VersionParseError { address: String, index: usize, raw: String, err: specifications::version::ParseError },
PackageIndexError { address: String, err: specifications::package::PackageIndexError },
DataIndexError { address: String, err: specifications::data::DataIndexError },
}
impl Display for ApiError {
fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
use ApiError::*;
match self {
RequestError { address, .. } => write!(f, "Failed to post request to '{address}'"),
ResponseBodyError { address, .. } => write!(f, "Failed to get body from response from '{address}'"),
ResponseJsonParseError { address, raw, .. } => write!(f, "Failed to parse response \"\"\"{raw}\"\"\" from '{address}' as JSON"),
NoResponse { address } => write!(f, "'{address}' responded without a body (not even that no packages are available)"),
PackageKindParseError { address, index, raw, .. } => {
write!(f, "Failed to parse '{raw}' as package kind in package {index} returned by '{address}'")
},
VersionParseError { address, index, raw, .. } => {
write!(f, "Failed to parse '{raw}' as version in package {index} returned by '{address}'")
},
PackageIndexError { address, .. } => write!(f, "Failed to create a package index from the package infos given by '{address}'"),
DataIndexError { address, .. } => write!(f, "Failed to create a data index from the data infos given by '{address}'"),
}
}
}
impl Error for ApiError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
use ApiError::*;
match self {
RequestError { err, .. } => Some(err),
ResponseBodyError { err, .. } => Some(err),
ResponseJsonParseError { err, .. } => Some(err),
NoResponse { .. } => None,
PackageKindParseError { err, .. } => Some(err),
VersionParseError { err, .. } => Some(err),
PackageIndexError { err, .. } => Some(err),
DataIndexError { err, .. } => Some(err),
}
}
}
#[derive(Debug)]
pub enum ClientVersionParseError {
MissingDot { raw: String },
IllegalMajorNumber { raw: String, err: std::num::ParseIntError },
IllegalMinorNumber { raw: String, err: std::num::ParseIntError },
}
impl Display for ClientVersionParseError {
fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
use ClientVersionParseError::*;
match self {
MissingDot { raw } => write!(f, "Missing '.' in Docket client version number '{raw}'"),
IllegalMajorNumber { raw, .. } => write!(f, "'{raw}' is not a valid Docket client version major number"),
IllegalMinorNumber { raw, .. } => write!(f, "'{raw}' is not a valid Docket client version minor number"),
}
}
}
impl Error for ClientVersionParseError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
use ClientVersionParseError::*;
match self {
MissingDot { .. } => None,
IllegalMajorNumber { err, .. } => Some(err),
IllegalMinorNumber { err, .. } => Some(err),
}
}
}