use brane_ast::Workflow;
use brane_tsk::errors::PlanError;
use brane_tsk::spec::{AppId, TaskId};
use log::debug;
use reqwest::{Client, Request, Response, StatusCode};
use serde_json::Value;
use specifications::address::Address;
use specifications::planning::{PlanningDeniedReply, PlanningReply, PlanningRequest};
use specifications::profiling::ProfileScopeHandle;
pub struct InstancePlanner;
impl InstancePlanner {
pub async fn plan(plr: &Address, app_id: AppId, workflow: Workflow, prof: ProfileScopeHandle<'_>) -> Result<Workflow, PlanError> {
let task_id: String = format!("{}", TaskId::generate());
debug!("Serializing request...");
let ser = prof.time(format!("workflow {app_id}:{task_id} serialization"));
let vwf: Value = match serde_json::to_value(&workflow) {
Ok(vwf) => vwf,
Err(err) => {
return Err(PlanError::WorkflowSerialize { id: workflow.id, err });
},
};
let sreq: String = match serde_json::to_string(&PlanningRequest { app_id: app_id.to_string(), workflow: vwf }) {
Ok(sreq) => sreq,
Err(err) => {
return Err(PlanError::PlanningRequestSerialize { id: workflow.id, err });
},
};
ser.stop();
debug!("Sending request...");
let remote = prof.time(format!("workflow '{task_id}' on brane-plr"));
let url: String = format!("{plr}/plan");
let client: Client = Client::new();
let req: Request = match client.post(&url).body(sreq).build() {
Ok(req) => req,
Err(err) => return Err(PlanError::PlanningRequest { id: workflow.id, url, err }),
};
let res: Response = match client.execute(req).await {
Ok(res) => res,
Err(err) => return Err(PlanError::PlanningRequestSend { id: workflow.id, url, err }),
};
let status: StatusCode = res.status();
if status == StatusCode::UNAUTHORIZED {
let res: String = match res.text().await {
Ok(res) => res,
Err(_) => return Err(PlanError::PlanningFailure { id: workflow.id, url, code: status, response: None }),
};
let res: PlanningDeniedReply = match serde_json::from_str(&res) {
Ok(res) => res,
Err(_) => return Err(PlanError::PlanningFailure { id: workflow.id, url, code: status, response: Some(res) }),
};
return Err(PlanError::CheckerDenied { domain: res.domain, reasons: res.reasons });
} else if !status.is_success() {
return Err(PlanError::PlanningFailure { id: workflow.id, url, code: status, response: res.text().await.ok() });
}
remote.stop();
debug!("Receiving response...");
let post = prof.time(format!("workflow '{task_id}' response processing"));
let res: String = match res.text().await {
Ok(res) => res,
Err(err) => return Err(PlanError::PlanningResponseDownload { id: workflow.id, url, err }),
};
let res: PlanningReply = match serde_json::from_str(&res) {
Ok(res) => res,
Err(err) => return Err(PlanError::PlanningResponseParse { id: workflow.id, url, raw: res, err }),
};
let plan: Workflow = match serde_json::from_value(res.plan.clone()) {
Ok(res) => res,
Err(err) => return Err(PlanError::PlanningPlanParse { id: workflow.id, url, raw: res.plan, err }),
};
post.stop();
Ok(plan)
}
}