跳到主要内容

Kafka

如何使用Docker Compose部署Kafka

信息

借鉴github仓库
下面示例使用最简单的 zk-single-kafka-single.yml

以下为正确示例:

对仓库中的文件作了两处调整

  • 调整了连接地址
-  KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
+ KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-192.168.3.163}:9092,DOCKER://host.docker.internal:29092
  • 在Zoo1和kafka1中都添加了时区,可以解决时间不对的问题
+      TZ: Asia/Shanghai
点击展开查看完整配置文件
version: '2.1'

services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
TZ: Asia/Shanghai

kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-192.168.3.163}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
TZ: Asia/Shanghai
depends_on:
- zoo1

报错 could not be established. Broker may not be available

调整外部连接地址即可解决问题

[| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] 
Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.

如何使用命令行操作集群

# 创建topic
docker run --rm --network=host \
confluentinc/cp-kafka:7.3.2 \
kafka-topics --bootstrap-server=127.0.0.1:9092 \
--create \
--topic=my-topic \
--partitions=3 \
--replication-factor=1
# 生产消息
docker run --tty --interactive \
--rm \
--network=host \
confluentinc/cp-kafka:7.3.2 \
kafka-console-producer --broker-list=127.0.0.1:9092 --topic=my-topic

# 消费消息
docker run --tty --interactive \
--rm \
--network=host \
confluentinc/cp-kafka:7.3.2 \
kafka-console-consumer --bootstrap-server=127.0.0.1:9092 --topic=my-topic

如何在Spring Boot中使用Kafka

参考资料

参考仓库代码

仓库代码简单介绍了 Kafka创建字符串消息和Pojo消息,发送消息的示例。

参考资料涉及多种类型消息混合生产和消费。值得学习和借鉴。

生产者使用注意点

  1. 一个JVM下使用一个生产者即可,多个生产者可能OOM,一个生产者buffer.memory=32MB
  2. Batch设计,首先看batch.size 是否满足,不满足再看linger.ms ,当缓存消息到达buffer.memory 会直接触发消息发送到服务器。
  3. acks, 0 -不确认,1-所有主节点写入成功,all-所有节点写入成功

触发Coordinator Rebalance

  • 组成员数量变化
  • 订阅主题数量变化
  • 订阅主题分区数变化

建议

  1. session.timeout.ms > 3 * hearbeat.interval.ms
  2. 控制消费者消费消息的时间,不要超过max.poll.interval.ms配置时间
协议
本作品代码部分采用 Apache 2.0协议 进行许可。遵循许可的前提下,你可以自由地对代码进行修改,再发布,可以将代码用作商业用途。但要求你:
  • 署名:在原有代码和衍生代码中,保留原作者署名及代码来源信息。
  • 保留许可证:在原有代码和衍生代码中,保留Apache 2.0协议文件。
本作品文档部分采用 知识共享署名 4.0 国际许可协议 进行许可。遵循许可的前提下,你可以自由地共享,包括在任何媒介上以任何形式复制、发行本作品,亦可以自由地演绎、修改、转换或以本作品为基础进行二次创作。但要求你:
  • 署名:应在使用本文档的全部或部分内容时候,注明原作者及来源信息。
  • 非商业性使用:不得用于商业出版或其他任何带有商业性质的行为。如需商业使用,请联系作者。
  • 相同方式共享的条件:在本文档基础上演绎、修改的作品,应当继续以知识共享署名 4.0国际许可协议进行许可。