From be41bddc71342d6420f5ac766586b2eb58803789 Mon Sep 17 00:00:00 2001 From: David Fridrich Date: Wed, 15 Apr 2026 22:08:19 +0200 Subject: [PATCH] sqlite database template --- AGENTS.md | 1 + python/sqlite/README.md | 103 ++++++++++++++ python/sqlite/function/__init__.py | 1 + python/sqlite/function/database.py | 85 ++++++++++++ python/sqlite/function/func.py | 197 +++++++++++++++++++++++++++ python/sqlite/pyproject.toml | 27 ++++ python/sqlite/tests/test_func.py | 209 +++++++++++++++++++++++++++++ 7 files changed, 623 insertions(+) create mode 100644 python/sqlite/README.md create mode 100644 python/sqlite/function/__init__.py create mode 100644 python/sqlite/function/database.py create mode 100644 python/sqlite/function/func.py create mode 100644 python/sqlite/pyproject.toml create mode 100644 python/sqlite/tests/test_func.py diff --git a/AGENTS.md b/AGENTS.md index d0c023e..412cc38 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -48,6 +48,7 @@ Python-specific: | `python/pdf-processing` | PDF operations (extract text, metadata, split, merge) via HTTP. | | `python/mcp` | MCP server exposing basic tools (hello, add_numbers) via Model Context Protocol. | | `python/mcp-ollama-rag` | RAG via MCP — combines Ollama with Chroma vector DB for document Q&A. Needs Ollama. | +| `python/sqlite` | REST API backed by SQLite. CRUD on tables with zero external dependencies. | | `python/keycloak-auth` | Validates Keycloak JWT Bearer tokens via OIDC/JWKS. Protects endpoints with auth. | For contributing to this repo, see [CONTRIBUTING.md](CONTRIBUTING.md). diff --git a/python/sqlite/README.md b/python/sqlite/README.md new file mode 100644 index 0000000..f3a7a2a --- /dev/null +++ b/python/sqlite/README.md @@ -0,0 +1,103 @@ +# Python HTTP Function - SQLite Database + +A Knative Function with a REST API backed by a SQLite database. Shows how +to use persistent storage in a serverless function — no external database +server needed. + +## Quick Start + +### 1. Create the function + +```bash +func create myfunc \ + -r https://github.com/functions-dev/templates \ + -l python -t sqlite +cd myfunc +``` + +### 2. Run the function + +```bash +func run --builder=host +``` + +### 3. Try it + +```bash +# Function info +curl -s http://localhost:8080/ | jq . + +# Create a table +curl -s -X POST http://localhost:8080/tables \ + -H "Content-Type: application/json" \ + -d '{"table": "tasks", "columns": {"title": "TEXT", "status": "TEXT", "priority": "TEXT"}}' | jq . + +# Insert rows +curl -s -X POST http://localhost:8080/tables/tasks \ + -H "Content-Type: application/json" \ + -d '{"title": "Fix login bug", "status": "open", "priority": "high"}' | jq . + +curl -s -X POST http://localhost:8080/tables/tasks \ + -H "Content-Type: application/json" \ + -d '{"title": "Update docs", "status": "open", "priority": "low"}' | jq . + +# Query all rows +curl -s http://localhost:8080/tables/tasks | jq . + +# Filter rows +curl -s 'http://localhost:8080/tables/tasks?priority=high' | jq . + +# Delete a row +curl -s -X DELETE 'http://localhost:8080/tables/tasks?id=2' | jq . + +# List all tables +curl -s http://localhost:8080/tables | jq . + +# Table schema +curl -s http://localhost:8080/tables/tasks/schema | jq . +``` + +## Configuration + +| Variable | Required | Description | Default | +|---|---|---|---| +| `SQLITE_DB_PATH` | No | Path to the SQLite database file | `data.db` | + +## Endpoints + +| Method | Path | Description | +|---|---|---| +| GET | `/` | Function info and list of tables | +| GET | `/tables` | List all tables | +| POST | `/tables` | Create a table | +| GET | `/tables/` | Query rows (`?col=val` to filter, `?limit=N`) | +| POST | `/tables/` | Insert a row | +| DELETE | `/tables/` | Delete rows (`?col=val` to filter, at least one required) | +| GET | `/tables//schema` | Column info for a table | + +## Deploying to a Cluster + +SQLite stores data inside the container — it's lost on restart unless you +mount a persistent volume. + +Add to `func.yaml`: + +```yaml +run: + envs: + - name: SQLITE_DB_PATH + value: /data/data.db + volumes: + - persistentVolumeClaim: + claimName: sqlite-data + path: /data +``` + +## Development + +```bash +pip install -e '.[dev]' +pytest tests/ +``` + +For more, see [the complete documentation](https://github.com/knative/func/tree/main/docs) diff --git a/python/sqlite/function/__init__.py b/python/sqlite/function/__init__.py new file mode 100644 index 0000000..c16dbac --- /dev/null +++ b/python/sqlite/function/__init__.py @@ -0,0 +1 @@ +from .func import new diff --git a/python/sqlite/function/database.py b/python/sqlite/function/database.py new file mode 100644 index 0000000..f2d8cd9 --- /dev/null +++ b/python/sqlite/function/database.py @@ -0,0 +1,85 @@ +"""SQLite database wrapper. + +Provides a class for creating tables, inserting rows, querying data, +and inspecting schema. All operations go through a single SQLite file +on disk so data persists across restarts. +""" + +import pysqlite3 as sqlite3 + +ALLOWED_TYPES = {"TEXT", "INTEGER", "REAL", "BLOB", "NUMERIC"} + + +def _quote_id(identifier: str) -> str: + """Quote a SQL identifier to prevent injection.""" + return f'"{identifier.replace(chr(34), "")}"' + + +class Database: + def __init__(self, db_path: str): + self.db_path = db_path + self.conn = sqlite3.connect(db_path) + self.conn.row_factory = sqlite3.Row + + def close(self) -> None: + self.conn.close() + + def list_tables(self) -> list[str]: + rows = self.conn.execute( + "SELECT name FROM sqlite_master" + " WHERE type='table' AND name NOT LIKE 'sqlite_%'" + " ORDER BY name" + ).fetchall() + return [row["name"] for row in rows] + + def describe_table(self, table: str) -> list[dict]: + rows = self.conn.execute( + f"PRAGMA table_info({_quote_id(table)})" + ).fetchall() + return [ + {"name": r["name"], "type": r["type"], "notnull": bool(r["notnull"])} + for r in rows + ] + + def create_table(self, table: str, columns: dict[str, str]) -> str: + for col_name, col_type in columns.items(): + if col_type.upper() not in ALLOWED_TYPES: + raise ValueError( + f"Invalid column type '{col_type}' for '{col_name}'. " + f"Allowed: {', '.join(sorted(ALLOWED_TYPES))}" + ) + cols = ", ".join(f"{_quote_id(k)} {v}" for k, v in columns.items()) + sql = f"CREATE TABLE IF NOT EXISTS {_quote_id(table)} (id INTEGER PRIMARY KEY AUTOINCREMENT, {cols})" + self.conn.execute(sql) + self.conn.commit() + return f"Table '{table}' created with columns: {', '.join(columns.keys())}" + + def insert(self, table: str, data: dict) -> str: + keys = list(data.keys()) + placeholders = ", ".join("?" for _ in keys) + cols = ", ".join(_quote_id(k) for k in keys) + sql = f"INSERT INTO {_quote_id(table)} ({cols}) VALUES ({placeholders})" + cursor = self.conn.execute(sql, list(data.values())) + self.conn.commit() + return f"Inserted row {cursor.lastrowid} into '{table}'" + + def query(self, table: str, filters: dict | None = None, limit: int = 100) -> list[dict]: + sql = f"SELECT * FROM {_quote_id(table)}" + params: list = [] + if filters: + clauses = [f"{_quote_id(k)} = ?" for k in filters] + sql += " WHERE " + " AND ".join(clauses) + params = list(filters.values()) + sql += " LIMIT ?" + params.append(limit) + rows = self.conn.execute(sql, params).fetchall() + return [dict(row) for row in rows] + + def delete(self, table: str, filters: dict[str, str]) -> str: + if not filters: + return "Error: at least one filter is required for delete" + clauses = [f"{_quote_id(k)} = ?" for k in filters] + sql = f"DELETE FROM {_quote_id(table)} WHERE " + " AND ".join(clauses) + cursor = self.conn.execute(sql, list(filters.values())) + self.conn.commit() + return f"Deleted {cursor.rowcount} row(s) from '{table}'" diff --git a/python/sqlite/function/func.py b/python/sqlite/function/func.py new file mode 100644 index 0000000..205feb7 --- /dev/null +++ b/python/sqlite/function/func.py @@ -0,0 +1,197 @@ +"""HTTP function with SQLite persistence. + +A Knative Function that provides a REST API backed by a SQLite database. +Shows how to use persistent storage in a serverless function. + +Endpoints: + GET / -> function info + list of tables + GET /tables -> list all tables + POST /tables -> create a table + GET /tables/ -> query rows (filter via query params) + POST /tables/ -> insert a row + DELETE /tables/ -> delete rows (filter via query params) + GET /tables//schema -> column info for a table + +Configuration (environment variables): + SQLITE_DB_PATH -> path to SQLite database file (default: data.db) +""" + +import json +import logging +from urllib.parse import unquote + +from .database import Database + + +def new(): + """Entry point -- called once by the Knative Functions runtime.""" + return Function() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +async def send_json(send, body, status: int = 200) -> None: + payload = json.dumps(body).encode() + await send({ + "type": "http.response.start", + "status": status, + "headers": [[b"content-type", b"application/json"]], + }) + await send({ + "type": "http.response.body", + "body": payload, + }) + + +async def read_body(receive) -> bytes: + body = b"" + while True: + message = await receive() + body += message.get("body", b"") + if not message.get("more_body", False): + break + return body + + +def parse_path(path: str) -> tuple[str, str, str]: + """Parse /tables/name/schema into ("tables", "name", "schema").""" + parts = [p for p in path.strip("/").split("/") if p] + while len(parts) < 3: + parts.append("") + return (parts[0], parts[1], parts[2]) + + +# --------------------------------------------------------------------------- +# The Function +# --------------------------------------------------------------------------- + +class Function: + def __init__(self): + self.db = None + + async def handle(self, scope, receive, send) -> None: + method = scope.get("method", "GET") + path = scope.get("path", "/") + query_string = unquote(scope.get("query_string", b"").decode()) + resource, name, sub = parse_path(path) + + # GET / -> function info + if path == "/" and method == "GET": + return await send_json(send, { + "name": "sqlite", + "description": "HTTP function with SQLite database", + "database": self.db.db_path, + "tables": self.db.list_tables(), + "endpoints": { + "GET /tables": "List all tables", + "POST /tables": "Create a table", + "GET /tables/": "Query rows (?col=val to filter, ?limit=N)", + "POST /tables/": "Insert a row", + "DELETE /tables/": "Delete rows (?col=val to filter)", + "GET /tables//schema": "Column info for a table", + }, + }) + + if resource != "tables": + return await send_json(send, {"error": "Not found"}, status=404) + + # GET /tables -> list tables + if not name and method == "GET": + return await send_json(send, {"tables": self.db.list_tables()}) + + # POST /tables -> create table + # Body: {"table": "tasks", "columns": {"title": "TEXT", "done": "INTEGER"}} + if not name and method == "POST": + raw = await read_body(receive) + try: + body = json.loads(raw) if raw else {} + except json.JSONDecodeError: + return await send_json(send, {"error": "Invalid JSON body"}, status=400) + table = body.get("table", "") + columns = body.get("columns", {}) + if not table or not columns: + return await send_json(send, { + "error": "Required: {\"table\": \"name\", \"columns\": {\"col\": \"TYPE\"}}" + }, status=400) + try: + result = self.db.create_table(table, columns) + return await send_json(send, {"result": result}, status=201) + except Exception as e: + return await send_json(send, {"error": str(e)}, status=400) + + # GET /tables//schema -> column info + if name and sub == "schema" and method == "GET": + columns = self.db.describe_table(name) + return await send_json(send, {"table": name, "columns": columns}) + + if sub: + return await send_json(send, {"error": "Not found"}, status=404) + + # GET /tables/ -> query rows + # Filter via query params: ?done=0&priority=high (equality filters) + # Special param: ?limit=N (default 100) + if name and method == "GET": + params = dict(p.split("=", 1) for p in query_string.split("&") if "=" in p) + try: + limit = int(params.pop("limit", "100")) + except ValueError: + return await send_json(send, {"error": "limit must be an integer"}, status=400) + try: + rows = self.db.query(name, filters=params or None, limit=limit) + return await send_json(send, { + "table": name, + "count": len(rows), + "rows": rows, + }) + except Exception as e: + return await send_json(send, {"error": str(e)}, status=400) + + # POST /tables/ -> insert row + # Body: {"title": "Fix bug", "done": 0} + if name and method == "POST": + raw = await read_body(receive) + try: + body = json.loads(raw) if raw else {} + except json.JSONDecodeError: + return await send_json(send, {"error": "Invalid JSON body"}, status=400) + try: + result = self.db.insert(name, body) + return await send_json(send, {"result": result}, status=201) + except Exception as e: + return await send_json(send, {"error": str(e)}, status=400) + + # DELETE /tables/?id=5 -> delete rows + # Filter via query params (at least one required) + if name and method == "DELETE": + params = dict(p.split("=", 1) for p in query_string.split("&") if "=" in p) + if not params: + return await send_json(send, { + "error": "Required: filter params (e.g. ?id=5)" + }, status=400) + try: + result = self.db.delete(name, params) + return await send_json(send, {"result": result}) + except Exception as e: + return await send_json(send, {"error": str(e)}, status=400) + + return await send_json(send, {"error": "Method not allowed"}, status=405) + + def start(self, cfg) -> None: + db_path = cfg.get("SQLITE_DB_PATH", "data.db") + self.db = Database(db_path) + logging.info("SQLite function ready: database=%s", db_path) + + def stop(self) -> None: + if self.db: + self.db.close() + logging.info("Function stopping") + + def alive(self) -> tuple: + return True, "Alive" + + def ready(self) -> tuple: + if self.db is None: + return False, "Database not initialized" + return True, "Ready" diff --git a/python/sqlite/pyproject.toml b/python/sqlite/pyproject.toml new file mode 100644 index 0000000..a3d73ac --- /dev/null +++ b/python/sqlite/pyproject.toml @@ -0,0 +1,27 @@ +[project] +name = "function" +description = "HTTP function with SQLite database" +version = "0.1.0" +requires-python = ">=3.9" +readme = "README.md" +license = "MIT" +dependencies = [ + "pysqlite3-binary>=0.5.4", +] +authors = [ + { name = "Your Name", email = "you@example.com" }, +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.pytest.ini_options] +asyncio_mode = "strict" +asyncio_default_fixture_loop_scope = "function" diff --git a/python/sqlite/tests/test_func.py b/python/sqlite/tests/test_func.py new file mode 100644 index 0000000..b2deff8 --- /dev/null +++ b/python/sqlite/tests/test_func.py @@ -0,0 +1,209 @@ +"""Tests for the sqlite template. + +Tests both the database layer and the HTTP handler. +Each test gets a fresh temporary database via the configured_function fixture. +""" + +import json + +import pytest + +from function import new + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +class ResponseCapture: + def __init__(self): + self.status = None + self.body = b"" + + async def __call__(self, message): + if message["type"] == "http.response.start": + self.status = message["status"] + elif message["type"] == "http.response.body": + self.body += message.get("body", b"") + + @property + def json(self) -> dict: + return json.loads(self.body) + + +def make_scope(path="/", method="GET", query_string=""): + return { + "method": method, + "path": path, + "headers": [], + "query_string": query_string.encode(), + } + + +async def call(f, path="/", method="GET", body=None, query_string=""): + resp = ResponseCapture() + body_bytes = json.dumps(body).encode() if body else b"" + sent = False + + async def receive(): + nonlocal sent + if not sent: + sent = True + return {"body": body_bytes, "more_body": False} + return {"body": b"", "more_body": False} + + await f.handle(make_scope(path, method, query_string), receive, resp) + return resp + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def configured_function(tmp_path): + """Create a Function with a temporary SQLite database.""" + f = new() + f.start({"SQLITE_DB_PATH": str(tmp_path / "test.db")}) + yield f + f.stop() + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio(loop_scope="function") +async def test_root_info(configured_function): + resp = await call(configured_function, "/") + assert resp.status == 200 + assert resp.json["name"] == "sqlite" + assert "tables" in resp.json + + +@pytest.mark.asyncio(loop_scope="function") +async def test_list_tables_empty(configured_function): + resp = await call(configured_function, "/tables") + assert resp.status == 200 + assert resp.json["tables"] == [] + + +@pytest.mark.asyncio(loop_scope="function") +async def test_create_table(configured_function): + resp = await call(configured_function, "/tables", method="POST", body={ + "table": "tasks", + "columns": {"title": "TEXT", "done": "INTEGER"}, + }) + assert resp.status == 201 + assert "tasks" in resp.json["result"] + + resp = await call(configured_function, "/tables") + assert "tasks" in resp.json["tables"] + + +@pytest.mark.asyncio(loop_scope="function") +async def test_create_table_invalid_type(configured_function): + resp = await call(configured_function, "/tables", method="POST", body={ + "table": "bad", + "columns": {"name": "VARCHAR(255)"}, + }) + assert resp.status == 400 + assert "Invalid column type" in resp.json["error"] + + +@pytest.mark.asyncio(loop_scope="function") +async def test_insert_and_query(configured_function): + await call(configured_function, "/tables", method="POST", body={ + "table": "notes", + "columns": {"text": "TEXT"}, + }) + + resp = await call(configured_function, "/tables/notes", method="POST", body={"text": "hello"}) + assert resp.status == 201 + + resp = await call(configured_function, "/tables/notes") + assert resp.status == 200 + assert resp.json["count"] == 1 + assert resp.json["rows"][0]["text"] == "hello" + + +@pytest.mark.asyncio(loop_scope="function") +async def test_query_with_filter(configured_function): + await call(configured_function, "/tables", method="POST", body={ + "table": "tasks", + "columns": {"title": "TEXT", "done": "INTEGER"}, + }) + await call(configured_function, "/tables/tasks", method="POST", body={"title": "a", "done": 0}) + await call(configured_function, "/tables/tasks", method="POST", body={"title": "b", "done": 1}) + await call(configured_function, "/tables/tasks", method="POST", body={"title": "c", "done": 0}) + + resp = await call(configured_function, "/tables/tasks", query_string="done=0") + assert resp.json["count"] == 2 + + +@pytest.mark.asyncio(loop_scope="function") +async def test_delete(configured_function): + await call(configured_function, "/tables", method="POST", body={ + "table": "items", + "columns": {"name": "TEXT"}, + }) + await call(configured_function, "/tables/items", method="POST", body={"name": "keep"}) + await call(configured_function, "/tables/items", method="POST", body={"name": "remove"}) + + resp = await call(configured_function, "/tables/items", method="DELETE", query_string="name=remove") + assert resp.status == 200 + assert "1 row" in resp.json["result"] + + resp = await call(configured_function, "/tables/items") + assert resp.json["count"] == 1 + + +@pytest.mark.asyncio(loop_scope="function") +async def test_delete_requires_filter(configured_function): + resp = await call(configured_function, "/tables/items", method="DELETE") + assert resp.status == 400 + + +@pytest.mark.asyncio(loop_scope="function") +async def test_create_table_missing_fields(configured_function): + resp = await call(configured_function, "/tables", method="POST", body={"table": "x"}) + assert resp.status == 400 + + +@pytest.mark.asyncio(loop_scope="function") +async def test_query_bad_limit(configured_function): + await call(configured_function, "/tables", method="POST", body={ + "table": "items", + "columns": {"name": "TEXT"}, + }) + resp = await call(configured_function, "/tables/items", query_string="limit=abc") + assert resp.status == 400 + + +@pytest.mark.asyncio(loop_scope="function") +async def test_not_found(configured_function): + resp = await call(configured_function, "/unknown") + assert resp.status == 404 + + +@pytest.mark.asyncio(loop_scope="function") +async def test_schema(configured_function): + await call(configured_function, "/tables", method="POST", body={ + "table": "tasks", + "columns": {"title": "TEXT", "done": "INTEGER"}, + }) + resp = await call(configured_function, "/tables/tasks/schema") + assert resp.status == 200 + assert resp.json["table"] == "tasks" + col_names = [c["name"] for c in resp.json["columns"]] + assert "id" in col_names + assert "title" in col_names + assert "done" in col_names + + +@pytest.mark.asyncio(loop_scope="function") +async def test_not_ready_before_start(): + f = new() + ok, msg = f.ready() + assert not ok + assert "not initialized" in msg.lower()