本系列文章为对《Kafka:The Definitive Guide》的学习整理,希望能够帮助到大家

本章我们探讨下以下三个问题:

  • Kafka复制机制是怎么工作的?
  • Kafka是怎么处理生产者和消费者的请求的?
  • Kafka是怎么存储文件或者索引的?

集群成员管理

Kafka使用Zookeeper管理集群成员状态,每一个broker都有一个唯一ID(在配置文件中指定或者自动生成),当broker启动时会在Zookeeper中注册相应的临时节点(可参考这篇文章)。如果集群中存在相同的ID,那么新的broker会启动失败。

Zookeeper中的节点注册路径为/broker/ids,Kafka的各个组件会监听此路径下的变更信息,当broker加入或者离开时,它们会收到通知。当节点离开(可能由于停机、网络故障、长GC等导致)时,Zookeeper中相应的节点会消失,但该broker的ID仍然会在某些数据结构中存在。比如,每个主题的副本列表会包含副本所在的broker ID,因此如果一个broker离开同时有一个新的broker拥有此相同的ID,那么新的broker会在集群中替代之前的broker,并且会被分配同样的主题和分区。

集群控制器(Controller)

集群控制器也是一个broker,在承担一般的broker职责外,它还负责选举分区的主副本(后面会讨论分区主副本的作用)。集群中的第一个broker通过在Zookeeper的/controller路径下创建一个临时节点来成为控制器。当其他broker启动时,也会试图创建一个临时节点,但是会收到一个“节点已存在”的异常,这样便知道当前已经存在集群控制器。这些broker会监听Zookeeper的这个控制器临时节点,当控制器发生故障时,该临时节点会消失,这些broker便会收到通知,然后尝试去创建临时节点成为新的控制器。

对于一个控制器来说,如果它发现集群中的一个broker离开时,它会检查该broker是否有分区的主副本,如果有则需要对这些分区选举出新的主副本。控制器会在分区的副本列表中选出一个新的主副本,并且发送请求给新的主副本和其他的跟随者;这样新的主副本便知道需要处理生产者和消费者的请求,而跟随者则需要向新的主副本同步消息。

如果控制器发现有新的broker(这个broker也有可能是之前宕机的)加入时,它会通过此broker的ID来检查该broker是否有分区副本存在,如果有则通知该broker向相应的分区主副本同步消息。

最后,每当选举一个新的控制器时,就会产生一个新的更大的控制器时间戳(controller epoch),这样集群中的broker收到控制器的消息时检查此时间戳,当发现消息来源于老的控制器,它们便会忽略,以避免脑裂(split brain)。

复制

复制,是Kafka架构的核心。通过复制机制,Kafka达到了高可用的要求,保证在少量节点发生故障时集群仍然可用。如前所述,每个主题都有若干个分区,而每个分区有多个副本,这些副本都存在broker中。Kafka中的副本有两种类型:

  • 主副本(leader replica):每个分区都有唯一的主副本,所有的生产者和消费者请求都由主副本处理,这样才能保证一致性。
  • 跟随者副本(follower replica):分区的其他副本为跟随者副本,跟随者副本不处理生产者或消费者请求,它们只是向主副本同步消息,当主副本所在的broker宕机后,跟随者副本会选举出新的主副本。

对于主副本来说,它还有一个职责,那就是关注跟随者副本的同步状态。每个跟随者副本都会保持与主副本同步,但在异常情况(例如网络阻塞、机器故障、重启等)下,跟随者的状态可能会同步失败。跟随者副本通过向主副本发送Fetch请求来进行同步(与消费者一样),请求中包含希望同步的下一个消息位移,该位移始终是有序的。比如,一个跟随者可能会按序请求消息1,消息2,消息3…如果跟随者请求消息N,那么主副本可以确定此跟随者已经接收到N-1及以前的消息了。因此根据请求中的位移信息,主副本知道跟随者的落后状态,如果副本超过10秒钟没有发送同步请求或者请求的位移属于10秒钟以前的位移,那么主副本会认为该跟随者是同步落后(out of sync)的。如果一个跟随者副本是同步落后的,那么在主副本发生故障时该跟随者不能成为新的主副本。而能够及时同步的跟随者副本则是in-sync状态的,这些跟随者副本有资格成为新的主副本。当然10秒钟不是固定的,可以通过replica.lag.time.max.ms来设置。

