1、打宽表实验2

基于实验1

https://www.douban.com/note/746216957/

尝试重开全新的topic,stream和table来实验,一对多这块的问题;

2、建立流和表

CREATE TABLE t_test2_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)

WITH (kafka_topic='test2_orders', partitions=1, value_format='json', key='order_id');

CREATE STREAM s_test2_users (user_id VARCHAR,user_name VARCHAR)

WITH (kafka_topic='test2_users', partitions=1, value_format='json', key='user_id');

3、插入试验用的订单数据;

INSERT INTO t_test2_orders (order_id, product_name,user_id) VALUES ('dd0001','DRETEC温湿度计','001');

INSERT INTO t_test2_orders (order_id, product_name,user_id) VALUES ('dd0002','华硕路由器TEK','001');

INSERT INTO t_test2_orders (order_id, product_name,user_id) VALUES ('dd0003','BOSE音箱 MINI','002');

4、建立所谓的监听

SELECT o.order_id,o.product_name,u.user_name

FROM s_test2_users u JOIN t_test2_orders o ON u.user_id = o.user_id EMIT CHANGES;

5、插入用户数据

INSERT INTO s_test2_users (user_id, user_name) VALUES ('001', '余柠');

INSERT INTO s_test2_users (user_id, user_name) VALUES ('002', '张三');

INSERT INTO s_test2_users (user_id, user_name) VALUES ('003', '李四');

INSERT INTO s_test2_users (user_id, user_name) VALUES ('004', '王五');

瞬间即被触发了:

ksql> SELECT o.order_id,o.product_name,u.user_name

>FROM s_test2_users u JOIN t_test2_orders o ON u.user_id = o.user_id EMIT CHANGES;

+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+

|ORDER_ID |PRODUCT_NAME |USER_NAME |

+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+

|dd0002 |华硕路由器TEK |余柠 |

|dd0003 |BOSE音箱 MINI |张三

6、更新维表数据:

INSERT INTO s_test2_users (user_id, user_name) VALUES ('001', 'yuning');

|dd0002 |华硕路由器TEK |yuning

7、如同实验1,与topic看来是无关了;

8、修改监听语句:

SELECT o.order_id,o.product_name,u.user_name

FROM s_test2_users u LEFT JOIN t_test2_orders o ON u.user_id = o.user_id EMIT CHANGES;

修改为LEFT JOIN后无效果

SELECT o.order_id,o.product_name,u.user_name

FROM s_test2_users u LEFT JOIN t_test2_orders o ON o.user_id = u.user_id EMIT CHANGES;

9、导致实验失败的原因

ksql> SELECT o.order_id,o.product_name,u.user_name

>FROM s_test2_users u LEFT JOIN t_test2_orders o ON u.user_id = o.user_id EMIT CHANGES;

Source table (O) key column (O.ORDER_ID) is not the column used in the join criteria (O.USER_ID). Only the table's key column or 'ROWKEY' is supported in the join criteria.

ksql>

左边的表是user表,是流表

右边的表是order表,是table维度表

他们之间的join要求,在建表的时候

key值必须是一样的

CREATE TABLE t_test2_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)

WITH (kafka_topic='test2_orders', partitions=1, value_format='json', key='order_id');

ORDER表这么建是没有问题的

但是如果改成:

CREATE TABLE t_test2_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)

WITH (kafka_topic='test2_orders', partitions=1, value_format='json', key='user_id');

倒是可以join了,因为key值是一样的

但是,问题来了

INSERT INTO t_test2_orders (order_id, product_name,user_id) VALUES ('dd0001','DRETEC温湿度计','001');

INSERT INTO t_test2_orders (order_id, product_name,user_id) VALUES ('dd0002','华硕路由器TEK','001');

两条订单数据

如果均已,001为key 的话,

('dd0002','华硕路由器TEK','001');

会覆盖掉第一条

所以join,永远只能join一条数据;