feat: add Airflow DAG for log file cleanup#421
Conversation
Introduce a new DAG for cleaning up Airflow log files older than a specified number of days. The implementation includes options for dry-run, removal of empty directories, and configurable base log folder. This enhancement aims to improve log management and maintain a cleaner environment.
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (6)
dags/airflow_cleaner_etl.py (6)
103-105: Avoid blind except; log and re-raise to surface real failures.Aligns with your preference to let errors propagate in DAGs.
- except Exception as err: - logging.warning("Unexpected error processing %s/%s: %s", root, file_name, err) + except Exception: + logging.exception("Unexpected error processing %s/%s", root, file_name) + raise
51-51: Optional: shorter raise message to satisfy TRY003 (Ruff).You can keep as-is, but to appease Ruff TRY003, shorten or rely on the exception type plus context logging.
69-85: Optional perf: prefer os.scandir in tight loops.Using os.scandir() yields faster dir entry iteration vs Path+stat per file in large trees.
106-113: Optional UX: log cutoff date and human-readable bytes.Helps operators understand impact at a glance.
Example tweak:
from datetime import datetime, timezone cutoff_dt = datetime.fromtimestamp(cutoff_ts, tz=timezone.utc).isoformat() logging.info( "Scan complete. files_scanned=%s candidates=%s deleted=%s bytes_freed=%s (~%0.2f MB) cutoff=%s", total_files_scanned, candidate_files, deleted_files, bytes_freed, bytes_freed/1_048_576, cutoff_dt )
1-7: Nit: module name vs DAG name.The filename has “etl” but the DAG cleans logs. Consider renaming the module to reflect its function (e.g., airflow_logs_cleanup.py) for clarity.
13-26: Use a timezone-aware start_date and the modernscheduleargument; keepmax_active_tasks(Airflow 2.9.1 supports it).Replace naive datetime with pendulum, switch
schedule_interval→schedule, and add an import for pendulum. Dockerfile pins apache-airflow==2.9.1 somax_active_tasksis supported.File: dags/airflow_cleaner_etl.py
with DAG( dag_id="airflow_logs_cleanup", - start_date=datetime(2025, 1, 1), - schedule_interval="@daily", + start_date=pendulum.datetime(2025, 1, 1, tz="UTC"), + schedule="@daily", catchup=False, max_active_runs=1, max_active_tasks=1, params={ "max_age_days": 30, # delete files older than N days "dry_run": False, # when True, only report, do not delete "remove_empty_dirs": True, # remove empty directories after deletion "base_log_folder": None, # override Airflow BASE_LOG_FOLDER if needed }, ) as dag:Add near other imports:
import pendulum
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
dags/airflow_cleaner_etl.py(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: amindadgar
PR: TogetherCrew/airflow-dags#186
File: dags/discord_analyzer_etl.py:0-0
Timestamp: 2024-10-09T08:57:20.288Z
Learning: amindadgar prefers not to use try-catch blocks in the airflow DAGs to allow errors to propagate to Apache Airflow for easier tracking.
Learnt from: amindadgar
PR: TogetherCrew/airflow-dags#186
File: dags/discord_analyzer_etl.py:0-0
Timestamp: 2024-06-25T14:01:54.830Z
Learning: amindadgar prefers not to use try-catch blocks in the airflow DAGs to allow errors to propagate to Apache Airflow for easier tracking.
🪛 Ruff (0.13.1)
dags/airflow_cleaner_etl.py
51-51: Avoid specifying long messages outside the exception class
(TRY003)
103-103: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: ci / test / Test
🔇 Additional comments (1)
dags/airflow_cleaner_etl.py (1)
28-38: Use TaskFlow context; kwargs won’t auto-carry dag_run/params. Also avoid basicConfig in tasks.Decorated tasks don’t receive runtime context via kwargs. Use get_current_context() and drop logging.basicConfig to avoid altering Airflow logging.
@task(retries=0) -def clean_airflow_logs(**kwargs) -> None: +def clean_airflow_logs() -> None: """ Delete Airflow log files older than the configured max_age_days. Respects Airflow's logging BASE_LOG_FOLDER by default. Supports dry-run. """ - logging.basicConfig(level=logging.INFO) - - run_conf = kwargs.get("dag_run").conf if "dag_run" in kwargs else {} - params = kwargs.get("params", {}) + context = get_current_context() + run_conf = (context["dag_run"].conf or {}) + params = (context.get("params") or {})Also applies to: 34-35
⛔ Skipped due to learnings
Learnt from: amindadgar PR: TogetherCrew/airflow-dags#390 File: dags/hivemind_discord_etl.py:25-31 Timestamp: 2025-08-13T09:55:31.135Z Learning: In amindadgar's Airflow environment, dag_run is always available in kwargs within DAG task contexts, so defensive KeyError handling for kwargs["dag_run"] is unnecessary.
| from airflow import DAG | ||
| from airflow.decorators import task | ||
| from airflow.configuration import conf as airflow_conf | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Import get_current_context for TaskFlow runtime context.
-from airflow.decorators import task
+from airflow.decorators import task, get_current_context📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from airflow import DAG | |
| from airflow.decorators import task | |
| from airflow.configuration import conf as airflow_conf | |
| from airflow import DAG | |
| from airflow.decorators import task, get_current_context | |
| from airflow.configuration import conf as airflow_conf |
🤖 Prompt for AI Agents
In dags/airflow_cleaner_etl.py around lines 8 to 11, the TaskFlow runtime helper
get_current_context is not imported; add the import (e.g., from
airflow.operators.python import get_current_context) to the existing imports so
tasks can access the runtime context via get_current_context().
| bytes_freed += stat_result.st_size | ||
| file_path.unlink(missing_ok=True) | ||
| deleted_files += 1 | ||
| except PermissionError: | ||
| logging.warning("Permission denied deleting %s", file_path) | ||
| except OSError as err: | ||
| logging.warning("Failed deleting %s: %s", file_path, err) |
There was a problem hiding this comment.
Count bytes_freed only after a successful deletion.
Currently bytes are added even if unlink fails.
- bytes_freed += stat_result.st_size
- file_path.unlink(missing_ok=True)
- deleted_files += 1
+ file_path.unlink(missing_ok=True)
+ deleted_files += 1
+ bytes_freed += stat_result.st_size📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| bytes_freed += stat_result.st_size | |
| file_path.unlink(missing_ok=True) | |
| deleted_files += 1 | |
| except PermissionError: | |
| logging.warning("Permission denied deleting %s", file_path) | |
| except OSError as err: | |
| logging.warning("Failed deleting %s: %s", file_path, err) | |
| file_path.unlink(missing_ok=True) | |
| deleted_files += 1 | |
| bytes_freed += stat_result.st_size | |
| except PermissionError: | |
| logging.warning("Permission denied deleting %s", file_path) | |
| except OSError as err: | |
| logging.warning("Failed deleting %s: %s", file_path, err) |
🤖 Prompt for AI Agents
In dags/airflow_cleaner_etl.py around lines 96–102, bytes_freed is incremented
before unlink may fail; change the flow so you compute or read the file size
(stat_result = file_path.stat()) before attempting deletion but only add
stat_result.st_size to bytes_freed and increment deleted_files after
file_path.unlink(...) completes successfully (i.e., inside the try block after
unlink), leaving the PermissionError and OSError handlers unchanged so failed
deletions do not increase the counters.
Introduce a new DAG for cleaning up Airflow log files older than a specified number of days. The implementation includes options for dry-run, removal of empty directories, and configurable base log folder. This enhancement aims to improve log management and maintain a cleaner environment.
Summary by CodeRabbit