文章

Kafka原理解析与实战

Kafka

Kafka原理解析与实战

Kafka

一、为什么使用消息队列

消息队列解决具体的是什么问题 -> 通信问题,所以消息队列是一种通信方案。

1.使用同步的通信方式来解决多个服务之间的通信

同步通信的问题 同步通信流程

同步的通信方式会存在性能和稳定性的问题。

2.使用异步的通信方式

异步通信的优势与问题 异步通信流程

  • 下一个订单后直接订单创建成功,将有具体业务信息的请求放置于消息队列中,例如队列1、队列2、队列3。
  • 下游的消费者会订阅队列,例如数据库里创建一条订单订阅队列1,给用户加积分订阅队列3。
  • 消费成功后,有具体业务信息的请求会到消费者内消费

针对于同步的通信方式来说,异步的方式,可以让上游快速成功,极大提高了系统的吞吐量。而且在分布式系统中,通过下游多个服务的分布式事务的保障,也能保障业务执行之后的最终一致性。

二、消息队列的流派

1. 什么是 MQ

Message Queue(MQ),消息队列中间件。很多人都说:MQ 通过将消息的发送和接收分离来实现应用程序的异步和解偶,这个给人的直觉是 —- MQ 是异步的,用来解耦的,但是这个只是 MQ 的效果而不是目的。MQ 真正的目的是为了通讯,屏蔽底层复杂的通讯协议,定义了一套应用层的、更加简单的通讯协议。一个分布式系统中两个模块之间通讯要么是HTTP,要么是自己开发的(rpc) TCP,但是这两种协议其实都是原始的协议。HTTP协议很难实现两端通讯——模块 A可以调用B, B 也可以主动调用A,如果要做到这个两端都要背上 WebServer,而且还不支持长连接(HTTP 2.0 的库根本找不到)。TCP 就更加原始了,粘包、心跳、私有的协议,想一想头皮就发麻。MQ 所要做的就是在这些协议之上构建一个简单的“协议”——生产者/消费者模型。MQ 带给我的“协议”不是具体的通讯协议,而是更高层次通讯模型。它定义了两个对象——发送数据的叫生产者;接收数据的叫消费者, 提供一个SDK 让我们可以定义自己的生产者和消费者实现消息通讯而无视底层通讯协议

2. 有broker(中转站)

这个流派通常有一台服务器作为 Broker,所有的消息都通过它中转。生产者把消息发送给它就结束自己的任务了,Broker 则把消息主动推送给消费者(或者消费者主动轮询)

重Topic就是在broker转发的时候以topic作为依据,当我消费者1订阅了主题1,消费者2定义了主题2,那消费者2就收不到主题1的内容了。轻topic可以用topic也可以不用。

  • 重Topic : kafka、RocketMQ(阿里)、ActiveMQ

kafka, JMS (ActiveMQ)就属于这个流派,生产者会发送key和数据到Broker,由Broker比较key之后决定给哪个消费者。这种模式是我们最常见的模式,是我们对 MQ 最多的印象。在这种模式下一个topic往往是一个比较大的概念,甚至一个系统中就可能只有一个topic,topic 某种意义上就是 queue,生产者发送 key 相当于说:“hi,把数据放到 key 的队列中”。

如上图所示, Broker定义了三个队列, key1, key2, key3,生产者发送数据的时候会发送key1和data, Broker在推送数据的时候则推送data (也可能把 key带上)。

虽然架构一样但是kafka的性能要比jms的性能不知道高到多少倍,所以基本这种类型的MQ只有kafka一种备选方案。如果你需要一条暴力的数据流(在乎性能而非灵活性)那么 kafka 是最好的选择。

整个broker,依据topic来进行消息的中转。在重topic的消息队列里必然需要topic的存在

  • 轻Topic (RabbitMQ)

topic只是一种中转模式。

3. 无broker(socket通信)

无 Broker 的 MQ 的代表是 ZeroMQ。该作者非常睿智,他非常敏锐的意识到——MQ 是更高级的 Socket,它是解决通讯问题的。所以ZeroMQ 被设计成了一个“库”而不是一个中间件,这种实现也可以达到——没有 Broker 的目的。

节点之间通讯的消息都是发送到彼此的队列中,每个节点都既是生产者又是消费者。ZeroMQ 做的事情就是封装出一套类似于 Socket 的API可以完成发送数据,读取数据

ZeroMQ其实就是一个跨语言的、重量级的Actor 模型邮箱库。你可以把自己的程序想象成一个Actor, ZeroMQ就是提供邮箱功能的库; ZeroMQ可以实现同一台机器的RPC通讯也可以实现不同机器的TCP、UDP通讯,如果你需要一个强大的、灵活、野蛮的通讯能力,别犹豫 ZeroMQ。

4. 各种MQ间的区别

  • rabbitMQ: 内部的可玩性(功能性)是非常强的
  • rocketMQ:阿里内部一个大神,根据kafka的内部执行原理,手写的一个消息队列中间件。性能是与Kafka相比肩,除此之外,在功能上封装了更多的功能。
  • kafka:全球消息处理性能最快的一款MQ
  • zeroMQ

