分布式-MQ-05Kafka特性详解&场景介绍

Kafka特性详解&场景介绍

image-20220611121058072

概念介绍

Kafka是最初由Linkedin公司开发,是一个分布式支持分区的(partition)、多副本的(replica),==基于zookeeper==协调的分布式消息系统(官方称之为commit log);它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系 统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写, Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

名词解释

名称 含义
Broker 消息中间件处理节点,一个Kafka节点就是 一个broker,一个或者多个Broker可以组 成一个Kafka集群
Topic Kafka根据topic对消息进行归类,发布到 Kafka集群的每条消息都需要指定一个topic
Producer 消息生产者,向Broker发送消息的客户端
Consumer 消息消费者,从Broker读取消息的客户端
ConsumerGroup 每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的 Consumer Group消费,但是一个 Consumer Group中只能有一个Consumer 能够消费该消息
Partition 物理上的概念,一个topic可以分为多个
LEO Log end offset Kafka 分区消息的终点,消息终止偏移,下一条消息的插入位置
HW High water mark 高水位 用于控制消息可见性,HW 以下的消息对外可见,HW 的位置可能对应一条消息,但是对外不可见不可以消费,HW 的最大值是 LEO,分区 HW 就是 leader 副本的 HW 值=整个ISR分区所有副本的的LEO
LW Low water mark,用于控制消息可见性,LW 及以上的消息对外可见
ISR In sync replica 指满足副本同步要求的副本集合,包括领导者副本

主题Topic和消息日志Log

让我们首先深入理解Kafka提出一个高层次的抽象概念-Topic。 可以理解Topic是一个类别的名称,同类消息发送到同一个Topic下面。对于每一个Topic,下面可以有多个分区

image-20220611140717313

Partition是一个有序的message序列,这些message按顺序添加到一个叫做commit log的文件中。每partition中的 消息都有一个唯一的编号,称之为offset,用来唯一标示某个分区中的message;每个partition,都对应一个commit log文件。一个partition中的message的offset都是唯一的,但是不同的 partition中的message的offset可能是相同的。

kafka集群,在配置的时间范围内,维护所有的由producer生成的消息,而不管这些消息有没有被消费。例如日志保留( log retention )时间被设置为2天。kafka会维护最近2天生产的所有消息,而2天前的消息会被丢弃。kafka的性能与保留的数据量的大小没有关系,因此保存大量的数据(日志信息)不会有什么影响。

每个consumer是基于自己在commit log中的消费进度(offset)来进行工作的。在kafka中,消费offset由consumer自己来维护;一般情况下我们按照顺序逐条消费commit log中的消息,当然我可以通过指定offset来重复消费某些消息或者跳过某些消息。 这意味kafka中的consumer对集群的影响是非常小的,添加一个或者减少一个consumer,对于集群或者其他consumer来说,都是没有影响的,因为每个consumer维护各自的offset。所以说kafka集群是无状态的,性能不会因为consumer数量受太多影响。kafka还将很多关键信息记录在zookeeper里,保证自己的无状态,从而在水平扩容时非常方便。

为什么要对Topic下数据进行分区存储?

1、commit log文件会受到所在机器的文件系统大小的限制,分区之后,理论上一个topic可以处理任意数量的数据。

2、为了提高并行度

分布式Distribution

log的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。kafka 集群支持配置一个partition备份的数量。 针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。

leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果。如果这个leader失效了,其中 的一个follower将会自动的变成新的leader

Producers

生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过round­robin做简单的 负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多

Consumers

传统的消息传递模式有2种:( queue) 和(publish-subscribe)

queue模式:多个consumer从服务器中读取数据,消息只会到达一个consumer。

publish-subscribe模式:消息会被广播给所有的consumer。

Kafka基于这2种模式提供了一种consumer的抽象概念:consumer group。

​ queue模式:所有的consumer都位于同一个consumer group 下。

​ publish-subscribe模式:所有的consumer都有着自己唯一的consumer group。

image-20220611142326746

上图说明:由2个broker组成的kafka集群,总共有4个partition(P0-P3)。这个集群由2个Consumer Group, A有2个 consumer instances ,B有四个。 通常一个topic会有几个consumer group,每个consumer group都是一个逻辑上的订阅者( logical subscriber )。每个consumer group由多个consumer instance组成,从而达到可扩展和容灾的功能。

消费顺序

Kafka比传统的消息系统有着更强的顺序保证。 ==一个partition同一个时刻在一个consumer group中只有一个consumer instance在消费,从而保证顺序==。 consumer group中的consumer instance的数量不能比一个Topic中的partition的数量多,否则,多出来的 consumer消费不到消息

严格有序

Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。如果有在总体上保证消费顺序的需求,那么我们可以通过将topic的partition数量设置为1,将consumer group中的consumer instance数量也设置为1。

Spring Boot整合Kafka

引入spring boot kafka依赖,详见项目实例:spring-boot-kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka-study</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
</project>

application.yml配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
server:
port: 8080

spring:
kafka:
bootstrap‐servers: localhost:9092,localhost:9093
producer: # 生产者
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch‐size: 16384
buffer‐memory: 33554432
# 指定消息key和消息体的编解码方式
key‐serializer: org.apache.kafka.common.serialization.StringSerializer
value‐serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group‐id: mygroup
enable‐auto‐commit: true

