kafka

kafka

Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力

  • 以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输。
  • 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展。

概念

主题(Topic)与分区(Partition)

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-ba46359183324eb5d10f70d00736a4ce_r.jpg

在 Kafka 中,消息以**主题(Topic)**来分类,每一个主题都对应一个「消息队列」,这有点儿类似于数据库中的表。但是如果我们把所有同类的消息都塞入到一个“中心”队列中,势必缺少可伸缩性,无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽系统的性能或存储。

我们使用一个生活中的例子来说明:现在 A 城市生产的某商品需要运输到 B 城市,走的是公路,那么单通道的高速公路不论是在「A 城市商品增多」还是「现在 C 城市也要往 B 城市运输东西」这样的情况下都会出现「吞吐量不足」的问题。所以我们现在引入**分区(Partition)**的概念,类似“允许多修几条道”的方式对我们的主题完成了水平扩展。

Broker 和集群(Cluster)

一个 Kafka 服务器也称为 Broker,它接受生产者发送的消息并存入磁盘;Broker 同时服务消费者拉取分区消息的请求,返回目前已经提交的消息。使用特定的机器硬件,一个 Broker 每秒可以处理成千上万的分区和百万量级的消息。

若干个 Broker 组成一个集群(Cluster),其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责。下图是一个样例:

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-9c8de1bed82a54799c4ef2cbfeedab61_1440w.jpg

Kafka 的一个关键性质是日志保留(retention),我们可以配置主题的消息保留策略,譬如只保留一段时间的日志或者只保留特定大小的日志。当超过这些限制时,老的消息会被删除。我们也可以针对某个主题单独设置消息过期策略,这样对于不同应用可以实现个性化。

多集群

随着业务发展,我们往往需要多集群,通常处于下面几个原因:

  • 基于数据的隔离;
  • 基于安全的隔离;
  • 多数据中心(容灾)

当构建多个数据中心时,往往需要实现消息互通。举个例子,假如用户修改了个人资料,那么后续的请求无论被哪个数据中心处理,这个更新需要反映出来。又或者,多个数据中心的数据需要汇总到一个总控中心来做数据分析。

上面说的分区复制冗余机制只适用于同一个 Kafka 集群内部,对于多个 Kafka 集群消息同步可以使用 Kafka 提供的 MirrorMaker 工具。本质上来说,MirrorMaker 只是一个 Kafka 消费者和生产者,并使用一个队列连接起来而已。它从一个集群中消费消息,然后往另一个集群生产消息。

术语总结

  • 消息记录 (record):由一个 key,一个 value 和一个时间戳构成,消息最终存储在主题下的分区中
    • 记录在生产者中称为生产者记录 (ProducerRecord),在消费者中称为消费者记录 (ConsumerRecord)
    • 在一个可配置的时间段,Kafka 集群保持所有的消息,直到它们过期, 无论消息是否被消费了
    • 比如,如果消息的保存策略被设置为 2 天,那么在一个消息被发布的两天时间内,它都是可以被消费的。之后它将被丢弃以释放空间。Kafka 的性能是和数据量无关的常量级的,所以保留太多的数据并不是问题。
  • 生产者 (producer):生产者用于发布 (send) 消息
  • 消费者 (consumer):消费者用于订阅 (subscribe) 消息
  • 消费者组 (consumer group):相同的 group.id 的消费者将视为同一个消费者组,每个消费者都需要设置一个组 id,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费
  • 主题 (topic):消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中
  • 分区 (partition):消息的一种物理分组, 一个主题被拆成多个分区,每一个分区就是一个顺序的、不可变的消息队列,并且可以持续添加
    • 分区中的每个消息都被分配了一个唯一的 id,称之为偏移量 (offset) 在每个分区中偏移量都是唯一的。
    • 每个分区对应一个逻辑 log,有多个 segment 组成。
  • 偏移量 (offset):分区中的每个消息都一个一个唯一 id,称之为偏移量,它代表已经消费的位置。
    • 可以自动或者手动提交偏移量(即自动或者手动控制一条消息是否已经被成功消费)
  • 代理 (broker):一台 kafka 服务器称之为一个 broker
  • 副本(replica):副本只是一个分区(partition)的备份。 副本从不读取或写入数据。 它们用于防止数据丢失。
  • 领导者(leader):Leader 是负责给定分区的所有读取和写入的节点。 每个分区都有一个服务器充当 Leader, producer 和 consumer 只跟 leader 交互
  • 追随者 (follower):跟随领导者指令的节点被称为 Follower。 如果领导失败,一个追随者将自动成为新的领导者。
    • 跟随者作为正常消费者,拉取消息并更新其自己的数据存储。它是 replica 中的一个角色,从 leader 中复制数据。
  • zookeeper:Kafka 代理是无状态的,所以他们使用 ZooKeeper 来维护它们的集群状态。ZooKeeper 用于管理和协调 Kafka 代理

