Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,532 changes: 419 additions & 1,113 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
resolver = "3"
members = [
"crates/byteport-transport",
"crates/byteport-dag",
"frontend/web/src-tauri",
]
13 changes: 13 additions & 0 deletions crates/byteport-dag/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "byteport-dag"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
description = "DAG foundation: topological sort and parallel-bucket scheduler for Phenotype compute/infra"
publish = false

[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_yaml = "0.9"
thiserror = "2.0"
192 changes: 192 additions & 0 deletions crates/byteport-dag/src/dag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
//! Core DAG data structure.
//!
//! A generic directed acyclic graph backed by an adjacency list.

use std::collections::{HashMap, HashSet};
use std::hash::Hash;

/// Error types for DAG operations.
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum DagError {
#[error("node `{0:?}` already exists")]
NodeAlreadyExists(String),

#[error("node `{0:?}` not found")]
NodeNotFound(String),

#[error("edge from `{0:?}` to `{1:?}` would create a cycle")]
CycleDetected(String, String),
}

/// A generic directed acyclic graph over node identifiers of type `K`.
#[derive(Debug, Clone)]
pub struct Dag<K> {
/// All known nodes.
nodes: HashSet<K>,
/// Adjacency list: parent -> children (forward edges).
children: HashMap<K, Vec<K>>,
/// Reverse adjacency list: child -> parents (back edges).
parents: HashMap<K, Vec<K>>,
}

// ---------------------------------------------------------------------------
// Construction
// ---------------------------------------------------------------------------

impl<K> Default for Dag<K> {
fn default() -> Self {
Self {
nodes: HashSet::new(),
children: HashMap::new(),
parents: HashMap::new(),
}
}
}

impl<K> Dag<K>
where
K: Eq + Hash + Clone + std::fmt::Debug,
{
/// Create an empty DAG.
pub fn new() -> Self {
Self::default()
}

/// Insert a new node. Returns an error if the node already exists.
pub fn add_node(&mut self, node: K) -> Result<(), DagError> {
if self.nodes.contains(&node) {
return Err(DagError::NodeAlreadyExists(format!("{:?}", node)));
}
self.nodes.insert(node.clone());
self.children.entry(node.clone()).or_default();
self.parents.entry(node).or_default();
Ok(())
}

/// Insert a directed edge `from -> to`.
///
/// Both endpoints must already exist. This method does **not** check
/// for cycles — call [`check_cycle`] or [`topological_sort`] separately.
pub fn add_edge(&mut self, from: K, to: K) -> Result<(), DagError> {
if !self.nodes.contains(&from) {
return Err(DagError::NodeNotFound(format!("{:?}", from)));
}
if !self.nodes.contains(&to) {
return Err(DagError::NodeNotFound(format!("{:?}", to)));
}
self.children.entry(from.clone()).or_default().push(to.clone());
self.parents.entry(to).or_default().push(from);
Comment on lines +77 to +78

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Duplicate edges are inserted without checks, which duplicates entries in children/parents and inflates edge_count. This also breaks serialization round-trip fidelity because DagSchema::from_dag deduplicates edges via BTreeSet, so repeated edges are silently lost; reject duplicates on insert (or switch adjacency storage to sets). [logic error]

Severity Level: Major ⚠️
- ❌ Serialization drops duplicate edges, changing graph semantics.
- ⚠️ Edge counts inconsistent between DAG and serialized schema.
Steps of Reproduction ✅
1. Construct a DAG similarly to `sample_dag()` in
`crates/byteport-dag/src/serialize.rs:198-206`, but call `dag.add_edge("build".into(),
"test".into())` twice using `Dag::add_edge()` implemented at
`crates/byteport-dag/src/dag.rs:70-79`.

2. Call `dag.edge_count()` (`crates/byteport-dag/src/dag.rs:96-99`) and observe that it
returns a higher count than the logical number of distinct edges because the duplicate
`"build" → "test"` edge has been appended twice to both `children` and `parents` via the
lines at `dag.rs:77-78`.

3. Serialize this DAG using `DagSchema::from_dag(&dag, "1.0.0")`
(`crates/byteport-dag/src/serialize.rs:93-132`); the edge collection uses a
`BTreeSet<(String, String)>` at `serialize.rs:108-117`, so only one `(\"build\",
\"test\")` pair is retained in `schema.edges`.

4. Round-trip via `let dag2 = schema.into_dag().unwrap()`
(`crates/byteport-dag/src/serialize.rs:144-152`) and compare `dag.edge_count()` vs
`dag2.edge_count()`: the reconstructed DAG has fewer edges, confirming that duplicate
edges are silently dropped across serialization and that the in-memory graph and its
serialized representation diverge.

Fix in Cursor Fix in VSCode Claude

(Use Cmd/Ctrl + Click for best experience)

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** crates/byteport-dag/src/dag.rs
**Line:** 77:78
**Comment:**
	*Logic Error: Duplicate edges are inserted without checks, which duplicates entries in `children`/`parents` and inflates `edge_count`. This also breaks serialization round-trip fidelity because `DagSchema::from_dag` deduplicates edges via `BTreeSet`, so repeated edges are silently lost; reject duplicates on insert (or switch adjacency storage to sets).

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
Once fix is implemented, also check other comments on the same PR, and ask user if the user wants to fix the rest of the comments as well. if said yes, then fetch all the comments validate the correctness and implement a minimal fix
👍 | 👎

Ok(())
}

// -----------------------------------------------------------------------
// Accessors
// -----------------------------------------------------------------------

/// Return the set of all node identifiers.
pub fn nodes(&self) -> &HashSet<K> {
&self.nodes
}

/// Return the number of nodes.
pub fn node_count(&self) -> usize {
self.nodes.len()
}

/// Return the number of edges.
pub fn edge_count(&self) -> usize {
self.children.values().map(|v| v.len()).sum()
}

/// Return the children (direct successors) of `node`.
pub fn children_of(&self, node: &K) -> Option<&[K]> {
self.children.get(node).map(|v| v.as_slice())
}

/// Return the parents (direct predecessors) of `node`.
pub fn parents_of(&self, node: &K) -> Option<&[K]> {
self.parents.get(node).map(|v| v.as_slice())
}

/// Return `true` if the graph contains `node`.
pub fn contains(&self, node: &K) -> bool {
self.nodes.contains(node)
}

/// Iterate over all nodes.
pub fn iter_nodes(&self) -> impl Iterator<Item = &K> {
self.nodes.iter()
}
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn empty_dag() {
let dag: Dag<u32> = Dag::new();
assert_eq!(dag.node_count(), 0);
assert_eq!(dag.edge_count(), 0);
}

#[test]
fn add_and_query_nodes() {
let mut dag = Dag::new();
dag.add_node(1).unwrap();
dag.add_node(2).unwrap();
assert_eq!(dag.node_count(), 2);
assert!(dag.contains(&1));
assert!(dag.contains(&2));
}

#[test]
fn duplicate_node_error() {
let mut dag = Dag::new();
dag.add_node("a").unwrap();
assert!(dag.add_node("a").is_err());
}

#[test]
fn add_edge_missing_source() {
let mut dag = Dag::<&str>::new();
dag.add_node("b").unwrap();
assert!(dag.add_edge("a", "b").is_err());
}

#[test]
fn add_edge_missing_target() {
let mut dag = Dag::<&str>::new();
dag.add_node("a").unwrap();
assert!(dag.add_edge("a", "b").is_err());
}

#[test]
fn simple_edge() {
let mut dag = Dag::new();
dag.add_node("a").unwrap();
dag.add_node("b").unwrap();
dag.add_edge("a", "b").unwrap();
assert_eq!(dag.edge_count(), 1);
assert_eq!(dag.children_of(&"a"), Some(&["b"][..]));
assert_eq!(dag.parents_of(&"b"), Some(&["a"][..]));
}

#[test]
fn diamond_graph() {
let mut dag = Dag::new();
for n in ["a", "b", "c", "d"] {
dag.add_node(n).unwrap();
}
dag.add_edge("a", "b").unwrap();
dag.add_edge("a", "c").unwrap();
dag.add_edge("b", "d").unwrap();
dag.add_edge("c", "d").unwrap();
assert_eq!(dag.node_count(), 4);
assert_eq!(dag.edge_count(), 4);
}
}
34 changes: 34 additions & 0 deletions crates/byteport-dag/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! # byteport-dag
//!
//! DAG foundation for Phenotype compute/infra automation (epic F).
//!
//! ## Modules
//!
//! | Module | Description |
//! |---------------|-----------------------------------------------------|
//! | [`dag`] | Generic directed-acyclic-graph data structure |
//! | [`topo`] | Topological sort (Kahn's algorithm + DFS variant) |
//! | [`scheduler`] | Parallel-bucket scheduler built on topological order |
//! | [`serialize`] | YAML/JSON round-trip serialization (DagSchema) |
//!
//! ## Example
//!
//! ```rust
//! use byteport_dag::dag::Dag;
//! use byteport_dag::scheduler;
//!
//! let mut dag = Dag::new();
//! dag.add_node("build").unwrap();
//! dag.add_node("test").unwrap();
//! dag.add_node("deploy").unwrap();
//! dag.add_edge("build", "test").unwrap();
//! dag.add_edge("test", "deploy").unwrap();
//!
//! let sched = scheduler::schedule(&dag).unwrap();
//! assert_eq!(sched.buckets.len(), 3);
//! ```

pub mod dag;
pub mod topo;
pub mod scheduler;
pub mod serialize;
Loading
Loading