三、Kafka的基本知识

  • Kafka介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica) ,基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎, web/nginx日志、访问日志,消息服务等等,用scala语言编写, Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

  • Kafka的使用场景
    • 日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
    • 消息系统:解耦和生产者和消费者、缓存消息等。
    • 用户活动跟踪: Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
    • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

1.Kafka的安装

  • 部署一台zookeeper服务器
  • 安装jdk
  • 下载kafka的安装包
  • 上传到kafka服务器上
  • 解压缩压缩包
  • 进入到config目录内,修改server.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    
    #broker.id属性在kafka集群中必须要是唯一
    broker.id=0
    
    #kafka部署的机器ip和提供服务的端口号
    listeners=PLAINTEXT://192.168.65.60:9092
    
    #kafka的消息存储文件
    log.dir=/usr/local/data/kafka-logs
    
    #kafka连接zookeeper的地址
    zookeeper.connect=192.168.65.60:2181
    
  • 进入到bin目录内,执行以下命令来启动kafka服务器(带着配置文件)

    1
    
    ./kafka-server-start.sh -daemon ../config/server.properties
    
  • 校验kafka是否启动成功 : 进入到zk客户端查看是否有kafka的节点/brokers/ids/0

以下是具体操作流程。

a) 下载与配置

官网下载2.4.1版本,二进制版本Scala 2.11,打开终端,进入超级用户模式sudo -s,在路径/usr/local/新建kafka文件mkdir kafka,解压缩tar -zxvf kafka_2.11-2.4.1.tgz,删除压缩包rm -rf kafka_2.11-2.4.1.tgz,配置服务器文件vim server.properties,使用ifconfig en0查看主机地址进行配置,启动zk服务器./zkServer.sh start ../conf/zoo.cfg

项目结构

  • bin : 可执行命令。重点关注kafka-server-start.sh。
  • config : 配置。重点关注server.properties。
关键配置解释样例
broker.id服务器id,必须唯一,需要布置集群的时候每个id是不一样的broker.id=0
listeners连接到kafka的地址,需要放开,9092是kafka默认的对外提供的端口listeners = PLAINTEXT://主机地址:9092
log.dir日志存放地址,消息存储文件,默认保存7天log.dirs=/usr/local/kafka/data/kafka-logs
zookeeper.connectkafka连接到zk的地址,需先启动zk服务器zookeeper.connect=zk主机地址:2181,查看../conf/zoo.cfg文件中的clientPort的端口号是否一致,一般zk默认是2181

为什么要使用zk?

因为broker0,broker1,broker2是kafka的集群,如果需要做到集群无状态,则需要zk,将实际数据保存到zk上。

其余配置解释样例
log.retention.hours每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样168
num.partitions创建topic的默认分区数1
default.replication.factor自动创建topic的默认副本数量,建议设置为大于等于21
min.insync.replicas当producer设置acks为—1时, min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常1
delete.topic.enable是否允许删除主题false

b) 启动kafka服务器

进入/usr/local/kafka/kafka-2.4.1-src/bin,启动./kafka-server-start.sh -daemon ../config/server.properties,检查是否启动成功,查看当前服务器进程是否关联到server.properties配置文件的ps aux | grep server.properties(-aux废弃,aux仍可使用,具体查看man ps),显示一大堆信息,启动成功,也可以去zk客户端查看,进入zk客户端/usr/local/zookeeper/apache-zookeeper-3.7.2-bin/bin,启动zk客户端./zkCli.sh,查看内部数据ls /,可以看到多了很多的zk节点,brokers、controller、cluster等等。重点看brokersls /brokers,其子节点是[ids, seqid, topics],再往下看ls /brokers/ids,可以看到[0],说明kafka节点为0的服务器上线了。

2.kafka中的一些基本概念

kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。可以这样来说,Kafka借鉴了JMS规范的思想,但是确并没有完全遵循JMS规范

首先,让我们来看一下基础的消息(Message)相关术语:

名称解释
Broker消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
TopicKafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
Producer消息生产者,向Broker发送消息的客户端
Consumer消息消费者,从Broker读取消息的客户端
ConsumerGroup每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息
Partition物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的

基础消息 前4个术语的流程

3.创建topic

topic是什么概念? topic可以实现消息的分类,不同消费者订阅不同的topic。

通过kafka命令向zk中创建一个名为“test”的topic,这个topic只有一个partition,并且备份因子(副本)也设置为1:

1
2
3
# localhost : zk服务器ip
>>> ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.

查看当前kafka内有哪些topic :

1
2
3
# localhost : zk服务器ip
>>> ./kafka-topics.sh --list --zookeeper localhost:2181
test

4.发送消息

kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic

把消息发送给broker中的某个topic,打开kafka的客户端,并开始向kafka服务器发送消息 :

1
2
3
4
# localhost : kafka服务器ip
>>> ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 输入一条消息abc
> abc

5.消费消息

对于consumer, kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。使用kafka的消费者消息的客户端,从指定kafka服务器的指定topic中消费消息。可创建多个终端会话方便查看。

打开一个消费消息的客户端,向kafka服务器的某个主题消费消息 :

