1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
//  DRIVING.rs
//    by Lut99
//
//  Created:
//    06 Jan 2023, 14:43:35
//  Last edited:
//    08 Feb 2024, 17:01:30
//  Auto updated?
//    Yes
//
//  Description:
//!   Defines the prost messages for interacting with the driver.
//

use std::error;
use std::fmt::{Display, Formatter, Result as FResult};
use std::sync::Arc;

pub use DriverServiceError as Error;
use async_trait::async_trait;
use futures::Stream;
use prost::Message;
use tonic::body::{BoxBody, empty_body};
use tonic::client::Grpc as GrpcClient;
use tonic::codec::{ProstCodec, Streaming};
use tonic::codegen::{Body, BoxFuture, Context, Poll, Service, StdError, http};
use tonic::server::{Grpc as GrpcServer, NamedService, ServerStreamingService, UnaryService};
use tonic::transport::{Channel, Endpoint};
use tonic::{Code, Request, Response, Status};


/***** ERRORS *****/
/// Defines the errors occuring in the DriverServiceClient or DriverServiceServer.
#[derive(Debug)]
pub enum DriverServiceError {
    /// Failed to create an endpoint with the given address.
    EndpointError { address: String, err: tonic::transport::Error },
    /// Failed to connect to the given address.
    ConnectError { address: String, err: tonic::transport::Error },
}
impl Display for DriverServiceError {
    #[inline]
    fn fmt(&self, f: &mut Formatter<'_>) -> FResult {
        use DriverServiceError::*;
        match self {
            EndpointError { address, err } => write!(f, "Failed to create a new Endpoint from '{address}': {err}"),
            ConnectError { address, err } => write!(f, "Failed to connect to gRPC endpoint '{address}': {err}"),
        }
    }
}
impl error::Error for DriverServiceError {}





/***** MESSAGES *****/
/// Request for creating a new session.
#[derive(Clone, Message)]
pub struct CreateSessionRequest {}

/// The reply sent by the driver when a new session has been created.
#[derive(Clone, Message)]
pub struct CreateSessionReply {
    /// The resulting UUID of the session.
    #[prost(tag = "1", required, string)]
    pub uuid: String,
}



/// Request for checking the given workflow only.
#[derive(Clone, Message)]
pub struct CheckRequest {
    /// The workflow to check
    #[prost(tag = "1", required, string)]
    pub workflow: String,
}

/// Reply to the [`CheckRequest`].
#[derive(Clone, Message)]
pub struct CheckReply {
    /// If all checkers agreed with it across all questions.
    #[prost(tag = "1", required, bool)]
    pub verdict: bool,
    /// Which checker was the first to deny (if any).
    #[prost(tag = "2", optional, string)]
    pub who:     Option<String>,
    /// The reasons for the first checker to deny, if any (and the checker wants to share).
    #[prost(tag = "3", repeated, string)]
    pub reasons: Vec<String>,

    /// If any, contains profile results of the driver.
    #[prost(tag = "4", optional, string)]
    pub profile: Option<String>,
}



/// Request for executing the given workflow.
#[derive(Clone, Message)]
pub struct ExecuteRequest {
    /// The session in which to execute the workflow.
    #[prost(tag = "1", required, string)]
    pub uuid:  String,
    /// The input to the request, i.e., the workflow.
    #[prost(tag = "2", required, string)]
    pub input: String,
}

/// The reply sent by the driver when a workflow has been executed.
#[derive(Clone, Message)]
pub struct ExecuteReply {
    /// Whether to close the communication after this or nay.
    #[prost(tag = "1", required, bool)]
    pub close: bool,

    /// If given, then the driver has some debug information to show to the user.
    #[prost(tag = "2", optional, string)]
    pub debug:  Option<String>,
    /// If given, then the driver has stdout to write to the user.
    #[prost(tag = "3", optional, string)]
    pub stdout: Option<String>,
    /// If given, then the driver has stderr to write to the user.
    #[prost(tag = "4", optional, string)]
    pub stderr: Option<String>,
    /// If given, then the workflow has returned a value to use (FullValue encoded as JSON).
    #[prost(tag = "5", optional, string)]
    pub value:  Option<String>,
}





/***** SERVICES *****/
/// The DriverServiceClient can connect to a remote server implementing the DriverService protocol.
#[derive(Debug, Clone)]
pub struct DriverServiceClient {
    /// The client with which we actually do everything
    client: GrpcClient<Channel>,
}

impl DriverServiceClient {
    /// Attempts to connect to the remote endpoint.
    ///
    /// # Arguments
    /// - `address`: The address of the remote endpoint to connect to.
    ///
    /// # Returns
    /// A new DriverServiceClient instance that is connected to the remove endpoint.
    ///
    /// # Errors
    /// This function errors if the connection could not be established for whatever reason.
    pub async fn connect(address: impl Into<String>) -> Result<Self, Error> {
        let address: String = address.into();

        // Attempt to make the connection
        let conn: Channel = match Endpoint::new(address.clone()) {
            Ok(endpoint) => match endpoint.connect().await {
                Ok(conn) => conn,
                Err(err) => {
                    return Err(Error::ConnectError { address, err });
                },
            },
            Err(err) => {
                return Err(Error::EndpointError { address, err });
            },
        };

        // Store it internally
        Ok(Self { client: GrpcClient::new(conn) })
    }

