From 531e315d01408e25e98efdb629451c3218855162 Mon Sep 17 00:00:00 2001 From: Finleyh Date: Wed, 6 May 2026 08:37:10 -0400 Subject: [PATCH] Adding postgres connector --- README.md | 10 ++ dev_env/postgres/test_asyncpostgres.py | 32 +++++ dev_env/postgres/test_postgres.py | 20 +++ docker-compose.yml | 1 + poetry.lock | 127 +++++++++++++++++- pyproject.toml | 3 + src/pyapiary/dbms_connectors/postgres.py | 104 ++++++++++++++ .../tests/test_postgres/test_unit_postgres.py | 80 +++++++++++ 8 files changed, 374 insertions(+), 3 deletions(-) create mode 100644 dev_env/postgres/test_asyncpostgres.py create mode 100644 dev_env/postgres/test_postgres.py create mode 100644 src/pyapiary/dbms_connectors/postgres.py create mode 100644 src/pyapiary/tests/test_postgres/test_unit_postgres.py diff --git a/README.md b/README.md index 4a22ec7..c3849bb 100644 --- a/README.md +++ b/README.md @@ -330,6 +330,16 @@ results = conn.query("search index=_internal | head 5") pytest -m integration ``` + +### Manual Testing +- Located in dev_env +- Do not use pytest or mocking +- run the docker-compose.yaml to stand up services, change directories into the folder in dev_env you wish to test +- execute the following to test your module +``` +poetry run python .py +``` + ### 🧼 Suppress warnings Add this to `pytest.ini`: diff --git a/dev_env/postgres/test_asyncpostgres.py b/dev_env/postgres/test_asyncpostgres.py new file mode 100644 index 0000000..b10e77f --- /dev/null +++ b/dev_env/postgres/test_asyncpostgres.py @@ -0,0 +1,32 @@ +import asyncio +from pyapiary.dbms_connectors.postgres import AsyncPostgresConnector +from pyapiary.helpers import combine_env_configs, setup_logger +from typing import Dict, Any + +async def test_async_db(): + # Load config and setup logging + env_config: Dict[str, Any] = combine_env_configs() + logger = setup_logger("pg logger") + + # Use 'async with' to handle the background worker threads and connection pool + async with AsyncPostgresConnector(conn_str=env_config["PGSQL_DSN"], logger=logger) as conn: + + logger.info("inserting one row (async)") + # Await the execution of the insert + await conn.async_bulk_insert("employees", [{"name": "rob", "department": "hr"}]) + + base_query = "SELECT * FROM employees" + logger.info("Querying with pagination (async):") + + # Await the coroutine to get the actual list of results + rows = await conn.async_query(base_query) + + # Standard loop through the returned list + for i, row in enumerate(rows): + print(row) + if i >= 9: + break + +if __name__ == "__main__": + # Standard entry point for async scripts + asyncio.run(test_async_db()) \ No newline at end of file diff --git a/dev_env/postgres/test_postgres.py b/dev_env/postgres/test_postgres.py new file mode 100644 index 0000000..e43fd8a --- /dev/null +++ b/dev_env/postgres/test_postgres.py @@ -0,0 +1,20 @@ +from pyapiary.dbms_connectors.postgres import PostgresConnector, AsyncPostgresConnector +from pyapiary.helpers import combine_env_configs, setup_logger +from typing import Dict, Any + +env_config: Dict[str, Any] = combine_env_configs() + +logger = setup_logger("pg logger") +with PostgresConnector(conn_str=env_config["PGSQL_DSN"], logger=logger) as conn: + + # Optional insert test + logger.info("inserting one row") + + conn.bulk_insert("employees", [{"name": "rob", "department": "hr"}]) + base_query = "SELECT * FROM employees" + + logger.info("Querying with pagination:") + for i, row in enumerate(conn.query(base_query)): + print(row) + if i >= 9: + break \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 89e7546..f304506 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -47,6 +47,7 @@ services: - ./dev_env/splunk/init_app.sh:/init_app.sh entrypoint: [ "/bin/bash", "-c", "/init_app.sh & /sbin/entrypoint.sh start-service" ] + # both PG and ODBC connector use this data source, this is why dev_env/postgres does not have an init.sql odbc_db: image: postgres:15-alpine container_name: odbc_db diff --git a/poetry.lock b/poetry.lock index 1dfb399..8b53c06 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.4.0 and should not be changed by hand. [[package]] name = "anyio" @@ -566,6 +566,114 @@ files = [ {file = "propcache-0.4.1.tar.gz", hash = "sha256:f48107a8c637e80362555f37ecf49abe20370e557cc4ab374f04ec4423c97c3d"}, ] +[[package]] +name = "psycopg" +version = "3.3.4" +description = "PostgreSQL database adapter for Python" +optional = false +python-versions = ">=3.10" +groups = ["main"] +files = [ + {file = "psycopg-3.3.4-py3-none-any.whl", hash = "sha256:b6bbc25ccf05c8fad3b061d9db2ef0909a555171b84b07f29458a447253d679a"}, + {file = "psycopg-3.3.4.tar.gz", hash = "sha256:e21207764952cff81b6b8bdacad9a3939f2793367fdac2987b3aac36a651b5bc"}, +] + +[package.dependencies] +psycopg-pool = {version = "*", optional = true, markers = "extra == \"pool\""} +typing-extensions = {version = ">=4.6", markers = "python_version < \"3.13\""} +tzdata = {version = "*", markers = "sys_platform == \"win32\""} + +[package.extras] +binary = ["psycopg-binary (==3.3.4) ; implementation_name != \"pypy\""] +c = ["psycopg-c (==3.3.4) ; implementation_name != \"pypy\""] +dev = ["ast-comments (>=1.1.2)", "black (>=26.1.0)", "codespell (>=2.2)", "cython-lint (>=0.16)", "dnspython (>=2.1)", "flake8 (>=4.0)", "isort-psycopg", "isort[colors] (>=6.0)", "mypy (>=1.19.0)", "pre-commit (>=4.0.1)", "types-setuptools (>=57.4)", "types-shapely (>=2.0)", "wheel (>=0.37)"] +docs = ["Sphinx (>=9.1)", "furo (==2025.12.19)", "sphinx-autobuild (>=2025.8.25)", "sphinx-autodoc-typehints (>=3.10.2)"] +pool = ["psycopg-pool"] +test = ["anyio (>=4.0)", "mypy (>=1.19.0) ; implementation_name != \"pypy\"", "pproxy (>=2.7)", "pytest (>=6.2.5)", "pytest-cov (>=3.0)", "pytest-randomly (>=3.5)"] + +[[package]] +name = "psycopg-binary" +version = "3.3.4" +description = "PostgreSQL database adapter for Python -- C optimisation distribution" +optional = false +python-versions = ">=3.10" +groups = ["main"] +files = [ + {file = "psycopg_binary-3.3.4-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b7bfff1ca23732b488cbca3076fc11bc98d520ee122514fdb17a8e20d3338f5a"}, + {file = "psycopg_binary-3.3.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:32a6fbf8481e3a370d0d72b860d35948a693cb01281da217f7b2f307636e591a"}, + {file = "psycopg_binary-3.3.4-cp310-cp310-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:bdef84570ebbce1d42b4e7ea952d21c414c5f118ad02fee00c5625f35e134429"}, + {file = "psycopg_binary-3.3.4-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:fa1cbc10768a796c96d3243656016bf4e337c81c71097270bb7b0ad6210d9765"}, + {file = "psycopg_binary-3.3.4-cp310-cp310-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:cf7f73a4a792bc5db58a4b385d8a1467e8d468f7548702fb0ed1e9b7501b1c13"}, + {file = "psycopg_binary-3.3.4-cp310-cp310-manylinux_2_38_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:d7b4d40c153fa352ab3cca530f3a0baedf7621b2ebcbd7f084009522c21788fc"}, + {file = "psycopg_binary-3.3.4-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:f9b1c2533af01cd7648378599f82b0b8ae32f293296e6eec5753a625bc97ef28"}, + {file = "psycopg_binary-3.3.4-cp310-cp310-musllinux_1_2_ppc64le.whl", hash = "sha256:ad3bc94054876155549fdaedf4a46d1ec69d39a5bcee377148afe498e84c4b8e"}, + {file = "psycopg_binary-3.3.4-cp310-cp310-musllinux_1_2_riscv64.whl", hash = "sha256:eb4eed2079c01a4850bf467deacfab56d356d4225040170af03dc9958321242d"}, + {file = "psycopg_binary-3.3.4-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:f80e3f2b5331dbbf0901bcb658056c03eeb2c1ef31d774afb0d61598b242e744"}, + {file = "psycopg_binary-3.3.4-cp310-cp310-win_amd64.whl", hash = "sha256:574ea21a9651958f1535c5a1c649c7409e9168bcbffa29a3f2f961f58b322949"}, + {file = "psycopg_binary-3.3.4-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:612a627d733f695b1de1f9b4bd511c15f999a5d8b915d444bbd7dd71cf3370da"}, + {file = "psycopg_binary-3.3.4-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:13a7f380824c35896dcac7fe0f61440f7ca49d6dc73f3c13a9a4471e6a3b302e"}, + {file = "psycopg_binary-3.3.4-cp311-cp311-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:276904e3452d6a23d474ef9a21eee19f20eed3d53ddd2576af033827e0ba0992"}, + {file = "psycopg_binary-3.3.4-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:ab8cca8ef8fb1ccf5b048ae5bd78ba55b9e4b5d472e3ce5ca39ff4d2a9c249e4"}, + {file = "psycopg_binary-3.3.4-cp311-cp311-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:7465bfe6087d2d5b42d4c53b9b11ca9f218e477317a4a162a10e3c19e984ba8e"}, + {file = "psycopg_binary-3.3.4-cp311-cp311-manylinux_2_38_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:22cdbf5f91ef7bb91fe0c5757e1962d3127a8010256eefd9c61fcaf441802097"}, + {file = "psycopg_binary-3.3.4-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:e2631da29253a98bd496e6c4813b24e09a4fe3fb2a9e88513305d6f8747cce95"}, + {file = "psycopg_binary-3.3.4-cp311-cp311-musllinux_1_2_ppc64le.whl", hash = "sha256:7f7668f30b9dd5163197e5cbf4e0efd54e00f0a859cc566ce56cfc31f4054839"}, + {file = "psycopg_binary-3.3.4-cp311-cp311-musllinux_1_2_riscv64.whl", hash = "sha256:cffc3408d77a27973f33e5d909b624cce683db5fc25964b02fe0aae7886c1007"}, + {file = "psycopg_binary-3.3.4-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:0579252a1202cd73e4da137a1426e2dae993ae44e757605344282af3a082848c"}, + {file = "psycopg_binary-3.3.4-cp311-cp311-win_amd64.whl", hash = "sha256:41f2ec0fea529832982bcb6c9415de3c86264ebe562b77a467c0fbcd7efbba8d"}, + {file = "psycopg_binary-3.3.4-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:5ab28a2a7649df3b72e6b674b4c190e448e8e77cf496a65bd846472048de2089"}, + {file = "psycopg_binary-3.3.4-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:6402a9d8146cf4b3974ded3fd28a971e83dc6a0333eb7822524a3aa20b546578"}, + {file = "psycopg_binary-3.3.4-cp312-cp312-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:580ae30a5f95ccd90008ec697d3ed6a4a2047a516407ad904283fa42086936e9"}, + {file = "psycopg_binary-3.3.4-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:e7510c37550f91a187e3660a8cc50d4b760f8c3b8b2f89ebc5698cd2c7f2c85d"}, + {file = "psycopg_binary-3.3.4-cp312-cp312-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:77df19583501ea288eaf15ac0fe7ad01e6d8091a91d5c41df5c718f307d8e31b"}, + {file = "psycopg_binary-3.3.4-cp312-cp312-manylinux_2_38_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:018fbed325936da502feb546642c982dcc4b9ffdea32dfef78dbf3b7f7ad4070"}, + {file = "psycopg_binary-3.3.4-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:17a21953a9e5ff3a16dab692625a3676e2f101db5e40072f39dbee2250194d68"}, + {file = "psycopg_binary-3.3.4-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:eb05ee1c2b817d27c537333224c9e83c7afb86fe7296ba970990068baf819b16"}, + {file = "psycopg_binary-3.3.4-cp312-cp312-musllinux_1_2_riscv64.whl", hash = "sha256:773d573e11f437ce0bdb95b7c18dc58390494f96d43f8b45b9760436114f7652"}, + {file = "psycopg_binary-3.3.4-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:71e55ccbdfae79a2ed9c6369c3008a3025817ff9d7e27b32a2d84e2a4267e66e"}, + {file = "psycopg_binary-3.3.4-cp312-cp312-win_amd64.whl", hash = "sha256:494ca54901be8cf9eb7e02c25b731f2317c378efa44f43e8f9bd0e1184ae7be4"}, + {file = "psycopg_binary-3.3.4-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:fbd1d4ed566895ad2d3bf4ddfd8bae90026930ddf29df3b9d91d32c8c47866a7"}, + {file = "psycopg_binary-3.3.4-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:75a9067e236f9b9ae3535b66fe99bddb33d39c0de10112e49b9ab11eee53dc31"}, + {file = "psycopg_binary-3.3.4-cp313-cp313-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:b56b603ebcea8aa10b46228b8410ba7f13e7c2ee54389d4d9be0927fd8ce2a70"}, + {file = "psycopg_binary-3.3.4-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:c677c4ad433cb7150c8cd304a0769ae3bcfbe5ea0676eb53faa7b1443b16d0d3"}, + {file = "psycopg_binary-3.3.4-cp313-cp313-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:26df2717e59c0473e4465a97dfb1b7afebaa479277870fd5784d1436470db47c"}, + {file = "psycopg_binary-3.3.4-cp313-cp313-manylinux_2_38_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:1dc1f79fd16bb1f3f4421417a514607539f17804d95c7ed617265369d1981cae"}, + {file = "psycopg_binary-3.3.4-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:136f199a407b5348b9b857c504aff60c77622a28482e7195839ce1b51238c4cc"}, + {file = "psycopg_binary-3.3.4-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:b6f5a29e9c775b9f12a1a717aa7a2c80f9e1db6f27ba44a5b59c80ac61d2ffcf"}, + {file = "psycopg_binary-3.3.4-cp313-cp313-musllinux_1_2_riscv64.whl", hash = "sha256:ee17a2cf4943cde261adfad1bbc5bf38d6b3776d7afff74c7cabcbeaeb08c260"}, + {file = "psycopg_binary-3.3.4-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:5c4ab71be17bdca30cb34c34c4e1496e2f5d6f20c199c12bad226070b22ef9bf"}, + {file = "psycopg_binary-3.3.4-cp313-cp313-win_amd64.whl", hash = "sha256:dbfdb9b6cc79f31104a7b162a2b921b765fcc62af6c00540a167a8de47e4ed38"}, + {file = "psycopg_binary-3.3.4-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:28b7398fdd19db3232c884fb24550bdfe951221f510e195e233299e4c9b78f97"}, + {file = "psycopg_binary-3.3.4-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:1fbaa292a3c8bb61b45df1ad3da1908ccee7cb889db9425e3557d9e34e2a4829"}, + {file = "psycopg_binary-3.3.4-cp314-cp314-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:94596f9e7633ee3f6440711d43bb70aa31cc0a46a900ab8b4201a366ace5c9e7"}, + {file = "psycopg_binary-3.3.4-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:8c0056529e68dbe9184cd4019a1f3d8f3a4ead2f6fc7a5afcf27d3314edd1277"}, + {file = "psycopg_binary-3.3.4-cp314-cp314-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:2c09aad7051326e7603c14e50636db9c01f78272dc54b3accff03d46370461e6"}, + {file = "psycopg_binary-3.3.4-cp314-cp314-manylinux_2_38_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:514404ed543efd620c85602b747df2a23cf1241b4067199e1a66f2d2757aaa41"}, + {file = "psycopg_binary-3.3.4-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:46893c26858be12cc49ca4226ed6a60b4bfccadd946b3bebb783a60b38788228"}, + {file = "psycopg_binary-3.3.4-cp314-cp314-musllinux_1_2_ppc64le.whl", hash = "sha256:df1d567fc430f6df15c9fcf67d87685fc49bdb325adc0db5af1adfb2f44eb5c9"}, + {file = "psycopg_binary-3.3.4-cp314-cp314-musllinux_1_2_riscv64.whl", hash = "sha256:6b9016b1714da4dd5ecaaa75b82098aa5a0b87854ce9b092e21c27c4ae23e014"}, + {file = "psycopg_binary-3.3.4-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:47c656a8a7ba6eb0cff1801a4caaa9c8bdc12d03080e273aff1c8ac39971a77e"}, + {file = "psycopg_binary-3.3.4-cp314-cp314-win_amd64.whl", hash = "sha256:c37e024c07308cd06cf3ec51bfd0e7f6157585a4d84d1bce4a7f5f7913719bf8"}, +] + +[[package]] +name = "psycopg-pool" +version = "3.3.1" +description = "Connection Pool for Psycopg" +optional = false +python-versions = ">=3.10" +groups = ["main"] +files = [ + {file = "psycopg_pool-3.3.1-py3-none-any.whl", hash = "sha256:2af5b432941c4c9ad5c87b3fa410aec910ec8f7c122855897983a06c45f2e4b5"}, + {file = "psycopg_pool-3.3.1.tar.gz", hash = "sha256:b10b10b7a175d5cc1592147dc5b7eec8a9e0834eb3ed2c4a92c858e2f51eb63c"}, +] + +[package.dependencies] +typing-extensions = ">=4.6" + +[package.extras] +test = ["anyio (>=4.0)", "mypy (>=1.14)", "pproxy (>=2.7)", "pytest (>=6.2.5)", "pytest-cov (>=3.0)", "pytest-randomly (>=3.5)"] + [[package]] name = "pygments" version = "2.19.2" @@ -1051,6 +1159,19 @@ files = [ ] markers = {dev = "python_version < \"3.13\""} +[[package]] +name = "tzdata" +version = "2026.2" +description = "Provider of IANA time zone data" +optional = false +python-versions = ">=2" +groups = ["main"] +markers = "sys_platform == \"win32\"" +files = [ + {file = "tzdata-2026.2-py2.py3-none-any.whl", hash = "sha256:bbe9af844f658da81a5f95019480da3a89415801f6cc966806612cc7169bffe7"}, + {file = "tzdata-2026.2.tar.gz", hash = "sha256:9173fde7d80d9018e02a662e168e5a2d04f87c41ea174b139fbef642eda62d10"}, +] + [[package]] name = "urllib3" version = "1.26.20" @@ -1103,8 +1224,8 @@ files = [ [package.dependencies] PyYAML = "*" urllib3 = [ - {version = "<2", markers = "platform_python_implementation == \"PyPy\""}, {version = "*", markers = "platform_python_implementation != \"PyPy\" and python_version >= \"3.10\""}, + {version = "<2", markers = "platform_python_implementation == \"PyPy\""}, ] wrapt = "*" yarl = "*" @@ -1350,4 +1471,4 @@ odbc = ["pyodbc"] [metadata] lock-version = "2.1" python-versions = "^3.10" -content-hash = "7c335bd1bffc7815b9c7e4c7c8869fb9dc389befdd7c3ab841eea8ffe799284d" +content-hash = "8b7bc284ac3f35165d4f8e3c7cac4a6e2c589bf4c2d7fef73f44cd4ba732d700" diff --git a/pyproject.toml b/pyproject.toml index 78695b5..d2806c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,9 @@ splunk-sdk = "^2.1.1" httpx = "^0.28.1" tenacity = "^9.1.2" pyodbc = "^5.3.0" +psycopg-pool = {extras = ["binary", "pool"], version = "^3.3.1"} +psycopg = {extras = ["pool"], version = "^3.3.4"} +psycopg-binary = "^3.3.4" [tool.poetry.extras] odbc = ["pyodbc"] diff --git a/src/pyapiary/dbms_connectors/postgres.py b/src/pyapiary/dbms_connectors/postgres.py new file mode 100644 index 0000000..6c48c73 --- /dev/null +++ b/src/pyapiary/dbms_connectors/postgres.py @@ -0,0 +1,104 @@ +from typing import List, Dict, Any, Optional, Generator, Type, Union +from types import TracebackType +from pyapiary.helpers import setup_logger +from psycopg_pool import AsyncConnectionPool, ConnectionPool + +class PostgresConnector: + def __init__(self,conn_str, logger=None, min_size=5, max_size=30): + self.dsn = conn_str + self.min_size = min_size + self.max_size = max_size + self.connection_pool = ConnectionPool(self.dsn, kwargs={"autocommit":True}, min_size=self.min_size, max_size=self.max_size) + self.logger = logger if logger else setup_logger(__name__) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def close(self): + """Close the PG connection.""" + if self.connection_pool: + self.connection_pool.close() + self._log("PG connection closed") + + def _log(self, msg: str, level: str = "info"): + if self.logger: + log_method = getattr(self.logger, level, self.logger.info) + log_method(msg) + + def query(self, query: str, params=None): + """ + query - string query representing the work the user wants done + params - must be legal for psycopog_pool AsyncConnectionPool object + https://www.psycopg.org/psycopg3/docs/api/pool.html#module-psycopg_pool + """ + with self.connection_pool.connection() as conn: + with conn.transaction(): + # claude recommended a transaction wrapper here + return conn.execute(query, params).fetchall() + + def bulk_insert(self, table: str, data: List[Dict[str, Any]]): + if not data: + return + + self._log(f"Inserting {len(data)} rows into table {table}") + + columns = list(data[0].keys()) + copy_query = f"COPY {table} ({', '.join(columns)}) FROM STDIN" + + with self.connection_pool.connection() as conn: + with conn.cursor() as cur: + with cur.copy(copy_query) as copy: + for row in data: + copy.write_row(tuple(row[col] for col in columns)) + # Note: pool is configured for autocommit, commit will happen before the with block ends + +# Async Version +## Need to write an async_bulk_insert +class AsyncPostgresConnector: + def __init__(self,conn_str, min_size=5, max_size=30, logger=None): + self.dsn = conn_str + self.min_size = min_size + self.max_size = max_size + self.connection_pool = AsyncConnectionPool(self.dsn, kwargs={"autocommit":True}, min_size=self.min_size, max_size=self.max_size, open=False) + self.logger = logger if logger else setup_logger(__name__) + + async def __aenter__(self): + # for async with calls + await self.connection_pool.open() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + # for async with calls + await self.connection_pool.close() + + async def async_query(self, query: str, params=None): + """ + query - string query representing the work the user wants done + params - must be legal for psycopog_pool AsyncConnectionPool object + https://www.psycopg.org/psycopg3/docs/api/pool.html#module-psycopg_pool + """ + async with self.connection_pool.connection() as conn: + cur = await conn.execute(query, params) + return await cur.fetchall() + + async def async_bulk_insert(self, table_name: str, data: List[Dict[str, Any]]): + if not data: + return + + columns = list(data[0].keys()) + + # Google recommended using an cursor.copy command here to process the dict, better performance over a high volume of rows, more efficient + # than the odbc write/execute_many paradigm. + async with self.connection_pool.connection() as aconn: + async with aconn.cursor() as acur: + # using COPY is the most performative for millions of rows + copy_query = f"COPY {table_name} ({', '.join(columns)}) FROM STDIN" + + async with acur.copy(copy_query) as copy: + for record in data: + row = tuple(record[col] for col in columns) + await copy.write_row(row) + # Note: since asyncpool passes autocommit kwarg, commits will happen before the with block ends \ No newline at end of file diff --git a/src/pyapiary/tests/test_postgres/test_unit_postgres.py b/src/pyapiary/tests/test_postgres/test_unit_postgres.py new file mode 100644 index 0000000..fe63d5f --- /dev/null +++ b/src/pyapiary/tests/test_postgres/test_unit_postgres.py @@ -0,0 +1,80 @@ +from unittest.mock import MagicMock, patch +import pytest +import pyapiary.dbms_connectors.postgres as pg_module +from pyapiary.dbms_connectors.postgres import PostgresConnector + + +@pytest.fixture +def mock_pyodbc(): + with patch("pyapiary.dbms_connectors.odbc._get_pyodbc") as mock_loader: + pyodbc_mock = MagicMock() + mock_loader.return_value = pyodbc_mock + yield pyodbc_mock + + +def test_odbcconnector_init(mock_pyodbc): + mock_logger = MagicMock() + connector = PostgresConnector("DSN=testdb", logger=mock_logger) + mock_pyodbc.connect.assert_called_once_with("DSN=testdb") + assert connector.logger == mock_logger + + +def test_odbcconnector_query_returns_rows(mock_pyodbc): + """Test that query returns rows as dictionaries.""" + mock_cursor = MagicMock() + mock_cursor.description = [("id",), ("name",)] + mock_cursor.fetchall.return_value = [(1, "Alice"), (2, "Bob")] + mock_pyodbc.connect.return_value.cursor.return_value = mock_cursor + connector = PostgresConnector("DSN=testdb") + + results = list(connector.query("SELECT * FROM users")) + + assert results == [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}] + + +def test_odbcconnector_bulk_insert(mock_pyodbc): + mock_cursor = MagicMock() + mock_pyodbc.connect.return_value.cursor.return_value = mock_cursor + connector = PostgresConnector("DSN=testdb") + + data = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}] + connector.bulk_insert("users", data) + + assert mock_cursor.executemany.called + assert mock_pyodbc.connect.return_value.commit.called + + +def test_odbcconnector_bulk_insert_empty_data(mock_pyodbc): + mock_cursor = MagicMock() + mock_pyodbc.connect.return_value.cursor.return_value = mock_cursor + connector = PostgresConnector("DSN=testdb") + + connector.bulk_insert("users", []) + + mock_cursor.executemany.assert_not_called() + + +def test_odbcconnector_context_manager_closes_connection(mock_pyodbc): + """Test that the context manager closes the connection.""" + mock_conn = MagicMock() + mock_pyodbc.connect.return_value = mock_conn + + with PostgresConnector("DSN=testdb") as connector: + assert isinstance(connector, PostgresConnector) + + mock_conn.close.assert_called_once() + + +def test_odbcconnector_raises_helpful_error_when_pyodbc_missing(monkeypatch): + """Ensure that using the connector without the extra raises a clear error.""" + monkeypatch.setattr(pg_module, "_PG_MODULE", None) + monkeypatch.setattr( + pg_module, + "import_module", + MagicMock(side_effect=ImportError("pyodbc missing")), + ) + + with pytest.raises(ImportError) as excinfo: + PostgresConnector("DSN=testdb") + + assert "pyodbc is not installed" in str(excinfo.value)