百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

流式数据库 KSQL 概念详解 (二)

wptr33 2024-12-08 19:12 13 浏览

联接 Join 集合

可以使用具有SQL连接语法的JOIN语句,使用ksqlDB实时合并事件流。ksqlDB连接和关系数据库连接的相似之处在于,它们都基于通用值组合了来自两个或多个源的数据。ksqlDB连接的结果是一个新的流或表,其中填充了您在SELECT语句中指定的列值。

使用ksqlDB,无需围绕连接流和表编写低级逻辑,因此可以专注于用于组合流数据的业务逻辑。

可以通过以下方式联接流和表:

    • 连接多个流以创建新的流。
    • 连接多个表以创建一个新表。
    • 连接多个流和表以创建一个新的流。

Join 语句

ksqlDB JOIN子句具有SQL JOIN子句的熟悉语法。以下示例创建一个pageviews_enriched流,该流是pageviews流和users表的组合:

CREATE STREAM pageviews_enriched AS
  SELECT 
     users.userid AS userid, 
     pageid, 
     regionid, 
     gender 
  FROM pageviews
    LEFT JOIN users ON pageviews.userid = users.userid
  EMIT CHANGES;

当连接两个流时,必须指定WITHIN子句以匹配在指定时间间隔内都出现的记录。有关有效时间单位,请参见时间单位。

这里有一个例子流,信息流加入结合orders,payments与shipments流。结果shipped_orders流包含下订单后1小时内支付的所有订单,并在收到付款后2小时内发货。

CREATE STREAM shipped_orders AS
     SELECT 
        o.id as orderId 
        o.itemid as itemId,
        s.id as shipmentId,
        p.id as paymentId
     FROM orders o
        INNER JOIN payments p WITHIN 1 HOURS ON p.id = o.id
        INNER JOIN shipments s WITHIN 2 HOURS ON s.id = o.id;

Join和Windows

ksqlDB允许将具有相同键的记录进行分组,以便将有状态操作(例如联接)组合到windows中。您为窗口指定保留期,此保留期控制ksqlDB等待无序记录的时间。如果记录在窗口的保留期过后到达,则该记录将被丢弃,并且不会在该窗口中进行处理。

注意:仅对流连接流支持窗口。

每个记录键都跟踪Windows。在联接操作中,ksqlDB使用窗口状态存储区将到目前为止收到的所有记录存储在定义的窗口边界内。在指定的窗口保留期之后,将清除状态存储中的旧记录。

Join 需求

ksqlDB应用程序必须满足特定的需求,Join才能成功。

共分区的数据

连接时必须对输入数据进行共分区。这样可以确保在处理过程中,从连接的两侧将具有相同键的输入记录传递到同一流任务。联接时,用户有责任确保数据进行分区。

Join功能

ksqlDB支持大量的流和表联接操作,包括INNER,LEFT OUTER和FULL OUTER。通常,LEFT OUTER缩短为LEFT JOIN,而FULL OUTER缩短为OUTER JOIN。

不支持RIGHT OUTER JOIN。而是交换操作数并使用LEFT JOIN。

下表显示了支持的组合。


类型

INNER

LEFT OUTER

FULL OUTER

流-流

窗口式的

支持的

支持的

支持的

表-表

非窗口式

支持的

支持的

支持的

流-表

非窗口式

支持的

支持的

不支持

流流联接

ksqlDB支持流之间的INNER,LEFT OUTER和FULL OUTER连接。

所有这些操作都支持乱序记录。

要加入两个流,必须使用WITHIN子句指定一个窗口方案。一侧的新输入记录为另一侧的每个匹配记录生成联接输出,并且联接窗口中可以有多个此类匹配记录。

连接仅在将流标记为要重新分区时才对流进行数据重新分区。如果两个流都被标记,则都将被重新分区。

重要的提示

Kafka保证来自一个源分区的任何两个消息的相对顺序,前提是它们在重新分区后也都位于同一个分区中。否则,Kafka可能会交错插入事件。用例将确定这些订购保证是否可接受。

