1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
// SPEC.rs
// by Lut99
//
// Created:
// 26 Aug 2022, 18:26:40
// Last edited:
// 31 Jan 2024, 11:36:19
// Auto updated?
// Yes
//
// Description:
//! Defines some non-Vm-trait structs and interfaces useful when using
//! this crate.
//
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::path::Path;
use std::sync::{Arc, RwLock};
use brane_ast::ast::SymTable;
use brane_ast::locations::Location;
use specifications::data::{AccessKind, DataName, PreprocessKind};
use specifications::package::Capability;
use specifications::profiling::ProfileScopeHandle;
use specifications::version::Version;
use crate::frame_stack::FrameStack;
use crate::pc::ProgramCounter;
use crate::value::FullValue;
/***** LIBRARY *****/
/// Defines whatever is needed for the custom global part of a RunState.
pub trait CustomGlobalState: 'static + Send + Sync {}
impl CustomGlobalState for () {}
/// Defines whatever is needed for the custom local part of a RunState.
pub trait CustomLocalState: 'static + Send + Sync + Clone {
/// Constructs a new CustomLocalState from the given global state.
///
/// # Arguments
/// - `global`: The global state to create this state from.
///
/// # Returns
/// A new instance of Self.
fn new(global: &Arc<RwLock<impl CustomGlobalState>>) -> Self;
}
impl CustomLocalState for () {
#[inline]
fn new(_global: &Arc<RwLock<impl CustomGlobalState>>) -> Self {}
}
/// A trait that implements various missing pieces in task execution. See the `brane-tsk` crate for implementations.
#[async_trait::async_trait]
pub trait VmPlugin: 'static + Send + Sync {
/// The type of the custom, App-wide, global state.
type GlobalState: CustomGlobalState;
/// The type of the custom, thread-local, local state.
type LocalState: CustomLocalState;
/// The error type of the preprocess function.
type PreprocessError: 'static + Send + Sync + Error;
/// The error type of the execute function.
type ExecuteError: 'static + Send + Sync + Error;
/// The error type of the stdout function.
type StdoutError: 'static + Send + Sync + Error;
/// The error type of the publicize and commit functions.
type CommitError: 'static + Send + Sync + Error;
/// A function that preprocesses a given dataset in the given way. Typically, this involves "transferring data" as a preprocessing step.
///
/// # Generic arguments
/// - `E`: The kind of error this function returns. Should, of course, implement `Error`.
///
/// # Arguments
/// - `global`: The custom global state for keeping track of your own things during execution.
/// - `local`: The custom local state for keeping track of your own things faster but only local to this (execution) thread.
/// - `pc`: A [`ProgramCounter`] that denotes for which task in the workflow we're preprocessing.
/// - `loc`: The location where this preprocessing should happen.
/// - `name`: The name of the intermediate result to make public. You'll typically only use this for debugging.
/// - `preprocess`: The PreprocessKind that determines what you must do to make the dataset available.
/// - `prof`: A ProfileScopeHandle that can be used to prove additional details about the timings of this function.
///
/// # Returns
/// This function should return an AccessKind which describes how to access the preprocessed data.
///
/// It is expected that the preprocessed data is available the moment the function returns.
///
/// # Errors
/// This function may error whenever it likes.
async fn preprocess(
global: Arc<RwLock<Self::GlobalState>>,
local: Self::LocalState,
pc: ProgramCounter,
loc: Location,
name: DataName,
preprocess: PreprocessKind,
prof: ProfileScopeHandle<'_>,
) -> Result<AccessKind, Self::PreprocessError>;
/// A function that executes the given task.
///
/// # Generic arguments
/// - `E`: The kind of error this function returns. Should, of course, implement `Error`.
///
/// # Arguments
/// - `global`: The custom global state for keeping track of your own things during execution.
/// - `local`: The custom local state for keeping track of your own things faster but only local to this (execution) thread.
/// - `info`: A `TaskInfo` that contains all the information about the to-be-executed task the VM provides you with. **Note**: You have to preprocess the arguments contained within. Be aware that the path describes by the IntermediateResults is relative to some directory you still have to prepend.
/// - `prof`: A ProfileScopeHandle that can be used to prove additional details about the timings of this function.
///
/// # Returns
/// This function should return either a FullValue, or None (where None is equivalent to `FullValue::Void`).
///
/// # Errors
/// This function may error whenever it likes.
async fn execute(
global: &Arc<RwLock<Self::GlobalState>>,
local: &Self::LocalState,
info: TaskInfo<'_>,
prof: ProfileScopeHandle<'_>,
) -> Result<Option<FullValue>, Self::ExecuteError>;
/// A function that prints a message to stdout - whatever that may be.
///
/// This function is called whenever BraneScript's `print` or `println` are called.
///
/// # Generic arguments
/// - `E`: The kind of error this function returns. Should, of course, implement `Error`.
///
/// # Arguments
/// - `global`: The custom global state for keeping track of your own things during execution.
/// - `local`: The custom local state for keeping track of your own things faster but only local to this (execution) thread.
/// - `text`: The text to write to your version of stdout.
/// - `newline`: Whether or not to print a closing newline after the text (i.e., whether to use `println` or `print`).
/// - `prof`: A ProfileScopeHandle that can be used to prove additional details about the timings of this function.
///
/// # Errors
/// This function may error whenever it likes.
async fn stdout(
global: &Arc<RwLock<Self::GlobalState>>,
local: &Self::LocalState,
text: &str,
newline: bool,
prof: ProfileScopeHandle<'_>,
) -> Result<(), Self::StdoutError>;
/// A function that "publicizes" the given intermediate result.
///
/// This is not really commiting, as it is making the intermediate dataset available upon request. In a distributed/instance setting, this typically means making the registry aware of it.
///
/// # Generic arguments
/// - `E`: The kind of error this function returns. Should, of course, implement `Error`.
///
/// # Arguments
/// - `global`: The custom global state for keeping track of your own things during execution.
/// - `local`: The custom local state for keeping track of your own things faster but only local to this (execution) thread.
/// - `loc`: The location where the dataset currently lives.
/// - `name`: The name of the intermediate result to make public.
/// - `path`: The path where the intermediate result is available. You'll probably want to archive this before continuing. **Note**: Be aware that this path is relative to some directory you still have to prepend.
/// - `prof`: A ProfileScopeHandle that can be used to prove additional details about the timings of this function.
///
/// # Errors
/// This function may error whenever it likes.
async fn publicize(
global: &Arc<RwLock<Self::GlobalState>>,
local: &Self::LocalState,
loc: &Location,
name: &str,
path: &Path,
prof: ProfileScopeHandle<'_>,
) -> Result<(), Self::CommitError>;
/// A function that commits the given intermediate result by promoting it a Data.
///
/// Typically, this involves saving the data somewhere outside of the results folder and then updating the registry on its existance.
///
/// # Generic arguments
/// - `E`: The kind of error this function returns. Should, of course, implement `Error`.
///
/// # Arguments
/// - `global`: The custom global state for keeping track of your own things during execution.
/// - `local`: The custom local state for keeping track of your own things faster but only local to this (execution) thread.
/// - `loc`: The location where the dataset currently lives.
/// - `name`: The name of the intermediate result to promoto (you'll typically use this for debugging only).
/// - `path`: The path where the intermediate result is available. You'll probably want to archive this somewhere else before continuing. **Note**: Be aware that this path is relative to some directory you still have to prepend.
/// - `data_name`: The identifier of the dataset once the intermediate result is promoted. If it already exists, you'll probably want to override the old value with the new one.
/// - `prof`: A ProfileScopeHandle that can be used to prove additional details about the timings of this function.
///
/// # Errors
/// This function may error whenever it likes.
async fn commit(
global: &Arc<RwLock<Self::GlobalState>>,
local: &Self::LocalState,
loc: &Location,
name: &str,
path: &Path,
data_name: &str,
prof: ProfileScopeHandle<'_>,
) -> Result<(), Self::CommitError>;
}
/// Defines whatever we need to remember w.r.t. runtime in between two submission of part of a workflow (i.e., repl-runs).
///
/// # Generic types
/// - `C`: The custom state with which to extend this RunState.
#[derive(Clone, Debug)]
pub struct RunState<G: CustomGlobalState> {
/// The Variable Register that contains previously defined variables.
pub fstack: FrameStack,
/// The custom part of the RunState that is global across all threads in a workflow.
pub global: Arc<RwLock<G>>,
}
impl<G: CustomGlobalState> RunState<G> {
/// Constructor for the RunState that initializes it as new.
///
/// # Arguments
/// - `table`: The initial SymTable that is the global symbol table.
/// - `global`: The (already initialized) custom thread-global part of the state.
///
/// # Returns
/// A new RunState instance.
#[inline]
pub fn new(table: Arc<SymTable>, global: Arc<RwLock<G>>) -> Self { Self { fstack: FrameStack::new(512, table), global } }
}
/// Defines that which the execute closure needs to know about a task.
#[derive(Clone, Debug)]
pub struct TaskInfo<'a> {
/// The program counter of the execution (may be used to identify the call to the task itself).
pub pc: ProgramCounter,
/// The identifier of the task definition itself.
pub def: usize,
/// The name of the task to execute.
pub name: &'a str,
/// The package name of the task to execute.
pub package_name: &'a str,
/// The package version of the task to execute.
pub package_version: &'a Version,
/// The requirements that the task has.
pub requirements: &'a HashSet<Capability>,
/// The arguments that are given for this Task. Note that data & intermediate results have to be resolved before passing this to the function.
pub args: HashMap<String, FullValue>,
/// The planned location for this task.
pub location: &'a Location,
/// The list of inputs to the workflow.
pub input: HashMap<DataName, AccessKind>,
/// If this task returns an intermediate result, then this specifies the name it should have.
pub result: &'a Option<String>,
}