Kafka特性详解&场景介绍

概念介绍
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),==基于zookeeper==协调的分布式消息系统(官方称之为commit log);它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系 统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写, Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
名词解释
| 名称 | 含义 |
|---|---|
| Broker | 消息中间件处理节点,一个Kafka节点就是 一个broker,一个或者多个Broker可以组 成一个Kafka集群 |
| Topic | Kafka根据topic对消息进行归类,发布到 Kafka集群的每条消息都需要指定一个topic |
| Producer | 消息生产者,向Broker发送消息的客户端 |
| Consumer | 消息消费者,从Broker读取消息的客户端 |
| ConsumerGroup | 每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的 Consumer Group消费,但是一个 Consumer Group中只能有一个Consumer 能够消费该消息 |
| Partition | 物理上的概念,一个topic可以分为多个 |
| LEO | Log end offset Kafka 分区消息的终点,消息终止偏移,下一条消息的插入位置 |
| HW | High water mark 高水位 用于控制消息可见性,HW 以下的消息对外可见,HW 的位置可能对应一条消息,但是对外不可见不可以消费,HW 的最大值是 LEO,分区 HW 就是 leader 副本的 HW 值=整个ISR分区所有副本的的LEO |
| LW | Low water mark,用于控制消息可见性,LW 及以上的消息对外可见 |
| ISR | In sync replica 指满足副本同步要求的副本集合,包括领导者副本 |
主题Topic和消息日志Log
让我们首先深入理解Kafka提出一个高层次的抽象概念-Topic。 可以理解Topic是一个类别的名称,同类消息发送到同一个Topic下面。对于每一个Topic,下面可以有多个分区

Partition是一个有序的message序列,这些message按顺序添加到一个叫做commit log的文件中。每partition中的 消息都有一个唯一的编号,称之为offset,用来唯一标示某个分区中的message;每个partition,都对应一个commit log文件。一个partition中的message的offset都是唯一的,但是不同的 partition中的message的offset可能是相同的。
kafka集群,在配置的时间范围内,维护所有的由producer生成的消息,而不管这些消息有没有被消费。例如日志保留( log retention )时间被设置为2天。kafka会维护最近2天生产的所有消息,而2天前的消息会被丢弃。kafka的性能与保留的数据量的大小没有关系,因此保存大量的数据(日志信息)不会有什么影响。
每个consumer是基于自己在commit log中的消费进度(offset)来进行工作的。在kafka中,消费offset由consumer自己来维护;一般情况下我们按照顺序逐条消费commit log中的消息,当然我可以通过指定offset来重复消费某些消息或者跳过某些消息。 这意味kafka中的consumer对集群的影响是非常小的,添加一个或者减少一个consumer,对于集群或者其他consumer来说,都是没有影响的,因为每个consumer维护各自的offset。所以说kafka集群是无状态的,性能不会因为consumer数量受太多影响。kafka还将很多关键信息记录在zookeeper里,保证自己的无状态,从而在水平扩容时非常方便。
为什么要对Topic下数据进行分区存储?
1、commit log文件会受到所在机器的文件系统大小的限制,分区之后,理论上一个topic可以处理任意数量的数据。
2、为了提高并行度。
分布式Distribution
log的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。kafka 集群支持配置一个partition备份的数量。 针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。
leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果。如果这个leader失效了,其中 的一个follower将会自动的变成新的leader
Producers
生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过roundrobin做简单的 负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多
Consumers
传统的消息传递模式有2种:( queue) 和(publish-subscribe)
queue模式:多个consumer从服务器中读取数据,消息只会到达一个consumer。
publish-subscribe模式:消息会被广播给所有的consumer。
Kafka基于这2种模式提供了一种consumer的抽象概念:consumer group。
queue模式:所有的consumer都位于同一个consumer group 下。
publish-subscribe模式:所有的consumer都有着自己唯一的consumer group。

