Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
c83cc07
Create 3 replicas per reader node in migration
ygina Sep 15, 2018
3716541
Remove reader node replicas in apply recipe
ygina Sep 15, 2018
f421f9c
Hardcode number of expected nodes to pass tests
ygina Sep 15, 2018
fcfa832
Consider all replicas when returning a view
ygina Sep 15, 2018
7cb0b2d
Always allocate replicas in different domains
ygina Sep 15, 2018
ec22c1c
Remove replicas on different domains
ygina Sep 15, 2018
d9dd041
Cache readers in node
ygina Sep 15, 2018
cc73d86
Ability to obtain view to any replica
ygina Sep 20, 2018
70f6ccb
Assign readers a replica index
ygina Sep 25, 2018
119a140
Assign replica domains to different workers maybe
ygina Sep 25, 2018
f995181
Refactor local address assignment
ygina Sep 26, 2018
db1afbb
Refactor round robin worker assignment method
ygina Sep 27, 2018
1534991
Put the first reader in a separate domain as well
ygina Sep 27, 2018
a35748b
Assign replicas and non-replicas to workers separately
ygina Sep 27, 2018
04bd3f7
Clean up assignment code a bunch, though it still panics sometimes
ygina Sep 28, 2018
4e7b296
Merge branch 'master' into replication
ygina Sep 28, 2018
837b194
Fix already mutable borrowed panic
ygina Sep 28, 2018
1354a38
Merge branch 'master' into replication
ygina Sep 30, 2018
85c6278
Replica is not in distinct domain if there is only 1
ygina Oct 1, 2018
cb1cf8b
Logging and clarifying comments
ygina Oct 1, 2018
6e54552
Test to query any replica
ygina Oct 1, 2018
8853aee
View api returns view replicas in round robin
ygina Oct 3, 2018
2815d06
Merge branch 'master' into replication
ygina Oct 15, 2018
be0fbc6
Merge branch 'master' into replication
ygina Oct 15, 2018
77913d7
CLI option to update replica count on the fly
ygina Oct 15, 2018
576346f
Logging for view builders
ygina Oct 17, 2018
10c4885
Actually assign replicas to different workers
ygina Oct 18, 2018
2ef42fb
Shards should be on separate workers again
ygina Oct 18, 2018
b69c3c0
CLI option to set number of pool threads
ygina Oct 23, 2018
0c2bf3e
Revert unnecessary refactor to controller view
ygina Nov 2, 2018
57d8804
Set default replicas to 1 when not a test, otherwise 3
ygina Nov 2, 2018
692ce0a
Edit comments in existing tests to reflect counting replicas
ygina Nov 2, 2018
eeecc12
Propagate reader node name to view, integration test for replica writes
ygina Nov 2, 2018
b9b7325
Documented place_domain, some minor Rust and whitespace things
ygina Nov 11, 2018
f8749fd
Small string fix
ygina Nov 11, 2018
0ad9f62
Simplify commit(), no logical changes
ygina Nov 12, 2018
b2bef11
Name all readers with replica index in the suffix
ygina Nov 12, 2018
94c3225
Clarify language regarding the reader replication factor
ygina Nov 12, 2018
7a577c2
Naming, comments, and Rust things
ygina Nov 13, 2018
aa7435b
Store reader index instead of name in view
ygina Nov 13, 2018
232453f
Simplify query node removal with assumption that query node has exact…
ygina Nov 13, 2018
85998f2
More comments
ygina Nov 13, 2018
d7c93f5
It is ok to remove leaf nodes with no children
ygina Nov 21, 2018
8b0fba5
Fix localsoup
ygina Nov 22, 2018
f68ca92
Merge branch 'master' into replication
ygina Jan 8, 2019
5cb51d2
Stylistic changes for remove_leaf
ygina Jan 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion noria-benchmarks/vote/clients/localsoup/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl Setup {
Graph {
vote: inputs["Vote"],
article: inputs["Article"],
end: outputs["ArticleWithVoteCount"],
end: outputs["ArticleWithVoteCount_0"],
stupid: self.stupid,
graph,
}
Expand Down
36 changes: 36 additions & 0 deletions noria-server/dataflow/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub struct Node {
taken: bool,

sharded_by: Sharding,
next_reader: usize,
readers: Vec<NodeIndex>,
}

// constructors
Expand All @@ -50,6 +52,8 @@ impl Node {
taken: false,

sharded_by: Sharding::None,
next_reader: 0,
readers: Vec::new(),
}
}

