Skip to content

Latest commit

 

History

History
527 lines (404 loc) · 12.2 KB

File metadata and controls

527 lines (404 loc) · 12.2 KB
name sqlite
description SQLite - embedded database, SQL queries, schema design, Python integration, optimization
metadata
author version tags
OSS AI Skills
1.0.0
sqlite
database
sql
embedded
python
db-api

SQLite

SQLite - self-contained, serverless, zero-configuration SQL database engine.

Overview

SQLite is an embedded relational database. The entire database is stored in a single cross-platform disk file. No server process needed.

  • Serverless - No separate server process
  • Zero config - No installation or setup
  • Single file - Entire database in one .db file
  • ACID - Full transactional support
  • Cross-platform - Works everywhere

See Slicker.me SQLite Features for a comprehensive feature overview.


Python Integration

Basic Usage

import sqlite3

# Connect (creates file if not exists)
conn = sqlite3.connect('myapp.db')

# Use as context manager (auto-commits)
with sqlite3.connect('myapp.db') as conn:
    conn.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)")
    conn.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Alice", "alice@example.com"))
    conn.commit()

# In-memory database
conn = sqlite3.connect(':memory:')

Row Factory

# Access columns by name
conn = sqlite3.connect('myapp.db')
conn.row_factory = sqlite3.Row

cursor = conn.execute("SELECT * FROM users")
for row in cursor:
    print(row['name'], row['email'])

# Or use dict factory
def dict_factory(cursor, row):
    return {col[0]: row[i] for i, col in enumerate(cursor.description)}

conn.row_factory = dict_factory

CRUD Operations

import sqlite3

conn = sqlite3.connect('app.db')
conn.row_factory = sqlite3.Row

# Create
conn.execute("""
    CREATE TABLE IF NOT EXISTS products (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        price REAL NOT NULL,
        category TEXT,
        in_stock INTEGER DEFAULT 1,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
""")

# Insert
conn.execute("INSERT INTO products (name, price, category) VALUES (?, ?, ?)",
             ("Widget", 9.99, "gadgets"))

# Bulk insert
products = [("A", 1.0, "cat1"), ("B", 2.0, "cat2"), ("C", 3.0, "cat1")]
conn.executemany("INSERT INTO products (name, price, category) VALUES (?, ?, ?)", products)
conn.commit()

# Read
cursor = conn.execute("SELECT * FROM products WHERE price > ?", (2.0,))
for row in cursor:
    print(dict(row))

# Update
conn.execute("UPDATE products SET price = ? WHERE name = ?", (12.99, "Widget"))
conn.commit()

# Delete
conn.execute("DELETE FROM products WHERE id = ?", (1,))
conn.commit()

conn.close()

Schema Design

Data Types

SQLite uses dynamic typing with storage classes:

  • NULL - Null value
  • INTEGER - Signed integer (1-8 bytes)
  • REAL - Floating point (8-byte IEEE)
  • TEXT - UTF-8, UTF-16BE, or UTF-16LE string
  • BLOB - Binary data

Table Creation

-- Basic table
CREATE TABLE users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    username TEXT NOT NULL UNIQUE,
    email TEXT NOT NULL UNIQUE,
    password_hash TEXT NOT NULL,
    is_active INTEGER DEFAULT 1,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- With foreign key
CREATE TABLE orders (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    user_id INTEGER NOT NULL,
    total REAL NOT NULL,
    status TEXT DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);

-- Enable foreign keys (required in SQLite)
PRAGMA foreign_keys = ON;

-- Index
CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_orders_status ON orders(status);

-- Unique constraint
CREATE TABLE tags (
    id INTEGER PRIMARY KEY,
    name TEXT NOT NULL UNIQUE
);

-- Composite index
CREATE INDEX idx_products_cat_price ON products(category, price);

Alter Table

-- SQLite supports limited ALTER TABLE
ALTER TABLE users ADD COLUMN avatar TEXT;
ALTER TABLE users RENAME COLUMN username TO handle;
ALTER TABLE users RENAME TO accounts;