方式一(最新消息):从最后一条消息的偏移量+1开始消费,执行命令后,可在发送消息的会话窗口输入信息,则可在消费消息的会话窗口看到。

1
2
# localhost : kafka服务器ip
>>> ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

方式二(所有消息):从头开始消费,只多了一个from beginning指令。

1
2
# localhost : kafka服务器ip
>>> ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test

几个注意点:

  • 消息会被存储
  • 消息是顺序存储
  • 消息是有偏移量的
  • 消费时可以指明偏移量进行消费

6.关于消息的细节

  • 生产者将消息发送给broker, broker会将消息保存在本地的日志文件中/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log
  • 消息的保存是有序的,通过offset偏移量来描述消息的有序性
  • 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置

其他两个文件 :

  • 稀疏索引 : /usr/local/kafka/data/kafka-logs/主题-分区/00000000.index
  • 时间索引 : /usr/local/kafka/data/kafka-logs/主题-分区/00000000.timeindex24小时内的索引具体是在.log文件内的哪一条

7.单播消息 (同一组只有一个)

如果多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的topic中的消息。换言之,同一个消费组中只能有一个消费者收到一个topic中的消息。

配置消费组 : –consumer-property group.id=testGroup

1
2
# localhost : kafka服务器ip
>>> ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test

8.多播消息 (不同组中只有一个)

不同的消费组订阅同一个topic,那么不同的消费组中只有一个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同一个消息。

1
2
3
# localhost : kafka服务器ip
>>> ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic test
>>> ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2 --topic test

单播和多播 单播和多播的区别

9. 查看消费组的详细信息

1
2
3
4
5
# 查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
GROUPTOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-ID
testGrouptest0990---

重点关注以下几个信息:

  • Current—offset: 最后被消费的消息的偏移量(当前偏移量)
  • Log—end—offset: 消息总量(最后一条消息的偏移量)
  • Lag:积压了多少条消息(剩余多少条消息未被消费)

四、Kafka中主题和分区的概念

1. 主题Topic

主题Topic可以理解成是一个类别的名称。在kafka中是一个逻辑的概念, kafka通过topic将消息进行分类。不同的topic会被订阅该topic的消费者消费。

但是有一个问题,如果说这个topic中的消息非常非常多,多到需要几T来存,因为消息是会被保存到log日志文件中的。为了解决这个文件过大的问题, kafka提出了Partition分区的概念。

2. 分区Partition

  • 1) 分区的概念

    通过partition将一个topic中的消息分区来存储。这样的好处有多个:

    • 分区存储,可以解决统一存储文件过大的问题
    • 提供了读写的吞吐量:读和写可以同时在多个分区中进行

分区 分区

  • 2) 创建多分区的主题
1
2
3
4
5
# 为一个主题创建多个分区
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test1

# 可以通过这样的命令查看topic的分区信息
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

3. kafka中消息日志文件中保存的内容

  • 00000.log : 这个文件中保存的就是消息,存在data/kafka—logs/test—0 和 test—1中的0000000.log文件中
  • consumer_offsets-49 : kafka内部自己创建了_consumer_offsets主题包含了50个分区。这个主题用来存放消费者消费某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题:consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区。
    • 提交到哪个分区: hash(consumerGroupld) % _consumer_offsets主题的分区数
    • 提交到该主题中的内容是: key是consumerGroupld+topic+分区号,value就是当前offset的值
  • 文件中保存的消息,默认保存7天,7天到后消息会被删除。

另外两个文件 :

  • 稀疏索引 : /usr/local/kafka/data/kafka-logs/主题-分区/00000000.index
  • 时间索引 : /usr/local/kafka/data/kafka-logs/主题-分区/00000000.timeindex24小时内的索引具体是在.log文件内的哪一条

五、Kafka集群操作

1. 搭建kafka集群(三个broker)

  • 停止之前的单点kafka节点: 查询跟server.properties配置文件相关的进程ps aux | grep server.properties,根据进程号停止kill 11075

  • 创建三个server.properties文件(server.properties, server1.properties, server2.properties)
    1
    2
    3
    4
    5
    6
    
    // 0 1 2
    broker.id=0
    // 9092 9093 9094
    listeners=PLAINTEXT://localhost:9092
    // kafka-logs kafka-logs-1 kafka-logs-2
    log.dir=/usr/local/data/kafka-logs
    
  • 使用如下命令来启动3台服务器
1
2
3
4
# 进入/bin文件
./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
  • 打开zk服务器与客户端查看是否启动成功 : ls /brokers/ids,看到有kafka的服务器ids[0, 1, 2]则启动成功。

2.副本的概念

副本是对分区的备份。在集群中,不同的副本会被部署在不同的broker上。多个副本在kafka集群的多个broker中,会有一个副本作为leader,其他是follower。

1
2
3
4
5
# 创建1个主题、2个分区、3个副本
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic

# 查看topic情况
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

副本流程 my-replicated-topic的详细信息与副本流程