Expand Down Expand Up @@ -106,6 +110,11 @@ impl Node {
self.sharded_by = s;
}

/// The node index `ni` being added is the index of a reader for this node.
pub fn add_reader(&mut self, ni: NodeIndex) {
self.readers.push(ni)
}

pub fn on_commit(&mut self, remap: &HashMap<NodeIndex, IndexPair>) {
// this is *only* overwritten for these asserts.
assert!(!self.taken);
Expand Down Expand Up @@ -331,6 +340,33 @@ impl Node {
}
}

// reader replication
impl Node {
pub fn has_readers(&self) -> bool {
!self.readers.is_empty()
}

pub fn num_readers(&self) -> usize {
self.readers.len()
}

pub fn get_readers(&self) -> &[NodeIndex] {
&self.readers[..]
}

/// Returns reader replicas in round robin order each time the method is called,
/// with the primary reader (reader index = 0) being returned last.
pub fn next_reader(&mut self) -> Option<NodeIndex> {
if self.num_readers() > 0 {
self.next_reader += 1;
Comment thread
jonhoo marked this conversation as resolved.
self.next_reader %= self.num_readers();
Some(*self.readers.get(self.next_reader).unwrap())
} else {
None
}
}
}

// is this or that?
impl Node {
pub fn is_source(&self) -> bool {
Expand Down
11 changes: 10 additions & 1 deletion noria-server/dataflow/src/node/special/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct Reader {
streamers: Vec<channel::StreamSender<Vec<StreamUpdate>>>,

for_node: NodeIndex,
/// The index of this Reader in the list of Readers for its ancestor
reader_index: usize,
Comment thread
jonhoo marked this conversation as resolved.
state: Option<Vec<usize>>,
}

Expand All @@ -46,17 +48,19 @@ impl Clone for Reader {
streamers: self.streamers.clone(),
state: self.state.clone(),
for_node: self.for_node,
reader_index: self.reader_index,
}
}
}

