初识kafka
# 初识kafka
# 什么是消息队列
消息队列是一种存放消息的容器,它遵循FIFO(先进先出)原则。它的出现是为了最大限度的解决短时间内有大量请求导致服务挂掉的问题,以及实现分布式系统中可靠的、高效的、实时的跨平台数据传输。
为了更好的理解它,我们来举个例子,一个很常见的场景——某个商品要进行降价活动,某天的某个时间该商品的价格会很低,但商品只有100份,先到先得。很多人都想要,于是在这个时间点,服务器就会收到成千上万的购买请求。但服务器性能不太好,一次只能处理100个请求,因此它会宕机,服务挂掉,没有人能成功买到商品,商家也无法卖出商品。
但是如果把第101个请求以及其后的请求都先放在一边,等服务器处理完当前的100个请求后,再去获取又100个请求,则服务能正常进行。那谁能存放这些多余的请求呢,没错就是消息队列组件
。
kafka
就是一个消息队列中间件,同时也是一种高吞吐量的分布式发布订阅消息系统。
# 用途
如今,消息队列的作用主要有以下几点:
- 解耦:在客户端发送消息到消息队列后,服务端再处理消息,服务端和客户端没有直接的联系,通过这种方式来解耦。
- 异步处理:服务端接到客户端的请求后,将其存储到消息队列中,就返回结果,不需要一直等待。
- 削峰:在短时间高并发产生的事务消息存储在消息队列中,然后去消费这些消息,防止压力过大导致系统崩溃。
当然引入新的东西在解决了一些问题时,同时也会产生一些新的问题,比如:系统可用性降低
- 系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦消息队列组件(MQ)宕机,就会对业务造成影响。
简单介绍到此结束,相信现在对消息队列有了一个大概的认识了吧,下面就开始上手进行一个简单的kafka-demo实现。
# 安装kafka
# 下载
前往 Apache Kafka 官方网站,下载适用于你操作系统的 Kafka 版本。
这里注意 kafka 3.0开始就不再支持 java 8,我们下载解压后能直接运行的二进制文件
这里我们选择该版本:Scala 2.12 - [kafka_2.12-2.8.0.tgz]
kafka-jar包下载——官网下载 (opens new window)
官网中有Source 跟Binary两种类型的文件供下载
- Source下载后需要自行编译,再使用,但是你可以对Kafka源码进行二次修改,进行定制
- Binary-则是官网已经编译好后可执行运行,省去了繁琐的编译过程,方便更快使用
# 解压文件
tar -xf kafka_2.12-2.8.0.tgz -C ~/utils
将kafka解压缩到用户目录下的utils文件夹
# 修改配置文件
vi config/server.properties
#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/dis/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=localhost:2181
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
注意:如果你不手动修改 broker.id 属性,Kafka 服务器会在启动时自动分配一个唯一的 ID。
Kafka 依赖于ZooKeeper用于协调和管理集群,要想启动kafka需要先启动zookeeper
# 启动ZooKeeper跟kafka
- 先启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
不同版本的zookeeper启动脚本可能不同,最新的是zkServer.sh
- 再启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
# 简单的命令
# 创建topic
在集群中创建topic,topic是生产者
跟消费者
用来交换信息的中转站
- 创建一个topic
bin/kafka-topics.sh --zookeeper localhost:2181 --topic first --create --partitions 1 --replication-factor 1
这里创建了一个名为first的topic
,分区数为1,备用分区为1
- 查看topic的数量(当前存在哪些topic)
bin/kafka-topics.sh --zookeeper localhost:2181 --list
- 查看指定topic的详细信息
bin/kafka-topics.sh --zookeeper localhost:2181 --topic first --describe
# 生产者进行生产
通过控制台进行-生产
bin/kafka-console-producer.sh --topic first --broker-list PLAINTTEXT://localhost:9092
其中PLAINTTEXT://localhost:9092
是根据config/server.properties配置文件中所得
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
2
3
4
5
6
7
8
9
10
11
12
该配置可自行定义
执行该命令后,就可以进行消费生成
# 消费者进行消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
执行后,一旦生成者进行生效,这边就会收到对应消息
# 提醒
通过命令行的形式进行生产者-消费者的调用,在真实开发中其实大部分情况下都用不到,都是在Java中用代码去操作的,如果需要去看这些topic kafka的状态啥的 到时候都有插件去可视化的去看