上图说明:由2个broker组成的kafka集群,总共有4个partition(P0-P3)。这个集群由2个Consumer Group, A有2个 consumer instances ,B有四个。 通常一个topic会有几个consumer group,每个consumer group都是一个逻辑上的订阅者( logical subscriber )。每个consumer group由多个consumer instance组成,从而达到可扩展和容灾的功能。
消费顺序
Kafka比传统的消息系统有着更强的顺序保证。 ==一个partition同一个时刻在一个consumer group中只有一个consumer instance在消费,从而保证顺序==。 consumer group中的consumer instance的数量不能比一个Topic中的partition的数量多,否则,多出来的 consumer消费不到消息
严格有序
Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。如果有在总体上保证消费顺序的需求,那么我们可以通过将topic的partition数量设置为1,将consumer group中的consumer instance数量也设置为1。
Spring Boot整合Kafka
引入spring boot kafka依赖,详见项目实例:spring-boot-kafka
1 | |
application.yml配置如下:
1 | |
发送者代码:
1 | |
消费者代码:
1 | |
Kafka的使用场景
1、日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种 consumer,例如hadoop、Hbase、Solr等。
2、消息系统:解耦和生产者和消费者、缓存消息等。
3、用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这 些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。
4、运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反 馈,比如报警和报告

普通ELK架构

flume替代logstash(Java等语言应用支持直接异步发送消息到kafka)

Kafka最佳实践
Kafka可视化管理工具kafka-manager

安装及基本使用可参考:https://www.cnblogs.com/dadonggg/p/8205302.html
线上broker机器配置规划
1、假设某电商公司的日志平台日均10亿条。
2、按照二八原则,80%的流量产生在20%时间内,所以8亿数据分布在4.8个小时里,假设4小时,QPS=6w
3、一般需要在QPS均值上放大5-10倍对应瞬间的流量高峰,也就是 6 * 5 = 30w/s的数据流量
4、topic一般设置副本数=2,也就是10 * 2 = 20亿数据,假设数据默认保留7天,则需要最大容量 20 * 7 =140亿数据
5、假设一条数据平均1kb的大小,140亿 * 1kb = 14T
6、根据第3、5点,规划5-6台物理机部署kafka比较稳妥,每台机器承载几万的QPS,且每台物理机分配3-4个T的机械硬盘(无需SSD,因为Kafka是顺序读写,不需要像Mysql那样随机读写)
7、高峰每秒6w数据/s = 每秒传输60M数据,峰值300M,综合成本考虑用千兆网卡
8、因为Kafka消息处理时用到的大部分是年轻代,且启动后处理线程较多,应该分配16核,考虑到峰值时1分钟产生18G,可以采用32G内存,老年代空间分配少些。
9、最终采用 6台 16核32G内存的物理机,每台机器配有3T机械硬盘、千兆网卡
JVM参数设置
kafka是scala语言开发,运行在JVM上,需要对JVM参数合理设置,参看JVM调优专题 修改bin/kafka-start-server.sh中的jvm设置
1 | |
使用G1垃圾收集器的原因:
1、这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长 时间
线上问题及优化
消息丢失情况
消息发送端:
(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消 息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。
(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消 息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。 (3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一 个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果 min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。
min.insync.replicas 该参数是 Broker 端或主题级别的配置,表示 允许处理 acks=all 请求的最低 ISR 副本数(默认值为 1),当新的publisher推新消息时,ISR副本数小于min.insync.replicas,则直接拒绝写入。
两者的协同工作流程
写入流程步骤
步骤 1:生产者发送消息到 Leader 副本。
步骤 2 Leader 检查当前 ISR 副本数是否 ≥ min.insync.replicas
若否,直接拒绝写入,返回错误
若是,继续等待所有 ISR 副本写入成功
步骤 3,所有 ISR 副本写入成功后,Leader 返回成功响应给生产者
示例场景
| ISR 副本数 | min.insync.replicas |
写入结果 |
|---|---|---|
| 3 | 2 | 需 3 个副本写入成功。 |
| 2 | 2 | 需 2 个副本写入成功。 |
| 1 | 2 | 拒绝写入(ISR 副本数 < min 值 |
消息消费端:
如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据 丢失了,下次也消费不到了。
消息重复消费:
如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第 一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了 所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息从发送 端到消费端全链路有序。
消息积压
1)线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。 此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分 区),然后再启动多个消费者同时消费新主题的不同分区。
2)由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。 此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。
消息回溯
如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消 费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移 的消息开始消费,参见上节课的内容。
分区数越多吞吐量越高吗
网络上很多资料都说分区数越多吞吐量越高 , 但从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个 临界值,当超过 这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了。 当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。 注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台会报错”java.io.IOException : Too many open files”。 异常中最关键的信息是“ Too many open flies”,这是一种常见的 Linux 系统错误,通常意味着文件描述符不足,它一般发生在创建线 程、创建 Socket、打开文件这些场景下 。 在 Linux系统的默认设置下,这个文件描述符的个数不是很多 ,通过 ulimit -n 命令可以查 看:一般默认是1024,可以将该值增大,比如:ulimit -n 65535