通过查看主题信息,其中的关键数据:

  • replicas: 当前副本存在的broker节点
  • leader: kafka的写和读的操作,都发生在leader上。leader负责把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产生一个新的leader
  • follower: 接收leader的同步的数据
  • isr: 可以同步的broker节点和已同步的broker节点,存放在isr集合中。但是如果有节点性能不好,会被剔除isr集合。leader选举也是在isr内选举的。

3. (重点)主题、分区、副本的总结

集群cluster中有多个broker,创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的broker里。

  • 一个集群可以有多个broker
  • 一个broker通过主题将消息分类
  • 一个主题可以有多个分区
  • 一个分区可以有多个副本
  • 不同的副本放在不同的broker内
  • 一个分区内的多个副本中会有一个leader

4. 关于集群消费

  • 1) 向集群发送消息

与zk的发消息不同的是,往集群的topic发送

1
2
# localhost切换成kafka服务器节点
./kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic my-replicated-topic
  • 2) 从集群中消费消息
1
2
# 未带消费组
./kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --topic my-replicated-topic
  • 3) 指定消费组来消费消息
1
2
# 带有消费组
./kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic
  • 4) 分区分消费组的集群消费中的细节

分区分消费组的集群消费中的细节 分区分消费组的集群消费中的细节

  • 一个partition只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性,但是多个partion的多个消费者消费的总的顺序性是得不到保证的,那怎么做到消费的总顺序性呢?

  • partition的数量决定了消费组中消费者的数量,建议同一个消费组中消费者的数量不要超过partition的数量,否则多的消费者消费不到消息

  • 如果消费者挂了,那么会触发rebalance机制(后面介绍),会让其他消费者来消费该分区

    例如 : 如果在group B中多了个Consumer7,那这个Consumer7肯定消费不到消息,因为上面最多只有4个分区。这时候Consumer7的意义就在于,如果前面的Consumer5挂了,Consumer7可以顶上。 但是如果Consumer5挂了,Consumer7并没有创建的情况下,Partition1的消息就无法消费了,这时候可以通过kafka集群的rebalance机制进行重新分配。

六、Kafka的java客户端-生产者的实现

1. 引入依赖

1
2
3
4
5
<dependency>
  <groupId>org.apache.kafka</groupId> 
  <artifactId>kafka-clients</artifactId> 
  <version>2.4.1</version>
</dependency>

1. 生产者的基本实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MySimpleProducer {

	private final static String TOPIC_NAME = "my-replicated-topic";

	public static void main(String[] args) throws ExecutionException, InterruptedException {

	//1.设置参数
	Properties props = new Properties();
	props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");

	//把发送的key从字符串序列化为字节数组
	props.put(EroducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

	//把发送消息value从字符串序列化为字节数组
	props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

	//2,创建生产消息的客户端,传入参数
	Producer<String,String> producer = new KafkaProducer<String, String>(props);

	//3.创建消息
	//key:作用是决定了往哪个分区上发, value:具体要发送的消息内容
	ProducerRecord<String,String> producerRecord = new ProducerRecord<> (TOPIC_NAME,"mykeyvalue","hellokafka");

	//4.发送消息,得到消息发送的元数据并输出
	RecordMetadata metadata = producer.send(producerRecord).get();

	System.out.println("同步方式发送消息结果:"+ "topic-" + metadata.topic() +"Ipartition-"+metadata.partition() + "loffset-" + metadata.offset());

	}
}
  • 发送到指定分区上
1
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0, order.getorderId().toString(), JSON.toJSONString(order));
  • 未指定分区,则会通过业务key的hash运算,算出消息往哪个分区上发
1
ProducerRecord<String, String> producerRecord= new ProducerRecord<String, String>(TOPIC_NAME, order.getorderId ().toString(), JSON.toJSONString(order));

2. 生产者的同步发送消息

发送消息的同步发送和异步发送,也就是生产者给kafka发送消息的过程是同步或异步的。与之前讲的异步操作是不一样的概念。

发送消息的同步发送 发送消息的同步发送

如果生产者发送消息没有收到ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数3次。

1
2
3
//等待消息发送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());

3. 生产者的异步发送消息

使用异步发送可能会消息丢失,所以使用同步的方式会更多一些

发送消息的异步发送 发送消息的异步发送

异步发送,生产者发送完消息后就可以执行之后的业务, broker在收到消息后异步调用生产者提供的callback回调方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
		//异步回调方式发送消息
		producer.send(producerRecord, new Callback(){

			public void onCompletion(RecordMetadata metadata, Exception exception){

				if (exception != null) {
					System.err.println("发送消息失败: " + exception.getStackTrace());
				}

				if (metadata != null) {
					System.out.println("异步方式发送消息结果: " + "topic-" + metadata.topic() + "Ipartition-" + metadata.partition() + "Ioffset-" + metadata.offset());
				}

			}
		});

4. 生产者中的ack的配置(同步发送)

在同步发消息的场景下,生产者发动broker上后, ack会有3种不同的选择,官网说明3.3 Producer Configs

1
2
3
(1) acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
(2) acks=1: 至少要等待Leader已经成功将数据写入本地Log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时Leader又挂掉,则消息会丢失。性能和安全性最均衡。
(3) acks=—1或all:  需要等待min.insync.replicas(默认为1,推荐配置大于等于2)这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。最安全但性能最差。
1
2
3
4
5
6
7
props.put(ProducerConfig.ACKS_CONFIG, "1");

