1、尝试打宽表

2、创建流和表:

CREATE TABLE test1_users (user_id VARCHAR,user_name VARCHAR)
WITH (kafka_topic='test1_users', partitions=1, value_format='json', key='user_id');
CREATE STREAM test1_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)
WITH (kafka_topic='test1_orders', partitions=1, value_format='json', key='order_id');

用户表:由用户id,用户名组成

订单表:由订单id,产品名,用户id组成

3、尝试打宽表

SELECT o.order_id,o.product_name,u.user_name

FROM test1_orders o JOIN test1_users u ON o.user_id = u.user_id EMIT CHANGES;

4、插入实验用的用户数据

在另一个终端里:

INSERT INTO test1_users (user_id, user_name) VALUES ('001', '余柠');

INSERT INTO test1_users (user_id, user_name) VALUES ('002', '张三');

INSERT INTO test1_users (user_id, user_name) VALUES ('003', '李四');

INSERT INTO test1_users (user_id, user_name) VALUES ('004', '王五');

5、插入实验用的订单数据:

INSERT INTO test1_orders (order_id, product_name,user_id) VALUES ('dd0001','飞利浦抽湿机H20','001');

插入后得到了预期的数据:

+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+

|ORDER_ID |PRODUCT_NAME |USER_NAME |

+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+

|dd0001 |飞利浦抽湿机H20 |余柠 |

6、修改用户表:

INSERT INTO test1_users (user_id, user_name) VALUES ('001', '柠檬叔');

二号终端没有任何的反应,没有触发任何的更改;

8、触发一次流的更新:

INSERT INTO test1_orders (order_id, product_name,user_id) VALUES ('dd0001','飞利浦抽湿机H20','001');

|dd0001 |飞利浦抽湿机H20 |柠檬叔 |

9、再创建一对表流

CREATE TABLE t_test1_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)

WITH (kafka_topic='test1_orders', partitions=1, value_format='json', key='user_id');

CREATE STREAM s_test1_users (user_id VARCHAR,user_name VARCHAR)

WITH (kafka_topic='test1_users', partitions=1, value_format='json', key='user_id');

10、创建监听

SELECT o.order_id,o.product_name,u.user_name

FROM t_test1_orders o JOIN s_test1_users u ON o.user_id = u.user_id EMIT CHANGES;

ksql> SELECT o.order_id,o.product_name,u.user_name

>FROM t_test1_orders o JOIN s_test1_users u ON o.user_id = u.user_id EMIT CHANGES;

Join between invalid operands requested: left type: KTABLE, right type: KSTREAM

ksql>

失败了,重新写;

-------------------------------------------------------------------

SELECT o.order_id,o.product_name,u.user_name

FROM s_test1_users u JOIN t_test1_orders o ON u.user_id = o.user_id EMIT CHANGES;

ksql> SELECT o.order_id,o.product_name,u.user_name

>FROM s_test1_users u JOIN t_test1_orders o ON u.user_id = o.user_id EMIT CHANGES;

Source table (O) key column (O.ORDER_ID) is not the column used in the join criteria (O.USER_ID). Only the table's key column or 'ROWKEY' is supported in the join criteria.

ksql>

-------------------------------------------------------------------

又失败了,原因是,user_id是user表的主键,但是却不是订单表的主键;

-------------------------------------------------------------------

ksql> CREATE TABLE t_test1_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)

> WITH (kafka_topic='test1_orders', partitions=1, value_format='json', key='user_id');

Message

---------------

Table created

---------------

ksql> SELECT o.order_id,o.product_name,u.user_name

>FROM s_test1_users u JOIN t_test1_orders o ON u.user_id = o.user_id EMIT CHANGES;

Press CTRL-C to interrupt

成功了;

11、修改第一步的表:

INSERT INTO test1_users (user_id, user_name) VALUES ('001', '余柠');

哦呵呵,什么都没有发生;

INSERT INTO s_test1_users (user_id, user_name) VALUES ('001', '余柠');

还是没有任何事发生???

为啥嘞?

好吧,再来

INSERT INTO t_test1_orders (order_id, product_name,user_id) VALUES ('dd0001','飞利浦抽湿机H20','001');

INSERT INTO t_test1_orders (order_id, product_name,user_id) VALUES ('dd0002','逻辑K380键盘','001');

有趣的事情发生了,我在操作t_test1_orders的时候

第一步监听所建立的test1_orders(STREAM)的监听语句也被trigger了

ksql> SELECT o.order_id,o.product_name,u.user_name

>FROM s_test1_users u JOIN t_test1_orders o ON u.user_id = o.user_id EMIT CHANGES;

Exception while preparing statement: T_TEST1_ORDERS does not exist.

Statement: SELECT o.order_id,o.product_name,u.user_name

FROM s_test1_users u JOIN t_test1_orders o ON u.user_id = o.user_id EMIT CHANGES;

Caused by: T_TEST1_ORDERS does not exist.

ksql> CREATE TABLE t_test1_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)

> WITH (kafka_topic='test1_orders', partitions=1, value_format='json', key='user_id');

Message

---------------

Table created

---------------

ksql> SELECT o.order_id,o.product_name,u.user_name

>FROM s_test1_users u JOIN t_test1_orders o ON u.user_id = o.user_id EMIT CHANGES;

+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+

|ORDER_ID |PRODUCT_NAME |USER_NAME |

+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+

|dd0002 |逻辑K380键盘 |余柠 |

|dd0002 |逻辑K380键盘 |余柠 |

|dd0002 |逻辑K380键盘 |余柠 |

|dd0002 |逻辑K380键盘 |余柠 |

|dd0002 |逻辑K380键盘 |柠檬叔 |

Press CTRL-C to interrupt

顺利触发了,但是很可惜,只会去关联出订单dd0002

这不符合我们的期望,这就是所谓的一对多?是不是要用left join?