可靠性是一个系统的属性,这种属性必须在系统设计之初就进行考虑。而Kafka支持不同程度的数据可靠性,这得取决于不同的使用场景。有的应用需要极强的数据可靠性,而有的应用则更倾向于性能和可扩展性…Kafka提供了非常灵活的配置和API来支持不同的用户场景。

也正是由于Kafka的灵活性,如果使用时不加以留意可能会导致问题。比如,你以为当前的系统是非常可靠的但实际却不然。下面我们来看下Kafka中的可靠性语义。

可靠性保证

当我们讨论可靠性的时候,我们总会提到保证这个词语。可靠性保证是基础,我们基于这些基础之上构建我们的应用。比如关系型数据库的可靠性保证是ACID,也就是原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。

Kafka中的可靠性保证有如下四点:

  • 对于一个分区来说,它的消息是有序的。如果一个生产者向一个分区先写入消息A,然后写入消息B,那么消费者会先读取消息A再读取消息B。
  • 当消息写入所有in-sync状态的副本后,消息才会认为已提交(committed)。这里的写入有可能只是写入到文件系统的缓存,不一定刷新到磁盘。生产者可以等待不同时机的确认,比如等待分区主副本写入即返回,后者等待所有in-sync状态副本写入才返回。
  • 一旦消息已提交,那么只要有一个副本存活,数据不会丢失。
  • 消费者只能读取到已提交的消息。

使用这些基础保证,我们构建一个可靠的系统,这时候需要考虑一个问题:究竟我们的应用需要多大程度的可靠性?可靠性不是无偿的,它与系统可用性、吞吐量、延迟和硬件价格息息相关,得此失彼。因此,我们往往需要做权衡,一味的追求可靠性并不实际。下面我们先来看下Kafka的复制机制是如何保证可靠性的,然后进一步讨论可靠性配置。

复制

复制机制是Kafka可靠性保证的核心,在这篇文章里面我们已经深入讨论过Kafka的可靠性机制了,这里再总结下。

每一个Kafka的主题都会分成几个分区,每个分区都单独存放在磁盘上。Kafka保证分区内的消息顺序。每个分区可以有若干个相同的副本,其中一个为主副本。生产者发送消息到主副本,消费者从主副本中读取消息。其他副本需要保持与主副本同步,如果主副本不可用,那么这些同步中的副本其中之一会成为新的主副本。

一个副本被认为是in-sync(也就是及时同步)状态,当且仅当它为主副本,或者是保持如下行为的其他副本:

  • 与Zookeeper的会话仍然活跃,即心跳间隔不超过6秒(时间可以配置)。
  • 向主副本发起拉取数据请求间隔不超过10秒(时间可以配置)。
  • 向主副本拉取的数据应当是10秒内产生的消息,也就是说不能拉取太老的消息。

如果一个副本不满足以上要求,那么它会变成out-of-sync状态直到它重新满足以上要求。

稍微有落后但仍然是in-sync状态的副本会拖慢生产者和消费者,因为它们需要等待所有in-sync状态的副本写入该消息。一旦某个副本变成out-of-sync状态,我们不需要等待它写入消息,这时候该副本不会造成任何性能影响,但同时由于in-sync状态副本减少,系统的冗余度也减少,数据丢失的可能性增大。

下面来看下实际中的具体应用及配置。

broker配置

Kafka中有三个broker的参数会影响数据可靠性。但就像其他的参数一样,这些参数可以在broker中设置(影响所有的主题),也可以在主题中设置(只影响该主题)。下面来看下这些参数。

复制因子(Replication Factor)

主题的复制因子配置参数为replication.factor;而对于自动创建的主题来说,broker中可以配置default.replication.factor来控制复制因子。

如果设置复制因子为N,那么我们能够容忍N-1个副本发生故障。因此更高的复制因子意味着更高的可用性以及数据可靠性;但另一方面,N个副本意味着需要存储N份相同的数据,需要N倍的存储空间。因此我们需要基于业务特点来决定复制因子。如果单个broker重启而导致主题不可用是可以接受的,那么复制因子为1就行。如果设置为2,那么我们可以容忍一个副本发生故障,但在此情况下系统只有1个副本运行,有丢失数据的风险。基于以上考虑,建议设置复制因子为3,对于大部分应用来说这种级别的容灾已经足够了,但特殊应用可能需要更高的冗余度,比如银行的关键数据存储会使用5个副本以防万一。