LEFT OUTER联接将在结果流中包含leftRecord-NULL记录,这意味着该联接包含从没有进行匹配的右侧流中选择的字段的NULL值。

FULL OUTER联接将在结果流中包含leftRecord-NULL或NULL-rightRecord记录,这意味着联接中包含来自未进行匹配的流的字段的NULL值。

流流连接的语义

下表显示了各种流-流连接变体的语义。在表中,每一行代表一个新的传入记录。适用以下假设:

  • 所有记录具有相同的键。
  • 所有记录都属于一个连接窗口。
  • 所有记录均按时间戳顺序处理。

收到新输入后,将在表中列出的条件下触发联接。具有NULL键或NULL值的输入记录将被忽略,并且不会触发联接。

时间戳记

左流

右流

内部联接 Inner

左联接 left

右联接 right

1个

Null





2个


Null




3

A



[A,Null]

[A,Null]

4


一个

[A,a]

[A,a]

[A,a]

5

B


[B,a]

[B,a]

[B,a]

6


b

[A,b],[B,b]

[A,b],[B,b]

[A,b],[B,b]

7

Null





8


Null




9

C


[C,a],[C,b]

[C,a],[C,b]

[C,a],[C,b]

10


C

[A,c],[B,c],[C,c]

[A,c],[B,c],[C,c]

[A,c],[B,c],[C,c]

11


Null




12

Null





13


Null




14


d

[A,d],[B,d],[C,d]

[A,d],[B,d],[C,d]

[A,d],[B,d],[C,d]

15

D


[D,a],[D,b],[D,c],[D,d]

[D,a],[D,b],[D,c],[D,d]

[D,a],[D,b],[D,c],[D,d]

流表联接

ksqlDB仅支持流和表之间的INNER和LEFT连接。

流表联接始终是非窗口联接。当新记录到达流上时,您可以针对表执行表查找。只有到达流侧的事件才触发下游更新并产生联接输出。表端的更新不会产生更新的联接输出。

流表联接仅在将流标记为要重新分区时才对流进行数据重新分区。

重要提示

ksqlDB当前在时间同步方面提供了最大的努力,但是没有保证,这可能会导致结果丢失或left Record-NULL结果。

流表联接的语义

下表显示了各种流表连接变体的语义。在表中,每一行代表一个新的传入记录。适用以下假设:

  • 所有记录具有相同的键。
  • 所有记录均按时间戳顺序处理。

仅左侧流的输入记录会触发联接。右侧表的输入记录仅更新内部右侧连接状态。

具有NULL值的表的输入记录被解释为对应键的逻辑删除,表示从表中删除了键。逻辑删除不会触发联接。

时间戳记

左流

右表

内部联接

左联接

1

Null




2


Null(墓碑)



3

A



[A,Null]

4


一个



5

B


[B,a]

[B,a]

6


b



7

Null




8


Null(墓碑)



9

C



[C,Null]

10


C



11


Null



12

Null




13


Null



14


d



15

D


[D,d]

[D,d]

请注意,即使表端稍后已填充,如果表端尚未包含键的值,INNER JOIN也不会产生任何输出。对于LEFT JOIN,相同的场景将导致leftRecord-NULL的输出。因此,重要的是接收流事件之前加载表数据。

ksqlDB尝试按事件时间顺序处理连接的两端,但是不能提供有力的保证,尤其是在存在乱序行的情况下。

为了最大程度地提高联接的可预测性,请确保源主题中提供了历史表数据,查询正在运行,并且ksqlDB开始生成流之前有足够的时间来处理表数据。

表表联接

ksqlDB支持表之间的INNER,LEFT OUTER和FULL OUTER连接。不支持与多个记录匹配的联接(一对多)。

表-表联接始终是非窗口联接。

表-表联接最终是一致的。

重要提示

ksqlDB当前在时间同步方面提供了最大的努力,但是没有保证,这可能会导致结果丢失或leftRecord-NULL结果。

表-表联接只能在其PRIMARY KEY字段上联接,并且不支持一对多(1:N)联接。