-- For complex changes, recreate:
BEGIN TRANSACTION;
CREATE TABLE users_new (
    id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    email TEXT UNIQUE
);
INSERT INTO users_new SELECT id, name, email FROM users;
DROP TABLE users;
ALTER TABLE users_new RENAME TO users;
COMMIT;

Transactions

# Manual transaction
conn = sqlite3.connect('app.db')
conn.execute("PRAGMA foreign_keys = ON")

try:
    conn.execute("BEGIN")
    conn.execute("INSERT INTO orders (user_id, total) VALUES (?, ?)", (1, 99.99))
    conn.execute("UPDATE users SET is_active = 1 WHERE id = ?", (1,))
    conn.commit()
except Exception as e:
    conn.rollback()
    raise

# Context manager (auto-commit or rollback)
with conn:
    conn.execute("INSERT INTO users (name) VALUES (?)", ("Bob",))
    # Auto-commits on success, auto-rollback on exception

Advanced Queries

Joins

-- Inner join
SELECT o.id, u.username, o.total, o.status
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE o.status = 'completed';

-- Left join
SELECT u.username, COUNT(o.id) as order_count, COALESCE(SUM(o.total), 0) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.id
HAVING order_count > 0;

-- Subquery
SELECT * FROM products
WHERE price > (SELECT AVG(price) FROM products);

-- Common Table Expression (CTE)
WITH active_users AS (
    SELECT id, username FROM users WHERE is_active = 1
)
SELECT au.username, COUNT(o.id) as orders
FROM active_users au
LEFT JOIN orders o ON au.id = o.user_id
GROUP BY au.id;

Window Functions

-- Row number
SELECT name, price,
    ROW_NUMBER() OVER (ORDER BY price DESC) as rank
FROM products;

-- Partitioned aggregation
SELECT name, category, price,
    RANK() OVER (PARTITION BY category ORDER BY price DESC) as category_rank,
    AVG(price) OVER (PARTITION BY category) as avg_category_price
FROM products;

-- Running total
SELECT id, total,
    SUM(total) OVER (ORDER BY id) as running_total
FROM orders;

Upsert (INSERT OR REPLACE)

-- Insert or replace
INSERT OR REPLACE INTO users (id, name, email)
VALUES (1, 'Alice', 'alice@new.com');

-- Insert or ignore (skip if conflict)
INSERT OR IGNORE INTO users (id, name, email)
VALUES (1, 'Alice', 'alice@example.com');

-- UPSERT with DO UPDATE (SQLite 3.24+)
INSERT INTO users (username, email)
VALUES ('bob', 'bob@example.com')
ON CONFLICT(username) DO UPDATE SET
    email = excluded.email;

Full-Text Search (FTS5)

-- Create FTS table
CREATE VIRTUAL TABLE articles_fts USING fts5(title, body, content='articles', content_rowid='id');

-- Populate
INSERT INTO articles_fts (rowid, title, body) SELECT id, title, body FROM articles;

-- Search
SELECT * FROM articles_fts WHERE articles_fts MATCH 'sqlite AND python';
SELECT * FROM articles_fts WHERE articles_fts MATCH 'sqlite OR database';
SELECT * FROM articles_fts WHERE articles_fts MATCH '"full text search"';

-- Ranked results
SELECT rank, * FROM articles_fts WHERE articles_fts MATCH 'sqlite' ORDER BY rank;

-- Keep FTS in sync with triggers
CREATE TRIGGER articles_ai AFTER INSERT ON articles BEGIN
    INSERT INTO articles_fts (rowid, title, body) VALUES (new.id, new.title, new.body);
END;

CREATE TRIGGER articles_ad AFTER DELETE ON articles BEGIN
    INSERT INTO articles_fts (articles_fts, rowid, title, body) VALUES ('delete', old.id, old.title, old.body);
END;

JSON Support

-- Store JSON in TEXT column
CREATE TABLE events (
    id INTEGER PRIMARY KEY,
    data TEXT
);

-- Extract values
SELECT json_extract(data, '$.name') FROM events;
SELECT json_extract(data, '$.tags[0]') FROM events;

-- Insert JSON
INSERT INTO events (data) VALUES (json('{"name": "click", "tags": ["ui", "btn"]}'));

