1、打宽表实验2
基于实验1
https://www.douban.com/note/746216957/
尝试重开全新的topic,stream和table来实验,一对多这块的问题;
2、建立流和表
CREATE TABLE t_test2_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)
WITH (kafka_topic='test2_orders', partitions=1, value_format='json', key='order_id');
CREATE STREAM s_test2_users (user_id VARCHAR,user_name VARCHAR)
WITH (kafka_topic='test2_users', partitions=1, value_format='json', key='user_id');
3、插入试验用的订单数据;
INSERT INTO t_test2_orders (order_id, product_name,user_id) VALUES ('dd0001','DRETEC温湿度计','001');
INSERT INTO t_test2_orders (order_id, product_name,user_id) VALUES ('dd0002','华硕路由器TEK','001');
INSERT INTO t_test2_orders (order_id, product_name,user_id) VALUES ('dd0003','BOSE音箱 MINI','002');
4、建立所谓的监听
SELECT o.order_id,o.product_name,u.user_name
FROM s_test2_users u JOIN t_test2_orders o ON u.user_id = o.user_id EMIT CHANGES;
5、插入用户数据
INSERT INTO s_test2_users (user_id, user_name) VALUES ('001', '余柠');
INSERT INTO s_test2_users (user_id, user_name) VALUES ('002', '张三');
INSERT INTO s_test2_users (user_id, user_name) VALUES ('003', '李四');
INSERT INTO s_test2_users (user_id, user_name) VALUES ('004', '王五');
瞬间即被触发了:
ksql> SELECT o.order_id,o.product_name,u.user_name
>FROM s_test2_users u JOIN t_test2_orders o ON u.user_id = o.user_id EMIT CHANGES;
+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+
|ORDER_ID |PRODUCT_NAME |USER_NAME |
+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+
|dd0002 |华硕路由器TEK |余柠 |
|dd0003 |BOSE音箱 MINI |张三
6、更新维表数据:
INSERT INTO s_test2_users (user_id, user_name) VALUES ('001', 'yuning');
|dd0002 |华硕路由器TEK |yuning
7、如同实验1,与topic看来是无关了;
8、修改监听语句:
SELECT o.order_id,o.product_name,u.user_name
FROM s_test2_users u LEFT JOIN t_test2_orders o ON u.user_id = o.user_id EMIT CHANGES;
修改为LEFT JOIN后无效果
SELECT o.order_id,o.product_name,u.user_name
FROM s_test2_users u LEFT JOIN t_test2_orders o ON o.user_id = u.user_id EMIT CHANGES;
9、导致实验失败的原因
ksql> SELECT o.order_id,o.product_name,u.user_name
>FROM s_test2_users u LEFT JOIN t_test2_orders o ON u.user_id = o.user_id EMIT CHANGES;
Source table (O) key column (O.ORDER_ID) is not the column used in the join criteria (O.USER_ID). Only the table's key column or 'ROWKEY' is supported in the join criteria.
ksql>
左边的表是user表,是流表
右边的表是order表,是table维度表
他们之间的join要求,在建表的时候
key值必须是一样的
CREATE TABLE t_test2_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)
WITH (kafka_topic='test2_orders', partitions=1, value_format='json', key='order_id');
ORDER表这么建是没有问题的
但是如果改成:
CREATE TABLE t_test2_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)
WITH (kafka_topic='test2_orders', partitions=1, value_format='json', key='user_id');
倒是可以join了,因为key值是一样的
但是,问题来了
INSERT INTO t_test2_orders (order_id, product_name,user_id) VALUES ('dd0001','DRETEC温湿度计','001');
INSERT INTO t_test2_orders (order_id, product_name,user_id) VALUES ('dd0002','华硕路由器TEK','001');
两条订单数据
如果均已,001为key 的话,
('dd0002','华硕路由器TEK','001');
会覆盖掉第一条
所以join,永远只能join一条数据;