From 8481c58cd4ac2ed5957c52a32b084a0d3f08056a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Kut=C3=BD?= <6du1ro.n@gmail.com> Date: Thu, 26 Apr 2018 00:05:16 +0200 Subject: [PATCH 1/2] Upgrade to new airflow. --- airflow_plugins/operators/base.py | 5 ----- airflow_plugins/operators/files.py | 5 ++--- airflow_plugins/operators/sensors/file_sensor.py | 3 +-- setup.py | 2 +- 4 files changed, 4 insertions(+), 11 deletions(-) diff --git a/airflow_plugins/operators/base.py b/airflow_plugins/operators/base.py index 2d6229c..7428403 100644 --- a/airflow_plugins/operators/base.py +++ b/airflow_plugins/operators/base.py @@ -73,11 +73,6 @@ def _split_path(path): def _get_ftp_path(self, path): return self._split_path(path)[-1] - def _get_s3_path(self, path): - bucket, key = self._split_path(path)[1:] - bucket = bucket or 'storiesbi-datapipeline' - return (bucket, key) - def pre_execute(self, context): params = context['params'] for param in ['local_path', 'remote_path']: diff --git a/airflow_plugins/operators/files.py b/airflow_plugins/operators/files.py index 66ee0a7..a262717 100644 --- a/airflow_plugins/operators/files.py +++ b/airflow_plugins/operators/files.py @@ -34,8 +34,7 @@ def execute(self, context): elif self.conn and self.conn.conn_type == "s3": hook = S3Hook(self.conn_id) - bucket, key = self._get_s3_path(self.remote_path) - fileobj = hook.get_bucket(bucket).get_key(key) + fileobj = hook.get_key(self.remote_path) fileobj.get_contents_to_filename(self.local_path) else: @@ -61,7 +60,7 @@ def execute(self, context): elif self.conn and self.conn.conn_type == "s3": hook = S3Hook(self.conn_id) - bucket, key = self._get_s3_path(self.remote_path) + bucket, key = hook.parse_s3_url(self.remote_path) hook.load_file(self.local_path, key, bucket, replace=True) else: diff --git a/airflow_plugins/operators/sensors/file_sensor.py b/airflow_plugins/operators/sensors/file_sensor.py index 61c6ca1..98ea4f8 100644 --- a/airflow_plugins/operators/sensors/file_sensor.py +++ b/airflow_plugins/operators/sensors/file_sensor.py @@ -182,8 +182,7 @@ def poke(self, context): elif self.conn.conn_type == "s3": hook = S3Hook(self.conn_id) - bucket, key = self._get_s3_path(self.path) - fileobj = hook.get_bucket(bucket).get_key(key) + fileobj = hook.get_key(self.path) if not fileobj: msg = 'The file does not exist' diff --git a/setup.py b/setup.py index bea39e0..acafa91 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ requirements = [ "python-slugify>=1.1.4", "psycopg2>=2.6.2", - "boto==2.45.0", + "boto3>=1.6.14", "csvkit==1.0.2", "slackclient==1.0.4", "six==1.11.0", From 84ce7fd7e224ae249e4179cd5dfffbd04a49a9d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Kut=C3=BD?= <6du1ro.n@gmail.com> Date: Thu, 26 Apr 2018 09:34:50 +0200 Subject: [PATCH 2/2] Upgrade to new airflow. --- .../operators/sensors/file_sensor.py | 15 ++++- tests/operators/sensors/test_file_sensor.py | 56 +++++++++++-------- 2 files changed, 48 insertions(+), 23 deletions(-) diff --git a/airflow_plugins/operators/sensors/file_sensor.py b/airflow_plugins/operators/sensors/file_sensor.py index 98ea4f8..c75c2dc 100644 --- a/airflow_plugins/operators/sensors/file_sensor.py +++ b/airflow_plugins/operators/sensors/file_sensor.py @@ -3,6 +3,7 @@ import time from datetime import datetime, timedelta +import pytz from airflow.exceptions import ( AirflowException, AirflowSensorTimeout, @@ -12,6 +13,7 @@ from airflow.operators.sensors import BaseSensorOperator from airflow.utils.decorators import apply_defaults from pytz import timezone +from pytz.exceptions import UnknownTimeZoneError from airflow_plugins.hooks import FTPHook from airflow_plugins.operators import FileOperator @@ -66,7 +68,14 @@ def _send_notification(self, context, success=False): send_notification(ti.get_dagrun(), text, title, color) return - runtime = datetime.now() - ti.start_date + try: + tz = timezone(ti.start_date.tm_zone) + except (AttributeError, UnknownTimeZoneError): # tm_zone not set on t + runtime = datetime.now() - ti.start_date.replace(tzinfo=None) + else: + runtime = datetime.utcnow().replace( + tzinfo=ti.tm_zone) - ti.start_date + if runtime >= self.notify_after: if (self.last_notification is None or runtime >= self.last_notification + self.notify_delta): @@ -196,6 +205,10 @@ def poke(self, context): def get_last_modified(fileobj): timestamp = fileobj.last_modified + + if isinstance(timestamp, datetime): + return timestamp + tformat = '%a, %d %b %Y %H:%M:%S %Z' dt = datetime.strptime(timestamp, tformat) t = time.strptime(timestamp, tformat) diff --git a/tests/operators/sensors/test_file_sensor.py b/tests/operators/sensors/test_file_sensor.py index d252ecf..3d6899c 100644 --- a/tests/operators/sensors/test_file_sensor.py +++ b/tests/operators/sensors/test_file_sensor.py @@ -1,10 +1,10 @@ from datetime import datetime, timedelta from time import sleep -import boto +import boto3 import pytest from airflow.exceptions import AirflowException -from boto.s3.key import Key +# from boto3.s3.key import Key from mock import Mock from moto import mock_s3 @@ -46,13 +46,14 @@ def test_files_sensor_fail_on_unsupported_connection(): @mock_s3 def test_files_on_s3(): - conn = boto.connect_s3() - bucket = conn.create_bucket('storiesbi-datapipeline') + conn = boto3.resource('s3') + conn.create_bucket(Bucket='storiesbi-datapipeline') + get_or_update_conn("s3.stories.bi", conn_type="s3") file_sensor = FileSensor( task_id="check_new_file", - path="foo", + path="ss://storiesbi-datapipeline/foo", conn_id="s3.stories.bi", modified="anytime" ) @@ -61,27 +62,22 @@ def test_files_on_s3(): assert not file_sensor.poke(ctx) - k = Key(bucket) - k.key = "foo" - k.set_contents_from_string("bar") + conn.Object('storiesbi-datapipeline', 'foo').put(Body="bar") assert file_sensor.poke(ctx) @mock_s3 def test_files_on_s3_modified_after(): - conn = boto.connect_s3() - bucket = conn.create_bucket('storiesbi-datapipeline') - - k = Key(bucket) - k.key = "foo" - k.set_contents_from_string("bar") + conn = boto3.resource('s3') + conn.create_bucket(Bucket='storiesbi-datapipeline') + conn.Object('storiesbi-datapipeline', 'foo').put(Body="bar") get_or_update_conn("s3.stories.bi", conn_type="s3") file_sensor = FileSensor( task_id="check_new_file", - path="foo", + path="s3://storiesbi-datapipeline/foo", conn_id="s3.stories.bi", modified=datetime.now() ) @@ -92,19 +88,16 @@ def test_files_on_s3_modified_after(): # Hacky hacky! sleep(1) - key = bucket.get_key("foo") - key.set_contents_from_string("baz") + conn.Object('storiesbi-datapipeline', 'foo').put(Body="baz") assert file_sensor.poke(ctx) @mock_s3 def test_files_on_s3_from_custom_bucket_defined_in_path(): - conn = boto.connect_s3() - bucket = conn.create_bucket('testing') - k = Key(bucket) - k.key = "foo" - k.set_contents_from_string("baz") + conn = boto3.resource('s3') + conn.create_bucket(Bucket='testing') + conn.Object('testing', 'foo').put(Body="baz") get_or_update_conn("s3.stories.bi", conn_type="s3") yesterday = datetime.now() - timedelta(1) @@ -119,3 +112,22 @@ def test_files_on_s3_from_custom_bucket_defined_in_path(): file_sensor.pre_execute(ctx) assert file_sensor.poke(ctx) + + +@mock_s3 +def test_operator_notification(): + conn = boto3.resource('s3') + conn.create_bucket(Bucket='testing') + conn.Object('testing', 'foo').put(Body="baz") + + get_or_update_conn("s3.stories.bi", conn_type="s3") + yesterday = datetime.now() - timedelta(1) + + file_sensor = FileSensor( + task_id="check_new_file", + path="s3://testing/foo", + conn_id="s3.stories.bi", + modified=yesterday + ) + + file_sensor._send_notification(ctx, success=False)