副本的位置也很重要。默认情况下,Kafka会将一个分区的不同副本放在不同的broker。但这仍然不够,如果所有的副本都在同一个机架上,那么无论复制因子为多少,该机架断电就会损失所有的数据。因此为了预防机架故障,在配置broker时建议配置broker.rack参数来指明机架信息,Kafka会利用此信息来将不同副本均衡到多个机架上。

有损主副本选举

unclean.leader.election.enable这个参数只能在broker级别设置,默认为true。这个参数有什么用?如前所述,如果分区的主副本不可用,那么in-sync副本中会选举出新的主副本,这样的选举是无损(clean)的,因为主副本切换没有导致数据丢失。但如果除了主副本外,没有in-sync副本存在呢?这种情况可能由于如下两种场景导致:

  • 分区有多个副本,除了主副本外其他副本均发生故障,但我们仍然往主副本中写入消息,这会导致其他副本同步落后。如果主副本不幸也发生故障,而在故障恢复中其他副本先恢复,那么现在集群中只有落后的副本可用。
  • 分区有多个副本,但由于网络原因,其他副本落后于主副本变成out-of-sync状态。假如主副本发生故障,集群中同样只有落后的副本可用。

对于这样的情况,我们需要从可用性与一致性之间作出选择:

  • 如果我们不允许落后副本成为新的主副本,那么集群是不可用的直到我们启动老的主副本。在实际情况中,这可能不能快速恢复,而且也不一定能恢复。
  • 如果我们允许落后副本成为新的主副本,那么丢失了部分已经提交的数据,而且导致消费者看到不一致的状态。

设置unclean.leader.election.enable为true,会允许落后副本成为新的主副本,这也是Kafka的默认设置。对于数据可靠性以及一致性非常重要的场景,用户可以设置为false。

最少in-sync副本数

对于最少in-sync副本数,主题与broker维度的参数都为min.insync.replicas。

上面可以看到,即便我们有多个副本,但最后可能只有一个副本(也就是主副本)是in-sync状态的。而且诡异的是,即便我们设置acks=all,也只有成功写入到一个副本中;也就是说,这里的all指的是所有in-sync状态副本,而不是所有副本。如果主副本故障,那么就面临上面说的问题,需要在一致性和可用性中做艰难取舍。

为了避免陷入这种情况,对于3个副本的分区来说,我们可以设置min.insync.replicas至少为2,这样可以保证数据至少写入了2份。如果系统中in-sync状态副本只剩下主副本,由于不满足min.insync.replicas,这时候生产者生产消息会失败,但消费者仍然可以读取之前的消息。这保证了一致性,牺牲了部分的可用性(但没有完全不可用)。

可靠的生产者

即使我们使用最可靠的方式来配置broker以保证数据不丢失,但如果使用生产者的方式不对,那么仍然可能会造成丢失。

下面从两个场景来说明这个问题:

  • 我们配置broker使用3个副本来进行冗余,并且禁止了有损主副本选举,这保证了一旦消息提交成功,那么消息将不会丢失。但是,假如我们配置生产者使用acks=1的确认方式,那么消息发送到主副本并且写入成功后,主副本返回结果,这时候生产者认为消息已经提交成功了。如果这时候主副本返回结果后就出现故障,并且没有把该消息同步到in-sync副本(in-sync副本允许有短暂的消息落后),那么in-sync副本会选举出新的主副本,而新的主副本并没有包含该消息。这样便出现生产者“认为”消息提交成功,但消息丢失的情况。
  • 我们配置broker使用3个副本来进行冗余,并且禁止了有损主副本选举,而且生产者使用acks=all的配置。假如在写入消息时,主副本发生故障(并进行新主副本选举),那么broker会返回“主副本不可用”的异常。如果我们程序没有处理这个错误并进行重试,那么消息将会丢失。这样的数据丢失并不是由于broker本身可靠性导致的,因为broker就没有成功收到这个消息。

因此对于可靠性非常重要的系统来说,在使用生产者写入消息时需要注意如下两点:

  • 使用正确的acks参数来满足业务的可靠性需要;
  • 正确处理写入消息的异常信息。

下面来对不同的acks取值进行深入的讨论。

消息确认

