Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[run]
omit =
src/main.py
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
- name: Install Requirements
run: POETRY_VIRTUALENVS_CREATE=false python -m poetry install
- name: Run tests with coverage
run: pytest --cov=src --cov-report=html --cov-fail-under=90
run: pytest --cov=src --cov-report=html --cov-fail-under=96
# Environment variables used by the `pg_client.py`
env:
DB_URL: postgresql://postgres:postgres@localhost:5432/postgres
Expand Down
65 changes: 25 additions & 40 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@

from __future__ import annotations

import asyncio
import os
from dataclasses import dataclass
from io import StringIO
from pathlib import Path
from string import Template
from typing import Any, TextIO
from urllib.parse import urlsplit, urlunsplit
from urllib.parse import urlsplit

import requests
import yaml
from aiohttp import ClientError, ClientResponseError
from aiohttp.client import ClientSession
from dotenv import load_dotenv
from dune_client.query import QueryBase

Expand Down Expand Up @@ -153,51 +151,38 @@ def _is_url(cls, path: str) -> bool:
"""
try:
result = urlsplit(path)
urlunsplit(result)
if result.scheme and result.netloc:
return True
except (
ValueError,
TypeError,
): # raised when not enough parts were given to unsplit -> not a URL probably
return bool(result.scheme and result.netloc)
except (TypeError, AttributeError): # raised when path isn't str-like
return False

return False

@classmethod
def _load_config_file(cls, file_path: Path | str) -> Any:
with open(file_path, encoding="utf-8") as _handle:
return cls.read_yaml(_handle)

@classmethod
async def _download_config(cls, url: str) -> str | None:
try:
async with ClientSession() as session:
async with session.get(url) as response:
try:
response.raise_for_status()
except ClientResponseError as e:
log.error(
"Error fetching config from URL: %s",
e,
)
return None

return await response.text()

except ClientError as e:
log.error("Request failed: %s", e)
return None

@classmethod
def _load_config_url(cls, url: str) -> Any:
loop = asyncio.get_event_loop()
config_data = loop.run_until_complete(cls._download_config(url))
if not config_data:
raise SystemExit("Could not download config")
"""Load configuration from a URL.

Args:
url: The URL to fetch the configuration from

Returns:
The parsed YAML configuration

Raises:
SystemExit: If the configuration cannot be downloaded

"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
config_data = response.text
except requests.RequestException as e:
log.error("Error fetching config from URL: %s", e)
raise SystemExit("Could not download config") from e

pseudofile = StringIO(config_data)
return cls.read_yaml(pseudofile)
return cls.read_yaml(StringIO(config_data))