//发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理
props.put(ProducerConfig.RETRIES_CONFIG, 3);

//重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

5. 关于消息发送的缓冲区

消息发送的缓冲区 消息发送的缓冲区

发送的消息会先进入到本地缓冲区(32mb),kakfa会跑一个线程,该线程去缓冲区中取16k的数据,发送到kafka,如果到10毫秒后,数据没取满16k,也会发送一次。

1
2
3
4
5
6
7
8
9
10
11
12
// 设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

// kafka本地线程会从缓冲区取数据,批量发送到broker,设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

/*
  默认值是θ,意思就是消息必须立即被发送,但这样会影响性能
  一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去
  如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

6. 整体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
//消息的发送方
public class MyProducer {

	private final static String TOPIC_NAME = "my-replicated-topic";

	public static void main(String[] args) throws ExecutionException, InterruptedException{
		// 存放属性
		Properties props = new Properties();

		// 更换localhost为kafka机器
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093,localhost:9094");

		/*
			发出消息持久化机制参数
			(1) acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
			(2) acks=1: 至少要等待Leader已经成功将数据写入本地Log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时Leader又挂掉,则消息会丢失。
			(3) acks=—1或aLL: 需要等待min.insync.replicas(默认为1,推荐配置大于等于2)这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
		*/
		prdps.put(ProducerConfig.ACKS_CONFIG, "1");

		//发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理
		props.put(ProducerConfig.RETRIES_CONFIG, 3);

		//重试间隔设置
		props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

		//设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
		props.put (ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

		// kafka本地线程会从缓冲区取数据,批量发送到broker,设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
		// props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

		/*
			默认值是θ,意思就是消息必须立即被发送,但这样会影响性能
			一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去
			如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
		*/
		props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

		//把发送的key从字符串序列化为字节数组
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

		//把发送消息value从字符串序列化为字节数组
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());


		// ======== 发消息的客户端 ========
		Producer<String, String> producer = new KafkaProducer<String, String>(props);

		//要发送5条消息
		int msgNum = 5;

		// 并发编程 : CountDownLatch合并多线程的查询结果
		final CountDownLatch countDownLatch = new CountDownLatch(msgNum);

		for (int i = 1; i <= 5; i++) {

			Order order = new Order((Long) i, i);

			//指定发送分区
			//ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(ToPIC_NAME, B, order.getOrderId().toString(), JSON.toJSONString(order));

			//未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
			ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, order.getorderId().toString(), JSON.toJSONString(order));
		}

		/*
		try {
			//等待消息发送成功的同步阻塞方法
			RecordMetadata metadata = producer,send(producerRecord).get(); 

			//=====阻塞
			System.out.printin("同步方式发送消息结果: " + "topic-" + metadata.topic() + "Ipartition-"+ metadata.partition() + "/offset-" + metadata.offset());
		} catch (InterruptedException e) {
			e.printStackTrace();

			//1.记录日志 预警系统 +1
			//2,设置时间间隔1s同步的方式再次发送,如果还不行日志预警人工介入1Thread. sleep(1000);

			try {
				//等待消息发送成功的同步阻塞方法
				RecordMetadata metadata = producer.send(producerRecord).get(); 
			} catch (Exception e1) {
				//人工介入了
			}

		} catch (ExecutionException e) {
			e.printStackTrace(); 
		}
		*/

		//异步回调方式发送消息
		producer.send(producerRecord, new Callback(){

			public void onCompletion(RecordMetadata metadata, Exception exception){

				if (exception != null) {
					System.err.println("发送消息失败: " + exception.getStackTrace());
				}

				if (metadata != null) {
					System.out.println("异步方式发送消息结果: " + "topic-" + metadata.topic() + "Ipartition-" + metadata.partition() + "Ioffset-" + metadata.offset());
				}

				ountDownLatch.countDown(); //-1 = 4 ==0 5个回复全部都执行完毕了
			}

		});

		//判断countDownLatch是不是0,如果不是0,就等待5秒
		countDownLatch.await(5, TimeUnit.SECONDS); 
		producer.close();

	}
}

七、Java客户端消费者的实现细节

1.消费者的基本实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class MyConsumer {

	private final static String TOPIC_NAME = "my-replicated-topic"; 

	private final static String CONSUMER_GROUP_NAME = "testGroup";

	public static void main(String[] args) {

		Properties props = new Properties();

		props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");

		// 消费分组名
		props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

		// 创建一个消费者端的客户端 
		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

		// 消费者订阅主题列表
		consumer.subscribe(Arrays.asList(TOPIC_NAME));
		while (true) {
			// poll() API 是拉取消息的长轮询
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 
			for (ConsumerRecord<String, String> record : records) {
				System.out.printf("收到消息: partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());
			}
		}
	}
}

2.关于消费者自动提交和手动提交offset

  • 1) 提交的内容

消费者offset的自动提交和手动提交 消费者offset的自动提交和手动提交

消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题里面。

  • 2) 自动提交(消息poll拉下来以后直接提交offset)

