1、尝试打宽表
2、创建流和表:
CREATE TABLE test1_users (user_id VARCHAR,user_name VARCHAR)
WITH (kafka_topic='test1_users', partitions=1, value_format='json', key='user_id');
CREATE STREAM test1_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)
WITH (kafka_topic='test1_orders', partitions=1, value_format='json', key='order_id');
用户表:由用户id,用户名组成
订单表:由订单id,产品名,用户id组成
3、尝试打宽表
SELECT o.order_id,o.product_name,u.user_name
FROM test1_orders o JOIN test1_users u ON o.user_id = u.user_id EMIT CHANGES;
4、插入实验用的用户数据
在另一个终端里:
INSERT INTO test1_users (user_id, user_name) VALUES ('001', '余柠');
INSERT INTO test1_users (user_id, user_name) VALUES ('002', '张三');
INSERT INTO test1_users (user_id, user_name) VALUES ('003', '李四');
INSERT INTO test1_users (user_id, user_name) VALUES ('004', '王五');
5、插入实验用的订单数据:
INSERT INTO test1_orders (order_id, product_name,user_id) VALUES ('dd0001','飞利浦抽湿机H20','001');
插入后得到了预期的数据:
+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+
|ORDER_ID |PRODUCT_NAME |USER_NAME |
+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+
|dd0001 |飞利浦抽湿机H20 |余柠 |
6、修改用户表:
INSERT INTO test1_users (user_id, user_name) VALUES ('001', '柠檬叔');
二号终端没有任何的反应,没有触发任何的更改;
8、触发一次流的更新:
INSERT INTO test1_orders (order_id, product_name,user_id) VALUES ('dd0001','飞利浦抽湿机H20','001');
|dd0001 |飞利浦抽湿机H20 |柠檬叔 |
9、再创建一对表流
CREATE TABLE t_test1_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)
WITH (kafka_topic='test1_orders', partitions=1, value_format='json', key='user_id');
CREATE STREAM s_test1_users (user_id VARCHAR,user_name VARCHAR)
WITH (kafka_topic='test1_users', partitions=1, value_format='json', key='user_id');
10、创建监听
SELECT o.order_id,o.product_name,u.user_name
FROM t_test1_orders o JOIN s_test1_users u ON o.user_id = u.user_id EMIT CHANGES;
ksql> SELECT o.order_id,o.product_name,u.user_name
>FROM t_test1_orders o JOIN s_test1_users u ON o.user_id = u.user_id EMIT CHANGES;
Join between invalid operands requested: left type: KTABLE, right type: KSTREAM
ksql>
失败了,重新写;
-------------------------------------------------------------------
SELECT o.order_id,o.product_name,u.user_name
FROM s_test1_users u JOIN t_test1_orders o ON u.user_id = o.user_id EMIT CHANGES;
ksql> SELECT o.order_id,o.product_name,u.user_name
>FROM s_test1_users u JOIN t_test1_orders o ON u.user_id = o.user_id EMIT CHANGES;
Source table (O) key column (O.ORDER_ID) is not the column used in the join criteria (O.USER_ID). Only the table's key column or 'ROWKEY' is supported in the join criteria.
ksql>
-------------------------------------------------------------------
又失败了,原因是,user_id是user表的主键,但是却不是订单表的主键;
-------------------------------------------------------------------
ksql> CREATE TABLE t_test1_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)
> WITH (kafka_topic='test1_orders', partitions=1, value_format='json', key='user_id');
Message
---------------
Table created
---------------
ksql> SELECT o.order_id,o.product_name,u.user_name
>FROM s_test1_users u JOIN t_test1_orders o ON u.user_id = o.user_id EMIT CHANGES;
Press CTRL-C to interrupt
成功了;
11、修改第一步的表:
INSERT INTO test1_users (user_id, user_name) VALUES ('001', '余柠');
哦呵呵,什么都没有发生;
INSERT INTO s_test1_users (user_id, user_name) VALUES ('001', '余柠');
还是没有任何事发生???
为啥嘞?
好吧,再来
INSERT INTO t_test1_orders (order_id, product_name,user_id) VALUES ('dd0001','飞利浦抽湿机H20','001');
INSERT INTO t_test1_orders (order_id, product_name,user_id) VALUES ('dd0002','逻辑K380键盘','001');
有趣的事情发生了,我在操作t_test1_orders的时候
第一步监听所建立的test1_orders(STREAM)的监听语句也被trigger了
ksql> SELECT o.order_id,o.product_name,u.user_name
>FROM s_test1_users u JOIN t_test1_orders o ON u.user_id = o.user_id EMIT CHANGES;
Exception while preparing statement: T_TEST1_ORDERS does not exist.
Statement: SELECT o.order_id,o.product_name,u.user_name
FROM s_test1_users u JOIN t_test1_orders o ON u.user_id = o.user_id EMIT CHANGES;
Caused by: T_TEST1_ORDERS does not exist.
ksql> CREATE TABLE t_test1_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)
> WITH (kafka_topic='test1_orders', partitions=1, value_format='json', key='user_id');
Message
---------------
Table created
---------------
ksql> SELECT o.order_id,o.product_name,u.user_name
>FROM s_test1_users u JOIN t_test1_orders o ON u.user_id = o.user_id EMIT CHANGES;
+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+
|ORDER_ID |PRODUCT_NAME |USER_NAME |
+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+
|dd0002 |逻辑K380键盘 |余柠 |
|dd0002 |逻辑K380键盘 |余柠 |
|dd0002 |逻辑K380键盘 |余柠 |
|dd0002 |逻辑K380键盘 |余柠 |
|dd0002 |逻辑K380键盘 |柠檬叔 |
Press CTRL-C to interrupt
顺利触发了,但是很可惜,只会去关联出订单dd0002
这不符合我们的期望,这就是所谓的一对多?是不是要用left join?