发送者代码:

1
2
3
4
5
6
7
8
9
10
@RestController
public class KafkaController {
@Resource
private KafkaTemplate<String,String> kafkaTemplate;

@RequestMapping("/send")
public void send() {
kafkaTemplate.send("mytopic", 0, "key", "this is a msg");
}
}

消费者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
public class MyConsumer {
/**
* @KafkaListener(groupId = "testGroup", topicPartitions = {
* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
* @TopicPartition(topic = "topic2", partitions = "0",
* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
* },concurrency = "6")
* //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
* @param record
*/
@KafkaListener(topics = "mytopic",groupId = "testGroup")
public void listen(ConsumerRecord<String, String> record) {
String value = record.value();
System.out.println(value);
System.out.println(record);
}
}

Kafka的使用场景

1、日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种 consumer,例如hadoop、Hbase、Solr等。

2、消息系统:解耦和生产者和消费者、缓存消息等。

3、用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这 些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。

4、运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反 馈,比如报警和报告

image-20220611135409534

普通ELK架构

img

flume替代logstash(Java等语言应用支持直接异步发送消息到kafka)

image-20220611134540976

Kafka最佳实践

Kafka可视化管理工具kafka-manager

image-20220612164527184

安装及基本使用可参考:https://www.cnblogs.com/dadonggg/p/8205302.html

线上broker机器配置规划

1、假设某电商公司的日志平台日均10亿条。

2、按照二八原则,80%的流量产生在20%时间内,所以8亿数据分布在4.8个小时里,假设4小时,QPS=6w

3、一般需要在QPS均值上放大5-10倍对应瞬间的流量高峰,也就是 6 * 5 = 30w/s的数据流量

4、topic一般设置副本数=2,也就是10 * 2 = 20亿数据,假设数据默认保留7天,则需要最大容量 20 * 7 =140亿数据

5、假设一条数据平均1kb的大小,140亿 * 1kb = 14T

6、根据第3、5点,规划5-6台物理机部署kafka比较稳妥,每台机器承载几万的QPS,且每台物理机分配3-4个T的机械硬盘(无需SSD,因为Kafka是顺序读写,不需要像Mysql那样随机读写)

7、高峰每秒6w数据/s = 每秒传输60M数据,峰值300M,综合成本考虑用千兆网卡

8、因为Kafka消息处理时用到的大部分是年轻代,且启动后处理线程较多,应该分配16核,考虑到峰值时1分钟产生18G,可以采用32G内存,老年代空间分配少些。

9、最终采用 6台 16核32G内存的物理机,每台机器配有3T机械硬盘、千兆网卡

JVM参数设置

kafka是scala语言开发,运行在JVM上,需要对JVM参数合理设置,参看JVM调优专题 修改bin/kafka-start-server.sh中的jvm设置

1
export KAFKA_HEAP_OPTS="‐Xmx16G ‐Xms16G ‐Xmn12G ‐XX:MetaspaceSize=256M ‐XX:+UseG1GC ‐XX:MaxGCPauseMillis=50"

使用G1垃圾收集器的原因:

1、这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长 时间

线上问题及优化

消息丢失情况

消息发送端:

(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消 息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。

(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消 息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。 (3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一 个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果 min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。

min.insync.replicas 该参数是 Broker 端或主题级别的配置,表示 允许处理 acks=all 请求的最低 ISR 副本数(默认值为 1),当新的publisher推新消息时,ISR副本数小于min.insync.replicas,则直接拒绝写入。

两者的协同工作流程

  1. 写入流程步骤

    步骤 1:生产者发送消息到 Leader 副本。

    步骤 2 Leader 检查当前 ISR 副本数是否 ≥ min.insync.replicas

    若否,直接拒绝写入,返回错误

    若是,继续等待所有 ISR 副本写入成功

    步骤 3,所有 ISR 副本写入成功后,Leader 返回成功响应给生产者

  2. 示例场景

ISR 副本数 min.insync.replicas 写入结果
3 2 需 3 个副本写入成功。
2 2 需 2 个副本写入成功。
1 2 拒绝写入(ISR 副本数 < min 值

消息消费端
如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据 丢失了,下次也消费不到了。

消息重复消费:

如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第 一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了 所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息从发送 端到消费端全链路有序。

消息积压

1)线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。 此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分 区),然后再启动多个消费者同时消费新主题的不同分区。

2)由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。 此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。

消息回溯

如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消 费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移 的消息开始消费,参见上节课的内容。

分区数越多吞吐量越高吗

网络上很多资料都说分区数越多吞吐量越高 , 但从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个 临界值,当超过 这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了。 当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。 注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台会报错”java.io.IOException : Too many open files”。 异常中最关键的信息是“ Too many open flies”,这是一种常见的 Linux 系统错误,通常意味着文件描述符不足,它一般发生在创建线 程、创建 Socket、打开文件这些场景下 。 在 Linux系统的默认设置下,这个文件描述符的个数不是很多 ,通过 ulimit -n 命令可以查 看:一般默认是1024,可以将该值增大,比如:ulimit -n 65535