    /// Send a CreateSessionRequest to the connected endpoint.
    ///
    /// # Arguments
    /// - `request`: The CreateSessionRequest to send to the endpoint.
    ///
    /// # Returns
    /// The CreateSessionReply the endpoint returns.
    ///
    /// # Errors
    /// This function errors if either we failed to send the request or the endpoint itself failed to process it.
    pub async fn create_session(&mut self, request: impl tonic::IntoRequest<CreateSessionRequest>) -> Result<Response<CreateSessionReply>, Status> {
        // Assert the client is ready to get the party started
        if let Err(err) = self.client.ready().await {
            return Err(Status::new(Code::Unknown, format!("Service was not ready: {err}")));
        }

        // Set the default stuff
        let codec: ProstCodec<_, _> = ProstCodec::default();
        let path: http::uri::PathAndQuery = http::uri::PathAndQuery::from_static("/driver.DriverService/CreateSession");
        self.client.unary(request.into_request(), path, codec).await
    }

    /// Send a request to validate a workflow to the connected endpoint.
    ///
    /// # Arguments
    /// - `request`: The [`CheckRequest`] to send to the endpoint.
    ///
    /// # Returns
    /// A [`CheckReply`] the endpoint returns.
    ///
    /// # Errors
    /// This function errors if either we failed to send the request or the endpoint itself failed to process it.
    pub async fn check(&mut self, request: impl tonic::IntoRequest<CheckRequest>) -> Result<Response<CheckReply>, Status> {
        // Assert the client is ready to get the party started
        if let Err(err) = self.client.ready().await {
            return Err(Status::new(Code::Unknown, format!("Service was not ready: {err}")));
        }

        // Set the default stuff
        let codec: ProstCodec<_, _> = ProstCodec::default();
        let path: http::uri::PathAndQuery = http::uri::PathAndQuery::from_static("/driver.DriverService/Check");
        self.client.unary(request.into_request(), path, codec).await
    }

    /// Send an ExecuteRequest to the connected endpoint.
    ///
    /// # Arguments
    /// - `request`: The ExecuteRequest to send to the endpoint.
    ///
    /// # Returns
    /// The ExecuteReply the endpoint returns.
    ///
    /// # Errors
    /// This function errors if either we failed to send the request or the endpoint itself failed to process it.
    pub async fn execute(&mut self, request: impl tonic::IntoRequest<ExecuteRequest>) -> Result<Response<Streaming<ExecuteReply>>, Status> {
        // Assert the client is ready to get the party started
        if let Err(err) = self.client.ready().await {
            return Err(Status::new(Code::Unknown, format!("Service was not ready: {err}")));
        }

        // Set the default stuff
        let codec: ProstCodec<_, _> = ProstCodec::default();
        let path: http::uri::PathAndQuery = http::uri::PathAndQuery::from_static("/driver.DriverService/Execute");
        self.client.server_streaming(request.into_request(), path, codec).await
    }
}



/// The DriverService, which is a trait for easily writing a service for the driver communication protocol.
///
/// Implementation based on the auto-generated version from tonic.
#[async_trait]
pub trait DriverService: 'static + Send + Sync {
    /// The response type for stream returned by `DriverService::execute()`.
    type ExecuteStream: 'static + Send + Stream<Item = Result<ExecuteReply, Status>>;



    /// Handle for when a CreateSessionRequest comes in.
    ///
    /// # Arguments
    /// - `request`: The (`tonic::Request`-wrapped) CreateSessionRequest containing the relevant details.
    ///
    /// # Returns
    /// A CreateSessionReply for this request, wrapped in a `tonic::Response`.
    ///
    /// # Errors
    /// This function may error (i.e., send back a `tonic::Status`) whenever it fails.
    async fn create_session(&self, request: Request<CreateSessionRequest>) -> Result<Response<CreateSessionReply>, Status>;

    /// Handle for when a [`CheckRequest`] comes in.
    ///
    /// # Arguments
    /// - `request`: The ([`tonic::Request`]-wrapped) [`CheckRequest`] containing the relevant details.
    ///
    /// # Returns
    /// A [`CheckReply`] for this request, wrapped in a [`tonic::Response`].
    ///
    /// # Errors
    /// This function may error (i.e., send back a [`tonic::Status`]) whenever it fails.
    async fn check(&self, request: Request<CheckRequest>) -> Result<Response<CheckReply>, Status>;

    /// Handle for when an ExecuteRequest comes in.
    ///
    /// # Arguments
    /// - `request`: The (`tonic::Request`-wrapped) ExecuteRequest containing the relevant details.
    ///
    /// # Returns
    /// A stream of ExecuteReply messages, updating the client and eventually sending back the workflow result.
    ///
    /// # Errors
    /// This function may error (i.e., send back a `tonic::Status`) whenever it fails.
    async fn execute(&self, request: Request<ExecuteRequest>) -> Result<Response<Self::ExecuteStream>, Status>;
}

