1、建表

CREATE STREAM s_test4_orders (order_id VARCHAR,product_name VARCHAR,user_id VARCHAR)

WITH (kafka_topic='s_test4_orders', partitions=1, value_format='json', key='order_id');

CREATE STREAM s_test4_users (user_id VARCHAR,user_name VARCHAR)

WITH (kafka_topic='s_test4_users', partitions=1, value_format='json', key='user_id');

2、监听

SELECT o.order_id,o.product_name,u.user_name

FROM s_test4_orders o JOIN s_test4_users u WITHIN 10 minute ON o.user_id = u.user_id EMIT CHANGES;

3、插入订单

INSERT INTO s_test4_orders (order_id, product_name,user_id) VALUES ('dd0001','DRETEC温湿度计','001');

INSERT INTO s_test4_orders (order_id, product_name,user_id) VALUES ('dd0002','华硕路由器TEK','001');

INSERT INTO s_test4_orders (order_id, product_name,user_id) VALUES ('dd0003','BOSE音箱 MINI','002');

4、插入用户

INSERT INTO s_test4_users (user_id, user_name) VALUES ('001', '余柠');

INSERT INTO s_test4_users (user_id, user_name) VALUES ('002', '张三');

INSERT INTO s_test4_users (user_id, user_name) VALUES ('003', '李四');

INSERT INTO s_test4_users (user_id, user_name) VALUES ('004', '王五');

5、这才是我真正想要的东西......

可又有什么用呢?

|dd0001 |DRETEC温湿度计 |余柠 |

|dd0002 |华硕路由器TEK |余柠 |

|dd0003 |BOSE音箱 MINI |张三 |