scylla/transport/
speculative_execution.rsuse futures::{
future::FutureExt,
stream::{FuturesUnordered, StreamExt},
};
use std::{future::Future, sync::Arc, time::Duration};
use tracing::{trace_span, warn, Instrument};
use super::{errors::QueryError, metrics::Metrics};
pub struct Context {
pub metrics: Arc<Metrics>,
}
pub trait SpeculativeExecutionPolicy: std::fmt::Debug + Send + Sync {
fn max_retry_count(&self, context: &Context) -> usize;
fn retry_interval(&self, context: &Context) -> Duration;
}
#[derive(Debug, Clone)]
pub struct SimpleSpeculativeExecutionPolicy {
pub max_retry_count: usize,
pub retry_interval: Duration,
}
#[derive(Debug, Clone)]
pub struct PercentileSpeculativeExecutionPolicy {
pub max_retry_count: usize,
pub percentile: f64,
}
impl SpeculativeExecutionPolicy for SimpleSpeculativeExecutionPolicy {
fn max_retry_count(&self, _: &Context) -> usize {
self.max_retry_count
}
fn retry_interval(&self, _: &Context) -> Duration {
self.retry_interval
}
}
impl SpeculativeExecutionPolicy for PercentileSpeculativeExecutionPolicy {
fn max_retry_count(&self, _: &Context) -> usize {
self.max_retry_count
}
fn retry_interval(&self, context: &Context) -> Duration {
let interval = context.metrics.get_latency_percentile_ms(self.percentile);
let ms = match interval {
Ok(d) => d,
Err(e) => {
warn!(
"Failed to get latency percentile ({}), defaulting to 100 ms",
e
);
100
}
};
Duration::from_millis(ms)
}
}
fn can_be_ignored<ResT>(result: &Result<ResT, QueryError>) -> bool {
match result {
Ok(_) => false,
Err(QueryError::IoError(_)) => true,
Err(QueryError::TimeoutError) => true,
_ => false,
}
}
const EMPTY_PLAN_ERROR: QueryError = QueryError::ProtocolError("Empty query plan - driver bug!");
pub(crate) async fn execute<QueryFut, ResT>(
policy: &dyn SpeculativeExecutionPolicy,
context: &Context,
query_runner_generator: impl Fn(bool) -> QueryFut,
) -> Result<ResT, QueryError>
where
QueryFut: Future<Output = Option<Result<ResT, QueryError>>>,
{
let mut retries_remaining = policy.max_retry_count(context);
let retry_interval = policy.retry_interval(context);
let mut async_tasks = FuturesUnordered::new();
async_tasks.push(
query_runner_generator(false)
.instrument(trace_span!("Speculative execution: original query")),
);
let sleep = tokio::time::sleep(retry_interval).fuse();
tokio::pin!(sleep);
let mut last_error = None;
loop {
futures::select! {
_ = &mut sleep => {
if retries_remaining > 0 {
async_tasks.push(query_runner_generator(true).instrument(trace_span!("Speculative execution", retries_remaining = retries_remaining)));
retries_remaining -= 1;
sleep.set(tokio::time::sleep(retry_interval).fuse());
}
}
res = async_tasks.select_next_some() => {
match res {
Some(r) => {
if !can_be_ignored(&r) {
return r;
} else {
last_error = Some(r)
}
},
None => {
if async_tasks.is_empty() && retries_remaining == 0 {
return last_error.unwrap_or({
Err(EMPTY_PLAN_ERROR)
});
}
},
}
}
}
}
}