基础架构

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-4692429e9184ed4a93911fa3a1361d28_1440w.jpg

  • Producer:生产者,消息的产生者,是消息的入口。

  • kafka cluster

    • Broker:Broker 是 kafka实例,每个服务器上有一个或多个 kafka 的实例,我们姑且认为每个 broker 对应一台服务器。
      • 每个 kafka 集群内的 broker 都有一个不重复的编号,如图中的 broker-0、broker-1 等……
    • Topic:消息的主题,可以理解为消息的分类,kafka 的数据就保存在 topic。在每个 broker 上都可以创建多个 topic。
    • Partition:Topic 的分区,每个 topic 可以有多个分区,分区的作用是做负载,提高 kafka 的吞吐量。
      • 同一个 topic 在不同的分区的数据是不重复的,partition 的表现形式就是一个一个的文件夹!
    • Replication:每一个分区都有多个副本,副本的作用是做备胎。
      • 当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为 Leader。
      • 在 kafka 中默认副本的最大数量是 10 个,且副本的数量不能大于 Broker 的数量,
      • follower 和 leader 绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。

Message:每一条发送的消息主体。

Consumer:消费者,即消息的消费方,是消息的出口。

Consumer Group:我们可以将多个消费组组成一个消费者组,在 kafka 的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个 topic 的不同分区的数据,这也是为了提高 kafka 的吞吐量!

Zookeeper:kafka 集群依赖 zookeeper 来保存集群的的元信息,来保证系统的可用性。

Kafka 的一些设计与实现

Kafka 存储在文件系统上

是的,您首先应该知道 Kafka 的消息是存在于文件系统之上的。Kafka 高度依赖文件系统来存储和缓存消息,一般的人认为 “磁盘是缓慢的”,所以对这样的设计持有怀疑态度。实际上,磁盘比人们预想的快很多也慢很多,这取决于它们如何被使用;一个好的磁盘结构设计可以与网络速度一样快。

现代的操作系统针对磁盘的读写已经做了一些优化方案来加快磁盘的访问速度。比如,预读会提前将一个比较大的磁盘快读入内存。后写会将很多小的逻辑写操作合并起来组合成一个大的物理写操作。并且,操作系统还会将主内存剩余的所有空闲内存空间都用作磁盘缓存,所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接 I/O 会绕过磁盘缓存)。综合这几点优化特点,如果是针对磁盘的顺序访问,某些情况下它可能比随机的内存访问都要快,甚至可以和网络的速度相差无几。

上述的 Topic 其实是逻辑上的概念,面相消费者和生产者,物理上存储的其实是 Partition,每一个 Partition 最终对应一个目录,里面存储所有的消息和索引文件。默认情况下,每一个 Topic 在创建时如果不指定 Partition 数量时只会创建 1 个 Partition。比如,我创建了一个 Topic 名字为 test ,没有指定 Partition 的数量,那么会默认创建一个 test-0 的文件夹,这里的命名规则是:<topic_name>-<partition_id>

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-346cb3219087ad26746e18f410954d9f_1440w.jpg

任何发布到 Partition 的消息都会被追加到 Partition 数据文件的尾部,这样的顺序写磁盘操作让 Kafka 的效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是 Kafka 高吞吐率的一个很重要的保证)。

