Skip to content

Commit 8df4352

Browse files
committed
count_lines param
1 parent 986b418 commit 8df4352

File tree

4 files changed

+24
-12
lines changed

4 files changed

+24
-12
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ CsvToPostgresOperator(
9696
| `has_header` | Include CSV header row | `True` |
9797
| `compression` | Compression format (`"gzip"` or `None`) | `None` |
9898
| `timeout` | Query timeout in minutes | `60` |
99+
| `count_lines` | Count and log the number of lines in the CSV after writing | `True` |
99100

100101
### CsvToPostgresOperator
101102

@@ -112,6 +113,7 @@ CsvToPostgresOperator(
112113
| `quote_char` | CSV quote character | `'"'` |
113114
| `null_string` | String representing NULL | `""` |
114115
| `timeout` | Query timeout in minutes | `60` |
116+
| `count_lines` | Count and log the number of lines in the CSV before loading | `True` |
115117

116118
## Development
117119

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "airflow-postgres-csv"
7-
version = "0.3.1"
7+
version = "0.3.2"
88
description = "Airflow operators for PostgreSQL <-> CSV file transfers using COPY"
99
readme = "README.md"
1010
license = "MIT"

src/airflow_postgres_csv/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
from airflow_postgres_csv.operators import CsvToPostgresOperator, PostgresToCsvOperator
44

55
__all__ = ["PostgresToCsvOperator", "CsvToPostgresOperator"]
6-
__version__ = "0.3.1"
6+
__version__ = "0.3.2"

src/airflow_postgres_csv/operators.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class PostgresToCsvOperator(BaseOperator):
3434
:param compression: Compression format. Supports ``"gzip"``.
3535
Defaults to ``None`` (no compression).
3636
:param timeout: Query timeout in minutes. Defaults to ``60``.
37+
:param count_lines: Count and log the number of lines in the CSV file after writing.
38+
Disable for very large files to avoid the extra read. Defaults to ``True``.
3739
"""
3840

3941
template_fields: Sequence[str] = (
@@ -51,6 +53,7 @@ def __init__(
5153
has_header: bool = True,
5254
compression: str | None = None,
5355
timeout: int = 60,
56+
count_lines: bool = True,
5457
**kwargs,
5558
):
5659
super().__init__(**kwargs)
@@ -61,6 +64,7 @@ def __init__(
6164
self.has_header = has_header
6265
self.compression = compression
6366
self.timeout = timeout
67+
self.count_lines = count_lines
6468

6569
def execute(self, context):
6670
sql = self.sql
@@ -89,15 +93,16 @@ def execute(self, context):
8993
cursor.copy_expert(copy_command, csv_file)
9094
rows = cursor.rowcount
9195

92-
open_func = self._get_open_func()
93-
with open_func(self.csv_file_path, "rt", encoding="utf-8") as f:
94-
line_count = sum(1 for _ in f)
96+
if self.count_lines:
97+
open_func = self._get_open_func()
98+
with open_func(self.csv_file_path, "rt", encoding="utf-8") as f:
99+
line_count = sum(1 for _ in f)
95100

96101
self.log.info(
97-
"CSV saved: %s (%s rows, %s lines, %s)",
102+
"CSV saved: %s (%s rows%s, %s)",
98103
self.csv_file_path,
99104
rows if rows >= 0 else "unknown",
100-
line_count,
105+
f", {line_count} lines" if self.count_lines else "",
101106
"with header" if self.has_header else "no header",
102107
)
103108
return self.csv_file_path
@@ -126,6 +131,8 @@ class CsvToPostgresOperator(BaseOperator):
126131
:param compression: Compression format. Supports ``"gzip"``.
127132
Defaults to ``None`` (no compression).
128133
:param timeout: Query timeout in minutes. Defaults to ``60``.
134+
:param count_lines: Count and log the number of lines in the CSV file before loading.
135+
Disable for very large files to avoid the extra read. Defaults to ``True``.
129136
"""
130137

131138
template_fields: Sequence[str] = ("csv_file_path", "table_name")
@@ -143,6 +150,7 @@ def __init__(
143150
truncate: bool = False,
144151
compression: str | None = None,
145152
timeout: int = 60,
153+
count_lines: bool = True,
146154
**kwargs,
147155
):
148156
super().__init__(**kwargs)
@@ -157,21 +165,23 @@ def __init__(
157165
self.truncate = truncate
158166
self.compression = compression
159167
self.timeout = timeout
168+
self.count_lines = count_lines
160169

161170
def execute(self, context):
162171
if not os.path.exists(self.csv_file_path):
163172
raise AirflowException(f"CSV file not found: {self.csv_file_path}")
164173

165174
pg_hook = PostgresHook(postgres_conn_id=self.conn_id)
166175

167-
open_func = self._get_open_func()
168-
with open_func(self.csv_file_path, "rt", encoding="utf-8") as f:
169-
line_count = sum(1 for _ in f)
176+
if self.count_lines:
177+
open_func = self._get_open_func()
178+
with open_func(self.csv_file_path, "rt", encoding="utf-8") as f:
179+
line_count = sum(1 for _ in f)
170180

171181
self.log.info(
172-
"Loading %s (%s lines, %s) into %s",
182+
"Loading %s (%s%s) into %s",
173183
self.csv_file_path,
174-
line_count,
184+
f"{line_count} lines, " if self.count_lines else "",
175185
"with header" if self.has_header else "no header",
176186
self.table_name,
177187
)

0 commit comments

Comments
 (0)