impl Reader {
pub fn new(for_node: NodeIndex) -> Self {
pub fn new(for_node: NodeIndex, reader_index: usize) -> Self {
Reader {
writer: None,
streamers: Vec::new(),
state: None,
for_node,
reader_index,
}
}

Expand All @@ -66,6 +70,10 @@ impl Reader {
self.for_node
}

pub fn reader_index(&self) -> usize {
self.reader_index
}

#[allow(dead_code)]
pub(crate) fn writer(&self) -> Option<&backlog::WriteHandle> {
self.writer.as_ref()
Expand All @@ -82,6 +90,7 @@ impl Reader {
streamers: mem::replace(&mut self.streamers, Vec::new()),
state: self.state.clone(),
for_node: self.for_node,
reader_index: self.reader_index,
}
}

Expand Down
5 changes: 5 additions & 0 deletions noria-server/src/controller/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ impl ControllerBuilder {
self.config.sharding = shards;
}

/// Set reader replication factor
pub fn set_replication_factor(&mut self, replication_factor: usize) {
self.config.replication_factor = replication_factor;
}

/// Set how many workers the controller should wait for before starting. More workers can join
/// later, but they won't be assigned any of the initial domains.
pub fn set_quorum(&mut self, quorum: usize) {
Expand Down
165 changes: 74 additions & 91 deletions noria-server/src/controller/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct ControllerInner {
pub(super) source: NodeIndex,
pub(super) ndomains: usize,
pub(super) sharding: Option<usize>,
pub(super) replication_factor: usize,

pub(super) domain_config: DomainConfig,

Expand Down Expand Up @@ -440,6 +441,7 @@ impl ControllerInner {

materializations,
sharding: state.config.sharding,
replication_factor: state.config.replication_factor,
domain_config: state.config.domain_config,
persistence: state.config.persistence,
heartbeat_every: state.config.heartbeat_every,
Expand Down Expand Up @@ -500,9 +502,15 @@ impl ControllerInner {
self.persistence = params;
}

// Assigns nodes to this domain, and shards the domain across multiple workers.
//
// Each worker identifier in `identifiers` corresponds to a single shard in `num_shards`,
// thus the logic of which worker gets which shard is determined by the code that calls
// this method. That code must also ensure the workers are healthy.
pub(crate) fn place_domain(
&mut self,
idx: DomainIndex,
identifiers: Vec<WorkerIdentifier>,
num_shards: Option<usize>,
log: &Logger,
nodes: Vec<(NodeIndex, bool)>,
Expand All @@ -520,9 +528,6 @@ impl ControllerInner {
.collect(),
);

// TODO(malte): simple round-robin placement for the moment
let mut wi = self.workers.iter_mut();

// Send `AssignDomain` to each shard of the given domain
for i in 0..num_shards.unwrap_or(1) {
let nodes = if i == num_shards.unwrap_or(1) - 1 {
Expand All @@ -540,17 +545,10 @@ impl ControllerInner {
persistence_parameters: self.persistence.clone(),
};

let (identifier, w) = loop {
if let Some((i, w)) = wi.next() {
if w.healthy {
break (*i, w);
Comment thread
jonhoo marked this conversation as resolved.
}
} else {
wi = self.workers.iter_mut();
}
};

// send domain to worker
let identifier = identifiers.get(i)
.expect("number of identifiers should match number of shards");
Comment thread
jonhoo marked this conversation as resolved.
let w = self.workers.get_mut(&identifier).unwrap();
info!(
log,
"sending domain {}.{} to worker {:?}",
Expand Down Expand Up @@ -625,7 +623,7 @@ impl ControllerInner {
.enumerate()
.map(|(i, worker)| {
let tx = txs.remove(&i).unwrap();
DomainShardHandle { worker, tx }
DomainShardHandle { worker: *worker, tx }
})
.collect();

Expand Down Expand Up @@ -728,52 +726,50 @@ impl ControllerInner {
.collect()
}

fn find_view_for(&self, node: NodeIndex) -> Option<NodeIndex> {
// reader should be a child of the given node. however, due to sharding, it may not be an
// *immediate* child. furthermore, once we go beyond depth 1, we may accidentally hit an
// *unrelated* reader node. to account for this, readers keep track of what node they are
// "for", and we simply search for the appropriate reader by that metric. since we know
// that the reader must be relatively close, a BFS search is the way to go.
let mut bfs = Bfs::new(&self.ingredients, node);
let mut reader = None;
while let Some(child) = bfs.next(&self.ingredients) {
if self.ingredients[child]
.with_reader(|r| r.is_for() == node)
.unwrap_or(false)
{
reader = Some(child);
break;
}
}

reader
}

/// Obtain a `ViewBuilder` that can be sent to a client and then used to query a given
/// (already maintained) reader node called `name`.
pub fn view_builder(&self, name: &str) -> Option<ViewBuilder> {
/// (already maintained) reader node. If there are multiple readers for any given query,
/// a `ViewBuilder` is returned for each reader in round robin order.
///
/// `name` is the name of the view the reader is for.
pub fn view_builder(&mut self, name: &str) -> Option<ViewBuilder> {
// first try to resolve the node via the recipe, which handles aliasing between identical
// queries.
let node = match self.recipe.node_addr_for(name) {
Ok(ni) => ni,
Err(_) => {
// if the recipe doesn't know about this query, traverse the graph.
// we need this do deal with manually constructed graphs (e.g., in tests).
*self.outputs().get(name)?
if let Some(ni) = self.outputs().get(name) {
*ni
} else {
// depending on how the graph was constructed, the outputs may be suffixed
// with the reader index.
let reader_name = format!("{}_0", name);
*self.outputs().get(&reader_name)?
Comment thread
jonhoo marked this conversation as resolved.
}
}
};

self.find_view_for(node).map(|r| {
let domain = self.ingredients[r].domain();
let columns = self.ingredients[r].fields().to_vec();
let schema = self.view_schema(r);
self.ingredients[node].next_reader().map(|ni| {
let domain = self.ingredients[ni].domain();
let columns = self.ingredients[ni].fields().to_vec();
let schema = self.view_schema(ni);
let shards = (0..self.domains[&domain].shards())
.map(|i| self.read_addrs[&self.domains[&domain].assignment(i)].clone())
.collect();
let reader_index = self.ingredients[ni].with_reader(|r| r.reader_index()).unwrap();
info!(
self.log,
"creating view builder";
"name" => &name,
"node_index" => ni.index(),
"reader_index" => reader_index,
);

ViewBuilder {
reader_index,
local_ports: vec![],
node: r,
node: ni,
columns,
schema: schema,
shards,
Expand Down Expand Up @@ -1135,7 +1131,7 @@ impl ControllerInner {
graphviz(&self.ingredients, detailed, &self.materializations)
}

fn remove_leaf(&mut self, mut leaf: NodeIndex) -> Result<(), String> {
fn remove_leaf(&mut self, leaf: NodeIndex) -> Result<(), String> {
let mut removals = vec![];
let start = leaf;
assert!(!self.ingredients[leaf].is_source());
Expand All @@ -1146,60 +1142,47 @@ impl ControllerInner {
leaf.index()
);

if self
// We're looking for a single egress node that connects the query node to readers in
// other domains.
let mut nodes = vec![];
let num_children = self
.ingredients
.neighbors_directed(leaf, petgraph::EdgeDirection::Outgoing)
.count()
> 0
{
// This query leaf node has children -- typically, these are readers, but they can also
// include egress nodes or other, dependent queries.
let mut has_non_reader_children = false;
let readers: Vec<_> = self
.ingredients
.count();
if num_children == 1 {
let child = self.ingredients
.neighbors_directed(leaf, petgraph::EdgeDirection::Outgoing)
.filter(|ni| {
if self.ingredients[*ni].is_reader() {
true
} else {
has_non_reader_children = true;
false
}
})
.collect();
if has_non_reader_children {
// should never happen, since we remove nodes in reverse topological order
crit!(
self.log,
"not removing node {} yet, as it still has non-reader children",
leaf.index()
);
unreachable!();
.next()
.unwrap();
assert!(self.ingredients[child].is_egress());

// Remove the egress node and its children
let mut bfs = Bfs::new(&self.ingredients, child);
while let Some(child) = bfs.next(&self.ingredients) {
if self.ingredients
.neighbors_directed(child, petgraph::EdgeDirection::Outgoing)
.count() == 0
{
removals.push(child);
nodes.push(child);
}
}
// nodes can have only one reader attached
assert!(readers.len() <= 1);
debug!(
self.log,
"Removing query leaf \"{}\"", self.ingredients[leaf].name();
"node" => leaf.index(),
);
if !readers.is_empty() {
removals.push(readers[0]);
leaf = readers[0];
} else {
unreachable!();
}
self.log, "Removing egress node and its children";
"node" => child.index(),
"leaf" => leaf.index(),
);
} else if num_children > 1 {
// should not happen, since we remove nodes in reverse topological order
crit!(
self.log,
"not removing node {} yet, as it still has non-reader-related children",
leaf.index();
"num_children" => num_children,
);
unreachable!();
}

// `node` now does not have any children any more
assert_eq!(
self.ingredients
.neighbors_directed(leaf, petgraph::EdgeDirection::Outgoing)
.count(),
0
);

let mut nodes = vec![leaf];
while let Some(node) = nodes.pop() {
let mut parents = self
.ingredients
Expand Down
Loading