每一条消息被发送到 Broker 中,会根据 Partition 规则选择被存储到哪一个 Partition。如果 Partition 规则设置的合理,所有消息可以均匀分布到不同的 Partition 中。

底层存储设计

假设我们现在 Kafka 集群只有一个 Broker,我们创建 2 个 Topic 名称分别为:「topic1」和「topic2」,Partition 数量分别为 1、2,那么我们的根目录下就会创建如下三个文件夹:

1
2
3
    | --topic1-0
    | --topic2-0
    | --topic2-1

在 Kafka 的文件存储中,同一个 Topic 下有多个不同的 Partition,每个 Partition 都为一个目录,而每一个目录又被平均分配成多个大小相等的 Segment File 中,Segment File 又由 index file 和 data file 组成,他们总是成对出现,后缀 “.index” 和 “.log” 分表表示 Segment 索引文件和数据文件。

现在假设我们设置每个 Segment 大小为 500 MB,并启动生产者向 topic1 中写入大量数据,topic1-0 文件夹中就会产生类似如下的一些文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    | --topic1-0
        | --00000000000000000000.index
        | --00000000000000000000.log
        | --00000000000000368769.index
        | --00000000000000368769.log
        | --00000000000000737337.index
        | --00000000000000737337.log
        | --00000000000001105814.index
        | --00000000000001105814.log
    | --topic2-0
    | --topic2-1

**Segment 是 Kafka 文件存储的最小单位。**Segment 文件命名规则:Partition 全局的第一个 Segment 从 0 开始,后续每个 Segment 文件名为上一个 Segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。如 00000000000000368769.index 和 00000000000000368769.log。

以上面的一对 Segment File 为例,说明一下索引文件和数据文件对应关系:

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-fbd72266fb39101145c204d6e38f7f6b_1440w.jpg

其中以索引文件中元数据 <3, 497> 为例,依次在数据文件中表示第 3 个 message(在全局 Partition 表示第 368769 + 3 = 368772 个 message)以及该消息的物理偏移地址为 497。

注意该 index 文件并不是从 0 开始,也不是每次递增 1 的,这是因为 Kafka 采取稀疏索引存储的方式,每隔一定字节的数据建立一条索引,它减少了索引文件大小,使得能够把 index 映射到内存,降低了查询时的磁盘 IO 开销,同时也并没有给查询带来太多的时间消耗。

因为其文件名为上一个 Segment 最后一条消息的 offset ,所以当需要查找一个指定 offset 的 message 时,通过在所有 segment 的文件名中进行二分查找就能找到它归属的 segment ,再在其 index 文件中找到其对应到文件上的物理位置,就能拿出该 message 。

由于消息在 Partition 的 Segment 数据文件中是顺序读写的,且消息消费后不会删除(删除策略是针对过期的 Segment 文件),这种顺序磁盘 IO 存储设计师 Kafka 高性能很重要的原因。

Kafka 是如何准确的知道 message 的偏移的呢?这是因为在 Kafka 定义了标准的数据存储结构,在 Partition 中的每一条 message 都包含了以下三个属性:

  • offset:表示 message 在当前 Partition 中的偏移量,是一个逻辑上的值,唯一确定了 Partition 中的一条 message,可以简单的认为是一个 id;
  • MessageSize:表示 message 内容 data 的大小;
  • data:message 的具体内容

生产者设计概要

当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量?

举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消息到 Kafka,另一个服务来读取消息并根据规则引擎来检查交易是否通过,将结果通过 Kafka 返回。对于这样的业务,消息既不能丢失也不能重复,由于交易量大因此吞吐量需要尽可能大,延迟可以稍微高一点。

再举个例子,假如我们需要收集用户在网页上的点击数据,对于这样的场景,少量消息丢失或者重复是可以容忍的,延迟多大都不重要只要不影响用户体验,吞吐则根据实时用户数来决定。

不同的业务需要使用不同的写入方式和配置。具体的方式我们在这里不做讨论,现在先看下生产者写消息的基本流程:

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-de25b264b72d1540b598c2362ddcc465_1440w.jpg