表-表联接的语义

下表显示了各种表-表连接变体的语义。在表中,每一行代表一个新的传入记录。适用以下假设:

  • 所有记录具有相同的键。
  • 所有记录均按时间戳顺序处理。

具有NULL值的输入记录被解释为相应键的逻辑删除,表示从表中删除了键。逻辑删除不会触发联接。如果接收到输入逻辑删除,则输出逻辑删除将直接转发到连接结果表(如果连接结果表中已经存在相应的键)。

时间戳记

左表

右表

内部联接

左联接

外连接

1

Null(墓碑)





2


Null(墓碑)




3

A



[A,Null]

[A,Null]

4


一个

[A,a]

[A,a]

[A,a]

5

B


[B,a]

[B,a]

[B,a]

6


b

[B,b]

[B,b]

[B,b]

7

Null(墓碑)


Null(墓碑)

Null(墓碑)

[null,b]

8


Null(墓碑)



Null(墓碑)

9

C



[C,Null]

[C,Null]

10


C

[C,c]

[C,c]

[C,c]

11


Null(墓碑)

Null(墓碑)

[C,Null]

[C,Null]

12

Null(墓碑)



Null(墓碑)

Null(墓碑)

13


Null(墓碑)




14


d



[n,d]

15

D


[D,d]

[D,d]

[D,d]

N向联接 Join

ksqlDB支持在单个语句中连接两个以上的源。这些连接在语义上等效于连续连接N个源,并且连接的顺序由写入连接的顺序控制。

以下面的查询为例,其中A是事件流,B并且C都是表:

CREATE STREAM joined AS 
  SELECT * 
  FROM A
    JOIN B ON A.id = B.product_id
    JOIN C ON A.id = C.purchased_id;

该查询的输出是流,中间连接结果将是stream A ? B。如果C是流而不是表,那么您将通过添加一个WITHIN子句来相应地重写连接,因为A ? Bwith与C是流-流连接:

CREATE STREAM joined AS 
  SELECT * 
  FROM A
    JOIN B ON A.id = B.product_id
    JOIN C WITHIN 10 SECONDS ON A.id = C.purchased_id;

N向联接的局限性

前面各节中对N向联接的每个中间步骤所描述的限制和限制。例如,FULL OUTER不支持流和表之间的联接。这意味着,如果N向联接中的任何阶段都解析为FULL OUTER流和表之间的联接,则整个查询将失败:

--- This JOIN fails with the following exception:
--- Join between invalid operands requested: left type: KTABLE, right type: KSTREAM
CREATE STREAM joined AS 
  SELECT * 
  FROM A
    JOIN B WITHIN 10 SECONDS ON A.id = B.product_id
    FULL OUTER JOIN C ON A.id = C.purchased_id;

分区要求

使用ksqlDB联接流数据时,必须确保流和表是共分区的,这意味着联接两侧的输入记录的分区配置都相同。

要连接两个数据源,流或表,ksqlDB需要根据连接列比较它们的记录。为确保具有相同联接列的记录在同一流任务上共置一处,联接列必须与源分区所在的列重合。

键 Keys

表始终按其分区PRIMARY KEY,而ksqlDB不允许对表进行重新分区,这意味着您只能将表的主键用作连接列。

流没有主键,但是有一个可选的 KEY 列。当存在KEY列时,定义了分区列。

流允许对除键列之外的表达式进行联接。当连接条件与KEY列不同时,ksqlDB在内部对流进行重新分区,这将隐式定义正确的键和分区。

重要提示

Kafka保证来自一个源分区的任何两个消息的相对顺序,前提是它们在重新分区后都位于同一个分区中。否则,Kafka可能会交错插入事件。用例将确定这些订购保证是否可接受。

以下示例显示了一个users表,该表clicks在点击的userId列上与流连接在一起。该users表具有id相同SQL类型的正确主键。该clicks流没有定义的键,因此ksqlDBuserId在执行连接之前在连接列()上对其内部重新分区以分配键。

