use futures_core::{ready, Stream};
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;
pin_project! {
/// The [`Future`] returned by the [`ServiceExt::call_all`] combinator.
#[derive(Debug)]
pub(crate) struct CallAll<Svc, S, Q> {
service: Option<Svc>,
#[pin]
stream: S,
queue: Q,
eof: bool,
}
}
pub(crate) trait Drive<F: Future> {
fn is_empty(&self) -> bool;
fn push(&mut self, future: F);
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>;
}
impl<Svc, S, Q> CallAll<Svc, S, Q>
where
Svc: Service<S::Item>,
Svc::Error: Into<crate::BoxError>,
S: Stream,
Q: Drive<Svc::Future>,
{
pub(crate) fn new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q> {
CallAll {
service: Some(service),
stream,
queue,
eof: false,
}
}
/// Extract the wrapped [`Service`].
pub(crate) fn into_inner(mut self) -> Svc {
self.service.take().expect("Service already taken")
}
/// Extract the wrapped [`Service`].
pub(crate) fn take_service(self: Pin<&mut Self>) -> Svc {
self.project()
.service
.take()
.expect("Service already taken")
}
pub(crate) fn unordered(mut self) -> super::CallAllUnordered<Svc, S> {
assert!(self.queue.is_empty() && !self.eof);
super::CallAllUnordered::new(self.service.take().unwrap(), self.stream)
}
}
impl<Svc, S, Q> Stream for CallAll<Svc, S, Q>
where
Svc: Service<S::Item>,
Svc::Error: Into<crate::BoxError>,
S: Stream,
Q: Drive<Svc::Future>,
{
type Item = Result<Svc::Response, crate::BoxError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
// First, see if we have any responses to yield
if let Poll::Ready(r) = this.queue.poll(cx) {
if let Some(rsp) = r.transpose().map_err(Into::into)? {
return Poll::Ready(Some(Ok(rsp)));
}
}
// If there are no more requests coming, check if we're done
if *this.eof {
if this.queue.is_empty() {
return Poll::Ready(None);
} else {
return Poll::Pending;
}
}
// Then, see that the service is ready for another request
let svc = this
.service
.as_mut()
.expect("Using CallAll after extracing inner Service");
ready!(svc.poll_ready(cx)).map_err(Into::into)?;
// If it is, gather the next request (if there is one), or return `Pending` if the
// stream is not ready.
// TODO: We probably want to "release" the slot we reserved in Svc if the
// stream returns `Pending`. It may be a while until we get around to actually
// using it.
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(req) => {
this.queue.push(svc.call(req));
}
None => {
// We're all done once any outstanding requests have completed
*this.eof = true;
}
}
}
}
}