Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ jobs:
- name: p2d-test
source:
ref: PG
query_string: "SELECT 1 as number, '\\x1234'::bytea as my_bytes;"
query_string: |
SELECT
1 AS number,
'\\x1234'::bytea AS my_bytes,
'{"key": "value", "array": [1, 2, 3], "dict": {}}'::json AS my_json,
'[ [{"x": 1}, {"y": 2}], null, [{"z": 3}] ]'::json AS list_dict
destination:
ref: Dune
table_name: dune_sync_test_table
table_name: bh2smith.dune_sync_test
insertion_type: replace

- name: cow-solvers
source:
Expand Down
84 changes: 78 additions & 6 deletions poetry.lock

Large diffs are not rendered by default.

30 changes: 15 additions & 15 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ description = ""
authors = [
"Benjamin Smith <bh2smith@users.noreply.github.com>",
"mooster531 <mooster531@users.noreply.github.com>",
"Felix Leupold <masked@example.com>"
"Felix Leupold <masked@example.com>",
]

[tool.poetry.dependencies]
python = ">=3.12,<3.14"
dune-client = ">=1.7.7"
dune-client = ">=1.7.8"
pandas = "*"
sqlalchemy = "*"
python-dotenv = "*"
Expand All @@ -28,7 +28,7 @@ black = "*"
pylint = "*"
pre-commit = "*"
pytest = "*"
pytest-cov="*"
pytest-cov = "*"
pytest-asyncio = "*"
pandas-stubs = "*"
sqlalchemy-stubs = "*"
Expand Down Expand Up @@ -58,17 +58,17 @@ exclude = [

[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
"N", # pep8-naming
"D", # pydocstring
"E", # pycodestyle errors
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
"N", # pep8-naming
"D", # pydocstring
"T20", # flake8-print
"RET", # flake8-return
"PL", # pylint
"PL", # pylint
]

ignore = [
Expand All @@ -80,11 +80,11 @@ ignore = [

[tool.ruff.lint.per-file-ignores]
"tests/**/*" = [
"D", # Ignore all docstring rules in tests
"T20", # Allow print statements in tests
"D", # Ignore all docstring rules in tests
"T20", # Allow print statements in tests
"N802", # Allow function names like "test_someFunction_does_something"
"B018", # Allow using mock objects without explicit assert
"RET", # Allow multiple returns in test functions
"RET", # Allow multiple returns in test functions
"E731", # Allow lambda assignments in tests
]

Expand Down
1 change: 1 addition & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ def _build_destination(
api_key=dest.key,
table_name=dest_config["table_name"],
request_timeout=request_timeout,
insertion_type=dest_config.get("insertion_type", "append"),
)

case Database.POSTGRES:
Expand Down
1 change: 1 addition & 0 deletions src/destinations/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Common structures used in multiple destination implementations."""
91 changes: 77 additions & 14 deletions src/destinations/dune.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
"""Destination logic for Dune Analytics."""

import io
import sys
from typing import Literal

from dune_client.client import DuneClient
from dune_client.models import DuneError
from dune_client.models import DuneError, QueryFailed
from dune_client.query import QueryBase
from dune_client.types import QueryParameter
from pandas import DataFrame

from src.interfaces import Destination, TypedDataFrame
from src.logger import log

InsertionPolicy = Literal["append", "replace", "upload_csv"]


class DuneDestination(Destination[TypedDataFrame]):
"""A class representing as Dune as a destination.
Expand All @@ -26,9 +35,19 @@ class DuneDestination(Destination[TypedDataFrame]):

"""

def __init__(self, api_key: str, table_name: str, request_timeout: int):
def __init__(
self,
api_key: str,
table_name: str,
request_timeout: int,
insertion_type: InsertionPolicy = "append",
):
self.client = DuneClient(api_key, request_timeout=request_timeout)
if "." not in table_name:
raise ValueError("Table name must be in the format namespace.table_name")
Comment on lines +46 to +47
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That the end to end test is expected to fail because it is reading a remote file from main. Once this PR lands. The file will be updated and the e2e test will pass again.


self.table_name: str = table_name
self.insertion_type: InsertionPolicy = insertion_type
super().__init__()

def validate(self) -> bool:
Expand All @@ -39,13 +58,13 @@ def validate(self) -> bool:
return True

def save(self, data: TypedDataFrame) -> int:
"""Upload a DataFrame to Dune as a CSV.
"""Upload a TypedDataFrame to Dune as a CSV.

Returns size of dataframe (i.e. number of "affected" rows).

Parameters
----------
data : DataFrame
data : TypedDataFrame
The data to be uploaded to Dune, which will be converted to CSV format.

Raises
Expand All @@ -58,15 +77,59 @@ def save(self, data: TypedDataFrame) -> int:
For any data processing issues prior to the upload.

"""
log.debug("Uploading DF to Dune...")
if self.insertion_type == "upload_csv":
if not self._upload_csv(data.dataframe):
raise RuntimeError("Dune Upload Failed")
else:
self._insert(data)
log.debug("Inserted DF to Dune, %s")

return len(data.dataframe)

def _insert(self, data: TypedDataFrame) -> None:
namespace, table_name = self._get_namespace_and_table_name()
try:
log.debug("Uploading DF to Dune...")
result = self.client.upload_csv(
self.table_name, data.dataframe.to_csv(index=False)
if self.insertion_type == "replace":
log.debug("Clearing table: %s", table_name)
clear_result = self.client.clear_data(namespace, table_name)
log.debug("Cleared: %s", clear_result)

log.info("Inserting to: %s", self.table_name)
self.client.insert_table(
namespace,
table_name,
data=io.BytesIO(data.dataframe.to_csv(index=False).encode()),
content_type="text/csv",
)
if not result:
raise RuntimeError("Dune Upload Failed")
except DuneError as dune_e:
log.error("Dune did not accept our upload: %s", dune_e)
except (ValueError, RuntimeError) as e:
log.error("Data processing error: %s", e)
return len(data)
except DuneError as err:
if "This table was not found" in str(err):
api_ref = "https://docs.dune.com/api-reference/tables/endpoint/create"
log.error(
"Table %s doesn't exist. See %s for table creation details.",
self.table_name,
api_ref,
)
raise err

def _upload_csv(self, data: DataFrame) -> bool:
return self.client.upload_csv(
self.table_name, data.to_csv(index=False)
)

def _table_exists(self) -> bool:
try:
results = self.client.run_query(
QueryBase(
4554525,
params=[QueryParameter.text_type("table_name", self.table_name)],
)
)
return results.result is not None
except QueryFailed:
return False

def _get_namespace_and_table_name(self) -> tuple[str, str]:
"""Split the namespace, table name from the provided table name."""
namespace, table_name = self.table_name.split(".")
return namespace, table_name
3 changes: 3 additions & 0 deletions src/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def is_empty(self) -> bool:
return self.dataframe.empty


# TODO: maybe a good place to define schema transformations and other data manipulation?


class Named(Protocol):
"""Represents any class with name field."""

Expand Down
20 changes: 3 additions & 17 deletions src/sources/dune.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,21 @@
from dune_client.query import QueryBase
from dune_client.types import ParameterType, QueryParameter
from pandas import DataFrame
from sqlalchemy import BIGINT, BOOLEAN, DATE, TIMESTAMP, VARCHAR
from sqlalchemy import VARCHAR
from sqlalchemy.dialects.postgresql import (
BYTEA,
DOUBLE_PRECISION,
INTEGER,
JSONB,
NUMERIC,
)

from src.interfaces import Source, TypedDataFrame
from src.logger import log
from src.sources.type_maps import DUNE_TO_PG

DECIMAL_PATTERN = r"decimal\((\d+),\s*(\d+)\)"
VARCHAR_PATTERN = r"varchar\((\d+)\)"

DUNE_TO_PG: dict[str, type[Any] | NUMERIC] = {
"bigint": BIGINT,
"integer": INTEGER,
"varbinary": BYTEA,
"date": DATE,
"boolean": BOOLEAN,
"varchar": VARCHAR,
"double": DOUBLE_PRECISION,
"real": DOUBLE_PRECISION,
"timestamp with time zone": TIMESTAMP,
"uint256": NUMERIC,
}


# TODO - migrate type utilities to type_maps.
def _parse_varchar_type(type_str: str) -> int | None:
"""Extract the length from Dune's varchar type string like varchar(255).

Expand Down
19 changes: 13 additions & 6 deletions src/sources/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from src.interfaces import Source, TypedDataFrame
from src.logger import log
from src.sources.type_maps import PG_TO_DUNE


def _convert_dict_to_json(df: DataFrame) -> DataFrame:
Expand Down Expand Up @@ -137,27 +138,33 @@ async def fetch(self) -> TypedDataFrame:
# of SQLAlchemy's synchronous interface.
# The current solution using run_in_executor is a workaround
# that moves the blocking operation to a thread pool.
# First get the column types
with self.engine.connect() as conn:
result = conn.execute(text(self.query_string))
types = {
col.name: PG_TO_DUNE.get(col.type_code, "varchar")
for col in result.cursor.description
}
df = await loop.run_in_executor(
None, lambda: pd.read_sql_query(self.query_string, con=self.engine)
)

df = _convert_dict_to_json(df)
df = _convert_bytea_to_hex(df)
# TODO include types.
return TypedDataFrame(dataframe=df, types={})
return TypedDataFrame(dataframe=_convert_bytea_to_hex(df), types=types)

def is_empty(self, data: TypedDataFrame) -> bool:
"""Check if the provided DataFrame is empty.
"""Check if the provided TypedDataFrame is empty.

Parameters
----------
data : DataFrame
The DataFrame to check.
data : TypedDataFrame
The TypedDataFrame to check.

Returns
-------
bool
True if the DataFrame is empty, False otherwise.
True if the TypedDataFrame is empty, False otherwise.

"""
return data.is_empty()
Expand Down
51 changes: 51 additions & 0 deletions src/sources/type_maps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Data Type mappings."""

from __future__ import annotations

from typing import Any

from sqlalchemy import BIGINT, BOOLEAN, DATE, TIMESTAMP, VARCHAR
from sqlalchemy.dialects.postgresql import (
BYTEA,
DOUBLE_PRECISION,
INTEGER,
NUMERIC,
)

DECIMAL_PATTERN = r"decimal\((\d+),\s*(\d+)\)"
VARCHAR_PATTERN = r"varchar\((\d+)\)"

DUNE_TO_PG: dict[str, type[Any] | NUMERIC] = {
"bigint": BIGINT,
"integer": INTEGER,
"varbinary": BYTEA,
"date": DATE,
"boolean": BOOLEAN,
"varchar": VARCHAR,
"double": DOUBLE_PRECISION,
"real": DOUBLE_PRECISION,
"timestamp with time zone": TIMESTAMP,
"uint256": NUMERIC,
}

# https://docs.dune.com/api-reference/tables/endpoint/create#body-schema-type
# This map is not a perfect inverse of the one above.
# 1. Notice `DOUBLE_PRECISION` has two pre-images: we chose double
# 2. timestamp with time zone not aligned with timestamp
# 3. Apparently no JSONB support here.
PG_TO_DUNE: dict[int, str] = {
16: "boolean",
17: "varbinary",
20: "bigint",
21: "bigint", # smallint
23: "integer",
25: "varchar",
# 26: "oid",
700: "double",
701: "double",
1042: "varchar",
1043: "varchar",
1082: "timestamp",
1114: "timestamp", # This doesn't match with above
1700: "uint256",
}
2 changes: 1 addition & 1 deletion tests/fixtures/config/basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ jobs:
query_string: SELECT 1;
destination:
ref: dune
table_name: table_name
table_name: table.name
2 changes: 1 addition & 1 deletion tests/fixtures/config/sql_file.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ jobs:
query_string: foo.sql
destination:
ref: dune
table_name: table_name
table_name: table.name
Loading