-- clicks stream, with no or unknown key.
-- the schema of stream clicks is: USERID BIGINT | URL STRING
CREATE STREAM clicks (
    userId BIGINT, 
    url STRING
  ) WITH (
    kafka_topic='clickstream', 
    value_format='json'
  );

-- users table, with userId primary key. 
-- the schema of table users is: USERID BIGINT PRIMARY KEY | FULLNAME STRING
CREATE TABLE users (
    id BIGINT PRIMARY KEY, 
    fullName STRING
  ) WITH (
    kafka_topic='users', 
    value_format='json'
  );

-- join of users table with clicks stream, joining on the table's primary key and the stream's userId column:
-- join will automatically repartition clicks stream:
SELECT 
  c.userId,
  c.url, 
  u.fullName 
FROM clicks c
  JOIN users u ON c.userId = u.id;

共分区要求

使用ksqlDB联接流数据时,必须确保流和表是共分区的,这意味着联接两侧的输入记录的分区配置都相同。

  • 联接的输入记录必须具有相同的键架构。
  • 输入记录的两侧必须具有相同数量的分区。
  • 连接的两端必须具有相同的分区策略。

当您对输入进行共同分区时,在连接过程中,来自连接两侧的具有相同键的记录将被传递到同一流任务。

记录具有相同的键Schema

为了使联接生效,两端的键必须具有相同的SQL类型。

例如,您可以STRING将以用户ID为关键字的用户点击流与也以STRING用户ID为键的用户配置文件表结合在一起。双方具有完全相同的用户ID的记录将被合并。

如果您希望加入的列的架构不匹配,则CAST一侧可能会匹配另一侧。例如,如果INT联接的一侧有一个userId列,而另一侧是a LONG,那么您可以选择将INT一侧转换为a LONG:

-- stream with INT userId
CREATE STREAM clicks (
    userId INT KEY, 
    url STRING
  ) WITH (
    kafka_topic='clickstream', 
    value_format='json'
  );

-- table with BIGINT id stored in the key:
CREATE TABLE users (
    id BIGINT PRIMARY KEY, 
    fullName STRING
  ) WITH (
    kafka_topic='users', 
    value_format='json'
  );

-- Join utilising a CAST to convert the left sides join column to match the rights type.
SELECT 
  clicks.url, 
  users.fullName 
FROM clicks 
  JOIN users ON CAST(clicks.userId AS BIGINT) = users.id;

在现有Kafka主题之上创建的表(例如,使用CREATE TABLE语句创建的表)将根据Kafka主题中记录的键中所保存的数据进行键入。ksqlDB在PRIMARY KEY列中显示此数据。

在ksqlDB内部从其他源创建的表(例如,使用CREATE TABLE AS SELECT语句创建的表)将从其源复制键,除非存在显式GROUP BY或JOIN子句,否则该显式或子句可以更改键入该表的内容。

注意

如果联接需要,则ksqlDB会自动对流进行重新分区,但是ksqlDB会拒绝不是键的表列上的任何联接。这是因为ksqlDB不支持外键上的联接,并且对表的主题进行重新分区有可能对事件进行重新排序并错误地解释逻辑删除,这可能导致意外或意外的副作用。

如果在多个联接中使用相同的源,并且需要对数据进行重新分区,则您可能希望手动重新分区,以避免ksqlDB多次重新分区。

要对流进行重新分区,请使用PARTITION BY子句。请注意,只有在重新分区之后,Kafka才能保证来自一个源分区的任何两个消息的相对顺序,它们也都位于同一个分区中。否则,Kafka可能会交错插入邮事件。用例将确定这些订购保证是否可接受。

重要提示

如果PARTITION BY表达式的计算结果为NULL,则结果行将产生一个随机分区。您可能需要使用COALESCE来包装表达式并将所有NULL值转换为默认值,例如PARTITION BY COALESCE(MY_UDF_THAT_MAY_FAIL(Col0), 0)。

例如,如果您需要对要由product_id字段进行键控的流进行重新分区,并且需要将键分布在6个分区上才能进行联接,请使用以下SQL语句:

CREATE STREAM products_rekeyed 
  WITH (PARTITIONS=6) AS 
  SELECT * 
   FROM products
   PARTITION BY product_id;


记录具有相同数量的分区

联接的输入记录两侧必须具有相同数量的分区。

ksqlDB会检查这部分分区需求,并拒绝分区计数不同的任何连接。

使用DESCRIBE EXTENDED <source name>CLI中的命令确定源下的Kafka主题,并使用CLI中的SHOW TOPICS命令列出主题及其分区数。

如果联接的两面具有不同的分区数,则可能要更改源主题的分区数,或重新分配一面以匹配另一面的分区数。

以下示例创建一个重新分区的流,并使用指定数量的分区来维护现有键。

CREATE STREAM products_rekeyed 
  WITH (PARTITIONS=6) AS 
  SELECT * FROM products;

记录具有相同的分区策略

连接两侧的记录必须具有相同的分区策略。如果在所有应用程序中使用默认分区程序设置,并且生产者未指定显式分区,则无需担心分区策略。

但是,如果您的记录的生产者应用程序在配置中指定了自定义分区程序,则必须对联接两侧的记录使用相同的自定义分区程序逻辑。写入联接输入的应用程序必须具有相同的分区策略,以便具有相同键的记录将传递到相同的分区号。

这意味着输入记录必须在连接的两侧都位于同一分区中。例如,在流表联接中,如果具有userId键值的键alice123在流的分区1中,但alice123在表的分区2中,则即使两端都由键键入,联接也不会匹配userId。

ksqlDB无法验证两个连接输入的分区策略是否相同,因此必须确保这一点。

该DefaultPartitioner类实现了以下的分区策略:

  • 如果生产者在记录中指定了分区,请使用它。
  • 如果生产者指定键而不是分区,请根据键的哈希值选择一个分区。
  • 如果生产者未指定分区或键,请以循环方式选择一个分区。

自定义分区程序类实现了Partitioner接口,并在生产者配置属性中分配partitioner.class。


综合键列

某些联接的结果中有一个合成键列。此列并非来自任何来源。这是一个示例,可以帮助解释什么是合成键列以及为什么需要它们:

CREATE TABLE OUTPUT AS
  SELECT * FROM L FULL OUTER JOIN R ON L.ID = R.ID;

前面的语句似乎很简单:创建一个新表,该表是对两个源表进行完全外部联接并在它们的ID列上联接的结果。但是在完全外部联接中,一个L.ID或R.ID可能会丢失(NULL),或者两个都可能具有相同的值。由于生成给ApacheKafka?的数据应始终具有非空消息键,因此ksql选择要使用的第一个非空键:

L.ID

R.ID

Kafka消息键

10

null

10

null

7

7

8

8

8

Kafka消息键中存储的数据可能与两个源ID列都不匹配。相反,它是一个新列:合成列,这意味着该列不属于任何一个源表。

哪些联接导致合成键列?

结果中的键列与任何源列都不匹配的任何联接都称为具有合成键列。

下列类型的联接会导致将合成键列添加到结果模式:

  1. FULL OUTER JOIN,例如:
    sql CREATE TABLE OUTPUT AS SELECT * FROM L FULL OUTER JOIN R ON L.ID = R.ID;
  2. 在联接ON条件中使用的所有表达式都不是简单列引用的任何联接。例如:
    sql -- join on expressions other than column references:
    CREATE TABLE OUTPUT AS SELECT * FROM L JOIN R ON ABS(L.ID) = ABS(R.ID);

合成键列分配了什么名称?

合成键列的默认名称为ROWKEY。但是,如果联接中使用的任何源已经包含名为的列ROWKEY,则合成键列的名称为ROWKEY_1,或者ROWKEY_2存在存在名为的源列ROWKEY_1等。

-- given sources:
CREATE STREAM S1 (ROWKEY INT KEY, V0 STRING) WITH (...);
CREATE STREAM S2 (ID INT KEY, ROWKEY_1 INT) WITH (...);