生产者可以选择三种acks取值:

  • acks=0:这种方式意味着,一旦生产者将消息通过网络传输成功,那么就认为消息已经写入到Kafka。假如这时候分区并不可用或者正在进行主副本选举,那么消息将会丢失,但生产者并不知晓。因为生产者在传输成功就进行后续处理了,没有关注Kafka的处理结果。当然,如果是消息序列化失败或者网卡发生故障,生产者还是能够得知这些错误的,因为这些错误发生在传输过程中。但这种方式也是性能最高的方式,虽然可能会导致消息丢失。
  • acks=1:这种方式意味着,生产者传输消息到主副本,并等待主副本写入磁盘(注意,可能只是写入到文件系统缓存,不一定刷新到磁盘),如果写入完成则返回成功,否则返回失败。如果发送消息时分区正在进行主副本选举,那么生产者会收到主副本不可用异常,在收到此异常信息后,生产者可以进行重试来避免消息丢失。但在主副本发生故障时,那些没有及时同步到其他副本的消息会丢失。
  • acks=all:这种方式意味着,生产者传输消息到主副本,并且在所有in-sync副本写入成功后主副本返回确认信息,否则返回错误。这个参数同时配合broker的min.insync.replica参数使用,能够实现“一旦返回成功,意味着最少写入N个副本”的语义,其中N为min.insync.replica的取值。如果生产者接收到错误,那么需要进行重试以防止消息丢失。这种方式也是最慢的处理方式,因为生产者需要等待所有in-sync副本写入成功。

下面对生产者遇到错误后的重试策略进行深入讨论。

配置生产者的重试机制

在使用生产者发送消息的时候,我们将会两类错误:

  • 可恢复错误:对于这种错误,生产者内部会进行重试。比如说broker返回主副本不可用异常,当生产者收到此异常后会进行重试。
  • 不可恢复错误:对于这种错误,即便生产者进行重试也不会成功,因此需要应用本身进行处理。比如说broker返回配置非法异常,当生产者收到此异常后进行重试也于事无补,需要应用自身进行处理。

如果你希望一条消息也不丢失,那么对于可恢复异常来说,可能会希望生产者一直重试直到成功。如果是因为主副本选举或者网络抖动而导致的异常,那么这种策略没什么问题。但如果网络一时半会恢复不了,我们也可以放弃重试并记录异常(比如记录日志、持久化到数据库等等),后续再进行处理。这取决于应用本身。注意到Kafka的跨机房复制工具(MirrorMaker)默认采取无限重试的策略,这是因为作为高可靠性的复制工具来说,它不应该丢失任何一条消息。

另外,生产者重试也可能会导致消息重复。假如消息发送到broker并且所有in-sync副本都写入成功,但在返回结果时网络发生故障,这时候生产者由于没收到回复认为消息没有发送成功,然后进行重试,这样便导致消息重复。异常处理和重试能够保证消息“最少一次”的语义,但无法保证“有且仅有一次”的语义(至少0.10.0版本Kafka如此)。应用本身如果需要实现“有且仅有一次”的语义,可以在消息中加入全局唯一标识符,这样在消费消息时可以进行去重。或者,应用生产幂等的消息,也就是说发送重复的消息没有影响,比如说“账户余额为110元”是幂等消息(因为发送多次也不会对账户造成影响),而“账户增加110元”则不是幂等消息(因为发送多次的结果并不一样)。

其他的错误处理

生产者的内部重试机制已经能解决大部分问题,但还有一些错误是需要应用进行处理的:

  • 不可恢复异常(比如消息大小非法、权限认证失败);
  • 消息发送到broker之前发生的异常(比如序列化异常);
  • 生产者达到重试上限的异常,或者由于消息重试导致消息堆积最终内存溢出的异常。

对于这些异常,我们可以记录日志,持久化到数据库或者简单的抛弃。但如果说我们的处理方式仍然为不断重试,那么建议把这样的重试策略下沉到生产者内部重试机制。

可靠的消费者

上面讨论了如何使得生产者更可靠,现在来看下消息消费端的可靠处理方式。之前说到,当消息变成已提交状态(也就是写入到所有in-sync副本)后,它才能被消费端读取。这保证了消费者读取到的数据始终是一致的,为了达到高可靠,消费者需要保证在消费消息时不丢失数据。

在处理分区消息时,消费者一般的处理流程为:拉取批量消息,处理完成后提交位移,然后再拉取下一批消息。提交位移保证了当前消费者发生故障或重启时,其他消费者可以接着上一次的消息位移来进行处理。需要提醒的是,消费端丢失消息的一个主要原因为:消费者拉取消息后还没处理完就提交位移,一旦在消息处理过程中发生故障,新的消费者会从已提交的位移接着处理,导致发生故障时的消息丢失。

下面来看下消费端处理流程中的一些需要注意的细节。

重要的可靠性配置

