1、继承上一个实验
https://docs.ksqldb.io/en/latest/concepts/stream-processing/
2、创建流和表:
CREATE TABLE products (product_name VARCHAR, cost DOUBLE)
WITH (kafka_topic='products', partitions=1, value_format='json', key='product_name');
CREATE STREAM orders (product_name VARCHAR)
WITH (kafka_topic='orders', partitions=1, value_format='json', key='product_name');
产品为一个表,orders是一个流,这倒是很符合现实,因为订单建模的时候应该就是一个不可变数据项
-----------------------
ksql> CREATE TABLE products (product_name VARCHAR, cost DOUBLE)
> WITH (kafka_topic='products', partitions=1, value_format='json', key='product_name');
>
>CREATE STREAM orders (product_name VARCHAR)
> WITH (kafka_topic='orders', partitions=1, value_format='json', key='product_name');
CREATE TABLE products (product_name VARCHAR, cost DOUBLE)
WITH (kafka_topic='products', partitions=1, value_format='json', key='product_name');
Message
---------------
Table created
---------------
CREATE STREAM orders (product_name VARCHAR)
WITH (kafka_topic='orders', partitions=1, value_format='json', key='product_name');
Message
----------------
Stream created
----------------
3、创建流表关联
使用(2)的终端,来创建一张新表:
CREATE TABLE order_metrics AS
SELECT p.product_name, COUNT(*) AS count, SUM(p.cost) AS revenue
FROM orders o JOIN products p ON p.product_name = o.product_name
GROUP BY p.product_name EMIT CHANGES;
-----------------------
----------------
ksql> CREATE TABLE order_metrics AS
> SELECT p.product_name, COUNT(*) AS count, SUM(p.cost) AS revenue
> FROM orders o JOIN products p ON p.product_name = o.product_name
> GROUP BY p.product_name EMIT CHANGES;
Message
-----------------------------------------------------------------------------------------------
Table ORDER_METRICS created and running. Created by query with query ID: CTAS_ORDER_METRICS_4
-----------------------------------------------------------------------------------------------
ksql>
4、插入产品数据:
INSERT INTO products (product_name, cost) VALUES ('罗技k380键盘', 130);
INSERT INTO products (product_name, cost) VALUES ('iPhone 11 Pro', 13000);
INSERT INTO products (product_name, cost) VALUES ('飞利浦抽湿机H210', 2300);
INSERT INTO products (product_name, cost) VALUES ('HP显示器', 1650);
结果:
ksql> INSERT INTO products (product_name, cost) VALUES ('罗技k380键盘', 130);
>INSERT INTO products (product_name, cost) VALUES ('iPhone 11 Pro', 13000);
>INSERT INTO products (product_name, cost) VALUES ('飞利浦抽湿机H210', 2300);
>INSERT INTO products (product_name, cost) VALUES ('HP显示器', 1650);
ksql>
5、插入流数据:
INSERT INTO orders (product_name) VALUES ('罗技k380键盘');
6、取得这个聚合物化视图表的结果:
ksql> SELECT * FROM order_metrics WHERE ROWKEY='罗技k380键盘';
+---------------------------------------------------------------+---------------------------------------------------------------+---------------------------------------------------------------+---------------------------------------------------------------+
|ROWKEY |P_PRODUCT_NAME |COUNT |REVENUE |
+---------------------------------------------------------------+---------------------------------------------------------------+---------------------------------------------------------------+---------------------------------------------------------------+
|罗技k380键盘 |罗技k380键盘 |5 |650.0 |
Query terminated
ksql>
7、概念笔记:
订单表==orders表
产品表==普通的表
用groupby建立了一个聚合表
order_metrics
是用产品名做的,订单数的count,以及对应的利润数据,REVENUE
当然,还有一个,就是最终的query
这里唯一让我有点懵逼的地方是,这个ROWKEY,就是PRODUCT_NAME,但是却必须需要指定;
ROWKEY | P_PRODUCT_NAME |COUNT |REVENUE
罗技k380键盘 | 罗技k380键盘 |5 |650.0|
一共产生了5条订单,并且最终的结果是650的利润
8、这里有一个冗余表的问题
就是说,如果罗技k380键盘 ,作为一个产品,它的价格是130,然后产生的订单,关联后得到的一刻,就是1x130=130,然后得到了它的价格
如果罗技k380键盘 的价格变化后,理论上,之前的订单不应当理会这个价格改变,所以需要看最后这个流式处理怎么搞
如果不行就只能触发重算,这就是物化视图的麻烦之处;
如果要修改以前发生的订单价格的话就会很麻烦;
当然,这个例子本身就不好,因为价格这个东西,应该是订单的一部分做出来作为冗余;这是具体的建模问题;
9、sink的问题
将group过后的数据,做沉降,沉降到es结果表里面去,这样就省去了es的聚合操作;
这也很重要;
另外,怎样做数据重放,这也是个问题,最直观的方法就是设置超大数据硬盘,讲所有的changes,原封不动得落硬盘
用kafka作为数据表的审计表