CREATE STREAM OUTPUT AS
  SELECT * 
  FROM S1 JOIN S2 
  WITHIN 30 SECONDS 
  ON ABS(S1.ROWKEY) = ABS(S2.ID);

-- result in OUTPUT with synthetic key column name: ROWKEY_2

与其他任何键列一样,合成键列必须包含在流查询的投影中。如果投影缺少合成键,则将返回类似以下的错误,指示丢失的键列的名称:

Key missing from projection.
The query used to build `OUTPUT` must include the join expression ROWKEY in its projection.
ROWKEY was added as a synthetic key column because the join criteria did not match any source column. This expression must be included in the projection and may be aliased. 

(可选)可以为投影中的键列提供别名。推荐这样做,因为不能保证系统生成的名称在版本之间保持一致。例如:

CREATE STREAM OUTPUT AS
   SELECT ROWKEY AS ID, S1.C0, S2.C1 FROM S1 FULL OUTER JOIN S2 ON S1.ID = S2.ID;

相关推荐

【推荐】一款开源免费、美观实用的后台管理系统模版

如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!项目介绍...

Android架构组件-App架构指南,你还不收藏嘛

本指南适用于那些已经拥有开发Android应用基础知识的开发人员,现在想了解能够开发出更加健壮、优质的应用程序架构。首先需要说明的是:AndroidArchitectureComponents翻...

高德地图经纬度坐标批量拾取(高德地图批量查询经纬度)

使用方法在桌面上新建一个index.txt文件,把下面的代码复制进去保存,再把文件名改成index.html保存,双击运行打开即可...

flutter系列之:UI layout简介(flutter ui设计)

简介对于一个前端框架来说,除了各个组件之外,最重要的就是将这些组件进行连接的布局了。布局的英文名叫做layout,就是用来描述如何将组件进行摆放的一个约束。...

Android开发基础入门(一):UI与基础控件

Android基础入门前言:...

iOS的布局体系-流式布局MyFlowLayout

iOS布局体系的概览在我的CSDN博客中的几篇文章分别介绍MyLayout布局体系中的视图从一个方向依次排列的线性布局(MyLinearLayout)、视图层叠且停靠于父布局视图某个位置的框架布局(M...

TDesign企业级开源设计系统越发成熟稳定,支持 Vue3 / 小程序

TDesing发展越来越好了,出了好几套组件库,很成熟稳定了,新项目完全可以考虑使用。...

WinForm实现窗体自适应缩放(winform窗口缩放)

众所周知,...

winform项目——仿QQ即时通讯程序03:搭建登录界面

上两篇文章已经对CIM仿QQ即时通讯项目进行了需求分析和数据库设计。winform项目——仿QQ即时通讯程序01:原理及项目分析...

App自动化测试|原生app元素定位方法

元素定位方法介绍及应用Appium方法定位原生app元素...

61.C# TableLayoutPanel控件(c# tabcontrol)

摘要TableLayoutPanel在网格中排列内容,提供类似于HTML元素的功能。TableLayoutPanel控件允许你将控件放在网格布局中,而无需精确指定每个控件的位置。其单元格...

想要深入学习Android性能优化?看完这篇直接让你一步到位

...

12个python数据处理常用内置函数(python 的内置函数)

在python数据分析中,经常需要对字符串进行各种处理,例如拼接字符串、检索字符串等。下面我将对python中常用的内置字符串操作函数进行介绍。1.计算字符串的长度-len()函数str1='我爱py...

如何用Python程序将几十个PDF文件合并成一个PDF?其实只要这四步

假定你有一个很无聊的任务,需要将几十个PDF文件合并成一个PDF文件。每一个文件都有一个封面作为第一页,但你不希望合并后的文件中重复出现这些封面。即使有许多免费的程序可以合并PDF,很多也只是简单的将...

Python入门知识点总结,Python三大数据类型、数据结构、控制流

Python基础的重要性不言而喻,是每一个入门Python学习者所必备的知识点,作为Python入门,这部分知识点显得很庞杂,内容分支很多,大部分同学在刚刚学习时一头雾水。...