Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,532 changes: 419 additions & 1,113 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
resolver = "3"
members = [
"crates/byteport-transport",
"crates/byteport-dag",
"frontend/web/src-tauri",
]
11 changes: 11 additions & 0 deletions crates/byteport-dag/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "byteport-dag"
version = "0.1.0"
edition = "2021"
description = "DAG foundation for Phenotype compute/infra automation (epic F)"

[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_yaml = "0.9"
thiserror = "2"
191 changes: 191 additions & 0 deletions crates/byteport-dag/src/dag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
//! Core DAG data structure.
//!
//! A generic directed acyclic graph backed by an adjacency list.

use std::collections::{HashMap, HashSet};
use std::hash::Hash;

/// Error types for DAG operations.
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum DagError {
#[error("node `{0:?}` already exists")]
NodeAlreadyExists(String),

#[error("node `{0:?}` not found")]
NodeNotFound(String),

#[error("edge from `{0:?}` to `{1:?}` would create a cycle")]
CycleDetected(String, String),
}

/// A generic directed acyclic graph over node identifiers of type `K`.
#[derive(Debug, Clone)]
pub struct Dag<K> {
/// All known nodes.
nodes: HashSet<K>,
/// Adjacency list: parent -> children (forward edges).
children: HashMap<K, Vec<K>>,
/// Reverse adjacency list: child -> parents (back edges).
parents: HashMap<K, Vec<K>>,
}

// ---------------------------------------------------------------------------
// Construction
// ---------------------------------------------------------------------------

impl<K> Default for Dag<K> {
fn default() -> Self {
Self {
nodes: HashSet::new(),
children: HashMap::new(),
parents: HashMap::new(),
}
}
}

impl<K> Dag<K>
where
K: Eq + Hash + Clone + std::fmt::Debug,
{
/// Create an empty DAG.
pub fn new() -> Self {
Self::default()
}

/// Insert a new node. Returns an error if the node already exists.
pub fn add_node(&mut self, node: K) -> Result<(), DagError> {
if self.nodes.contains(&node) {
return Err(DagError::NodeAlreadyExists(format!("{:?}", node)));
}
self.nodes.insert(node.clone());
self.children.entry(node.clone()).or_default();
self.parents.entry(node).or_default();
Ok(())
}

/// Insert a directed edge `from -> to`.
///
/// Both endpoints must already exist.
pub fn add_edge(&mut self, from: K, to: K) -> Result<(), DagError> {
if !self.nodes.contains(&from) {
return Err(DagError::NodeNotFound(format!("{:?}", from)));
}
if !self.nodes.contains(&to) {
return Err(DagError::NodeNotFound(format!("{:?}", to)));
}
self.children.entry(from.clone()).or_default().push(to.clone());
self.parents.entry(to).or_default().push(from);
Comment on lines +76 to +77

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Duplicate edges are appended without any deduplication check, so repeated add_edge(from, to) calls create parallel duplicates in both adjacency lists. This inflates edge_count and causes contract inconsistencies with serialization (from_dag deduplicates edges via BTreeSet), leading to lossy round-trips and unexpected graph behavior. Reject or ignore already-existing edges before pushing. [logic error]

Severity Level: Major ⚠️
- ⚠️ Duplicate edges inflate `Dag::edge_count` unexpectedly.
- ⚠️ Serialization round-trip drops duplicates, losing original structure.
Steps of Reproduction ✅
1. In a consumer or test, construct a DAG as in `crates/byteport-dag/src/dag.rs:168-173`:
create `let mut dag = Dag::new();`, add nodes `"a"` and `"b"` via
`dag.add_node("a").unwrap();` and `dag.add_node("b").unwrap();`, then call
`dag.add_edge("a", "b").unwrap();` twice; each call executes `add_edge` at `dag.rs:69-78`,
where lines 76-77 append `to` and `from` without any deduplication, resulting in
`children["a"] == ["b", "b"]` and `parents["b"] == ["a", "a"]`.

2. Calling `dag.edge_count()` at `crates/byteport-dag/src/dag.rs:95-98` on this DAG sums
the lengths of all `children` vectors, returning `2` for the single logical edge `"a" ->
"b"` because it is stored twice, inflating edge metrics compared to a unique-edge
interpretation.

3. Serializing this DAG with `DagSchema::from_dag(&dag, "1.0.0")` at
`crates/byteport-dag/src/serialize.rs:81-120` uses a `BTreeSet<(String, String)>`
(`edges_set` at lines 95-103) to collect edges; inserting `(node.clone(), child.clone())`
from `dag.iter_nodes()` and `dag.children_of(node)` deduplicates the two `"a" -> "b"`
entries, so `DagSchema.edges.len()` becomes `1` even though `dag.edge_count()` was `2`.

4. If a consumer then reconstructs a DAG via `schema.into_dag()` at
`serialize.rs:131-139`, it iterates over `self.edges` and calls
`dag.add_edge(edge.from.clone(), edge.to.clone())?;` once per schema edge, producing a new
DAG with `edge_count() == 1` despite the original DAG having `edge_count() == 2`,
demonstrating that duplicate adjacency entries are silently dropped by
`from_dag`/`into_dag` round-trip and leading to inconsistent edge counts and behavior
between the core `Dag` structure and the schema/serialization layer.

Fix in Cursor Fix in VSCode Claude

(Use Cmd/Ctrl + Click for best experience)

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** crates/byteport-dag/src/dag.rs
**Line:** 76:77
**Comment:**
	*Logic Error: Duplicate edges are appended without any deduplication check, so repeated `add_edge(from, to)` calls create parallel duplicates in both adjacency lists. This inflates `edge_count` and causes contract inconsistencies with serialization (`from_dag` deduplicates edges via `BTreeSet`), leading to lossy round-trips and unexpected graph behavior. Reject or ignore already-existing edges before pushing.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
Once fix is implemented, also check other comments on the same PR, and ask user if the user wants to fix the rest of the comments as well. if said yes, then fetch all the comments validate the correctness and implement a minimal fix
👍 | 👎

Comment on lines +76 to +77

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject cycle-forming edges

When a caller adds an edge that closes a path back to its source, such as a -> b -> c followed by c -> a, this method returns Ok(()) and mutates both adjacency lists, so the public Dag type can contain cycles despite being documented as acyclic and despite exposing CycleDetected. This also lets DagSchema::into_dag() accept cyclic schemas as valid Dags until a later scheduler/toposort call happens to catch them.

Useful? React with 👍 / 👎.

Ok(())
}

// -----------------------------------------------------------------------
// Accessors
// -----------------------------------------------------------------------

/// Return the set of all node identifiers.
pub fn nodes(&self) -> &HashSet<K> {
&self.nodes
}

/// Return the number of nodes.
pub fn node_count(&self) -> usize {
self.nodes.len()
}

/// Return the number of edges.
pub fn edge_count(&self) -> usize {
self.children.values().map(|v| v.len()).sum()
}

/// Return the children (direct successors) of `node`.
pub fn children_of(&self, node: &K) -> Option<&[K]> {
self.children.get(node).map(|v| v.as_slice())
}

/// Return the parents (direct predecessors) of `node`.
pub fn parents_of(&self, node: &K) -> Option<&[K]> {
self.parents.get(node).map(|v| v.as_slice())
}

/// Return `true` if the graph contains `node`.
pub fn contains(&self, node: &K) -> bool {
self.nodes.contains(node)
}

/// Iterate over all nodes.
pub fn iter_nodes(&self) -> impl Iterator<Item = &K> {
self.nodes.iter()
}
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn empty_dag() {
let dag: Dag<u32> = Dag::new();
assert_eq!(dag.node_count(), 0);
assert_eq!(dag.edge_count(), 0);
}

#[test]
fn add_and_query_nodes() {
let mut dag = Dag::new();
dag.add_node(1).unwrap();
dag.add_node(2).unwrap();
assert_eq!(dag.node_count(), 2);
assert!(dag.contains(&1));
assert!(dag.contains(&2));
}

#[test]
fn duplicate_node_error() {
let mut dag = Dag::new();
dag.add_node("a").unwrap();
assert!(dag.add_node("a").is_err());
}

#[test]
fn add_edge_missing_source() {
let mut dag = Dag::<&str>::new();
dag.add_node("b").unwrap();
assert!(dag.add_edge("a", "b").is_err());
}

#[test]
fn add_edge_missing_target() {
let mut dag = Dag::<&str>::new();
dag.add_node("a").unwrap();
assert!(dag.add_edge("a", "b").is_err());
}

#[test]
fn simple_edge() {
let mut dag = Dag::new();
dag.add_node("a").unwrap();
dag.add_node("b").unwrap();
dag.add_edge("a", "b").unwrap();
assert_eq!(dag.edge_count(), 1);
assert_eq!(dag.children_of(&"a"), Some(&["b"][..]));
assert_eq!(dag.parents_of(&"b"), Some(&["a"][..]));
}

#[test]
fn diamond_graph() {
let mut dag = Dag::new();
for n in ["a", "b", "c", "d"] {
dag.add_node(n).unwrap();
}
dag.add_edge("a", "b").unwrap();
dag.add_edge("a", "c").unwrap();
dag.add_edge("b", "d").unwrap();
dag.add_edge("c", "d").unwrap();
assert_eq!(dag.node_count(), 4);
assert_eq!(dag.edge_count(), 4);
}
}
38 changes: 38 additions & 0 deletions crates/byteport-dag/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//! # byteport-dag
//!
//! DAG foundation for Phenotype compute/infra automation (epic F).
//!
//! ## Modules
//!
//! | Module | Description |
//! |---------------|------------------------------------------------------------|
//! | [`dag`] | Generic directed-acyclic-graph data structure |
//! | [`topo`] | Topological sort (Kahn's algorithm + DFS variant) |
//! | [`scheduler`] | Parallel-bucket scheduler built on topological order |
//! | [`schema`] | Enriched node/edge schema: prereqs, acceptance, audit hooks |
//! | [`serialize`] | YAML/JSON round-trip serialization for the enriched schema |
//!
//! ## Example
//!
//! ```rust
//! use byteport_dag::dag::Dag;
//! use byteport_dag::serialize::DagSchema;
//!
//! let mut dag: Dag<String> = Dag::new();
//! dag.add_node("build".into()).unwrap();
//! dag.add_node("test".into()).unwrap();
//! dag.add_node("deploy".into()).unwrap();
//! dag.add_edge("build".into(), "test".into()).unwrap();
//! dag.add_edge("test".into(), "deploy".into()).unwrap();
//!
//! let schema = DagSchema::from_dag(&dag, "2.0.0");
//! let yaml = schema.to_yaml().unwrap();
//! let round: DagSchema = DagSchema::from_yaml(&yaml).unwrap();
//! assert_eq!(schema, round);
//! ```

pub mod dag;
pub mod topo;
pub mod scheduler;
pub mod schema;
pub mod serialize;
Loading
Loading