如果希望设计一个高可靠的消费者,那么消费者中有4个重要的属性需要慎重考虑。

第一个属性是group.id,这个属性在这篇文章中讨论过,大概的作用是:如果有多个消费者拥有相同的group.id并且订阅相同的主题,那么每个消费者会负责消费一部分的消息。如果消费组内存在多个消费者,那么一个消费者发生故障那么其他消费者可以接替其工作,保证高可用。

第二个属性是auto.offset.reset,这个属性在如下场景中起作用:当消费者读取消息,Kafka中没有提交的位移(比如消费者所属的消费组第一次启动)或者希望读取的位移不合法(比如消费组曾经长时间下线导致位移落后)时,消费者如何处理?当设置为earliest,消费者会从分区的起始端开始读取,这可能会导致消费者重复处理消息,但也将消息丢失可能性降低到最小;当设置为latest,消费者会从分区末端开始读取,这会导致消息丢失可能性加大,但会降低消息重复处理的概率。

第三个属性是enable.auto.commit,这个属性需要慎重考虑,那就是:你希望消费者定期自动提交位移,还是应用手动提交位移?自动提交位移可以让应用在处理消息时不用实现提交位移的逻辑,并且如果我们是在poll循环中使用相同的线程处理消息(poll循环详见这篇文章),那么自动提交位移可以保证在消息处理完成后才提交位移。如果我们在poll循环中使用另外的线程处理消息,那么自动提交位移可能会导致提交还没完成处理的消息位移。

第四个属性是auto.commit.interval.ms,它与第三个属性有关。如果选择了自动提交位移,那么这个属性控制提交位移的时间间隔。默认值是5秒,通常来说降低间隔可以降低消息重复处理的可能性。

手动提交位移

如果我们选择手动提交位移,下面来根据不同场景来讨论如何实现更可靠的消费者。

处理完消息后立即提交

如果在poll循环中进行消息处理,并且处理完后提交位移,那么提交位移的实现方式非常简单。对于这种场景,可以考虑使用自动提交而不是手动提交。

在处理消息过程中多次提交

消费者拉取批量消息后处理消息时,在处理过程中可以使用手动提交位移方式来多次更新位移。这种方式可以使得消息重复处理可能性降低。不过在这个场景中,如果不加以注意,那么可能会提交上一次拉取的最大位移而不是当前已经处理的消息位移。这篇文章中已经讨论过相应的处理方法,这里不再赘述。

重平衡

在设计应用时,我们需要记得正确处理重平衡。当重平衡发生时,消费者当前处理的分区可能会被回收,我们需要记得在回收前提交位移。

消费者的重试

在某些场景下,消费者拉取消息后进行处理时会遇到一些问题,可能希望这些消息可以延迟处理。比如,对于从Kafka拉取消息然后持久化到数据库的应用来说,如果某个时刻数据库不可用,我们可能希望延后重试。延后重试的策略可以分成如下两大类:

第一种处理方式是,我们提交已经处理成功的位移,然后将处理失败的消息存储到一个缓冲区,并不断进行重试处理这些消息。另外,在处理这些消息时可能poll循环仍然在继续,我们可以使用pause()方法来使得poll不会返回新的数据,这样使得重试更容易。

第二种处理方式是,我们把处理失败的消息写入到另外的主题,然后继续处理当前的消息。对于失败消息的主题,我们既可以使用同一个消费组进行处理,也可以使用不同的消费组进行处理。这种主题类似于其他消息系统使用的死信队列(dead-letter-queue)。

持久化状态

在某些场景下,我们可能需要在拉取消息时维护状态。比如,对于计算滑动平均数(moving average),我们每次拉取新消息时需要更新相应的平均数。当消费者重启时,我们不但需要从上一次提交的位移开始消费,同时还需要从相应的滑动平均数中恢复。一种处理方式是,我们提交位移时将滑动平均数写入到一个用于保存结果的主题,这样应用重启时可以获取上一次的处理结果。但由于Kafka不支持多操作的事务性,因此这种方式并不严谨。我们当然可以自己加以处理,但这个问题解决起来比较复杂,建议可以使用Kafka Streams这样的开源库。

消息处理时间长

某些应用拉取消息回来后处理消息时间比较长(比如依赖于一个阻塞服务或者进行复杂的计算),而某些版本的消费者如果长时间不poll消息会导致会话超时,因此使用这些版本的应用需要不断的拉取消息来发送心跳包到broker。一种常见的处理方式是,我们使用多线程来处理消息,然后当前线程调用pause()来使得既可以调用poll()而且消费者不会拉取新的消息;当消息处理完成后,再调用resume()来使得消费者恢复正常拉取逻辑。