设置自动提交参数— 默认

1
2
3
4
// 是否自动提交offset,默认就是true
props.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交offset的间隔时间
props.put (ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

注意:自动提交会丢消息。因为消费者在消费前提交offset,有可能提交完后还没消费时消费者挂了。

  • 3) 手动提交(在消费消息时/后再提交offset)

需要把自动提交的配置改成false

1
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

手动提交又分成了两种:

  • 手动同步提交(一般用同步就可以了)

在消费完消息后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,执行之后的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
		while (true) {
			// poll() API 是拉取消息的长轮询
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 
			for (ConsumerRecord<String, String> record : records) {
				System.out.printf("收到消息: partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
			}

			if (records.count() > 0) { //有消息
        // 手动同步提交offset,当前线程会阻塞直到offset提交成功
			  // 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
			  // 添加try catch 写业务逻辑 
			  consumer.commitSync();//=======阻塞=== 提交成功
			}
		}
  • 手动异步提交

在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
		while (true) {
			// poll() API 是拉取消息的长轮询
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 
			for (ConsumerRecord<String, String> record : records) {
				System.out.printf("收到消息: partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());
			}

			if (records.count() > 0) { //有消息
			// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
				consumer.commitAsync(new OffsetCommitCallback() {
					@Override
					public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception){
						if (exception != null) {

							System.err.println("Commit failed for " + offsets);

							System.err.println("Commit failed exception: " + exception.getStackTrace());
						}
					}
				});
			}
		}

3.长轮询poll消息

消费者建立了与broker之间的长连接,开始poll消息。

默认一次poll500条消息

1
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

可以根据消费速度的快慢来设置,因为如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。

可以通过这个值进行设置:

1
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

如果1s内每隔1s内没有poll到任何消息,则继续去poll消息,循环往复,直到poll到消息。如果超出了1s,则此次长轮询结束。

1
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

触发rebalance机制, rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少一点。

1
2
3
4
//一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置,如果消费速度慢,条数可以设置小一点
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//如果两次pol1的时间如果超出了30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

4.消费者的健康状态检查

消费者每隔1s向kafka发送心跳的时间间隔

1
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);

kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。

1
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

5.指定分区和偏移量、时间消费

  • 指定分区消费
1
2
// 往0号分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, O)));
  • 从头消费
1
2
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, O)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); // 从0号分区的从头开始消费
  • 指定offset消费
1
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, O))); consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
  • 从指定时间消费

根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
		//从指定时间点开始消费
		List<PartitionInfo> topχcPartitions = consumer.partitionsFor(TOPIC_NAME);

		//从1小时前开始消费
		Long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; 
		Map<TopicPartition, Long> map = new HashMap<>();
		for (PartitionInfo par : topicPartitions) {
			map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
		}

		Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
		for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()){
			TopicPartition key = entry.getKey();
			OffsetAndTimestamp value = entry.getValue();
			if (key == null Il value == null) continue;
			Long offset = value.offset();
			System.out.println("partition-" + key.partition() + "loffset-" + offset);
			System.out.println();

			//根据消费里的timestamp确定offset
			if (value != null) {
				consumer, assign(Arrays.asList(key));
				consumer.seek(key, offset);
			}
		}

6.新消费组的消费offset规则

1
2
3
4
5
6
/*
  当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费
  latest(默认):只消费自己启动之后发送到主题的消息
  earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)
*/
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

7.整体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public class MyConsumer {
	private final static String TOPIC_NAME = "my-replicated-topic";
	private final static String CONSUMER_GROUP_NAME = "testGroup9999";

	public static void main(String[] args) {

		Properties props = new Properties();

		props.put(ConsumerConfig.B00TSTRAP_SERVERS_CONFIG, "10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");

		//消费分组名
		props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);

		//是否自动提交offset,默认就是true
		//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

		//自动提交offset的间隔时间
		//props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

		/*
			当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费
			latest(默认):只消费自己启动之后发送到主题的消息
			earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)
		*/

		//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
		//consumer给broker发送心跳的间隔时间

		props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);

		//kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

		//一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置,如果消费速度慢,条数可以设置小一点
		props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

		//如果两次pol1的时间如果超出了30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance
    //踢出消费组则会rebalance,rebalance会消耗性能,所以比较好的是根据实际情况设置时间间隔或者条数
		props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,  30 * 1000);

		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName); 
		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

		consumer.subscribe (Arrays.asList (TOPIC_NAME));

		//消费指定分区
		consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

		//消息回溯消费
		//consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
		//consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, O)));

		//指定offset消费
		//consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);

		//从指定时间点开始消费
		List<PartitionInfo> topχcPartitions = consumer.partitionsFor(TOPIC_NAME);

		//从1小时前开始消费
		Long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; 
		Map<TopicPartition, Long> map = new HashMap<>();
		for (PartitionInfo par : topicPartitions) {
			map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
		}

		Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
		for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()){
			TopicPartition key = entry.getKey();
			OffsetAndTimestamp value = entry.getValue();
			if (key == null Il value == null) continue;
			Long offset = value.offset();
			System.out.println("partition-" + key.partition() + "loffset-" + offset);
			System.out.println();

			//根据消费里的timestamp确定offset
			if (value != null) {
				consumer, assign(Arrays.asList(key));
				consumer.seek(key, offset);
			}
		}


		while (true) {
			// poll() API 是拉取消息的长轮询
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 
			for (ConsumerRecord<String, String> record : records) {
				System.out.printf("收到消息: partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());
			}

			if (records.count() > 0) { //有消息

			// 手动同步提交offset,当前线程会阻塞直到offset提交成功
			// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
			// 添加try catch 写业务逻辑 
			//consumer.commitSync();//=======阻塞=== 提交成功

			// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
				consumer.commitAsync(new OffsetCommitCallback() {
					@Override
					public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception){
						if (exception != null) {

							System.err.println("Commit failed for " + offsets);

							System.err.println("Commit failed exception: " + exception.getStackTrace());
						}
					}
				});
			}
		}
	}
}