流程如下:

  1. 首先,我们需要创建一个 ProducerRecord,这个对象需要包含消息的主题(topic)和值(value),可以选择性指定一个键值(key)或者分区(partition)。
  2. 发送消息时,生产者会对键值和值序列化成字节数组,然后发送到分配器(partitioner)。
  3. 如果我们指定了分区,那么分配器返回该分区即可;否则,分配器将会基于键值来选择一个分区并返回。
  4. 选择完分区后,生产者知道了消息所属的主题和分区,它将这条记录添加到相同主题和分区的批量消息中,另一个线程负责发送这些批量消息到对应的 Kafka broker。
  5. 当 broker 接收到消息后,如果成功写入则返回一个包含消息的主题、分区及位移的 RecordMetadata 对象,否则返回异常。
  6. 生产者接收到结果后,对于异常可能会进行重试。

消费者设计概要

消费者与消费组

假设这么个场景:我们从 Kafka 中读取消息,并且进行检查,最后产生结果数据。我们可以创建一个消费者实例去做这件事情,但如果生产者写入消息的速度比消费者读取的速度快怎么办呢?这样随着时间增长,消息堆积越来越严重。对于这种场景,我们需要增加多个消费者来进行水平扩展。

Kafka 消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个 T1 主题,该主题有 4 个分区;同时我们有一个消费组 G1,这个消费组只有一个消费者 C1。那么消费者 C1 将会收到这 4 个分区的消息,如下所示:

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-3c4aef7b5d7f91ef1acaa7ec224a8272_1440w.jpg

如果我们增加新的消费者 C2 到消费组 G1,那么每个消费者将会分别收到两个分区的消息,如下所示:

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-9c613777295125d63e23c91df4618cc9_1440w.jpg

如果增加到 4 个消费者,那么每个消费者将会分别收到一个分区的消息,如下所示:

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-579ac0f4573a5060822e080e35a1f1c8_1440w.jpg

但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-f4bdede9192dce94c8b6f39c7f306ce6_1440w.jpg

总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。

另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。

**Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。**换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组 G2,而这个消费组有两个消费者,那么会是这样的:

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-23ee29dc5c98ad266bc77fd08e42ac4f_1440w.jpg

在这个场景中,消费组 G1 和消费组 G2 都能收到 T1 主题的全量消息,在逻辑意义上来说它们属于不同的应用。

最后,总结起来就是:

  • 如果应用需要读取全量消息,那么请为该应用设置一个消费组;
  • 如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。

消费组与分区重平衡

可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)

重平衡是 Kafka 一个很重要的性质,这个性质保证了高可用和水平扩展。**不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。**而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。

消费者通过定期发送心跳(heartbeat)到一个作为组协调者(group coordinator)的 broker 来保持在消费组内存活。这个 broker 不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。

如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。

在 0.10.1 版本,Kafka 对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的 Kafka 支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。

Partition 与消费模型

上面提到,Kafka 中一个 topic 中的消息是被打散分配在多个 Partition 中存储的, Consumer Group 在消费时需要从不同的 Partition 获取消息,那最终如何重建出 Topic 中消息的顺序呢?

答案是:没有办法。Kafka 只会保证在 Partition 内消息是有序的,而不管全局的情况。

下一个问题是:Partition 中的消息可以被(不同的 Consumer Group)多次消费,那 Partition 中被消费的消息是何时删除的? Partition 又是如何知道一个 Consumer Group 当前消费的位置呢?

无论消息是否被消费,除非消息到期 Partition 从不删除消息。例如设置保留时间为 2 天,则消息发布 2 天内任何 Group 都可以消费,2 天后,消息自动被删除。 Partition 会为每个 Consumer Group 保存一个偏移量,记录 Group 消费到的位置。 如下图:

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-2b72c48cdb7708fa42dc7ac5cb4a091f_1440w.jpg

为什么 Kafka 是 pull 模型

消费者应该向 Broker 要数据(pull)还是 Broker 向消费者推送数据(push)?作为一个消息系统,Kafka 遵循了传统的方式,选择由 Producer 向 broker push 消息并由 Consumer 从 broker pull 消息。一些 logging-centric system,比如 Facebook 的 Scribe 和 Cloudera 的 Flume,采用 push 模式。事实上,push 模式和 pull 模式各有优劣。

