编辑
2020-07-15
后端
00
请注意,本文编写于 1974 天前,最后修改于 631 天前,其中某些信息可能已经过时。

目录

三. Kafka核心API与设计理念详解
1.Kafka的架构
2.Topics和Partition
2.1 创建Topic
2.2 删除topic
3.Producer消息路由
3.1 写入方式
3.2 消息路由
3.3 写入流程
4.Consumer Group
5.Push vs. Pull
6.Kafka delivery guarantee

本文主要讲述了 Kafka 相关核心 Api 和其设计的理解,为进一步理解 kafka 打好基础

三. Kafka核心API与设计理念详解

1.Kafka的架构

image.png

如上图所示,该图是一个典型的Kafka集群中包含若干Producer,若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

2.Topics和Partition

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。

image.png

对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。例如可以通过配置KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置如下所示:

properties
# The minimum age of a log file to be eligible for deletion 符合删除条件的日志文件的最小生存时间 log.retention.hours=168 # The maximum size of a log segment file. When this size is reached a new log segment will be created.日志段文件的最大大小。达到此大小后,将创建一个新的日志段。 log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according to the retention policies 检查日志段以了解是否可以根据保留策略将其删除的时间间隔 log.retention.check.interval.ms=300000 # 300s # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. 如果设置了log.cleaner.enable = true,则将启用清理器,然后可以标记单个日志以进行日志压缩。 log.cleaner.enable=false

因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka性能无关。选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。

2.1 创建Topic

创建 topic 的序列图如下所示:

image.png

流程说明:

1、 controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。

2、 controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:

2.1、 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR

2.2、 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state

3、 controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

2.2 删除topic

删除 topic 的序列图如下所示:

image.png

流程说明:

1、 controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。

2、 若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。

3.Producer消息路由

Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。

在发送一条消息时,可以指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Parition。Paritition机制可以通过指定Producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。

Math.abs("routerKey or groupName".hashCode()) % ParititionNum

3.1 写入方式

producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。

3.2 消息路由

producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:

1、 指定了 patition,则直接使用; 2、 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition 3、 patition 和 key 都未指定,使用轮询选出一个 patition。

3.3 写入流程

producer 写入消息序列图如下所示:

image.png

流程说明:

1、 producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader

2、 producer 将消息发送给该 leader

3、 leader 将消息写入本地 log

4、 followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK

5、 leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

4.Consumer Group

使用Consumer high level API的时候,同一个Topic的一条消息只能被同一个Consumer Group中的一个Consumer消费,但是多个Consumer Group则可以同时消费这一信息.

  1. 保证消息消费的顺序性

传统消息中间件存在一个queue中, c1, c2, c3三个消费者从queue中取得的顺序在消息中间件看起来是顺序一致的,但是三个消费者实际处理时可能不一定是顺序的,尤其是多个消费者之间存在业务严格明确依赖的情况下, 比如c1用户下单后,c2再优惠,c3再送积分,此时传统的消息中间件是没有一个很好发办法去处理的。Kafka就可以解决这个问题。

Kafka解决这个问题的办法:

  1. 首先规定了一个分区只能有一个消费者,我们可以把有业务依赖性的消息往一个分区中发送
  2. 多个分区可以提高并发

既可以满足并发,也可以满足消息的一致性

image.png

这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。

5.Push vs. Pull

作为一个消息系统,Kafka遵循了传统的方式,选择由Producer向broker push消息并由Consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事实上,push模式和pull模式各有优劣。

push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据Consumer的消费能力以适当的速率消费消息。

对于Kafka而言,pull模式更合适。pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

6.Kafka delivery guarantee

有这么几种可能的delivery guarantee:

At most once   消息可能会丢,但绝不会重复传输

At least one    消息绝不会丢,但可能会重复传输

Exactly once    每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。

当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。

接下来讨论的是消息从broker到Consumer的delivery guarantee语义。(仅针对Kafka consumer high level API)。Consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际使用中应用程序并非在Consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。

Kafka默认保证At least once,并且允许通过设置Producer异步提交来实现At most once。而Exactly once要求与外部存储系统协作,幸运的是Kafka提供的offset可以非常直接非常容易得使用这种方式。

本文作者:CodeJump

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!