每个分区除了有主副本之外,还有一个备选主副本(preferred leader),它是主题初始创建时的主副本。在主题初始创建时,Kafka使用一定的算法来分散所有主题的主副本与跟随者副本,因此备份主副本通常能保证集群的流量是均衡分布的。如果备份主副本是in-sync状态的,那么在主副本发生故障后,它会自动成为新的主副本。

请求处理

Kafka的broker主要工作是,当作为分区的主副本时,处理来自生产者/消费者客户端、跟随者副本以及控制器的请求。对于此Kafka使用自定义的二进制协议来进行通信,请求的头部包含如下信息:

  • 请求类型
  • 请求版本(通过此参数broker可以正确处理不同版本客户端的请求)
  • 关联ID:标识请求的全局唯一数字,也会在响应结果和错误日志中呈现。
  • 客户端ID:用来识别请求的来源。

同一个客户端发送的所有请求都是按序处理的,这保证了消息的有序性。

对于监听的每一个端口,broker都会运行一个接收者(acceptor)线程来建立连接,并且把连接交给一个处理器(processor)线程处理,处理器线程的数量是可配置的。处理器线程接收客户端的请求,把请求放进请求队列,最后从响应队列中取出结果返回给客户端。当请求放进请求队列后,IO线程负责进行处理,大部分的请求都属于两种类型:

  • 生产请求(produce request):由生产者发送,包含需要写入的消息。
  • 拉取请求(fetch request):由消费者和跟随者副本发送。

整体架构如下所示:

request

生产请求和拉取请求都需要发送给分区的主副本处理,如果一个跟随者副本收到这两种请求,它会返回“Not a Leader for Partition”的异常信息,客户端收到异常信息后向正确的主副本发送请求。

客户端怎么知道哪个是主副本呢?通过使用另一种类型的请求来实现,那就是元信息请求(metadata request)。Kafka的任意一个broker都可以处理这种请求,请求中包含客户端感兴趣的主题列表,broker会返回这些主题的分区列表、分区的副本列表以及主副本信息。客户端收到这些信息后,会进行一定时间的缓存(由metadata.max.age.ms指定),当超过时间或者broker返回请求的异常后,会刷新此信息。整体交互处理如下:

metadata

生产请求(produce request)

这篇文章曾经说过,acks参数控制多少个副本确认写入成功后生产者才认为消息生产成功。这个参数的取值可以为:

  • acks=0:消息发送完毕,生产者认为消息写入成功;
  • acks=1:主副本写入成功,生产者认为消息写入成功;
  • acks=all:所有in-sync副本写入成功,生产者认为消息写入成功。

如果主副本收到生产消息,它会执行一些检查逻辑,包含:

  • 发送的用户是否有权限写入主题?
  • 请求的acks参数取值是否合法(只允许0,1,all)?
  • 如果acks设置为all,是否有足够的in-sync副本来安全写入消息?(我们可以配置如果in-sync副本低于一定数量,主副本拒绝写入消息)

检查通过后主副本会持久化消息到本地磁盘,在Linux系统上消息只会写入到文件系统的缓存,因此并没有保证一定写入到磁盘;Kafka依赖复制机制来保证数据不丢失。

一旦消息本地持久化后,如果acks=1那么会返回结果给客户端,如果acks=all那么会将请求放置在一个称为purgatory的缓冲区中等待其他的副本写入完成。

拉取请求(fetch request)

主副本处理拉取请求和处理生产请求差不多。客户端发送一个拉取请求,包含主题、分区和位移信息,主副本返回相应的数据。另外,客户端也指定返回的最大数据量,防止数据量过大造成客户端内存溢出。同时,客户端也指定返回的最小数据量,当消息数据量没有达到最小数据量时,请求会一直阻塞直到有足够的数据返回,如下所示:

fetch

指定最小的数据量在负载不高的情况下非常有用,通过这种方式可以减轻网络往返的额外开销。当然请求也不能永远的阻塞,客户端可以指定最大的阻塞时间,如果到达指定的阻塞时间,即便没有足够的数据也会返回。

有意思的是,不是所有主副本的数据都能够被读取。当数据被所有in-sync状态的副本写入成功后,它才能被客户端读取。对于主副本来说,这并不难实现,因为之前已经说过通过副本同步,它知道所有副本当前已经完成同步的消息位移。该机制描述如下:

sync

但为什么要这么做呢?这是因为如果没有足够的副本持久化消息,该消息是不安全的。如果主副本发生故障,然后其他副本成为新的主副本,这些消息将会在Kafka中莫名其妙的消失。也就是说,存在一个消费组能够读到某个消息,但另外的消费组读不到这个消息,从而导致不一致的行为。

