Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ repos:
hooks:
# Run the linter.
- id: ruff-check
files: (elt-common|warehouses)
files: (elt-common|elt-pipelines|warehouses)
# Run the formatter.
- id: ruff-format
files: (elt-common|warehouses)
files: (elt-common|elt-pipelines|warehouses)
- repo: https://github.com/shellcheck-py/shellcheck-py
rev: v0.11.0.1
hooks:
Expand Down
7 changes: 7 additions & 0 deletions elt-pipelines/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# ignore basic python artifacts
.env
**/__pycache__/
**/*.py[cod]
**/*$py.class
**/build/
**/*.egg-info/
34 changes: 34 additions & 0 deletions elt-pipelines/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# elt-pipelines

Pipelines for ingesting data from various sources into Iceberg catalogs using elt-common.

**Under construction** - this project is being developed as a replacement for the existing DLT pipelines.
[See here](https://github.com/ISISNeutronMuon/analytics-data-platform/issues/321) for details.

## Development setup

Development requires the following tools:

- [uv](https://docs.astral.uv/uv/): Used to manage both Python installations and dependencies

### Setting up a Python virtual environment

Once `uv` is installed, create an environment, activate it, and install dependencies with:

```bash
> uv venv
> source .venv/bin/activate
> uv sync
```
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Pipelines can declare optional dependencies in `pyproject.toml` - for example, `statusdisplay` uses `requests` for
fetching data. To install any additional dependencies for that specific pipeline, use:

```bash
> uv sync --extra statusdisplay
```

## Running a pipeline

Pipelines are run using the `elt` CLI tool. As an example, with the package as current working directory,
`elt run pipelines statusdisplay` will run the statusdisplay pipeline. See `elt -h` for full usage.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from datetime import datetime
import io
import json
import pyarrow.json
import requests

from elt_common.extract import BaseExtract, ResourceProperties, ResourceWriteProperties

CYCLES_URL = "https://status.isis.stfc.ac.uk/api/cycles"


class Extract(BaseExtract):
def extract_resource_properties(self):
yield (
"elt_cycles",
ResourceProperties(
extractor=extract_cycles,
write_properties=ResourceWriteProperties(write_mode="replace"),
watermark_column=None,
),
)


def extract_cycles(_):
data = clean(fetch())
newline_delimited = "\n".join(json.dumps(row) for row in data)

with io.BytesIO(newline_delimited.encode()) as f:
yield pyarrow.json.read_json(f)
Comment thread
coderabbitai[bot] marked this conversation as resolved.


def fetch():
try:
response = requests.get(CYCLES_URL, timeout=20)
except requests.Timeout as ex:
raise RuntimeError("Timed out when fetching cycles") from ex

if not response.ok:
raise RuntimeError(f"Failed to fetch cycles - {response.reason}")

return response.json()
Comment thread
WHTaylor marked this conversation as resolved.


def reformat(date_string):
"""Convert a date from ISO format into one that pyarrow will convert into a timestamp"""
return datetime.fromisoformat(date_string).strftime("%Y-%m-%d %H:%M:%S")


def clean(data):
for cycle in data:
for phase in cycle["phases"]:
if "start" in phase:
phase["start"] = reformat(phase["start"])
if "end" in phase:
phase["end"] = reformat(phase["end"])

return data
24 changes: 24 additions & 0 deletions elt-pipelines/pyproject.toml

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original thought here would be that the child directories of elt-pipelines would be named after the lakekeeper warehouse that the transformed models end up in, i.e. the cycles tables are associated with facility operationsso end up in thefacility_ops` warehouse.

For the FASE data our thinking was to have a separate warehouse given there are more access controls required for, e.g. who can access what. In the faciity_ops case the data can all be simply read only. It would also then be feasible to have separate repositories for each set of pipelines targeting a given warehouse.

What do you think about having:

elt-pipelines/
|-- facility_ops/
|    |-- ingest/
|    |    |-- accelerator/
|    |-- transform/
|    |    |-- # not here yet but would be...
|    |-- pyproject.toml
|    |-- .gitignore
|    |-- ...
|-- other_warehouse

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child directories of elt-pipelines would be named after the lakekeeper warehouse that the transformed models end up in

For the FASE data our thinking was to have a separate warehouse

I think this makes sense, and it might be possible to also use the directories for configuring pyiceberg to control the destination warehouse (either using the directory name instead of getting the default catalog here, or putting some amount of the config into the directories).

It would also then be feasible to have separate repositories for each set of pipelines targeting a given warehouse

This feels like it'd fragment the project, especially given the use of the relative path for the elt-common dependency; what benefits do you see it having?

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[project]
name = "elt-pipelines"
version = "0.1.0"
description = "Pipelines for ingesting data into Iceberg catalogs"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"elt-common",
"pydantic-settings>=2.14.2",
]

[project.optional-dependencies]
statusdisplay = [
"pyarrow>=24.0.0",
"requests>=2.34.2",
]

[tool.uv.sources]
elt-common = { path = "../elt-common", editable = true }

[dependency-groups]
dev = [
"prek>=0.4.5",
]
Loading