scylla/transport/locator/
precomputed_replicas.rsuse super::replication_info::ReplicationInfo;
use super::TokenRing;
use crate::routing::Token;
use crate::transport::node::Node;
use crate::transport::topology::Strategy;
use std::cmp;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::sync::Arc;
type Replicas = Vec<Arc<Node>>;
#[derive(Clone, Debug)]
pub(crate) struct PrecomputedReplicas {
global_replicas: PrecomputedReplicasRing,
datacenter_replicas: HashMap<String, DatacenterPrecomputedReplicas>,
}
#[derive(Debug, Clone)]
struct PrecomputedReplicasRing {
replicas_for_token: TokenRing<Replicas>,
max_rep_factor: usize,
}
#[derive(Clone, Debug)]
struct DatacenterPrecomputedReplicas {
compressed_replica_ring: Option<PrecomputedReplicasRing>,
above_rack_count_replica_rings: HashMap<usize, TokenRing<Replicas>>,
}
impl DatacenterPrecomputedReplicas {
fn get_replica_ring_for_rf(&self, replication_factor: usize) -> Option<&TokenRing<Replicas>> {
if let Some(compressed) = &self.compressed_replica_ring {
if compressed.max_rep_factor >= replication_factor {
return Some(&compressed.replicas_for_token);
}
}
self.above_rack_count_replica_rings.get(&replication_factor)
}
}
impl PrecomputedReplicas {
pub(crate) fn compute<'a>(
replication_data: &ReplicationInfo,
keyspace_strategies: impl Iterator<Item = &'a Strategy>,
) -> PrecomputedReplicas {
let min_precomputed_rep_factor: usize = 1;
let mut max_global_repfactor: usize = min_precomputed_rep_factor;
let mut dc_repfactors: HashMap<&'a str, BTreeSet<usize>> = HashMap::new();
for strategy in keyspace_strategies {
match strategy {
Strategy::SimpleStrategy { replication_factor } => {
max_global_repfactor = cmp::max(max_global_repfactor, *replication_factor)
}
Strategy::NetworkTopologyStrategy {
datacenter_repfactors,
} => {
for (dc_name, dc_repfactor) in datacenter_repfactors {
let repfactors: &mut BTreeSet<usize> =
dc_repfactors.entry(dc_name).or_default();
repfactors.insert(*dc_repfactor);
}
}
Strategy::LocalStrategy => {} Strategy::Other { .. } => {} }
}
let global_replicas_iter = replication_data.get_global_ring().iter().map(|(token, _)| {
let cur_replicas: Replicas = replication_data
.simple_strategy_replicas(*token, max_global_repfactor)
.cloned()
.collect();
(*token, cur_replicas)
});
let global_replicas = PrecomputedReplicasRing {
replicas_for_token: TokenRing::new(global_replicas_iter),
max_rep_factor: max_global_repfactor,
};
let mut datacenter_replicas: HashMap<String, DatacenterPrecomputedReplicas> =
HashMap::new();
for (dc_name, repfactors) in dc_repfactors {
let dc_rep_data = match replication_data.get_datacenters().get(dc_name) {
Some(dc_rep_data) => dc_rep_data,
None => continue,
};
let rack_count = dc_rep_data.get_rack_count();
let compressed_replica_ring_rf = repfactors.range(..=rack_count).next_back();
let replica_ring_rf_above_rack_count = repfactors.range((rack_count + 1)..);
let produce_replica_ring_iter = |rf| {
let ring_iter = dc_rep_data.get_dc_ring().iter().map(|(token, _)| {
let cur_replicas: Replicas = replication_data
.nts_replicas_in_datacenter(*token, dc_name, rf)
.cloned()
.collect();
(*token, cur_replicas)
});
TokenRing::new(ring_iter)
};
let compressed_replica_ring =
compressed_replica_ring_rf.map(|rf| PrecomputedReplicasRing {
replicas_for_token: produce_replica_ring_iter(*rf),
max_rep_factor: *rf,
});
let above_rack_count_replica_rings = replica_ring_rf_above_rack_count
.map(|rf| (*rf, produce_replica_ring_iter(*rf)))
.collect();
let dc_precomputed_replicas = DatacenterPrecomputedReplicas {
compressed_replica_ring,
above_rack_count_replica_rings,
};
datacenter_replicas.insert(dc_name.to_string(), dc_precomputed_replicas);
}
PrecomputedReplicas {
global_replicas,
datacenter_replicas,
}
}
pub(crate) fn get_precomputed_simple_strategy_replicas(
&self,
token: Token,
replication_factor: usize,
) -> Option<&[Arc<Node>]> {
if replication_factor > self.global_replicas.max_rep_factor {
return None;
}
let precomputed_token_replicas = self
.global_replicas
.replicas_for_token
.get_elem_for_token(token)?;
let result_len: usize = cmp::min(precomputed_token_replicas.len(), replication_factor);
Some(&precomputed_token_replicas[..result_len])
}
pub(crate) fn get_precomputed_network_strategy_replicas(
&self,
token: Token,
dc_name: &str,
dc_replication_factor: usize,
) -> Option<&[Arc<Node>]> {
let precomputed_replicas_ring = self
.datacenter_replicas
.get(dc_name)?
.get_replica_ring_for_rf(dc_replication_factor)?;
let precomputed_replicas = precomputed_replicas_ring.get_elem_for_token(token)?;
let result_len: usize = cmp::min(precomputed_replicas.len(), dc_replication_factor);
Some(&precomputed_replicas[..result_len])
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use crate::{
routing::Token,
transport::{
locator::test::{create_ring, mock_metadata_for_token_aware_tests, A, C, D, E, F, G},
topology::{Keyspace, Strategy},
},
};
use super::{PrecomputedReplicas, ReplicationInfo};
#[tokio::test]
async fn test_simple_stategy() {
let mut metadata = mock_metadata_for_token_aware_tests();
metadata.keyspaces = [(
"SimpleStrategy{rf=2}".into(),
Keyspace {
strategy: Strategy::SimpleStrategy {
replication_factor: 2,
},
tables: HashMap::new(),
views: HashMap::new(),
user_defined_types: HashMap::new(),
},
)]
.iter()
.cloned()
.collect();
let ring = create_ring(&metadata);
let replication_info = ReplicationInfo::new(ring);
let precomputed_replicas = PrecomputedReplicas::compute(
&replication_info,
metadata
.keyspaces
.values()
.map(|keyspace| &keyspace.strategy),
);
let check = |token, replication_factor, expected_node_ids| {
let replicas = precomputed_replicas.get_precomputed_simple_strategy_replicas(
Token { value: token },
replication_factor,
);
let ids: Vec<u16> = replicas
.unwrap()
.iter()
.map(|node| node.address.port())
.collect();
assert_eq!(ids, expected_node_ids);
};
check(160, 0, vec![]);
check(160, 1, vec![F]);
check(160, 2, vec![F, A]);
assert_eq!(
precomputed_replicas.get_precomputed_simple_strategy_replicas(Token { value: 160 }, 3),
None
);
check(200, 1, vec![F]);
check(200, 2, vec![F, A]);
check(701, 1, vec![E]);
check(701, 2, vec![E, G]);
}
#[tokio::test]
async fn test_network_topology_strategy() {
let metadata = mock_metadata_for_token_aware_tests();
let ring = create_ring(&metadata);
let replication_info = ReplicationInfo::new(ring);
let precomputed_replicas = PrecomputedReplicas::compute(
&replication_info,
metadata
.keyspaces
.values()
.map(|keyspace| &keyspace.strategy),
);
let check = |token, dc, replication_factor, expected_node_ids| {
let replicas = precomputed_replicas.get_precomputed_network_strategy_replicas(
Token { value: token },
dc,
replication_factor,
);
let ids: Vec<u16> = replicas
.unwrap()
.iter()
.map(|node| node.address.port())
.collect();
assert_eq!(ids, expected_node_ids);
};
check(160, "eu", 0, vec![]);
check(160, "eu", 1, vec![A]);
check(160, "eu", 2, vec![A, G]);
check(160, "eu", 3, vec![A, C, G]);
assert_eq!(
precomputed_replicas.get_precomputed_network_strategy_replicas(
Token { value: 160 },
"eu",
4
),
None
);
check(160, "us", 0, vec![]);
check(160, "us", 1, vec![F]);
check(160, "us", 2, vec![F, D]);
check(160, "us", 3, vec![F, D, E]);
assert_eq!(
precomputed_replicas.get_precomputed_network_strategy_replicas(
Token { value: 160 },
"us",
4
),
None
);
}
}