/// The DriverServiceServer hosts the server part of the DriverService protocol.
#[derive(Debug)]
pub struct DriverServiceServer<T> {
    /// The service that we host.
    service: Arc<T>,
}

impl<T> DriverServiceServer<T> {
    /// Constructor for the DriverServiceServer.
    ///
    /// # Arguments
    /// - `service`: The Service to serve.
    ///
    /// # Returns
    /// A new DriverServiceServer instance.
    #[inline]
    pub fn new(service: T) -> Self { Self { service: Arc::new(service) } }
}

impl<T: DriverService, B> Service<http::Request<B>> for DriverServiceServer<T>
where
    T: DriverService,
    B: 'static + Send + Body,
    B::Error: 'static + Send + Into<StdError>,
{
    type Error = std::convert::Infallible;
    type Future = BoxFuture<Self::Response, Self::Error>;
    type Response = http::Response<BoxBody>;

    #[inline]
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) }

    fn call(&mut self, req: http::Request<B>) -> Self::Future {
        match req.uri().path() {
            // Incoming CreateSessionRequest
            "/driver.DriverService/CreateSession" => {
                /// Helper struct for the given DriverService that focusses specifically on this request.
                struct CreateSessionSvc<T>(Arc<T>);
                impl<T: DriverService> UnaryService<CreateSessionRequest> for CreateSessionSvc<T> {
                    type Future = BoxFuture<Response<Self::Response>, Status>;
                    type Response = CreateSessionReply;

                    fn call(&mut self, req: Request<CreateSessionRequest>) -> Self::Future {
                        // Return the service function as the future to run
                        let service = self.0.clone();
                        let fut = async move { (*service).create_session(req).await };
                        Box::pin(fut)
                    }
                }

                // Create a future that creates the service
                let service = self.service.clone();
                Box::pin(async move {
                    let method: CreateSessionSvc<T> = CreateSessionSvc(service);
                    let codec: ProstCodec<_, _> = ProstCodec::default();
                    let mut grpc: GrpcServer<ProstCodec<_, _>> = GrpcServer::new(codec);
                    Ok(grpc.unary(method, req).await)
                })
            },

            // Incoming CheckRequest
            "/driver.DriverService/Check" => {
                /// Helper struct for the given DriverService that focusses specifically on this request.
                struct CheckSvc<T>(Arc<T>);
                impl<T: DriverService> UnaryService<CheckRequest> for CheckSvc<T> {
                    type Future = BoxFuture<Response<Self::Response>, Status>;
                    type Response = CheckReply;

                    fn call(&mut self, req: Request<CheckRequest>) -> Self::Future {
                        // Return the service function as the future to run
                        let service = self.0.clone();
                        let fut = async move { (*service).check(req).await };
                        Box::pin(fut)
                    }
                }

                // Create a future that creates the service
                let service = self.service.clone();
                Box::pin(async move {
                    let method: CheckSvc<T> = CheckSvc(service);
                    let codec: ProstCodec<_, _> = ProstCodec::default();
                    let mut grpc: GrpcServer<ProstCodec<_, _>> = GrpcServer::new(codec);
                    Ok(grpc.unary(method, req).await)
                })
            },

            // Incoming ExecuteRequest
            "/driver.DriverService/Execute" => {
                /// Helper struct for the given DriverService that focusses specifically on this request.
                struct ExecuteSvc<T>(Arc<T>);
                impl<T: DriverService> ServerStreamingService<ExecuteRequest> for ExecuteSvc<T> {
                    type Future = BoxFuture<Response<Self::ResponseStream>, Status>;
                    type Response = ExecuteReply;
                    type ResponseStream = T::ExecuteStream;

                    fn call(&mut self, req: Request<ExecuteRequest>) -> Self::Future {
                        // Return the service function as the future to run
                        let service = self.0.clone();
                        let fut = async move { (*service).execute(req).await };
                        Box::pin(fut)
                    }
                }

                // Create a future that creates the service
                let service = self.service.clone();
                Box::pin(async move {
                    let method: ExecuteSvc<T> = ExecuteSvc(service);
                    let codec: ProstCodec<_, _> = ProstCodec::default();
                    let mut grpc: GrpcServer<ProstCodec<_, _>> = GrpcServer::new(codec);
                    Ok(grpc.server_streaming(method, req).await)
                })
            },

            // Other (boring) request types
            _ => {
                // Return a future that simply does ¯\_(ツ)_/¯
                Box::pin(async move {
                    Ok(http::Response::builder()
                        .status(200)
                        .header("grpc-status", "12")
                        .header("content-type", "application/grpc")
                        .body(empty_body())
                        .unwrap())
                })
            },
        }
    }
}

impl<T: Clone> Clone for DriverServiceServer<T> {
    #[inline]
    fn clone(&self) -> Self { Self { service: self.service.clone() } }
}
impl<T: DriverService> NamedService for DriverServiceServer<T> {
    const NAME: &'static str = "driver.DriverService";
}