有且仅有一次的语义

Kafka不支持有且仅有一次的语义,但可以支持至少一次的语义。因此对于需要实现有且仅有一次语义的应用来说,我们需要自己额外处理。

一种常见的处理方式为,我们使用支持唯一键的外部系统(比如关系型数据库、Elasticsearch等)来进行结果去重。我们可以自己实现唯一键并且在消息中加入此属性,也可以根据消息的主题、分区以及位移信息来生成唯一键。另外,如果该外部系统支持事务,那么我们可以在一个事务中同时保存消息处理结果和位移。消费者重启时可以从该系统中获取位移,并且使用seek()方法来开始从相应的位移开始消费。

验证系统的可靠性

当完成整个系统(生产者、broker、消费者)的高可靠设计后,我们下一步需要去验证设计和配置是否正确,验证工作可以分为三层:验证配置、验证应用和在线上监控应用。

验证Kafka配置

在验证配置时,建议将Kafka整体配置与业务逻辑分离,这样做有两大原因:

  • 这样可以验证配置是否满足需求。
  • 这样可以猜测Kafka系统行为并且进行验证。

Kafka中有两个工具来协助我们完成配置验证,那就是org.apache.kafka.tools包中的VerifiableProducer和VerifiableConsumer。使用VerifiableProducer时,我们可以像配置一般的生产者一样配置acks和retries,并且可以配置消息生产速率;当运行VerifiableProducer起来后,它会打印出消息写入的结果。而VerifiableConsumer则可以进行写入消息的检查,它按序打印出消费的消息以及提交、重平衡等信息。

在使用前,我们先定义好测试场景,比如:

  • 主副本选举:如果杀死主副本会怎么样?生产者和消费者多长时间才恢复正常?
  • 控制器选举:控制器重启后系统多长时间才恢复正常?
  • broker重启:逐一重启broker是否会导致消息丢失?
  • 有损主副本选举:如果杀死所有的分区副本然后启动一个out-of-sync状态的副本会怎么样?如果在此场景下希望系统恢复那么配置是否正确?行为是否符合预期?

在定义好测试场景后,我们便可以启动VerifiableProducer和VerifiableConsumer,并且验证相应的场景。

验证应用

在验证完生产者、broker和消费者配置后,我们需要验证整体的业务逻辑,主要包含异常处理、消息提交、重平衡监听器以及其他Kafka客户端交互逻辑。在验证业务逻辑上,Kafka无法提供更多的支持,建议应用考虑多种故障场景下的逻辑验证:

  • 客户端与broker断开连接;
  • 主副本选举;
  • broker重启;
  • 消费者重启;
  • 生产者重启。

对于每种场景,我们应该猜测预期的行为并进行结果验证。比如,在进行消费者重启的验证时,可能会预期消费者会短暂的暂停消费(由于重平衡),然后快速恢复,而且不会导致超过1000条记录重复处理。

线上监控可靠性

最后,我们需要实时监控线上系统的运行情况。

Kafka客户端(生产者/消费者)包含有监控状态和事件的JMX指标。对于生产者来说,最重要的两个指标为平均每个消息的错误率和重试率,这两个指标上升意味着系统出现了问题。另外,我们也需要关注生产者的日志,如果日志中出现“Got error produce response with correlation id 5689 on topic-partition [topic-1,3], retrying (two attempts left). Error: …”这样的异常信息并且日志中的重试次数为0,那么意味着该消息已经达到了重试次数的上限,基于此我们可能会希望提高重试次数的阈值。

对于客户端来说,最重要的指标为消息延迟(consumer lag)。这个指标反映了消费者当前拉取的消息与最新消息的差距,在理想情况下应该为0,但是因为我们拉取消息后需要进行处理,因此消息延迟不大也是正常的。由于消息延迟的上下波动是正常的,因此对于这个指标设置报警可能比较复杂,我们可以使用LinkedIn开源的Burrow来减轻开发量。

此外,为了跟踪消息的整体生产及处理流程,Kafka从0.10.0版本开始加入了消息生产的时间戳。为了验证消息在特定时间内被消费,我们需要在生产端和消费端同时记录消息数,然后消费端根据生产的时间戳来计算整体消息的延迟时间。这些数据需要有一个实时系统负责收集和监控,这样的系统实现起来会比较复杂,目前除了Confluent公司的商业化产品Confluent Control Center外并没有开源的实现。