@classmethod
def read_yaml(cls, file_handle: TextIO) -> Any:
Expand Down Expand Up @@ -323,7 +308,7 @@ def _build_destination(
match dest.type:
case Database.DUNE:
try:
request_timeout = dest_config["timeout"]
request_timeout = dest_config["request_timeout"]
request_timeout = int(request_timeout)
except KeyError:
log.debug("Dune request timeout not set: defaulting to 10")
Expand Down
11 changes: 6 additions & 5 deletions src/destinations/dune.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

from dune_client.client import DuneClient
from dune_client.models import DuneError
from pandas import DataFrame

from src.interfaces import Destination
from src.interfaces import Destination, TypedDataFrame
from src.logger import log


class DuneDestination(Destination[DataFrame]):
class DuneDestination(Destination[TypedDataFrame]):
"""A class representing as Dune as a destination.

Uses the Dune API to upload CSV data to a table.
Expand Down Expand Up @@ -39,7 +38,7 @@ def validate(self) -> bool:
"""
return True

def save(self, data: DataFrame) -> int:
def save(self, data: TypedDataFrame) -> int:
"""Upload a DataFrame to Dune as a CSV.

Returns size of dataframe (i.e. number of "affected" rows).
Expand All @@ -61,7 +60,9 @@ def save(self, data: DataFrame) -> int:
"""
try:
log.debug("Uploading DF to Dune...")
result = self.client.upload_csv(self.table_name, data.to_csv(index=False))
result = self.client.upload_csv(
self.table_name, data.dataframe.to_csv(index=False)
)
if not result:
raise RuntimeError("Dune Upload Failed")
except DuneError as dune_e:
Expand Down
22 changes: 22 additions & 0 deletions tests/fixtures/config/invalid_request_timeout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
data_sources:
- name: dune
type: dune
key: fake-key
- name: postgres
type: postgres
key: postgresql://postgres:postgres@localhost:5432/postgres


jobs:
- name: Download simple test query to local postgres
source:
ref: dune
query_id: 4238114
query_engine: medium
poll_frequency: 5
destination:
ref: dune
table_name: test_table
if_exists: replace
request_timeout: word
128 changes: 80 additions & 48 deletions tests/unit/config_test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
import unittest
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock, patch
from io import StringIO
from unittest.mock import MagicMock, patch

import pytest
from aiohttp import ClientError, ClientResponseError
import requests
from dune_client.types import QueryParameter

from src.config import Env, RuntimeConfig
Expand Down Expand Up @@ -64,6 +65,32 @@ def setUpClass(cls):
def tearDownClass(cls):
cls.env_patcher.stop()

def test_is_url(self):
# Valid URLs
assert RuntimeConfig._is_url("https://example.com") is True
assert RuntimeConfig._is_url("http://localhost:8080") is True
assert RuntimeConfig._is_url("ftp://files.example.com") is True
assert RuntimeConfig._is_url("https://api.github.com/path?query=123") is True
assert RuntimeConfig._is_url("sftp://user:pass@server.com:22") is True

# Invalid URLs
assert RuntimeConfig._is_url("not-a-url") is False
assert RuntimeConfig._is_url("") is False
assert RuntimeConfig._is_url("file.txt") is False
assert RuntimeConfig._is_url("/path/to/file") is False
assert RuntimeConfig._is_url("C:\\Windows\\Path") is False
assert RuntimeConfig._is_url("://missing-scheme.com") is False
assert RuntimeConfig._is_url("http://") is False # Missing netloc

# Edge cases
assert RuntimeConfig._is_url(None) is False # type: ignore
assert RuntimeConfig._is_url("http:/example.com") is False # Missing slash
assert RuntimeConfig._is_url("https:example.com") is False # Missing slashes

# Cases that actually trigger exceptions
assert RuntimeConfig._is_url([1, 2, 3]) is False # TypeError: list is not str
assert RuntimeConfig._is_url(123) is False # TypeError: int is not str

def test_load_basic_conf(self):
config_file = config_root / "basic.yaml"
conf = RuntimeConfig.load(config_file.absolute())
Expand Down Expand Up @@ -124,58 +151,63 @@ def test_load_buggy_conf(self):
with self.assertRaises(SystemExit):
RuntimeConfig.load(config_root / "no_data_sources.yaml")

@pytest.mark.asyncio
async def test_successful_download(self):
mock_response = AsyncMock(name="Mock GET Response")
mock_response.text = AsyncMock(return_value="test_config_content")
mock_response.raise_for_status.return_value = True
mock_get = AsyncMock()
mock_get.__aenter__.return_value = mock_response

with patch("src.config.ClientSession.get", return_value=mock_get):
result = await RuntimeConfig._download_config("http://test.xyz")

self.assertEqual("test_config_content", result)
mock_response.raise_for_status.assert_called_once()
mock_response.text.assert_called_once()

@pytest.mark.asyncio
async def test_http_error_response(self):
error_response = ClientResponseError(
request_info=None, history=None, status=404, message="Not Found"
)
mock_response = AsyncMock(name="Mock GET Response")
mock_response.raise_for_status = MagicMock(
side_effect=error_response, name="mock raise for status"
)
mock_get = AsyncMock()
mock_get.__aenter__.return_value = mock_response

with self.assertRaises(ValueError):
RuntimeConfig.load(config_root / "invalid_request_timeout.yaml")

def test_load_config_url(self):
# Mock response for successful case
mock_yaml_content = """
data_sources:
- name: test
type: dune
key: test_key
jobs:
- name: job1
source:
ref: test
"""

# Test successful download
with (
patch("src.config.log") as mock_logger,
patch("src.config.ClientSession.get", return_value=mock_get),
patch("requests.get") as mock_get,
patch("src.config.RuntimeConfig.read_yaml") as mock_read_yaml,
):
result = await RuntimeConfig._download_config(
"http://test.thistldbetternotexist"
)
# Setup mock response
mock_response = MagicMock()
mock_response.text = mock_yaml_content
mock_response.raise_for_status.return_value = None
mock_get.return_value = mock_response

self.assertIsNone(result)
mock_logger.error.assert_called_once_with(
"Error fetching config from URL: %s", error_response
)
mock_read_yaml.return_value = {"test": "data"}

@pytest.mark.asyncio
async def test_client_connection_error(self):
with (
patch("aiohttp.ClientSession", side_effect=ClientError("Connection error")),
patch("src.config.log") as mock_logger,
):
result = await RuntimeConfig._download_config(
"http://test.thistldbetternotexist"
result = RuntimeConfig._load_config_url("https://example.com/config.yaml")

# Verify the URL was called with timeout
mock_get.assert_called_once_with(
"https://example.com/config.yaml", timeout=10
)
# Verify read_yaml was called with StringIO containing our mock content
mock_read_yaml.assert_called_once()
assert isinstance(mock_read_yaml.call_args[0][0], StringIO)
assert result == {"test": "data"}

# Test HTTP error
with patch("requests.get") as mock_get:
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = requests.HTTPError(
"404 Not Found"
)
mock_get.return_value = mock_response

with pytest.raises(SystemExit, match="Could not download config"):
RuntimeConfig._load_config_url("https://example.com/config.yaml")

# Test network error
with patch("requests.get") as mock_get:
mock_get.side_effect = requests.RequestException("Network error")

assert result is None
mock_logger.error.assert_called_once()
with pytest.raises(SystemExit, match="Could not download config"):
RuntimeConfig._load_config_url("https://example.com/config.yaml")


class TestParseQueryParameters(unittest.TestCase):
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/destinations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_ensure_index_disabled_when_uploading(self, mock_to_csv, *_):
table_name="foo",
request_timeout=10,
)
destination.save(dummy_df)
destination.save(TypedDataFrame(dummy_df, {}))
mock_to_csv.assert_called_once_with(index=False)

@patch("pandas.core.generic.NDFrame.to_csv", name="Fake csv writer")
Expand All @@ -55,7 +55,7 @@ def test_duneclient_sets_timeout(self, mock_to_csv, *_):
@patch("dune_client.api.table.TableAPI.upload_csv", name="Fake CSV uploader")
def test_dune_error_handling(self, mock_dune_upload_csv):
dest = DuneDestination(api_key="f00b4r", table_name="foo", request_timeout=10)
df = pd.DataFrame([{"foo": "bar"}])
df = TypedDataFrame(pd.DataFrame([{"foo": "bar"}]), {})

dune_err = DuneError(
data={"error": "bad stuff"},
Expand Down
Loading