Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions airflow_plugins/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ def _split_path(path):
def _get_ftp_path(self, path):
return self._split_path(path)[-1]

def _get_s3_path(self, path):
bucket, key = self._split_path(path)[1:]
bucket = bucket or 'storiesbi-datapipeline'
return (bucket, key)

def pre_execute(self, context):
params = context['params']
for param in ['local_path', 'remote_path']:
Expand Down
5 changes: 2 additions & 3 deletions airflow_plugins/operators/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ def execute(self, context):

elif self.conn and self.conn.conn_type == "s3":
hook = S3Hook(self.conn_id)
bucket, key = self._get_s3_path(self.remote_path)
fileobj = hook.get_bucket(bucket).get_key(key)
fileobj = hook.get_key(self.remote_path)
fileobj.get_contents_to_filename(self.local_path)

else:
Expand All @@ -61,7 +60,7 @@ def execute(self, context):

elif self.conn and self.conn.conn_type == "s3":
hook = S3Hook(self.conn_id)
bucket, key = self._get_s3_path(self.remote_path)
bucket, key = hook.parse_s3_url(self.remote_path)
hook.load_file(self.local_path, key, bucket, replace=True)

else:
Expand Down
18 changes: 15 additions & 3 deletions airflow_plugins/operators/sensors/file_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
from datetime import datetime, timedelta

import pytz
from airflow.exceptions import (
AirflowException,
AirflowSensorTimeout,
Expand All @@ -12,6 +13,7 @@
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from pytz import timezone
from pytz.exceptions import UnknownTimeZoneError

from airflow_plugins.hooks import FTPHook
from airflow_plugins.operators import FileOperator
Expand Down Expand Up @@ -66,7 +68,14 @@ def _send_notification(self, context, success=False):
send_notification(ti.get_dagrun(), text, title, color)
return

runtime = datetime.now() - ti.start_date
try:
tz = timezone(ti.start_date.tm_zone)
except (AttributeError, UnknownTimeZoneError): # tm_zone not set on t
runtime = datetime.now() - ti.start_date.replace(tzinfo=None)
else:
runtime = datetime.utcnow().replace(
tzinfo=ti.tm_zone) - ti.start_date

if runtime >= self.notify_after:
if (self.last_notification is None or
runtime >= self.last_notification + self.notify_delta):
Expand Down Expand Up @@ -182,8 +191,7 @@ def poke(self, context):

elif self.conn.conn_type == "s3":
hook = S3Hook(self.conn_id)
bucket, key = self._get_s3_path(self.path)
fileobj = hook.get_bucket(bucket).get_key(key)
fileobj = hook.get_key(self.path)

if not fileobj:
msg = 'The file does not exist'
Expand All @@ -197,6 +205,10 @@ def poke(self, context):

def get_last_modified(fileobj):
timestamp = fileobj.last_modified

if isinstance(timestamp, datetime):
return timestamp

tformat = '%a, %d %b %Y %H:%M:%S %Z'
dt = datetime.strptime(timestamp, tformat)
t = time.strptime(timestamp, tformat)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
requirements = [
"python-slugify>=1.1.4",
"psycopg2>=2.6.2",
"boto==2.45.0",
"boto3>=1.6.14",
"csvkit==1.0.2",
"slackclient==1.0.4",
"six==1.11.0",
Expand Down
56 changes: 34 additions & 22 deletions tests/operators/sensors/test_file_sensor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from datetime import datetime, timedelta
from time import sleep

import boto
import boto3
import pytest
from airflow.exceptions import AirflowException
from boto.s3.key import Key
# from boto3.s3.key import Key
from mock import Mock
from moto import mock_s3

Expand Down Expand Up @@ -46,13 +46,14 @@ def test_files_sensor_fail_on_unsupported_connection():

@mock_s3
def test_files_on_s3():
conn = boto.connect_s3()
bucket = conn.create_bucket('storiesbi-datapipeline')
conn = boto3.resource('s3')
conn.create_bucket(Bucket='storiesbi-datapipeline')

get_or_update_conn("s3.stories.bi", conn_type="s3")

file_sensor = FileSensor(
task_id="check_new_file",
path="foo",
path="ss://storiesbi-datapipeline/foo",
conn_id="s3.stories.bi",
modified="anytime"
)
Expand All @@ -61,27 +62,22 @@ def test_files_on_s3():

assert not file_sensor.poke(ctx)

k = Key(bucket)
k.key = "foo"
k.set_contents_from_string("bar")
conn.Object('storiesbi-datapipeline', 'foo').put(Body="bar")

assert file_sensor.poke(ctx)


@mock_s3
def test_files_on_s3_modified_after():
conn = boto.connect_s3()
bucket = conn.create_bucket('storiesbi-datapipeline')

k = Key(bucket)
k.key = "foo"
k.set_contents_from_string("bar")
conn = boto3.resource('s3')
conn.create_bucket(Bucket='storiesbi-datapipeline')
conn.Object('storiesbi-datapipeline', 'foo').put(Body="bar")

get_or_update_conn("s3.stories.bi", conn_type="s3")

file_sensor = FileSensor(
task_id="check_new_file",
path="foo",
path="s3://storiesbi-datapipeline/foo",
conn_id="s3.stories.bi",
modified=datetime.now()
)
Expand All @@ -92,19 +88,16 @@ def test_files_on_s3_modified_after():

# Hacky hacky!
sleep(1)
key = bucket.get_key("foo")
key.set_contents_from_string("baz")
conn.Object('storiesbi-datapipeline', 'foo').put(Body="baz")

assert file_sensor.poke(ctx)


@mock_s3
def test_files_on_s3_from_custom_bucket_defined_in_path():
conn = boto.connect_s3()
bucket = conn.create_bucket('testing')
k = Key(bucket)
k.key = "foo"
k.set_contents_from_string("baz")
conn = boto3.resource('s3')
conn.create_bucket(Bucket='testing')
conn.Object('testing', 'foo').put(Body="baz")

get_or_update_conn("s3.stories.bi", conn_type="s3")
yesterday = datetime.now() - timedelta(1)
Expand All @@ -119,3 +112,22 @@ def test_files_on_s3_from_custom_bucket_defined_in_path():
file_sensor.pre_execute(ctx)

assert file_sensor.poke(ctx)


@mock_s3
def test_operator_notification():
conn = boto3.resource('s3')
conn.create_bucket(Bucket='testing')
conn.Object('testing', 'foo').put(Body="baz")

get_or_update_conn("s3.stories.bi", conn_type="s3")
yesterday = datetime.now() - timedelta(1)

file_sensor = FileSensor(
task_id="check_new_file",
path="s3://testing/foo",
conn_id="s3.stories.bi",
modified=yesterday
)

file_sensor._send_notification(ctx, success=False)