119.kafka单机测试

发表于 linux 分类,标签:
在Kafka2.8之后,引入了基于Raft协议的KRaft模式,支持取消对Zookeeper的依赖。
在此模式下,一部分KafkaBroker被指定为Controller,另一部分则为Broker。
这些Controller的作用就是以前由Zookeeper提供的共识服务,并且所有的元数据都将存储在Kafka主题中并在内部进行管理。

下面研究这种方式启动一个单机版kafka。

1. 下载
curl -O https://dlcdn.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz

2. 解压安装
生成集群ID
[root@VM-8-16-centos kafka_2.13-3.3.1]# KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
[root@VM-8-16-centos kafka_2.13-3.3.1]# echo $KAFKA_CLUSTER_ID
lQAOBsE1Q6irBeQFkt5Rqg

用这个唯一ID格式化Kafka存储目录
[root@VM-8-16-centos kafka_2.13-3.3.1]# bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
Formatting /tmp/kraft-combined-logs with metadata.version 3.3-IV3.

启动kafka
[root@VM-8-16-centos kafka_2.13-3.3.1]# bin/kafka-server-start.sh config/kraft/server.properties
[root@VM-8-16-centos ~]# jps | grep ka
4629 Kafka

3.接下来我们以守护进程方式启动kafka
[root@VM-8-16-centos kafka_2.13-3.3.1]# nohup bin/kafka-server-start.sh config/kraft/server.properties &
[1] 5641

4.shell 测试kafka
创建topic然后查看描述信息
# bin/kafka-topics.sh --create --topic test1 --bootstrap-server localhost:9092
Created topic test1.

# bin/kafka-topics.sh --describe --topic test1 --bootstrap-server localhost:9092
Topic: test1  TopicId: sbJcTX91TAqD9fczBXvH9g PartitionCount: 1 ReplicationFactor: 1  Configs: segment.bytes=1073741824
Topic: test1  Partition: 0  Leader: 1 Replicas: 1 Isr: 1

当我们创建完topic 之后会在当前服务器的目录创建topic对应的目录信息
[root@VM-8-16-centos kraft-combined-logs]# ll
总用量 28
-rw-r--r-- 1 root root  249 12月  5 21:42 bootstrap.checkpoint
-rw-r--r-- 1 root root    0 12月  5 21:44 cleaner-offset-checkpoint
drwxr-xr-x 2 root root 4096 12月  5 21:47 __cluster_metadata-0
-rw-r--r-- 1 root root    4 12月  5 21:53 log-start-offset-checkpoint
-rw-r--r-- 1 root root   86 12月  5 21:42 meta.properties
-rw-r--r-- 1 root root   14 12月  5 21:53 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root   14 12月  5 21:54 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 12月  5 21:51 test1-0

生产几条消息
# bin/kafka-console-producer.sh --topic test1 --bootstrap-server localhost:9092
>hello kafka
>myname is kafka

消费者消费消息
# bin/kafka-console-consumer.sh --topic test1 --from-beginning --bootstrap-server localhost:9092
hello kafka
myname is kafka

删除test1
# bin/kafka-topics.sh --delete --topic test1 --bootstrap-server localhost:9092
再次查看目录(可以看到其是先标记位逻辑删除,然后再删除掉目录)
drwxr-xr-x 2 root root 4096 12月  5 21:51 test1-0.55585b77a4644fb8bb0ed567a82b29ba-delete
[root@VM-8-16-centos kraft-combined-logs]# ll | grep test1

尝试直接向一个不存在的topic写入数据,测试会直接创建topic
# bin/kafka-console-producer.sh --topic test2 --bootstrap-server localhost:9092
>hello test21
[2022-12-05 22:05:35,059] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test2=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
>hellotest22

消费消息
# bin/kafka-console-consumer.sh --topic test2 --from-beginning --bootstrap-server localhost:9092
hello test21
hellotest22

创建topic指定分区数量
创建一个分区位4,副本为1的主题,副本必须小于集群broker数量
# bin/kafka-topics.sh --create --topic test1 --bootstrap-server localhost:9092 --partitions 4 --replication-factor 1
Created topic test1.
# bin/kafka-topics.sh --describe --topic test1 --bootstrap-server localhost:9092
Topic: test1  TopicId: YvB0zKr5R-K3Qq7DAw9kMA PartitionCount: 4 ReplicationFactor: 1  Configs: segment.bytes=1073741824
  Topic: test1  Partition: 0  Leader: 1 Replicas: 1 Isr: 1
  Topic: test1  Partition: 1  Leader: 1 Replicas: 1 Isr: 1
  Topic: test1  Partition: 2  Leader: 1 Replicas: 1 Isr: 1
  Topic: test1  Partition: 3  Leader: 1 Replicas: 1 Isr: 1
  
查看目录
[root@VM-8-16-centos kraft-combined-logs]# ll | grep test
drwxr-xr-x 2 root root 4096 12月  5 22:09 test1-0
drwxr-xr-x 2 root root 4096 12月  5 22:09 test1-1
drwxr-xr-x 2 root root 4096 12月  5 22:09 test1-2
drwxr-xr-x 2 root root 4096 12月  5 22:09 test1-3
drwxr-xr-x 2 root root 4096 12月  5 22:05 test2-0.4043937575044d579345a7a5fc2cb8e7-delete

查看所有的topic以及查看指定topic 消息数据数量、从偏移量消费数据
# ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
quickstart-events
test1
# ./bin/kafka-run-class.sh
USAGE: ./bin/kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname [opts]
# ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test1  --time -1 --bootstrap-server localhost:9092
test1:0:24
test1:1:12
test1:2:24
test1:3:0

指定offset,指定offset,必须指定partition来进行消费(如下是分区1,偏移位10,所以读出来2条)
# bin/kafka-console-consumer.sh --topic test1 --partition 1 --offset 10 --bootstrap-server localhost:9092
mymsg_5
mymsg_6

消费消息指定group,然后查看消费者组进度记录
# bin/kafka-console-consumer.sh --topic test1 --group g1 --bootstrap-server localhost:9092
# bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
g1
# bin/kafka-consumer-groups.sh --describe --group g1 --bootstrap-server localhost:9092
Consumer group 'g1' has no active members.
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
g1              test1           0          28              28              0               -               -               -
g1              test1           1          14              14              0               -               -               -
g1              test1           2          28              28              0               -               -               -
g1              test1           3          0               0               0               -               -               -
__consumer_offsets 好像是保存消费者进度的一个topic
# bin/kafka-console-consumer.sh --topic __consumer_offsets --from-beginning --bootstrap-server localhost:9092

关闭自动创建topic
上面看到,当我们发送消息的时候,如果topic 不存在kafka 默认会帮我们创建topic。有时候我们需要关闭自动创建,创建topic 必须走申请创建的流程。

编辑配置文件 
config/kraft/server.properties 文件增加如下配置,然后重启
auto.create.topics.enable=false

安装web管理端
docker run -d -p 8080:8080 -v /opt/kafka-map/data:/usr/local/kafka-map/data -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin --name kafka-map --restart always dushixiang/kafka-map:latest
然后用默认的账号密码登录进去,创建集群后即可查看相关的集群以及topic信息,也可以从管理端消费消息与发送消息。


0 篇评论

发表我的评论