**push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。**push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。

**对于 Kafka 而言,pull 模式更合适。**pull 模式可简化 broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

Kafka 如何保证可靠性

当我们讨论可靠性的时候,我们总会提到保证这个词语。可靠性保证是基础,我们基于这些基础之上构建我们的应用。比如关系型数据库的可靠性保证是 ACID,也就是原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。

Kafka 中的可靠性保证有如下四点:

  • 对于一个分区来说,它的消息是有序的。如果一个生产者向一个分区先写入消息 A,然后写入消息 B,那么消费者会先读取消息 A 再读取消息 B。
  • 当消息写入所有 in-sync 状态的副本后,消息才会认为已提交(committed)。这里的写入有可能只是写入到文件系统的缓存,不一定刷新到磁盘。生产者可以等待不同时机的确认,比如等待分区主副本写入即返回,后者等待所有 in-sync 状态副本写入才返回。
  • 一旦消息已提交,那么只要有一个副本存活,数据不会丢失。
  • 消费者只能读取到已提交的消息。

使用这些基础保证,我们构建一个可靠的系统,这时候需要考虑一个问题:究竟我们的应用需要多大程度的可靠性?可靠性不是无偿的,它与系统可用性、吞吐量、延迟和硬件价格息息相关,得此失彼。因此,我们往往需要做权衡,一味的追求可靠性并不实际。

工作流程分析

发送数据

我们看上面的架构图中,producer 就是生产者,是数据的入口。注意看图中的红色箭头,Producer 在写入数据的时候永远的找leader,不会直接将数据写入 follower!那 leader 怎么找呢?写入的流程又是什么样的呢?我们看下图:

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-b7e72e9c5b9971e89ec174a2c2201ed9_1440w.jpg

发送的流程就在图中已经说明了;需要注意的一点是,消息写入 leader 后,follower 是主动的去 leader 进行同步的

producer 采用 push 模式将数据发布到 broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入示意图如下:

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-87d558aaa349bf920711b9c157e11f6a_1440w.jpg

分区的目的

上面说到数据会写入到不同的分区,那 kafka 为什么要做分区呢?分区的主要目的是:

  1. 方便扩展。因为一个 topic 可以有多个 partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
  2. 提高并发。以 partition 为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。

分区的寻找

熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在 kafka 中,如果某个 topic 有多个 partition,producer 又怎么知道该将数据发往哪个 partition 呢?kafka 中有几个原则:

  1. partition 在写入的时候可以指定需要写入的 partition,如果有指定,则写入对应的 partition。

  2. 如果没有指定 partition,但是设置了数据的 key,则会根据 key 的值 hash 出一个 partition。

  3. 如果既没指定 partition,又没有设置 key,则会轮询选出一个 partition。

保证消息不丢失

保证消息不丢失是一个消息队列中间件的基本保证,那 producer 在向 kafka 写入消息的时候,怎么保证消息不丢失呢?通过 ACK 应答机制

在 producer 向队列写入数据的时候可以设置参数来确定是否确认 kafka 接收到数据,这个参数可设置的值为 01all

  • 0 代表 producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
  • 1 代表 producer 往集群发送数据只要 leader 应答就可以发送下一条,只确保 leader 发送成功。
  • all 代表 producer 往集群发送数据需要所有的 follower 都完成从 leader 的同步才会发送下一条,确保 leader 发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

最后要注意的是,如果往不存在的 topic 写数据,能不能写入成功呢?kafka 会自动创建 topic,分区和副本的数量根据默认配置都是 1

保存数据

Producer 将数据写入 kafka 后,集群就需要对数据进行保存了。

kafka 将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。Kafka 初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。

Partition 结构

前面说过了每个 topic 都可以分为一个或多个 partition,如果你觉得 topic 比较抽象,那 partition 就是比较具体的东西了。

Partition 在服务器上的表现形式就是一个一个的文件夹,每个 partition 的文件夹下面会有多组 segment 文件,每组 segment 文件又包含 .index 文件、.log 文件、.timeindex 文件(早期版本中没有)三个文件,log 文件就实际是存储 message 的地方,而 index 和 timeindex 文件为索引文件,用于检索消息。

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-72e50c12fd9c6fbf58d3b5ca14c90623_1440w.jpg