八、Springboot中使用Kafka

项目源码 : Kafka demo

1.引入依赖

maven重新拉一下

1
2
3
4
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
  </dependency>

2.编写配置文件

修改application.properties为.yml,集群ip和redis ip都需要更换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 应用服务 web 访问端口
server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: localhost:9092, localhost:9093, localhost:9094 # 集群地址
    # 取决于该项目是生产者还是消费者,该项目用于测试所以两者都配置
    producer: # 生产者
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384 # 拉满16k
      buffer-memory: 33554432 # 32m内存
      acks: 1 # 默认同步发送, 1个leader拿到后再进行下一步业务
      # 指定消息key和消息体的编码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: # 消费者
      group-id: default-group # 消费组id
      enable-auto-commit: false # 自动提交关闭
      auto-offset-reset: earliest # 第一次从头开始,往后从最新offset读取
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 500 # 一次最多拉500条消息
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH
      #当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于CoUNT时提交
      # COUNT
      # TIME / COUNT 有一个条件满足时提交
      # COUNT_TIME
      #当每一批polL()的数据被消费者监听器(ListenerConsumer)处理之后,手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: MANUAL_IMMEDIATE
  redis:
    host: localhost # redis主机

3.编写消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.deng.kafka.spring.boot.demo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/msg")
public class MyKafkaController {

    private final static String TOPIC_NAME = "my-replicated-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String sendMessage(){

        kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a message!");

        return "send success!";
    }
}

4.编写消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.deng.kafka.spring.boot.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class MyConsummer {

    @KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { // record 对每一条消息进行操作 ack 只有手动提交才有用
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        // 手动提交offset 如果不做手动提交 消息会被重复消费
        ack.acknowledge();
    }

}

5.消费者中配置消费主题、分区和偏移量

1
2
3
4
5
6
7
8
9
10
11
12
    @KafkaListener(groupId = "testGroup", topicPartitions = { // 多主题消费
            @TopicPartition(topic = "topic1", partitions = {"0", "1"}), // 可消费topic1主题的0号和1号分区
            @TopicPartition(topic = "topic2", partitions = "0", // 可消费topic2主题的0号分区
                partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))}, concurrency = "3") // 指定消费 : 消耗1号分区的时候偏移量是100,也就是1号分区的第100条开始消费
    // concurrency就是创建同组(testGroup)的消费者个数(3个),就是并发消费数,建议小于等于分区总数
    public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        //手动提交offset
        ack.acknowledge();
    }

九、Kafka集群中的controller、rebalance、HW

1.controller

  • 集群中谁来充当controller

每个broker启动时会向zk创建一个临时序号节点,获得的序号最小的那个broker将会作为集群中的controller,负责这么几件事:

  • 当集群中有一个副本的leader挂掉,需要在集群中选举出一个新的leader,选举的规则是从isr集合中最左边获得。

  • 当集群中有broker新增或减少,controller会同步信息给其他broker

  • 当集群中有分区新增或减少,controller会同步信息给其他broker

2.rebalance机制

  • 前提:消费组中的消费者没有指明分区来消费

  • 触发的条件:当消费组中的消费者和分区的关系发生变化的时候

  • 分区分配的策略:在rebalance之前,分区怎么分配会有这么三种策略

    • range:根据公示计算得到每个消费消费哪几个分区:前面的消费者是分区总数/消费者数量+1,之后的消费者是分区总数/消费者数量
    • 轮询:大家轮着来
    • sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况。如果这个策略没有开,那么就要进行全部的重新分配。建议开启。

3.HW(高水位)和LEO(最后一个信息得偏移量)

LEO是某个副本最后消息的消息位置(log—end—offset)

HW是已完成同步的位置。消息在写入broker时,且每个broker完成这条消息的同步后,hw才会变化。在这之前消费者是消费不到这条消息的。在同步完成之后,HW更新之后,消费者才能消费到这条消息,这样的目的是防止消息的丢失。

高水位和最后一个信息偏移量 高水位和最后一个信息的偏移量

十、Kafka中的优化问题

1.如何防止消息丢失

  • 生产者:ack是1 或者—1/all 可以防止消息丢失,如果要做到99.9999%, ack设成all,把min.insync.replicas配置成分区备份数。也就是1)使用同步发送 2)把ack设成1或者all,并且设置同步的分区数>=2

  • 消费者:把自动提交改成手动提交

