Kafka
如何使用Docker Compose部署Kafka
信息
借鉴github仓库
下面示例使用最简单的 zk-single-kafka-single.yml
以下为正确示例:
对仓库中的文件作了两处调整
- 调整了连接地址
- KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
+ KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-192.168.3.163}:9092,DOCKER://host.docker.internal:29092
- 在Zoo1和kafka1中都添加了时区,可以解决时间不对的问题
+ TZ: Asia/Shanghai
点击展开查看完整配置文件
version: '2.1'
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
TZ: Asia/Shanghai
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-192.168.3.163}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
TZ: Asia/Shanghai
depends_on:
- zoo1
报错 could not be established. Broker may not be available
调整外部连接地址即可解决问题
[| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1]
Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.
如何使用命令行操作集群
# 创建topic
docker run --rm --network=host \
confluentinc/cp-kafka:7.3.2 \
kafka-topics --bootstrap-server=127.0.0.1:9092 \
--create \
--topic=my-topic \
--partitions=3 \
--replication-factor=1
# 生产消息
docker run --tty --interactive \
--rm \
--network=host \
confluentinc/cp-kafka:7.3.2 \
kafka-console-producer --broker-list=127.0.0.1:9092 --topic=my-topic
# 消费消息
docker run --tty --interactive \
--rm \
--network=host \
confluentinc/cp-kafka:7.3.2 \
kafka-console-consumer --bootstrap-server=127.0.0.1:9092 --topic=my-topic
如何在Spring Boot中使用Kafka
仓库代码简单介绍了 Kafka创建字符串消息和Pojo消息,发送消息的示例。
参考资料涉及多种类型消息混合生产和消费。值得学习和借鉴。
生产者使用注意点
- 一个JVM下使用一个生产者即可,多个生产者可能OOM,一个生产者
buffer.memory=32MB。 - Batch设计,首先看
batch.size是否满足,不满足再看linger.ms,当缓存消息到达buffer.memory会直接触发消息发送到服务器。 - acks, 0 -不确认,1-所有主节点写入成功,all-所有节点写入成功
触发Coordinator Rebalance
- 组成员数量变化
- 订阅主题数量变化
- 订阅主题分区数变化
建议
session.timeout.ms > 3 * hearbeat.interval.ms- 控制消费者消费消息的时间,不要超过
max.poll.interval.ms配置时间
协议
本作品代码部分采用 Apache 2.0协议 进行许可。遵循许可的前提下,你可以自由地对代码进行修改,再发布,可以将代码用作商业用途。但要求你:
- 署名:在原有代码和衍生代码中,保留原作者署名及代码来源信息。
- 保留许可证:在原有代码和衍生代码中,保留Apache 2.0协议文件。
- 署名:应在使用本文档的全部或部分内容时候,注明原作者及来源信息。
- 非商业性使用:不得用于商业出版或其他任何带有商业性质的行为。如需商业使用,请联系作者。
- 相同方式共享的条件:在本文档基础上演绎、修改的作品,应当继续以知识共享署名 4.0国际许可协议进行许可。