From 63329b6b996ee6a80ee947c8e6869f5c6adda01b Mon Sep 17 00:00:00 2001 From: Jorge Rivera Date: Thu, 18 Jun 2026 17:12:14 +0200 Subject: [PATCH 1/2] feat: add download_cpa reader for Country Programmable Aid Adds download_cpa() sourcing OECD CPA (DSD_CPA@DF_CRS_CPA) from the SDMX API, reusing the CRS filter, schema, and translation (via a cpa->crs alias in read_schema_translation). Defaults to project-level microdata (MD_DIM=DD). get_available_filters("cpa") is supported. Per-year bulk (download_cpa_file) is intentionally omitted: the OECD bulk .txt files are malformed upstream (32-69% of rows in 2020-2023 are structurally invalid), tracked in #39 to add once OECD fixes them. Co-authored-by: Claude --- CHANGELOG.md | 5 ++ README.md | 52 +++++++++++++ src/oda_reader/__init__.py | 2 + src/oda_reader/cpa.py | 78 +++++++++++++++++++ src/oda_reader/download/download_tools.py | 4 + src/oda_reader/schemas/schema_tools.py | 7 +- src/oda_reader/tools.py | 4 +- .../datasets/cpa/integration/test_cpa_e2e.py | 47 +++++++++++ tests/datasets/cpa/unit/test_cpa_dispatch.py | 59 ++++++++++++++ 9 files changed, 255 insertions(+), 3 deletions(-) create mode 100644 src/oda_reader/cpa.py create mode 100644 tests/datasets/cpa/integration/test_cpa_e2e.py create mode 100644 tests/datasets/cpa/unit/test_cpa_dispatch.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b737cf..ed79740 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## Unreleased +- Adds `download_cpa()` for the OECD Country Programmable Aid (CPA) dataset + (`DSD_CPA@DF_CRS_CPA`), sourced directly from the OECD SDMX API. CPA reuses the CRS filter + and `.stat` schema; `get_available_filters("cpa")` is supported. Per-year bulk download is + deferred because the OECD bulk files are currently malformed (tracked in + [#39](https://github.com/ONEcampaign/oda_reader/issues/39)). - Project maintenance: adopted the [`bblocks-projects`](https://github.com/ONEcampaign/bblocks-projects) template standard so the repo is now managed (`bblocks-projects update` / `doctor` work via `.copier-answers.yml`). Adds the `ty` type checker (enforced in CI and pre-commit) and full diff --git a/README.md b/README.md index 724541e..27c83c3 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ ODA Reader is a project created and maintained by The ONE Campaign. 1. [DAC1](#downloading-dac1-data) 1. [DAC2a](#downloading-dac2a-data) 1. [CRS](#downloading-crs-data) +1. [CPA](#downloading-cpa-data) 1. [Multisystem](#downloading-multisystem-data) 1. [Using filters](#using-filters) 1. [Rate limiting](#rate-limiting) @@ -419,6 +420,57 @@ from oda_reader import download_crs_file crs_data = download_crs_file(year=2017) ``` +### Downloading CPA Data + +**Country Programmable Aid (CPA)** is the share of aid that donors programme at country level. ODA Reader +downloads CPA directly from the OECD SDMX API (dataflow `DSD_CPA@DF_CRS_CPA`). CPA is activity-level +data and shares the same schema and filter set as the CRS. + +The `download_cpa()` function accepts the following arguments: + +- `start_year`: An integer like `2018`, specifying the starting year for the data. + This parameter is optional - if not provided, the starting date for the dataset is used. +- `end_year`: An integer like `2022`, specifying the end year for the data. + This parameter is optional - if not provided, the returned data goes up to the most recent year. +- `filters`: An optional dictionary containing additional filters to include in the API call. + See the [Using filters](#using-filters) section for more details. +- `pre_process`: A boolean to specify if light cleaning of the data should be performed. + If true, columns will be renamed to unique, machine readable names, and empty columns will be removed. +- `dotstat_codes`: A boolean to specify if the API response should be translated to the dotstat schema. + For this to work, `pre_process` must be true. +- `dataflow_version`: The specific schema / dataflow version to be used in the API call. + This is an advanced parameter and should be used only if necessary to override the default. + +**Note** `download_cpa` defaults to microdata (`microdata=True`, i.e. `MD_DIM=DD`), returning +project-level CPA records — the same default as `download_crs`. + +This basic example will get CPA data for 2022: + +```python +from oda_reader import download_cpa + +cpa_data = download_cpa(start_year=2022, end_year=2022) +``` + +You can also use filters to, for example, only get data for a specific donor: + +```python +from oda_reader import download_cpa + +cpa_data = download_cpa(start_year=2022, end_year=2022, filters={"donor": "USA"}) +``` + +The available filters for CPA are the same as for CRS and can be retrieved with +`get_available_filters("cpa")`. + +**Note on per-year bulk files:** OECD per-year bulk `.txt` files for CPA are currently malformed +upstream (32–69 % of rows in 2020–2023 are corrupted), so per-year bulk download (a +`download_cpa_file` function) has not been added yet — it is not importable from `oda_reader`. +For a full year of CPA data, use a single-year API call such as +`download_cpa(start_year=2022, end_year=2022)`. Per-year bulk support is tracked in +[issue #39](https://github.com/ONEcampaign/oda_reader/issues/39) and will be added once OECD +fixes the upstream files. + ### Downloading Multisystem Data The `download_multisystem()` function allows you to download _Members total use of the diff --git a/src/oda_reader/__init__.py b/src/oda_reader/__init__.py index 4d97c88..b7f1cb2 100644 --- a/src/oda_reader/__init__.py +++ b/src/oda_reader/__init__.py @@ -28,6 +28,7 @@ enable_http_cache, get_http_cache_info, ) +from oda_reader.cpa import download_cpa from oda_reader.crs import bulk_download_crs, download_crs, download_crs_file from oda_reader.dac1 import download_dac1 from oda_reader.dac2a import bulk_download_dac2a, download_dac2a @@ -111,6 +112,7 @@ def shim(*args: Any, **kwargs: Any) -> Any: "download_crs", "bulk_download_crs", "download_crs_file", + "download_cpa", "download_aiddata", "get_available_filters", # Cache configuration diff --git a/src/oda_reader/cpa.py b/src/oda_reader/cpa.py new file mode 100644 index 0000000..f20e364 --- /dev/null +++ b/src/oda_reader/cpa.py @@ -0,0 +1,78 @@ +import pandas as pd + +from oda_reader._cache import cache_info +from oda_reader.common import logger +from oda_reader.download.download_tools import download + +DATAFLOW_ID: str = "DSD_CPA@DF_CRS_CPA" +DATAFLOW_VERSION: str = "1.4" + +# CPA filter structure (dimension order mirrors CRS): +# donor, recipient, sector, measure, channel, +# modality, flow_type, price_base, md_dim, md_id, unit_measure, +# time_period + + +@cache_info +def download_cpa( + start_year: int | None = None, + end_year: int | None = None, + filters: dict | None = None, + pre_process: bool = True, + dotstat_codes: bool = True, + dataflow_version: str = DATAFLOW_VERSION, +) -> pd.DataFrame: + """ + Download the CPA (Country Programmable Aid) data from the API. + + CPA is sourced directly from the OECD (`DSD_CPA@DF_CRS_CPA`), activity-level, + and uses the same schema as CRS. Defaults to project-level microdata (`MD_DIM=DD`). + + Args: + start_year (int): The start year of the data to download. Optional + end_year (int): The end year of the data to download. Optional + filters (dict): Optional filters to pass to the download. + pre_process (bool): Whether to preprocess the data. Defaults to True. Preprocessing makes it comply with the .stat schema. + dotstat_codes (bool): Whether to convert the donor codes to the .stat schema. + dataflow_version (str): The version of the dataflow to download. + + Note: + CPA has no grant-equivalent dataflow, so ``as_grant_equivalent`` is not + available (unlike ``download_crs``). + + Returns: + pd.DataFrame: The CPA data. + + """ + + logger.info("Downloading CPA data. This may take a while — the OECD API is slow.") + + if filters is None: + filters = {} + + if filters.get("microdata") is False: + warning_message = "\nYou have requested aggregates.\n" + warnings = [w for w in ("channel", "modality") if w not in filters] + + if warnings: + warning_message += "\n".join( + f"Unless you specify {w}: '_T', the data will contain duplicates." + for w in warnings + ) + + logger.warning(warning_message) + + df = download( + version="cpa", + dataflow_id=DATAFLOW_ID, + dataflow_version=dataflow_version, + start_year=start_year, + end_year=end_year, + filters=filters, + pre_process=pre_process, + dotstat_codes=dotstat_codes, + ) + + df = df.dropna(axis=1, how="all") + + return df diff --git a/src/oda_reader/download/download_tools.py b/src/oda_reader/download/download_tools.py index c6043d5..48a3d21 100644 --- a/src/oda_reader/download/download_tools.py +++ b/src/oda_reader/download/download_tools.py @@ -167,6 +167,10 @@ def download( "filter_builder": qb.build_crs_filter, "convert_func": convert_crs_to_dotstat_codes, }, + "cpa": { + "filter_builder": qb.build_crs_filter, + "convert_func": convert_crs_to_dotstat_codes, + }, } try: diff --git a/src/oda_reader/schemas/schema_tools.py b/src/oda_reader/schemas/schema_tools.py index c6170bd..ccde20b 100644 --- a/src/oda_reader/schemas/schema_tools.py +++ b/src/oda_reader/schemas/schema_tools.py @@ -17,10 +17,13 @@ def read_schema_translation(version: str = "dac1") -> dict: """ logger.info(f"Reading the {version} schema translation") - schema = "schema" if version == "aidData" else "dotstat" + # CPA reuses the full CRS microdata schema; alias to avoid duplicating crs_dotstat.json. + file_version = "crs" if version == "cpa" else version + + schema = "schema" if file_version == "aidData" else "dotstat" # Load the schema translation - with open(ImporterPaths.mappings / f"{version}_{schema}.json") as f: + with open(ImporterPaths.mappings / f"{file_version}_{schema}.json") as f: mapping = json.load(f) return mapping diff --git a/src/oda_reader/tools.py b/src/oda_reader/tools.py index aef0932..ce3fb47 100644 --- a/src/oda_reader/tools.py +++ b/src/oda_reader/tools.py @@ -7,7 +7,7 @@ def get_available_filters(source: str, quiet: bool = False) -> dict: """ Get the available filters for a given source (printed and as a dictionary). - It can be "dac1", "dac2a", "multisystem", or "crs". + It can be "dac1", "dac2a", "multisystem", "crs", or "cpa". Args: source: The source to get the filters for. @@ -29,6 +29,8 @@ def get_available_filters(source: str, quiet: bool = False) -> dict: f = qb.build_multisystem_filter.__annotations__ case "crs": f = qb.build_crs_filter.__annotations__ + case "cpa": + f = qb.build_crs_filter.__annotations__ case _: raise ValueError(f"Source '{source}' not recognized.") diff --git a/tests/datasets/cpa/integration/test_cpa_e2e.py b/tests/datasets/cpa/integration/test_cpa_e2e.py new file mode 100644 index 0000000..f1fa306 --- /dev/null +++ b/tests/datasets/cpa/integration/test_cpa_e2e.py @@ -0,0 +1,47 @@ +"""Integration tests for CPA dataset.""" + +import pytest + +from oda_reader import download_cpa, enable_http_cache + + +@pytest.mark.integration +@pytest.mark.slow +class TestCPAIntegration: + """End-to-end tests for CPA with real API.""" + + def test_cpa_basic_query(self): + """Test CPA raw API query returns project-level data.""" + enable_http_cache() + + # Small query: US CPA data for 2022. + # CPA defaults to microdata=True (MD_DIM=DD, project-level). + # Using pre_process=False and dotstat_codes=False to test raw API output. + df = download_cpa( + start_year=2022, + end_year=2022, + filters={"donor": "USA"}, + pre_process=False, + dotstat_codes=False, + ) + + assert df is not None + assert len(df) > 0 + # Raw API columns + assert "TIME_PERIOD" in df.columns + assert "OBS_VALUE" in df.columns + + @pytest.mark.slow + def test_cpa_processed_query(self): + """Test CPA processed query applies schema translation and dotstat codes.""" + enable_http_cache() + + # Processed path: pre_process=True, dotstat_codes=True (defaults). + df = download_cpa( + start_year=2022, + end_year=2022, + filters={"donor": "USA"}, + ) + + assert df is not None + assert len(df) > 0 diff --git a/tests/datasets/cpa/unit/test_cpa_dispatch.py b/tests/datasets/cpa/unit/test_cpa_dispatch.py new file mode 100644 index 0000000..48c1346 --- /dev/null +++ b/tests/datasets/cpa/unit/test_cpa_dispatch.py @@ -0,0 +1,59 @@ +"""Unit tests for CPA dispatch wiring (offline, no network).""" + +import pytest + +import oda_reader.cpa as cpa_module +import oda_reader.download.download_tools as dt +from oda_reader import get_available_filters +from oda_reader.schemas.schema_tools import read_schema_translation + + +@pytest.mark.unit +class TestCPADispatch: + """Assert that CPA is correctly wired to the CRS filter/schema machinery.""" + + def test_schema_alias_matches_crs(self): + """CPA schema alias resolves to the same mapping as CRS.""" + assert read_schema_translation("cpa") == read_schema_translation("crs") + + def test_available_filters_match_crs(self): + """get_available_filters('cpa') returns the same surface as CRS.""" + assert get_available_filters("cpa", quiet=True) == get_available_filters( + "crs", quiet=True + ) + + def test_dataflow_constants(self): + """DATAFLOW_ID and DATAFLOW_VERSION are set to the confirmed live values.""" + assert cpa_module.DATAFLOW_ID == "DSD_CPA@DF_CRS_CPA" + assert cpa_module.DATAFLOW_VERSION == "1.4" + + def test_cpa_dispatch_uses_crs_converter(self, mocker): + """The 'cpa' dispatch in download() calls convert_crs_to_dotstat_codes.""" + import pandas as pd + + raw = pd.DataFrame({"x": [1]}) + + # Prevent any network call + mocker.patch.object(dt, "api_response_to_df", return_value=raw) + # preprocess must accept (df, schema_translation) and return a DataFrame + mocker.patch.object( + dt, "preprocess", side_effect=lambda df, schema_translation: df + ) + # Spy on the CRS converter to verify it is the one called + spy = mocker.patch.object( + dt, "convert_crs_to_dotstat_codes", side_effect=lambda df: df + ) + # Bypass the DataFrame cache so the call always reaches the converter + cache_instance = dt.dataframe_cache() + mocker.patch.object(cache_instance, "get", return_value=None) + mocker.patch.object(cache_instance, "set", return_value=None) + + dt.download( + version="cpa", + dataflow_id="DSD_CPA@DF_CRS_CPA", + dataflow_version="1.4", + pre_process=True, + dotstat_codes=True, + ) + + spy.assert_called_once() From 5988108a6e0979e6f3f87aa1b2c26153d4284b02 Mon Sep 17 00:00:00 2001 From: Jorge Rivera Date: Fri, 19 Jun 2026 08:36:57 +0200 Subject: [PATCH 2/2] docs: document CPA in docs site, drop bulk note from README Add a Country Programmable Aid section to the docs site (datasets, filtering, advanced) and update dataset counts/enumerations. Remove the per-year bulk caveat from the README CPA section. Co-authored-by: Claude --- README.md | 8 -------- docs/docs/advanced.md | 2 +- docs/docs/datasets.md | 36 +++++++++++++++++++++++++++++++++++- docs/docs/filtering.md | 5 ++++- docs/docs/getting-started.md | 2 +- docs/docs/index.md | 4 ++-- 6 files changed, 43 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 27c83c3..1745a7d 100644 --- a/README.md +++ b/README.md @@ -463,14 +463,6 @@ cpa_data = download_cpa(start_year=2022, end_year=2022, filters={"donor": "USA"} The available filters for CPA are the same as for CRS and can be retrieved with `get_available_filters("cpa")`. -**Note on per-year bulk files:** OECD per-year bulk `.txt` files for CPA are currently malformed -upstream (32–69 % of rows in 2020–2023 are corrupted), so per-year bulk download (a -`download_cpa_file` function) has not been added yet — it is not importable from `oda_reader`. -For a full year of CPA data, use a single-year API call such as -`download_cpa(start_year=2022, end_year=2022)`. Per-year bulk support is tracked in -[issue #39](https://github.com/ONEcampaign/oda_reader/issues/39) and will be added once OECD -fixes the upstream files. - ### Downloading Multisystem Data The `download_multisystem()` function allows you to download _Members total use of the diff --git a/docs/docs/advanced.md b/docs/docs/advanced.md index 1f50ff8..6068662 100644 --- a/docs/docs/advanced.md +++ b/docs/docs/advanced.md @@ -107,7 +107,7 @@ https://sdmx.oecd.org/public/rest/v2/data/dataflow/OECD.DCD.FSD/DF_DAC1/1.0/... ODA Reader uses the appropriate version for each dataset: - **DAC1, DAC2a**: API v2 -- **CRS, Multisystem**: Custom endpoint (CRS-specific API) +- **CRS, CPA, Multisystem**: Custom endpoint (CRS-specific API) You generally don't need to worry about this - ODA Reader handles it automatically. diff --git a/docs/docs/datasets.md b/docs/docs/datasets.md index a9bd3cc..94ba608 100644 --- a/docs/docs/datasets.md +++ b/docs/docs/datasets.md @@ -1,6 +1,6 @@ # Datasets Overview -ODA Reader provides access to five datasets covering official development assistance (ODA), other official flows (OOF), and development finance. Each dataset serves different analytical needs. +ODA Reader provides access to six datasets covering official development assistance (ODA), other official flows (OOF), and development finance. Each dataset serves different analytical needs. ## Quick Reference @@ -9,6 +9,7 @@ ODA Reader provides access to five datasets covering official development assist | **DAC1** | Aggregate flows by donor | Analyzing overall ODA trends, donor performance | | **DAC2a** | Bilateral flows by donor-recipient | Recipient-level analysis | | **CRS** | Project-level microdata | Sector analysis, project details, activity-level data | +| **CPA** | Country Programmable Aid | The share of aid donors programme at country level | | **Multisystem** | Multilateral system usage | Analyzing multilateral channels and contributions | | **AidData** | Chinese development finance | Chinese aid flows | @@ -154,6 +155,38 @@ semi_agg = download_crs( **Performance note**: The CRS API is slow for large queries. Consider using [bulk downloads](bulk-downloads.md) for full dataset access. +## CPA: Country Programmable Aid + +**What it contains**: The share of bilateral ODA that donors programme for individual partner countries. CPA strips out flows a partner country has no say over — debt relief, humanitarian aid, in-donor refugee and student costs, administrative costs, and other non-programmable items. The OECD publishes it as a separate dataflow (`DSD_CPA@DF_CRS_CPA`) derived from the CRS, so it shares the CRS schema, dimensions, and filter set. + +**Key dimensions**: Same as the CRS — donor, recipient, sector, channel, modality, flow type, and the microdata flag. + +**Use when**: + +- You want the country-programmable slice of aid rather than total bilateral ODA +- Comparing how much of each donor's aid is programmable at country level +- Tracking programmable aid to specific recipients or sectors over time + +**Important**: Like the CRS, `download_cpa` defaults to **microdata** (`microdata=True`, i.e. `MD_DIM=DD`), returning project-level records. There is no grant-equivalent dataflow for CPA, so `as_grant_equivalent` is not available. + +**Example**: + +```python +from oda_reader import download_cpa + +# Get all CPA records for 2022 +cpa = download_cpa(start_year=2022, end_year=2022) + +# Country-programmable aid from the United States to Nigeria +us_nga = download_cpa( + start_year=2022, + end_year=2022, + filters={"donor": "USA", "recipient": "NGA"} +) +``` + +The available filters match the CRS and can be listed with `get_available_filters("cpa")`. + ## Multisystem: Members' Use of the Multilateral System **What it contains**: Data on how DAC members use the multilateral aid system, including core contributions to multilateral organizations and earmarked funding. @@ -235,6 +268,7 @@ from oda_reader import get_available_filters dac1_filters = get_available_filters("dac1") dac2a_filters = get_available_filters("dac2a") crs_filters = get_available_filters("crs") +cpa_filters = get_available_filters("cpa") multisystem_filters = get_available_filters("multisystem") ``` diff --git a/docs/docs/filtering.md b/docs/docs/filtering.md index 0ec98a1..ccb8706 100644 --- a/docs/docs/filtering.md +++ b/docs/docs/filtering.md @@ -17,7 +17,7 @@ data = download_dac1( ) ``` -This pattern works across all datasets: DAC1, DAC2a, CRS, and Multisystem. +This pattern works across all datasets: DAC1, DAC2a, CRS, CPA, and Multisystem. ## Filtering with Multiple Values @@ -93,6 +93,9 @@ dac2a_filters = get_available_filters("dac2a") # CRS filters crs_filters = get_available_filters("crs") +# CPA filters (same as CRS) +cpa_filters = get_available_filters("cpa") + # Multisystem filters multisystem_filters = get_available_filters("multisystem") ``` diff --git a/docs/docs/getting-started.md b/docs/docs/getting-started.md index 640c8b3..695c4a1 100644 --- a/docs/docs/getting-started.md +++ b/docs/docs/getting-started.md @@ -115,7 +115,7 @@ When you ran these examples: Now that you've downloaded your first datasets, explore: -- **[Datasets Overview](datasets.md)** - Learn about all 5 available datasets and when to use each +- **[Datasets Overview](datasets.md)** - Learn about all 6 available datasets and when to use each - **[Filtering Data](filtering.md)** - Discover available filters and build complex queries - **[Bulk Downloads](bulk-downloads.md)** - Download full datasets efficiently for large-scale analysis - **[Caching & Performance](caching.md)** - Manage cache and configure rate limiting diff --git a/docs/docs/index.md b/docs/docs/index.md index efe88a5..4f2d5dc 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -10,7 +10,7 @@ ODA Reader eliminates these headaches. It provides a unified Python interface th **Key features**: -- **Access 5+ datasets** through simple functions: DAC1, DAC2a, CRS, Multisystem, AidData +- **Access 6+ datasets** through simple functions: DAC1, DAC2a, CRS, CPA, Multisystem, AidData - **Apply filters easily**: `filters={"donor": "USA", "recipient": "NGA"}` works across datasets - **Bulk download large files** with memory-efficient streaming for the full CRS (1GB+) - **Automatic rate limiting** and caching to work within API constraints @@ -39,4 +39,4 @@ us_uk_data = download_dac1( - [Why ODA Reader](why-oda-reader.md) - Understand the rationale and compare to alternatives - [Getting Started](getting-started.md) - Install and run your first queries in 5 minutes -- [Datasets Overview](datasets.md) - Learn about the 5 available datasets +- [Datasets Overview](datasets.md) - Learn about the 6 available datasets