Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2"

[workspace.package]
version = "0.0.9"
version = "0.0.10"
authors = ["nicelgueta"]
repository = "https://github.com/nicelgueta/cdktr"

Expand Down
2 changes: 1 addition & 1 deletion crates/cdktr-workflow/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cdktr-workflow"
version = "0.1.1"
version = "0.1.2"
edition = "2024"

[dependencies]
Expand Down
52 changes: 49 additions & 3 deletions crates/cdktr-workflow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ pub async fn get_yaml_map<T: FromYaml>(workflow_dir: &str) -> HashMap<String, T>
{
let workflow = match T::from_yaml(
path.to_str().expect("failed to get apth as str"),
) {
)
.await
{
Ok(workflow) => workflow,
Err(e) => {
warn!(
Expand Down Expand Up @@ -132,11 +134,13 @@ mod tests {
struct MockYamlContent {
name: String,
}
#[async_trait::async_trait]
impl FromYaml for MockYamlContent {
type Error = GenericError;
fn from_yaml(file_path: &str) -> Result<Self, Self::Error> {
async fn from_yaml(file_path: &str) -> Result<Self, Self::Error> {
let obj: MockYamlContent =
serde_norway::from_str(&fs::read_to_string(file_path).unwrap()).unwrap();
serde_norway::from_str(&tokio::fs::read_to_string(file_path).await.unwrap())
.unwrap();
Ok(obj)
}
}
Expand Down Expand Up @@ -206,4 +210,46 @@ mod tests {

assert_eq!(result, expected);
}

#[tokio::test]
async fn test_no_file_descriptor_leak_on_multiple_refreshes() {
// This test simulates the production scenario where workflows are refreshed
// every 60 seconds. We perform many rapid refreshes to ensure file descriptors
// are properly released with async tokio::fs instead of blocking std::fs.

let (wf_dir, _tmp_dir) = get_tmp_dir();

// Simulate 100 rapid refreshes (much more aggressive than production)
for i in 0..100 {
let result = get_yaml_map::<MockYamlContent>(wf_dir.to_str().unwrap()).await;

// Verify we still get correct results
assert_eq!(result.len(), 3, "Failed on iteration {}", i);
assert!(result.contains_key("workflow1"));
assert!(result.contains_key("sub1.workflow2"));
assert!(result.contains_key("sub1.sub2.workflow3"));
}

// If we've made it here without "too many open files" errors (errno 24),
// the file descriptors are being properly released
}

#[tokio::test]
async fn test_workflow_store_refresh_no_fd_leak() {
// Test the WorkflowStore refresh_workflows method specifically
// Use the real test_artifacts directory with actual workflow files
let test_dir = "./test_artifacts/workflows";

let mut store = WorkflowStore::from_dir(test_dir).await.unwrap();
let initial_count = store.count().await;

// Simulate multiple refresh cycles
for i in 0..50 {
store.refresh_workflows().await;
let count = store.count().await;
assert_eq!(count, initial_count, "Failed on refresh iteration {}", i);
}

// Success means no file descriptor leaks
}
}
19 changes: 12 additions & 7 deletions crates/cdktr-workflow/src/models.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use async_trait::async_trait;
use cdktr_core::exceptions::GenericError;
use cdktr_core::get_cdktr_setting;
use daggy::{self, Dag, NodeIndex, Walker};
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::fs;
use std::path::{Path, PathBuf};
use tokio::fs;

use super::executors::ExecutableTask;

Expand Down Expand Up @@ -159,9 +160,10 @@ impl WorkFlowDAG {
}
}

#[async_trait::async_trait]
pub trait FromYaml: Sized {
type Error: Display;
fn from_yaml(file_path: &str) -> Result<Self, Self::Error>;
async fn from_yaml(file_path: &str) -> Result<Self, Self::Error>;
}

#[derive(Debug, Clone, Deserialize, Serialize)]
Expand All @@ -174,11 +176,12 @@ pub struct Workflow {
cron: Option<String>,
start_time: Option<String>,
}
#[async_trait]
impl FromYaml for Workflow {
type Error = GenericError;
fn from_yaml(file_path: &str) -> Result<Self, GenericError> {
async fn from_yaml(file_path: &str) -> Result<Self, GenericError> {
let file = Path::new(file_path);
let contents = match fs::read_to_string(file) {
let contents = match fs::read_to_string(file).await {
Ok(s) => s,
Err(e) => {
return Err(GenericError::WorkflowError(format!(
Expand Down Expand Up @@ -417,11 +420,13 @@ tasks:
)
}

#[test]
fn test_get_dependents() {
#[tokio::test]
async fn test_get_dependents() {
let dir = env::current_dir().unwrap();
println!("{:?}", dir.to_string_lossy());
let wf = Workflow::from_yaml("./test_artifacts/workflows/multi-cmd.yml").unwrap();
let wf = Workflow::from_yaml("./test_artifacts/workflows/multi-cmd.yml")
.await
.unwrap();
let deps = wf.dag.get_dependents("task1").unwrap();
assert_eq!(deps.len(), 1);
assert_eq!(deps[0], "task2");
Expand Down
97 changes: 97 additions & 0 deletions crates/cdktr-workflow/tests/fd_leak_integration_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/// Integration test to verify file descriptor leak fix
///
/// This test simulates the production scenario where the workflow refresh loop
/// runs every 60 seconds. The test performs many rapid refreshes to ensure
/// file descriptors are properly released when using async tokio::fs operations.
///
/// Prior to the fix, using synchronous std::fs::read_to_string in an async context
/// would cause file descriptors to not be released quickly enough, eventually
/// leading to "No file descriptors available (os error 24)" errors.
use cdktr_workflow::{Workflow, WorkflowStore, get_yaml_map};

#[tokio::test]
async fn test_no_fd_leak_with_rapid_refreshes() {
// Use the existing test artifacts directory with valid workflow files
let workflow_dir = "./test_artifacts/workflows";

// Perform 200 rapid refreshes - far more aggressive than production (60s intervals)
// This would definitely trigger FD exhaustion with the old synchronous approach
for iteration in 0..200 {
let workflows = get_yaml_map::<Workflow>(workflow_dir).await;

assert!(
workflows.len() >= 1,
"Iteration {}: Expected at least 1 workflow, got {}",
iteration,
workflows.len()
);
}

// If we reach here without "too many open files" errors, the fix works!
println!("✓ Successfully completed 200 rapid refreshes without file descriptor leaks");
}

#[tokio::test]
async fn test_workflow_store_continuous_refresh_simulation() {
// Simulate continuous workflow store refreshes as done by the principal
let workflow_dir = "./test_artifacts/workflows";

let mut store = WorkflowStore::from_dir(workflow_dir).await.unwrap();
let initial_count = store.count().await;

assert!(
initial_count >= 1,
"Expected at least 1 workflow in test artifacts"
);

// Simulate 100 refresh cycles (like running for ~100 minutes with 60s intervals)
for iteration in 0..100 {
store.refresh_workflows().await;

let count = store.count().await;
assert_eq!(
count, initial_count,
"Iteration {}: Expected {} workflows, got {}",
iteration, initial_count, count
);
}

println!("✓ Successfully simulated 100 workflow refresh cycles without FD leaks");
}

#[tokio::test]
async fn test_concurrent_refreshes() {
// Test concurrent refreshes to ensure async operations are properly isolated
let workflow_dir = "./test_artifacts/workflows";

// First, get the expected count
let initial_workflows = get_yaml_map::<Workflow>(workflow_dir).await;
let expected_count = initial_workflows.len();

assert!(
expected_count >= 1,
"Expected at least 1 workflow in test artifacts"
);

// Spawn 10 concurrent refresh operations
let mut handles = vec![];
for _ in 0..10 {
let dir = workflow_dir.to_string();
let handle = tokio::spawn(async move {
for _ in 0..20 {
let workflows = get_yaml_map::<Workflow>(&dir).await;
assert!(workflows.len() >= 1, "Should have at least 1 workflow");
}
});
handles.push(handle);
}

// Wait for all concurrent operations to complete
for handle in handles {
handle.await.unwrap();
}

println!(
"✓ Successfully completed 200 concurrent refreshes (10 tasks × 20 iterations) without FD leaks"
);
}
Loading