-- JSON functions
SELECT json_type(data) FROM events;          -- 'object'
SELECT json_array_length(data, '$.tags') FROM events;  -- 2
SELECT json_insert(data, '$.count', 1) FROM events;
SELECT json_set(data, '$.count', 42) FROM events;

Optimization

PRAGMA Settings

conn = sqlite3.connect('app.db')

# Performance
conn.execute("PRAGMA journal_mode = WAL")         # Write-Ahead Logging
conn.execute("PRAGMA synchronous = NORMAL")        # Balance safety/speed
conn.execute("PRAGMA cache_size = -64000")          # 64MB cache
conn.execute("PRAGMA temp_store = MEMORY")          # Temp tables in memory
conn.execute("PRAGMA mmap_size = 268435456")        # 256MB memory map

# Safety
conn.execute("PRAGMA foreign_keys = ON")            # Enable FK constraints
conn.execute("PRAGMA busy_timeout = 5000")          # Wait 5s on lock

Query Analysis

-- Explain query plan
EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = 'test@example.com';

-- Check indexes
SELECT * FROM pragma_index_list('users');

-- Table info
PRAGMA table_info(users);
PRAGMA index_list(users);
PRAGMA index_info(idx_users_email);

Bulk Operations

# Fast bulk insert
conn.execute("PRAGMA synchronous = OFF")
conn.execute("PRAGMA journal_mode = MEMORY")

conn.executemany(
    "INSERT INTO large_table (col1, col2) VALUES (?, ?)",
    data  # list of tuples
)
conn.commit()

conn.execute("PRAGMA synchronous = NORMAL")
conn.execute("PRAGMA journal_mode = WAL")

Backups

import sqlite3

def backup_database(src_path, dst_path):
    src = sqlite3.connect(src_path)
    dst = sqlite3.connect(dst_path)
    
    src.backup(dst)
    
    dst.close()
    src.close()

# Or via command line
# sqlite3 app.db ".backup backup.db"

Common Patterns

Connection Pool (Thread-safe)

import sqlite3
import threading
from contextlib import contextmanager

class SQLitePool:
    def __init__(self, db_path, max_connections=5):
        self.db_path = db_path
        self._local = threading.local()
        
    def get_connection(self):
        if not hasattr(self._local, 'conn'):
            self._local.conn = sqlite3.connect(self.db_path)
            self._local.conn.row_factory = sqlite3.Row
            self._local.conn.execute("PRAGMA journal_mode = WAL")
            self._local.conn.execute("PRAGMA foreign_keys = ON")
        return self._local.conn
    
    @contextmanager
    def cursor(self):
        conn = self.get_connection()
        cursor = conn.cursor()
        try:
            yield cursor
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            cursor.close()

Migration Helper

import sqlite3

MIGRATIONS = {
    1: """
        CREATE TABLE users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT UNIQUE NOT NULL
        );
    """,
    2: """
        CREATE TABLE orders (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id INTEGER NOT NULL,
            total REAL NOT NULL,
            FOREIGN KEY (user_id) REFERENCES users(id)
        );
        CREATE INDEX idx_orders_user ON orders(user_id);
    """,
}

def run_migrations(db_path):
    conn = sqlite3.connect(db_path)
    conn.execute("CREATE TABLE IF NOT EXISTS _migrations (version INTEGER PRIMARY KEY)")
    
    current = conn.execute("SELECT MAX(version) FROM _migrations").fetchone()[0] or 0
    
    for version, sql in sorted(MIGRATIONS.items()):
        if version > current:
            conn.executescript(sql)
            conn.execute("INSERT INTO _migrations (version) VALUES (?)", (version,))
            conn.commit()
            print(f"Migration {version} applied")
    
    conn.close()

CLI Commands

# Open database
sqlite3 myapp.db

# Execute SQL
sqlite3 myapp.db "SELECT * FROM users;"

# Import CSV
sqlite3 myapp.db -csv -header "SELECT * FROM users;" > output.csv

# Export schema
sqlite3 myapp.db ".schema"

# Dump database
sqlite3 myapp.db ".dump" > backup.sql

# Restore from dump
sqlite3 new.db < backup.sql

# List tables
sqlite3 myapp.db ".tables"

# Describe table
sqlite3 myapp.db ".schema users"

References