-
Notifications
You must be signed in to change notification settings - Fork 0
KSQL Streams Example
Tmr edited this page Jun 17, 2022
·
1 revision
Почистить всё, если нужно начать сначала
DROP STREAM IF EXISTS accepted DELETE TOPIC;
DROP STREAM IF EXISTSaccepts DELETE TOPIC;
DROP STREAM IF EXISTS trades DELETE TOPIC;Создадим стрим сделок (client_id, amount, time)
CREATE OR REPLACE STREAM trades (id INT KEY, client_id INT, amount INT, t TIMESTAMP)
WITH(kafka_topic='trades', value_format='json', partitions=1, timestamp='t');Запросим события (с фильтром)
-- Можно посмотреть как влияет KEY в поле client_id на порядок записей в результате
-- В одном окне
SELECT id,
client_id,
amount,
FORMAT_TIMESTAMP(t, 'HH:mm:ss') t
FROM trades WHERE client_id = 1 EMIT CHANGES;
-- В другом окне
SELECT client_id,
count(*) cnt
FROM trades
GROUP BY client_id
EMIT CHANGES;Добавим сделки
python3 << EOD #| docker exec -i ksqldb-cli ksql http://ksqldb-server:8088
print("\n".join([f"INSERT INTO trades (id, client_id, amount, t) VALUES ({n+1}, {n % 2 + 1}, 10, '2022-06-18T00:00:{n:02d}.000');" for n in range(0, 5)]))
EOD INSERT INTO trades (id, client_id, amount, t) VALUES (1, 1, 10, '2022-06-18T00:00:00.000');
INSERT INTO trades (id, client_id, amount, t) VALUES (2, 2, 10, '2022-06-18T00:00:01.000');
INSERT INTO trades (id, client_id, amount, t) VALUES (3, 1, 10, '2022-06-18T00:00:02.000');
INSERT INTO trades (id, client_id, amount, t) VALUES (4, 2, 10, '2022-06-18T00:00:03.000');
INSERT INTO trades (id, client_id, amount, t) VALUES (5, 1, 10, '2022-06-18T00:00:04.000'); Посмотрим на DESCRIBE и топик
print trades;
describe trades extended;Создадим стрим подтверждений сделок
CREATE OR REPLACE STREAM accepts (id INT KEY, t TIMESTAMP)
WITH(kafka_topic='accepts', value_format='json', partitions=1, timestamp='t');Опубликуем события подтвеждения сделок
python3 << EOD #| docker exec -i ksqldb-cli ksql http://ksqldb-server:8088
print("\n".join([f"INSERT INTO accepts (id, t) VALUES ({n+1}, '2022-06-18T00:00:{n:02d}.100');" for n in range(0, 5)]))
EODINSERT INTO accepts (id, t) VALUES (1, '2022-06-18T00:00:00.100');
INSERT INTO accepts (id, t) VALUES (2, '2022-06-18T00:00:01.100');
INSERT INTO accepts (id, t) VALUES (3, '2022-06-18T00:00:02.100');
INSERT INTO accepts (id, t) VALUES (4, '2022-06-18T00:00:03.100');
INSERT INTO accepts (id, t) VALUES (5, '2022-06-18T00:00:04.100'); Создадим Stream c join сделок и подтверждений: подтвержденные сделки
-- Чтобы запросить все изменения с начала времени
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM accepted AS
SELECT client_id,
a.id as id,
amount,
t.t,
a.t
FROM trades t INNER JOIN accepts a
WITHIN 24 HOURS GRACE PERIOD 0 SECONDS
ON a.id = t.id
PARTITION BY a.id
EMIT CHANGES;Запросим новый стрим
SELECT client_id,
id,
amount,
FORMAT_TIMESTAMP(t_t, 'mm:ss:SSS') as trade_time,
FORMAT_TIMESTAMP(a_t, 'mm:ss:SSS') as accept_time,
ROWPARTITION
FROM accepted
EMIT CHANGES;Новые сделки без подтверждения не видно
INSERT INTO trades (id, client_id, amount, t) VALUES (6, 2, 10, '2022-06-18T00:00:05.000');
INSERT INTO trades (id, client_id, amount, t) VALUES (7, 1, 10, '2022-06-18T00:00:06.000');
INSERT INTO trades (id, client_id, amount, t) VALUES (8, 2, 10, '2022-06-18T00:00:07.000');
INSERT INTO trades (id, client_id, amount, t) VALUES (9, 1, 10, '2022-06-18T00:00:08.000');
INSERT INTO trades (id, client_id, amount, t) VALUES (10, 2, 10, '2022-06-18T00:00:09.000'); Пришлём подтверждение
INSERT INTO accepts (id, t) VALUES (6, '2022-06-18T00:00:05.100');
INSERT INTO accepts (id, t) VALUES (7, '2022-06-18T00:00:06.100');
INSERT INTO accepts (id, t) VALUES (8, '2022-06-18T00:00:07.100');
INSERT INTO accepts (id, t) VALUES (9, '2022-06-18T00:00:08.100');
INSERT INTO accepts (id, t) VALUES (10, '2022-06-18T00:00:09.100'); Посмотрим группировку по клиентам. Здесь мы превращаем поток событий в поток изменений (change log) значений key/value, где ключ client_id, а значение - агрегаты sum, cnt.
SELECT client_id,
SUM(amount) as sum,
COUNT(amount) as cnt
FROM accepted
GROUP BY client_id EMIT CHANGES;Здесь (в конце) мы сделаем таблицу с последними значениями тих агрегатов: KSQL Table Example