Generate the Kafka→ClickHouse ingestion pipeline — the streams, raw, and datalake tables plus the materialized views that connect them — from a single JSON sample. A drop-in replacement for the create_ddl_for_kafka.sh script that needs no ClickHouse binary.
Reliably ingesting Kafka events into ClickHouse takes three layers and two materialized views connecting them — exactly what clickforge kafka generates in one shot:
Kafka topic
↓
streams.{name} # Kafka engine cursor — holds no data, just reads from the topic
↓ (streams_mv)
raw.{name} # durable replay buffer — original message string + Kafka metadata
↓ (raw_mv)
datalake.{name} # typed, queryable table — fields JSONExtracted at write time
raw exists so you can rebuild datalake without re-consuming from Kafka if the schema changes.
Download the prebuilt binary for your platform from the latest release. The latest/download URLs below always resolve to the newest release, so they never go stale.
macOS (Apple Silicon)
curl -L https://github.com/Maksim-Gr/clickforge/releases/latest/download/clickforge-macos-arm64.tar.gz | tar -xz
chmod +x clickforge && sudo mv clickforge /usr/local/bin/clickforgemacOS (Intel)
curl -L https://github.com/Maksim-Gr/clickforge/releases/latest/download/clickforge-macos-x86_64.tar.gz | tar -xz
chmod +x clickforge && sudo mv clickforge /usr/local/bin/clickforgeLinux (x86_64)
curl -L https://github.com/Maksim-Gr/clickforge/releases/latest/download/clickforge-linux-x86_64.tar.gz | tar -xz
chmod +x clickforge && sudo mv clickforge /usr/local/bin/clickforgeTo install a specific version, replace latest/download with download/<tag>, e.g. download/v0.5.0.
macOS: the binary is unsigned, so Gatekeeper may block the first run. If you see "cannot be opened because the developer cannot be verified", clear the quarantine flag:
xattr -d com.apple.quarantine /usr/local/bin/clickforge
Verify:
clickforge --versioncargo build --releaseThe binary is at ./target/release/clickforge.
Point clickforge kafka at a JSON sample of your Kafka messages to generate the whole pipeline:
clickforge kafka video_events.jsonThis writes video_events_up.sql (the streams Kafka table, the raw replay buffer, the typed datalake table, and both materialized views) and video_events_down.sql (drops them in reverse). Target your cluster and Kafka collection as needed:
clickforge kafka video_events.json -c clickhouse_datalake -k kafka -o migrations/clickforge <COMMAND> [OPTIONS] <INPUT>The kafka, scan, and table commands take a single <INPUT>. The diff command takes two positionals, <OLD> <NEW> (see the diff section).
<INPUT> is a path to a JSON/NDJSON file, or - to read from stdin. When reading from stdin, pass --name to set the table name (it defaults to table):
cat video_events.json | clickforge scan -
cat video_events.json | clickforge table - --name video_events -o migrations/kafka is the primary command — it generates the whole ingestion pipeline. The rest are helpers around it:
| Command | Role | Description |
|---|---|---|
kafka |
primary | Generate the full Kafka→ClickHouse pipeline (streams, raw, datalake + materialized views) |
scan |
helper | Inspect JSON fields and pick a ClickHouse engine for a one-off table |
table |
helper | Generate a single CREATE TABLE migration from JSON |
diff |
helper | Generate ALTER TABLE migrations to evolve a table as your JSON schema changes |
clickforge kafka [OPTIONS] <INPUT>| Flag | Default | Description |
|---|---|---|
-n, --name <NAME> |
input filename stem | Override the table name |
-c, --cluster <CLUSTER> |
clickhouse_datalake |
ClickHouse cluster name |
-k, --kafka <KAFKA> |
kafka |
Kafka collection name |
-o, --output-dir <DIR> |
. |
Output directory for generated SQL files |
--stdout |
off | Print migrations to stdout instead of writing files |
clickforge kafka video_events.json
clickforge kafka video_events.json -n my_table -c my_cluster -k my_kafka -o migrations/Writes {name}_up.sql (creates streams table, raw table, datalake table, raw_mv, streams_mv) and {name}_down.sql (drops all 5 in reverse order).
scan, table, and diff support the pipeline above — choose an engine, generate a one-off table, or evolve the datalake schema as your messages change. You don't need them for the basic kafka flow.
Analyzes JSON fields, classifies them (Timestamp-like, ID-like, Numeric), and prints engine suggestions with ORDER BY recommendations. When numeric metrics and a dimension (id/timestamp) are present, it also suggests SummingMergeTree with the metric columns to sum.
clickforge scan [OPTIONS] <INPUT>| Flag | Default | Description |
|---|---|---|
-n, --name <NAME> |
input filename stem | Names the generated {name}_up.sql/{name}_down.sql files if you pick an engine at the prompt; otherwise no effect on output |
-c, --cluster <CLUSTER> |
— | If set, suggests ReplicatedMergeTree variants |
clickforge scan video_events.json
clickforge scan video_events.json -c my_clusterWhen run in a terminal, scan finishes by offering to generate a migration from one of
its suggestions, so you don't have to re-type a table command:
Pick an engine to generate [1-3, Enter to skip]: 2
Written: ./video_events_up.sql
Written: ./video_events_down.sql
Picking a number writes {name}_up.sql/{name}_down.sql (using that suggestion's engine
and ORDER BY) to the current directory; press Enter to skip. The prompt only appears in
an interactive terminal — piped or scripted runs just print the suggestions as before.
Example output (truncated):
Field analysis: video_events.json (4 records, 13 fields)
event_id String required → ID-like
playback_position_ms Int64 required → Numeric
timestamp DateTime64(3) required → Timestamp-like
client_ip Nullable(String) nullable
Suggested engines:
1. MergeTree
ORDER BY (timestamp)
→ general purpose time-series table
2. ReplacingMergeTree
ORDER BY (event_id, timestamp)
→ deduplicates rows by `event_id` — good for upsert-like data
To generate a migration with the chosen engine, run:
clickforge table video_events.json --engine MergeTree --order-by timestamp
clickforge table video_events.json --engine ReplacingMergeTree --order-by event_id,timestamp
Generates a single CREATE TABLE / DROP TABLE migration. Use scan first to pick the right engine.
clickforge table [OPTIONS] <INPUT>| Flag | Default | Description |
|---|---|---|
-n, --name <NAME> |
input filename stem | Override the table name |
-e, --engine <ENGINE> |
inferred (MergeTree) | MergeTree, ReplicatedMergeTree, ReplacingMergeTree, SummingMergeTree |
--order-by <FIELDS> |
inferred from field names | Comma-separated ORDER BY fields |
-c, --cluster <CLUSTER> |
— | Adds ON CLUSTER clause; required for ReplicatedMergeTree |
-o, --output-dir <DIR> |
. |
Output directory for generated SQL files |
--stdout |
off | Print migrations to stdout instead of writing files |
clickforge table video_events.json
clickforge table video_events.json --engine ReplicatedMergeTree -c my_clusterInfers a schema from two JSON samples (an old and a new one) and generates additive ALTER TABLE migrations for the columns that appear in the new sample but not the old.
clickforge diff [OPTIONS] <OLD> <NEW>| Flag | Default | Description |
|---|---|---|
<OLD> |
— | Existing/old JSON sample (or - for stdin) |
<NEW> |
— | New JSON sample (or - for stdin) |
-n, --name <NAME> |
new file stem | Override the table name |
-c, --cluster <CLUSTER> |
— | Adds ON CLUSTER to the statements |
-o, --output-dir <DIR> |
. |
Output directory for generated SQL files |
--stdout |
off | Print migrations to stdout instead of writing files |
clickforge diff video_events.json video_events_v2.json -n video_eventsWrites {name}_alter_up.sql (ADD COLUMN) and {name}_alter_down.sql (DROP COLUMN, reverse order). Removed columns and type changes are not migrated automatically — they are reported as warnings on stderr so you can review them by hand (dropping or retyping a populated column is destructive).
Types are inferred by scanning every record and widening as needed:
| JSON value | ClickHouse type |
|---|---|
| string | Nullable(String) |
ISO-8601 datetime string (YYYY-MM-DDThh:mm:ss…) |
Nullable(DateTime64(3)) |
| integer | Nullable(Int64) |
| float | Nullable(Float64) |
| boolean | Nullable(Bool) |
| array | Array(T) — element type inferred |
| object with scalar values | Map(String, V) |
| null / nested object | Nullable(String) |
Array and Map columns are never wrapped in Nullable (ClickHouse forbids it). Date-only strings (2024-03-01) are left as String.
String columns with few distinct values across a large-enough sample are emitted as LowCardinality(String) (a ClickHouse storage optimization). This is conservative: it stays off for small samples and high-distinct columns.
If the same field appears as Int64 in one record and Float64 in another, it widens to Nullable(Float64). Any other type conflict widens to Nullable(String).
A field is non-nullable only if it is present in every record. Field order in the output matches the first record that introduced each field.