这样的机制也导致了新消息到达消费者应用的高延迟,特别是存在副本之间网络拥塞的情况。我们可以通过replica.lag.time.max.ms来指定一个副本落后多少仍被视为in-sync状态的,减小该值可以使得消费者无需等待延迟较大的副本写入。

在读取消息上,Kafka使用零复制(zero-copy)来提高性能。也就是说,Kafka将文件(更准确的说,是文件系统缓存)的消息直接传给网络通道,并没有使用中间的buffer。这避免了内存的字节拷贝和buffer维护,极大地提高了性能。

其他请求

我们讨论了Kafka中最常见的三种请求类型:元信息请求,生产请求和拉取请求。这些请求都是使用的是Kafka的自定义二进制协议。集群中broker间的通信请求也是使用同样的协议,这些请求是内部使用的,客户端不能发送。比如在选举分区主副本过程中,控制器会发送LeaderAndIsr请求给新的主副本和其他跟随副本。

这个协议目前已经支持20种请求类型,并且仍然在演进以支持更多的类型。另外,Kafka也会修改已有的请求来增加扩展性。比如在0.9.0版本和0.10.0版本,Kafka决定在元信息请求中返回集群的控制器信息,因此在元信息请求和返回值中增加了一个版本号;0.9.0版本的客户端会发送版本号为0的元信息请求,而broker(无论是0.9.0或者0.10.0)也会返回版本号为0的结果,结果中不会包含控制器信息;而0.10.0版本的客户端会发送版本号为1的元信息请求,0.10.0版本的broker会返回版本号为1的结果,并且包含控制器信息。注意的是,如果0.10.0版本的客户端发送此请求到0.9.0版本的broker,那么请求会报错,这也是为什么在升级时先升级broker再升级客户端的理由。

物理存储

Kafka中基本的存储单元是一个分区副本。分区副本不能跨越多个broker,甚至不能跨越同一个broker的多个磁盘。当配置Kafka时,管理员需要设置log.dirs来指定分区存储的目录。

下面来看下存储的细节。

分区分配

当创建一个新主题时,Kafka首先需要决定如何分配分区到不同的broker。假设有6个broker,那么创建一个有10个分区且复制因子为3的主题的话,Kafka需要分配30个分区副本到这6个broker。分配策略主要的考虑因素如下:

  • 尽可能将分区副本均衡分配到集群的broker中;
  • 对于每个分区,它的所有副本需要在不同的broker;
  • 如果broker(0.10.0及更高版本)有机架信息,那么对于一个分区的所有副本,尽量分配这些副本到不同的机架。这保证了机架发生故障时集群仍然可用。

下面以一个例子来说明这几个策略。假设集群由6个broker,我们从一个随机的broker(不妨称为4号)开始以轮询的方式来分配分区的主副本。因此分区0的主副本在4号broker,分区1的主副本在5号broker,分区2的主副本在0号(总数量为6,下标从0开始)broker…然后对于每个分区的其他副本,从其主副本所在的broker开始分配。由于分区0的主副本在4号broker,分区0的第一个跟随者副本在5号broker,第二个跟随者副本在6号broker,以此类推…

如果考虑机架因素,那么我们需要首先根据broker的机架信息来对broker进行排序,而不是上面的按照序号递增来排序。假如0号、1号和2号broker在同一个机架,3号、4号和5号broker在另一个机架,那么broker的顺序可能为0,3,1,4,2,5。这个顺序穿插了不同机架的broker。那么对于这种情况,分区0的主副本在4号broker,分区1的主副本在2号broker…

在分配分区和副本到broker之后,下一步需要决定使用哪个目录来存储分区。策略非常简单:统计每个目录的分区数,把新分区分配到最少分区数的目录中。因此如果新增加一个磁盘,所有的新分区都会分配到在这个磁盘上,因为它的分区数最少。

文件管理

消息保留,是Kafka中一个很重要的概念。Kafka不会永远保留数据,也不会等待所有的消费组读取了消息才删除消息。只要数据量达到上限或者数据达到过期时间,Kafka会删除老的消息数据。

因为在一个大文件中查找需要清理的数据并进行删除是非常耗时而且容易出错的,Kafka将每个分区切割成段(segment)。默认每个段大小不超过1G,且只包含7天的数据。如果段的消息量达到1G,那么该段会关闭,同时打开一个新的段进行写入。

正在写入的段称为活跃段(active segment),活跃段不会被删除。因此,假如设置日志保留时间为1天,但是日志段包含5天的数据,那么我们实际上会保留5天的数据,因为活跃段正在使用而且在关闭前不会删除。