2.如何防止重复消费

在防止消息丢失的方案中,如果生产者发送完消息后,因为网络抖动,没有收到ack,但实际上broker已经收到了。

此时生产者会进行重试,于是broker就会收到多条相同的消息,而造成消费者的重复消费。

怎么解决:

  • 生产者关闭重试:会造成丢消息(不建议)
  • 消费者解决非幂等性消费问题:

    所谓的幂等性:多次访问的结果是一样的。对于rest的请求(get (幂等)、post (非幂等)、put (幂等)、delete (幂等) )

    解决方案:

    • 在数据库中创建联合主键,防止相同的主键 创建出多条记录
    • 使用分布式锁,以业务id(order_id)为锁。保证只有一条记录能够创建成功

防止重复消费方案 防止重复消费方案

3.如何做到消息的顺序消费

背景 : 例如在购买的场景里,需要先创建订单,支付,发货。不能一购买就发货了。

  • 生产者:保证消息按顺序消费,且消息不丢失——使用同步的发送,ack设置成非0的值。

  • 消费者:主题只能设置一个分区,消费组中只能有一个消费者

因此, kafka的顺序消费会牺牲掉性能。但是比如rocketmq在这一块有专门的功能已设计好。

4.如何解决消息积压问题

消息积压 消息积压

  • 1) 消息积压问题的出现

消息的消费者的消费速度远赶不上生产者的生产消息的速度,导致kafka中有大量的数据没有被消费。随着没有被消费的数据堆积越多,消费者寻址的性能会越来越差,最后导致整个kafka对外提供的服务的性能很差,从而造成其他服务也访问速度变慢,造成服务雪崩。

  • 2) 消息积压的解决方案

消息积压的解决方案 消息积压的问题在生产环境经常会碰到的

  • 在这个消费者中,使用多线程,充分利用机器的性能进行消费消息。

  • 通过业务的架构设计,提升业务层面消费的性能。(业务代码层面)

  • 创建多个消费组,多个消费者,部署到其他机器上,一起消费,提高消费者的消费速度

  • 创建一个消费者,该消费者在kafka另建一个主题,配上多个分区,多个分区再配上多个消费者。该消费者将poll下来的消息,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始一起消费了。-> 不常用

5.实现延时队列的效果

  • 1) 应用场景

订单创建后,超过30分钟没有支付,则需要取消订单,这种场景可以通过延时队列来实现

  • 2) 具体方案

实现延时队列的效果 实现延时队列的效果

  • kafka中创建创建相应的主题
  • 消费者消费该主题的消息(轮询)
  • 消费者消费消息时判断消息的创建时间和当前时间是否超过30分钟(前提是订单没支付)
    • 如果是:去数据库中修改订单状态为已取消
    • 如果否:记录当前消息的offset,并不再继续消费之后的消息。等待1分钟后,再次向kafka拉取该offset及之后的消息,继续进行判断,以此反复。

十一、Kafka—eagle监控平台

1.搭建

  • 去kafka-eagle官网下载压缩包

  • 分配一台虚拟机

  • 虚拟机中安装jdk

  • 解压缩kafka—eagle的压缩包

  • 给kafka—eagle配置环境变量,打开环境配置文件vim /etc/profile,使配置文件生效source /etc/profile

1
2
export KE_HOME=/usr/local/kafka-eagle
export PATH=$PATH:$KE_HOME/bin
  • 需要修改kafka—eagle内部的配置文件: 在路径./conf, vim system—config.properties

修改里面的zk的地址和mysql的地址,注释掉jdbc Lite

1
2
3
# 单服务器就如下,集群可用逗号分割开
efak.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
  • 启动前需要确保java和maven都正确安装且环境也配置好了

java环境配置

1
2
3
4
5
6
7
8
9
10
11
12
# 查看java路径,有则复制路径,若无则安装
/usr/libexec/java_home -V

# 打开环境配置文件
vim /etc/profile

# 设置JAVA_HOME,修改路径
export JAVA_HOME="/Library/Path/to/Java/Contents/Home"
export PATH=$PATH:$JAVA_HOME/bin

# 退出vim,并使环境配置文件生效
source /etc/profile

官网下载maven的Binary archive版本,解压到/usr/local或其他路径

1
2
3
4
# 打开环境配置文件,填入路径,再生效配置文件
export M2_HOME="/Path/to/Maven/apache-maven-3.6.3"
PATH="${M2_HOME}/bin:${PATH}"
export PATH
  • 进入到bin中,通过命令来启动
1
./ke.sh start

kafka-eagle启动成功 kafka-eagle启动成功

通过连接登录用户名和密码

1
2
3
4
5
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://127.0.0.1:8048'
* Account:admin ,Password:123456
*******************************************************************

2.平台的使用

一般查看Consumers,里面的消费组,查看消费能力如何。也可以点进去查看running,再点进主题里,查看有几个分区,每个分区的logSize(消息数量)是多少,lag(消息积压)是多少 等等。

本文由作者按照 CC BY 4.0 进行授权