如上图,这个 partition 有三组 segment 文件,每个 log 文件的大小是一样的,但是存储的 message 数量是不一定相等的(每条的 message 大小不一致)。文件的命名是以该 segment 最小 offset 来命名的,如 000.index 存储 offset 为 0~368795 的消息,kafka 就是利用分段 + 索引的方式来解决查找效率的问题。

Message 结构

上面说到 log 文件就实际是存储 message 的地方,我们在 producer 往 kafka 写入的也是一条一条的 message

message 主要包含消息体、消息大小、offset、压缩类型…… 重点需要知道的是下面三个:

1、 offset:offset 是一个占 8 byte 的有序 id 号,它可以唯一确定每条消息在 parition 内的位置

2、 消息大小:消息大小占用 4 byte,用于描述消息的大小。

3、 消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。

存储策略

无论消息是否被消费,kafka 都会保存所有的消息。那对于旧数据有什么删除策略呢?

1、 基于时间,默认配置是 168 小时(7 天)。

2、 基于大小,默认配置是 1073741824。

需要注意的是,kafka 读取特定消息的时间复杂度是 O(1),所以这里删除过期的文件并不会提高 kafka 的性能

消费数据

消息存储在 log 文件后,消费者就可以进行消费了。

与 producer 相同的是,消费者在拉取消息的时候也是找 leader 去拉取。

多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组 id,同一个消费组者的消费者可以消费同一 topic 下不同分区的数据,但是不会组内多个消费者消费同一分区的数据

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-75a79cba9cfafe5c2f4d5349acb72207_1440w.jpg

图示是消费者组内的消费者小于 partition 数量的情况,所以会出现某个消费者消费多个 partition 数据的情况,消费的速度也就不及只处理一个 partition 的消费者的处理速度

如果是消费者组的消费者多于 partition 的数量,那会不会出现多个消费者消费同一个 partition 的数据呢?答案是不会。多出来的消费者不消费任何 partition 的数据。

所以在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量一致

在保存数据的小节里面,我们聊到了 partition 划分为多组 segment,每个 segment 又包含 .log、.index、.timeindex 文件,存放的每条 message 包含 offset、消息大小、消息体……我们多次提到 segment 和 offset,查找消息的时候是怎么利用 segment + offset 配合查找的呢?假如现在需要查找一个 offset 为 368801 的 message 是什么样的过程呢?我们先看看下面的图:

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/v2-87051d884344edf9f8fd97a3dacb32d0_1440w.jpg

  1. 先找到 offset 的 368801 message 所在的 segment 文件(利用二分法查找),这里找到的就是在第二个 segment 文件。

  2. 打开找到的 segment 中的 .index 文件(也就是 368796.index 文件,该文件起始偏移量为 368796+1,我们要查找的 offset 为 368801 的 message 在该 index 内的偏移量为 368796+5=368801,所以这里要查找的相对 offset 为 5)。由于该文件采用的是稀疏索引的方式存储着相对 offset 及对应 message 物理偏移量的关系,所以直接找相对 offset 为 5 的索引找不到,这里同样利用二分法查找相对 offset 小于或者等于指定的相对 offset 的索引条目中最大的那个相对 offset,所以找到的是相对 offset 为 4 的这个索引。

  3. 根据找到的相对 offset 为 4 的索引确定 message 存储的物理偏移位置为 256。打开数据文件,从位置为 256 的那个地方开始顺序扫描直到找到 offset 为 368801 的那条 Message。

这套机制是建立在 offset 为有序的基础上,利用 segment + 有序 offset + 稀疏索引 + 二分查找 + 顺序查找等多种手段来高效的查找数据

至此,消费者就能拿到需要处理的数据进行处理了。那每个消费者又是怎么记录自己消费的位置呢?在早期的版本中,消费者将消费到的 offset 维护 zookeeper 中,consumer 每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的 offset 已经直接维护在 kafk 集群的 __consumer_offsets 这个 topic 中