对于每个分区的每个段(包括不活跃的段),broker都会维护文件句柄,因此打开的文件句柄数通常会比较多,这个需要适度调整系统的进程文件句柄参数。

文件格式

每个段都单独存为一个文件,文件内存放消息和位移。磁盘上的数据格式、生产者发送的数据格式和消费者读取的数据格式都是一样的,使用相同的数据格式使得Kafka可以进行零拷贝优化,以及避免压缩和解压缩。

除了key/value和位移之外,每个消息还包含消息大小、校验和(检测数据损坏)、魔数(标识消息格式版本)、压缩算法(Snappy、GZip或者LZ4)和时间戳(0.10.0新增)。

如果发送者发送压缩的消息,那么批量发送的消息会压缩在一起,以“包装消息”(wrapper message)来发送,如下所示:

wrapper

因此如果生产者使用压缩,那么发送更大的批量消息可以得到更好的网络传输效率,并且节省磁盘存储空间。

索引

Kafka允许消费者从任意合法的位移拉取消息,也就是说如果消费者希望从位移为100的地方开始读取1MB的消息,broker需要在该分区的所有段中定位到该消息的位置然后开始读取数据。为了帮助broker迅速定位,Kafka对于每个分区都维护一个索引,该索引将位移映射到段以及段内偏移。

索引也是按照段来切割的,因此清理过期消息时相应的索引也可以很容易清理。另外,索引并没有维护校验和,因此如果索引损坏了,Kafka会重新读取段文件生成索引。

压缩(compaction)

正常情况下,Kafka存储一定数量的消息,并且如果消息超过一定时间,这些消息会被清除。此外,Kafka还支持压缩的消息保留策略,使用这种策略会使得对于主题内的每个键,Kafka只会保留最新的消息内容。显然,压缩的策略对同时包含键和消息内容的主题才生效,如果主题内的消息键为null,那么压缩的策略不会生效。

压缩是怎么实现的?

每个分区都可以分为两部分:

  • 干净(clean):这部分消息之前已经被压缩过,对于每个key来说这部分只存在一个value。
  • 脏(dirty):在上一次压缩后写入的新消息。

如下所示:

compaction

如果使用压缩,那么每个broker会启动一个压缩管理器线程和若干个压缩线程,每个线程负责一个分区。

在压缩分区时,压缩线程会首先读取脏的部分,并且建立一个key的哈希和位移的映射,对于相同的键,只保留最新的位移。其中key的哈希大小为16字节,位移大小为8个字节。也就是说,一个映射只有24字节,假设消息大小为1KB,那么1GB的段有1百万条消息,建立这个段的映射只需要24MB的内存,映射的内存效率是非常高效的。

在配置Kafka时,管理员需要设置这些压缩线程可以使用的总内存。如果设置1GB的总内存同时有5个压缩线程,那么每个线程只有200MB的内存可用。在压缩线程工作时,它不需要把所有脏的段文件都一起在内存中建立上述映射,但需要保证至少能够建立一个段的映射。如果不能同时处理所有脏的段,Kafka会一次压缩最老的几个脏段,然后在下一次再处理其他的脏段。

一旦建立完脏段的键与位移的映射后,压缩线程会从最老的干净的段开始处理。如果发现段中的消息的键没有在映射中出现,那么可以知道这个消息是最新的,然后简单的复制到一个新的干净的段中;否则如果消息的键在映射中出现,这条消息需要抛弃,因为对于这个键,已经有新的消息写入。处理完会将产生的新段替代原始段,并处理下一个段。

对于一个段,处理前和处理后的效果如下:

segment

删除事件

对于只保留最新消息的压缩策略来说,Kafka还支持删除相应键的消息操作(而不仅仅是保留最新的消息内容)。这是通过生产者发送一条特殊的消息来实现的,该消息包含一个键以及一个null的消息内容。当压缩线程发现这条消息时,它首先仍然进行一个正常的压缩并且保留这个包含null的特殊消息一段时间,在这段时间内消费者消费者可以获取到这条消息并且知道消息内容已经被删除。过了这段时间,压缩线程会删除这条消息,这个键会从分区中消失。这段时间是必须的,因为它可以使得消费者有一定的时间余地来收到这条消息。

什么时候压缩?

与过期清除的策略一样,压缩策略也不会对活跃段进行压缩。在0.10.0以及更老的版本,Kafka会在主题包含50%脏记录的时候进行压缩,目的是为了既不频繁压缩(影响性能),也不留存太多脏记录。