Skip to content

KSQL Table Example

Tmr edited this page Jun 17, 2022 · 1 revision

KSQL Table Example

Сначала выплнить KSQL Streams Example

Удалить объекты, если нужно начать сначала

DROP TABLE IF EXISTS last_agg DELETE TOPIC;
DROP STREAM IF EXISTS accepted_agg_changes;
DROP TABLE IF EXISTS accepted_agg DELETE TOPIC;
DROP STREAM IF EXISTS clients_deleted DELETE TOPIC;
DROP TABLE IF EXISTS clients DELETE TOPIC;
DROP TABLE IF EXISTS clients_raw DELETE TOPIC;

Создадим таблицу с клиентами (client_id, name), по сути это change log таблицы, но в виде compacted log.

CREATE OR REPLACE TABLE clients_raw (client_id INT PRIMARY KEY, name VARCHAR) 
 WITH (kafka_topic='clients_raw', value_format='json', partitions=1);

Вставим данные

INSERT INTO clients_raw (client_id, name) VALUES (1, 'Client 1');
INSERT INTO clients_raw (client_id, name) VALUES (2, 'Client 2');
INSERT INTO clients_raw (client_id, name) VALUES (3, 'Client 3');
INSERT INTO clients_raw (client_id, name) VALUES (4, 'Client 4');
INSERT INTO clients_raw (client_id, name) VALUES (5, 'Client 5');

Создадим материализованное представление для Clients, теперь мы можем выполнять к нему Pull запросы и видеть последние значения (а не change log, как в clients_raw).

CREATE TABLE clients AS
SELECT * FROM clients_raw;

Посмотрим на DESCRIBE

describe clients extended;

Запросим данные (Pull)

SELECT * FROM clients;

Обновим запись

INSERT INTO clients_raw (client_id, name) VALUES (3, 'Cli 3');

И посмотрим результат

SELECT * FROM clients

Чтобы удалить запись нужно вставить в топик tombstone для ключа удаляемой записи, сделаем это через другой стрим с форматом KAFKA: stackoverflow

CREATE STREAM clients_deleted (client_id INT KEY, dummy VARCHAR) 
  WITH (kafka_topic='clients_raw', value_format='KAFKA');

INSERT INTO clients_deleted (client_id, dummy) VALUES (5, null);  

И посмотрим результат

SELECT * FROM clients

Сджоиним со стримом сделок: для каждой сделки привяжем клиента по client_id

SET 'auto.offset.reset' = 'earliest';
SELECT id,
       amount,
       a.client_id,
       c.name,
       FORMAT_TIMESTAMP(t_t, 'mm:ss:SSS') as trade_time,
       FORMAT_TIMESTAMP(a_t, 'mm:ss:SSS') as accept_time
  FROM accepted a INNER JOIN clients c ON a.client_id = c.client_id
  EMIT CHANGES;

Посчитаем Tumbling Windows по сделкам клиента 1, и увидим поток изменений (change log), в котором некоторые промежуточные события могут "схлопнуться" из-за буфферизации.

SET 'auto.offset.reset' = 'earliest';
SELECT SUM(amount) as sum,
       COUNT(amount) as cnt,
       FORMAT_TIMESTAMP(from_unixtime(WINDOWSTART), 'mm:ss:SSS') as window_start,
       FORMAT_TIMESTAMP(from_unixtime(WINDOWEND), 'mm:ss:SSS') as window_end
  FROM trades WINDOW TUMBLING (SIZE 5 SECONDS)
 WHERE client_id = 1
 GROUP BY client_id 
 EMIT CHANGES;

Посчитаем Hopping Windows по клиентам, и увидим поток изменений (change log) по окнам с перемешанным порядком событий, здесь ключем являются поля client_id, windowstart, windowend, и в рамках этого ключа события идут в строгом порядке.

-- Изменения в окнах
SET 'auto.offset.reset' = 'earliest';
SELECT client_id,
       SUM(amount) as sum,
       COUNT(amount) as cnt,
       FORMAT_TIMESTAMP(from_unixtime(WINDOWSTART), 'mm:ss:SSS') as window_start,
       FORMAT_TIMESTAMP(from_unixtime(WINDOWEND), 'mm:ss:SSS') as window_end
FROM accepted WINDOW HOPPING (SIZE 5 SECONDS, ADVANCE BY 2 SECOND)
 WHERE client_id = 1
 GROUP BY client_id EMIT CHANGES;

"Схлопнем"(reduce или fold или aggregate) change log в конечные значения в таблицу с ключем (client_id, windowstart, windowend) и значениями

CREATE TABLE accepted_agg AS
SELECT a.client_id as client_id, 
       AS_VALUE(a.client_id) as c_id,
       SUM(amount) as sum,
       COUNT(amount) as cnt
FROM accepted a
WINDOW HOPPING (SIZE 5 SECONDS, ADVANCE BY 2 SECOND)
 GROUP BY a.client_id
 EMIT FINAL;

Посмотрим результат для клиента Client 1

SELECT client_id,
       --name,
       sum,
       cnt, 
       FORMAT_TIMESTAMP(from_unixtime(windowstart), 'mm:ss:SSS') as window_start,
       FORMAT_TIMESTAMP(from_unixtime(windowend), 'mm:ss:SSS') as window_end
FROM accepted_agg
 WHERE client_id = 1;

Превратим таблицу с сключем (client_id, windowstart, windowend) в поток изменений для таблицы с ключем client_id, которую сделаем следующим шагом

CREATE STREAM accepted_agg_changes (c_id INT, sum INT, cnt INT)
WITH (kafka_topic='ACCEPTED_AGG', value_format='json');

Сделаем таблицу с последними значениями агрегатов и "обогащенную" именами клиентов.

CREATE TABLE last_agg WITH (key_format='json')  
AS
SELECT c_id as client_id,
       name,
       LATEST_BY_OFFSET(sum) as sum,
       LATEST_BY_OFFSET(cnt) as cnt
FROM accepted_agg_changes a INNER JOIN clients c ON a.с_id = c.client_id
 GROUP BY c_id, name;

Запросим последние значения агрегатов sum и cnt

SELECT * FROM accepted_agg_changes;

Сделаем Push запрос агрегатов в одном окне

SET 'auto.offset.reset' = 'earliest';
SELECT * FROM accepted_agg_changes EMIT CHANGES;

А в другом вставим несколько сделок и подтверждений

INSERT INTO trades (id, client_id, amount, t) VALUES (11, 1, 10, '2022-06-18T00:00:10.000');  
INSERT INTO trades (id, client_id, amount, t) VALUES (12, 2, 10, '2022-06-18T00:00:11.000');  
INSERT INTO trades (id, client_id, amount, t) VALUES (13, 1, 10, '2022-06-18T00:00:12.000');  
INSERT INTO trades (id, client_id, amount, t) VALUES (14, 2, 10, '2022-06-18T00:00:13.000');  
INSERT INTO trades (id, client_id, amount, t) VALUES (15, 1, 10, '2022-06-18T00:00:14.000');  

INSERT INTO accepts (id, t) VALUES (11, '2022-06-18T00:00:10.100');  
INSERT INTO accepts (id, t) VALUES (12, '2022-06-18T00:00:11.100');  
INSERT INTO accepts (id, t) VALUES (13, '2022-06-18T00:00:12.100');  
INSERT INTO accepts (id, t) VALUES (14, '2022-06-18T00:00:13.100');  
INSERT INTO accepts (id, t) VALUES (15, '2022-06-18T00:00:14.100'); 

Clone this wiki locally