Apache Kafka分布式流处理平台及大厂面试宝典v3.0.0

概述

**本人博客网站 **IT小神 www.itxiaoshen.com

定义

Apache Kafka官网地址 http://kafka.apache.org/ 最新版本为 3.0.0

Apache Kafka是一个开源的分布式事件流平台,使用Scala和Java混合编写,Kafka最初由Linkedin公司开发,2011年贡献给了Apache基金会并成为顶级开源项目。消息队列就是用于数据生产方和消费方解耦合的中间件。顾名思义,主体就是一个队列的形式收集消息,数据在消费端按照FIFO的原则被消费。

Apache Kafka主要以Java 8、11和17源码构建及测试,但Java 8支持从Apache Kafka 3.0开始就已弃用,并将在Apache Kafka 4.0中被移除。

近几天连续学习两个Apache的开源项目,今天我们又来学习另外一个Apache顶级开源项目Kafka,可以见得Apache在开源世界的绝对大佬地位。Kafka是一个基于Zookeeper协调的支持分区(partition)、多副本(replica)的分布式消息系统,最大特性是可以实时处理大量数据以满足各种需求场景,常用于大数据场景消息流中间件;其他消息队列有ActiveMQ、RabbitMQ、ZeroMQ、MetaMQ、RocketMQ,目前比较主流消息中间件是Kafka、RocketMQ和RabbitMQ。

核心能力

应用场景

Kafka作为一个消息中间件最基本的用途为解耦、异步、削峰、消息通讯,下面为其常用的场景:

安装

Kafka官方下载地址 http://kafka.apache.org/downloads

我们这里规划部署192.168.50.34、192.168.50.35、192.168.50.36共3个节点的Kafka集群,当然是需要有基本JDK环境,Kafka部署还需依赖ZooKeeper,刚好上一篇文章我们也非常愉快的学习和部署Zookeeper集群,直接拿来使用,奥利给!

#官网下载
wget --no-check-certificate https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
#解压
tar -zxvf kafka_2.13-3.0.0.tgz 
#进入kafka目录下创建logs文件夹
cd kafka_2.13-3.0.0
mkdir logs
#并修改config目录的server.properties
vim config/server.properties

修改server.properties配置文件内容如下:

// 作为当前机器在集群中的唯一标识,和zookeeper的myid性质一样
broker.id=1
// 监听地址和端口号,默认是9092
listeners=PLAINTEXT://192.168.50.34:9092
// 消息存放的目录
log.dirs=/home/commons/kafka_2.13-3.0.0/logs
// zookeeper集群地址
zookeeper.connect=192.168.50.34:2181,192.168.50.35:2181,192.168.50.36:2181
#然后将上整个kafka_2.13-3.0.0拷贝到另外两台192.168.50.35、192.168.50.36上的相同目录(我们这里是指/home/commons),配置文件主要修改broker.id和listeners,broker.id唯一那我们就顺序编号为2和3,listeners中的host就分配与主机地址对应即可,修改完成后我们可以在kafka根目录下分别启动三台kafka server,后台常驻方式带上参数 -daemon
./bin/kafka-server-start.sh -daemon config/server.properties
#启动后可以通过ps或者jps检查进程信息和logs下的日志文件如server.log
ps -ef | grep kafka
#停止kafka server
./bin/kafka-server-stop.sh

集群简单测试

#3.0版本官方使用-–bootstrap-server替代–-zookeeper,创建名称为mytopic1的Topic,指定分区数为1,分区的副本数为1
bin/kafka-topics.sh --create --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --replication-factor 1 --partitions 1 --topic mytopic1
#创建名称为mytopic2的Topic,指定分区数为2,分区的副本数为2
bin/kafka-topics.sh --create --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --replication-factor 2 --partitions 2 --topic mytopic2
#列出所有的topic
bin/kafka-topics.sh --list --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092
#查看指定topic
$bin/kafka-topics.sh --describe --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1
$bin/kafka-topics.sh --describe --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic2

#将mytopic1的分区数量扩充到5个
bin/kafka-topics.sh --alter --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --partitions 5 --topic mytopic1
#time 为 -1 时表示最大值,为 -2 时表示最小值:
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic mytopic1 --time -1 --broker-list 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --partitions 0 

#删除mytopic2
kafka-topics.sh --delete --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic2 
#往mytopic1里生产消息,这里可以使用--broker-list也可以使用--bootstrap-server
./bin/kafka-console-producer.sh --broker-list 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1
#从头开始消费消息
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1 --from-beginning
#从尾部开始消费消息,从尾部开始取数据,必需要指定分区:如果需要取指定个数消息可以加上如--max-messages 5的参数 
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1 --offset latest --partition 1

发送多条消息,最终在该分区下收到分发的该分区下的消息

./bin/kafka-console-consumer.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1 -group mygroup1 --from-beginning

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --list

#消费者组详情./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --group mygroup1 --describe#删除消费者组./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --group mygroup1 --delete#平衡Leader,--partition:指定需要重新分配leader的partition编号./bin/kafka-leader-election.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1 --partition=2 --election-type preferred#此外还有kafka自带压测工具./bin/kafka-producer-perf-test.sh --topic mytopic1 --num-records 1000 --record-size 1 --throughput 1000 --producer-props bootstrap.servers=192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 

架构原理面试宝典

Kafka的架构和组成

一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer,以及一个 ZooKeeper 集群,如下图所示。其中 ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器的选举等操作的。Producer 将消息发送到 Broker,Broker 负责将收到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。

Kafka的四个核心API

消息队列的模式?

Kafka的存储模型

说说Kafka消费者端使用的技术点?

Kafka发送数据的流程和消息结构?

Kafka数据分区的目的和原则?

Kafka如何保证消息不丢失或者可靠性?

为保证 Producer 发送的数据,能可靠地发送到指定的 Topic,Topic 的每个 Partition 收到 Producer 发送的数据后,都需要向 Producer 发送 ACK 确认收到,如果 Producer 收到 ACK,就会进行下一轮的发送,否则重新发送数据。

Kafka Broker节点故障时处理细节?

Kafka如何保证消息不重复消费?

Kafka如何保证消费数据的一致性?

Kafka如何处理消息积压?

增加新topic,增大分区,将原来topic数据消费转移到这个新topic,然后开多个消费者去处理新topic。

Kafka如何保证消息顺序消费?

说说Kafka的事务?

Kafka事务在0.11版本后引入,主要解决的是Producer在Exactly Once语义上跨分区跨会话的精准一次写入,要么成功要么失败。

Kafka如何通过ZooKeeper来进行选举和状态更新?

Kafka高效读写的保证?

本篇主要是以Kafka的基础理论、架构原理和面试为主,后续我们再分享Kafka有关 API 以及事务、拦截器、监控等高级篇,已达到Kafka实战编程和应用的目的。