brane_drv/
planner.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
//  PLANNER.rs
//    by Lut99
//
//  Created:
//    25 Oct 2022, 11:35:00
//  Last edited:
//    08 Feb 2024, 17:27:11
//  Auto updated?
//    Yes
//
//  Description:
//!   Implements a planner for the instance use-case.
//


/***** LIBRARY *****/
use brane_ast::Workflow;
use brane_tsk::errors::PlanError;
use brane_tsk::spec::{AppId, TaskId};
use log::debug;
use reqwest::{Client, Request, Response, StatusCode};
use serde_json::Value;
use specifications::address::Address;
use specifications::planning::{PlanningDeniedReply, PlanningReply, PlanningRequest};
use specifications::profiling::ProfileScopeHandle;


/***** LIBRARY *****/
/// The planner is in charge of assigning locations to tasks in a workflow. This one defers planning to the `brane-plr` service.
pub struct InstancePlanner;
impl InstancePlanner {
    /// Plans the given workflow.
    ///
    /// Will populate the planning timings in the given profile struct if the planner reports them.
    ///
    /// # Arguments
    /// - `plr`: The address of the remote planner to connect to.
    /// - `app_id`: The session ID for this workflow.
    /// - `workflow`: The Workflow to plan.
    /// - `prof`: The ProfileScope that can be used to provide additional information about the timings of the planning (driver-side).
    ///
    /// # Returns
    /// The same workflow as given, but now with all tasks and data transfers planned.
    pub async fn plan(plr: &Address, app_id: AppId, workflow: Workflow, prof: ProfileScopeHandle<'_>) -> Result<Workflow, PlanError> {
        // Generate the ID
        let task_id: String = format!("{}", TaskId::generate());

        // Serialize the workflow
        debug!("Serializing request...");
        let ser = prof.time(format!("workflow {app_id}:{task_id} serialization"));
        let vwf: Value = match serde_json::to_value(&workflow) {
            Ok(vwf) => vwf,
            Err(err) => {
                return Err(PlanError::WorkflowSerialize { id: workflow.id, err });
            },
        };

        // Create a serialized request with it
        let sreq: String = match serde_json::to_string(&PlanningRequest { app_id: app_id.to_string(), workflow: vwf }) {
            Ok(sreq) => sreq,
            Err(err) => {
                return Err(PlanError::PlanningRequestSerialize { id: workflow.id, err });
            },
        };
        ser.stop();

        // Populate a "PlanningRequest" with that (i.e., just populate a future record with the string)
        debug!("Sending request...");
        let remote = prof.time(format!("workflow '{task_id}' on brane-plr"));
        let url: String = format!("{plr}/plan");
        let client: Client = Client::new();
        let req: Request = match client.post(&url).body(sreq).build() {
            Ok(req) => req,
            Err(err) => return Err(PlanError::PlanningRequest { id: workflow.id, url, err }),
        };
        // Send the message
        let res: Response = match client.execute(req).await {
            Ok(res) => res,
            Err(err) => return Err(PlanError::PlanningRequestSend { id: workflow.id, url, err }),
        };
        let status: StatusCode = res.status();
        if status == StatusCode::UNAUTHORIZED {
            // Attempt to parse the response
            let res: String = match res.text().await {
                Ok(res) => res,
                // If errored, default to the other error
                Err(_) => return Err(PlanError::PlanningFailure { id: workflow.id, url, code: status, response: None }),
            };
            let res: PlanningDeniedReply = match serde_json::from_str(&res) {
                Ok(res) => res,
                // If errored, default to the other error
                Err(_) => return Err(PlanError::PlanningFailure { id: workflow.id, url, code: status, response: Some(res) }),
            };

            // Return it
            return Err(PlanError::CheckerDenied { domain: res.domain, reasons: res.reasons });
        } else if !status.is_success() {
            return Err(PlanError::PlanningFailure { id: workflow.id, url, code: status, response: res.text().await.ok() });
        }
        remote.stop();

        // Process the result
        debug!("Receiving response...");
        let post = prof.time(format!("workflow '{task_id}' response processing"));
        let res: String = match res.text().await {
            Ok(res) => res,
            Err(err) => return Err(PlanError::PlanningResponseDownload { id: workflow.id, url, err }),
        };
        let res: PlanningReply = match serde_json::from_str(&res) {
            Ok(res) => res,
            Err(err) => return Err(PlanError::PlanningResponseParse { id: workflow.id, url, raw: res, err }),
        };
        let plan: Workflow = match serde_json::from_value(res.plan.clone()) {
            Ok(res) => res,
            Err(err) => return Err(PlanError::PlanningPlanParse { id: workflow.id, url, raw: res.plan, err }),
        };
        post.stop();

        // Done
        Ok(plan)
    }
}