[官网 | 下载 | 快速上手]

安装

执行如下命令,直接下载kafka最新版本:

官网下载地址:官网下载。有的时候由于一些原因,导致链接失效,请至官网下载。

cd /usr/local
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz & tar -zxvf kafka_2.11-2.2.0.tgz

配置config/service.properties

zookeeper.connect=192.168.218.225:2181,192.168.218.231:2181,192.168.218.237:2181
listeners=PLAINTEXT://192.168.218.198:9092
advertised.listeners=PLAINTEXT://192.168.218.198:9092
注意:listenersadvertised.listeners建议加上。因为默认情况下,kafka是用hostname去查找服务器的IP。如果查找不到,或者查找到localhost等,则会导致外网连接不上。
如果没有安装zookeeper,可以使用 kafka 自带的启动即可
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

如果是自定义安装的java,那可能需要重新配置一下java执行环境。

配置kafka脚本执行的java环境:

vim bin/kafka-run-class.sh
#在首行添加如下代码
export JAVA_HOME=/usr/local/jdk1.8.0_151
export JRE_HOME=/usr/local/jdk1.8.0_151/jre

配置服务

kafka服务创建systemd service文件:

vim /etc/systemd/system/kafka.service

输入如下代码到该文件中:

[Unit]
Description=kafka
After=network.target remote-fs.target nss-lookup.target

[Service]
Type=forking
User=root
Group=root
ExecStart=/bin/sh -c '/usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.2.0/config/server.properties'
ExecStop=/usr/local/kafka_2.12-2.2.0/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

启动kafka服务:

systemctl start kafka

设置kafka服务为开机自启动

systemctl enable kafka

日志清理

在 kafka 的配置中 conf/server.properties 中设置 log.retention.byte 值。kafka 的硬盘使用量,建议设置为硬盘的 70%。根据不同的分区大小,进行区分。

# 设置日志保留大小为 7G,单位为:字节
log.retention.byte=7516192768

默认的日志保存天数为 168小时。即一周时间。此处可根据实际情况定义。

# 设置日志保留时间为1天
log.retention.hours=24
Kafka的日志清理-LogCleaner
kafka + how to calculate the value of log.retention.byte

副本配置

  • offsets.topic.replication.factor 用于配置offset记录的topic的partition的副本个数
  • transaction.state.log.replication.factor 事务主题的复制因子
  • transaction.state.log.min.isr 覆盖事务主题的min.insync.replicas配置
  • num.partitions 新建Topic时默认的分区数
  • default.replication.factor 自动创建topic时的默认副本的个数

注意:这些参数,设置得更高以确保高可用性!

其中 default.replication.factor 是真正决定,topi的副本数量的

offsets.topic.replication.factor=3                            
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
num.partitions=1
default.replication.factor=3   

关于kafka配置文件的更多解释,请参考链接:

https://blog.csdn.net/memoordit/article/details/78850086

参考:https://www.cnblogs.com/xiao987334176/p/10315176.html

命令

分区

分区一般是初始化的时候创建好的。所以在初始化之时,就必须规则好分区使用数量。

默认情况的分区配置在conf/service.properties下配置:

num.partitions=1

手工分配分区

./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 2 --topic foo

Topic

创建topic

./bin/kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --replication-factor 3 --partitions 3 --topic gilbert

--partitions 指定分区数量
--replication-factor 指定备份数量,默认值为1,不进行备份

在创建时间如果出现replication factor: 3 larger than available brokers: 0,则需要注意zookeeper的地址是否有引用对应的路径。例如--zookeeper master:2181/kafka
https://blog.csdn.net/qq_38976805/article/details/90577556

查看当前topic列表

./bin/kafka-topics.sh --zookeeper localhost:2181 --list

清空分区或队列

将缓存时间设置1秒,等待1分钟

kafka-topics.sh --zookeeper localhost:2181 --alter --topic com.junbo.dw.beacon --config retention.ms=1000

1分钟后,将缓存时间设回原值。注意可以先查看原值是什么?

kafka-topics.sh --zookeeper localhost:2181 --alter --topic com.junbo.dw.beacon --config retention.ms=86400000
kafka集群及与springboot集成
changing kafka retention period during runtime

查看分区信息

可查看分区数量等。

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic com.junbo.dw.beacon

结果:

Topic:com.junbo.dw.beacon       PartitionCount:10       ReplicationFactor:1     Configs:
        Topic: com.junbo.dw.beacon      Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: com.junbo.dw.beacon      Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: com.junbo.dw.beacon      Partition: 2    Leader: 0       Replicas: 0     Isr: 0
        Topic: com.junbo.dw.beacon      Partition: 3    Leader: 0       Replicas: 0     Isr: 0
        Topic: com.junbo.dw.beacon      Partition: 4    Leader: 0       Replicas: 0     Isr: 0
        Topic: com.junbo.dw.beacon      Partition: 5    Leader: 0       Replicas: 0     Isr: 0
        Topic: com.junbo.dw.beacon      Partition: 6    Leader: 0       Replicas: 0     Isr: 0
        Topic: com.junbo.dw.beacon      Partition: 7    Leader: 0       Replicas: 0     Isr: 0
        Topic: com.junbo.dw.beacon      Partition: 8    Leader: 0       Replicas: 0     Isr: 0
        Topic: com.junbo.dw.beacon      Partition: 9    Leader: 0       Replicas: 0     Isr: 0

查看分区消费信息

sh kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:6667 --new-consumer --group mojing --describe

自动均衡

在创建一个topic时,kafka尽量将partition均分在所有的brokers上,并且将replicas也j均分在不同的broker上。

每个partitiion的所有replicas叫做"assigned replicas","assigned replicas"中的第一个replicas叫"preferred replica",刚创建的topic一般"preferred replica"是leader。leader replica负责所有的读写。

但随着时间推移,broker可能会停机,会导致leader迁移,导致机群的负载不均衡。我们期望对topic的leader进行重新负载均衡,让partition选择"preferred replica"做为leader。

./bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181

一般情况下,在生产环境可开启配置打开自动均衡。

修改副本信息

编辑文件replication-factor-junbo-dw2-5.json

{"version":1, "partitions":[{"topic":"com.junbo.dw2","partition":5,"replicas":[1001,1002,1003]}] }

执行命令:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file replication-factor-junbo-dw2-5.json --execute

验证结果:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file replication-factor-junbo-dw2-5.json --verify

重新分配分区

重分配步骤:

  1. 确认brokers数
  2. 使用generate生成迁移计划
  3. 使用execute执行迁移计划
  4. 使用verify检查是否迁移完成

查看 brokers

在kafka的bin目录使用zookeeper查看所有的kafka brokers id。

./bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids

获取borders id对应的机器:

./bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/1003

生成迁移计划

手动创建一个json文件:topic.json

{
  "topics": [
    {"topic": "com.junbo.dw2"},
    {"topic": "flink-user-action"},
    {"topic": "flink-user-info"}
  ],
  "version":1
}

使用--generate生成计划:

./bin/kafka-reassign-partitions.sh --zookeeper bigdata1:2181 --topics-to-move-json-file ./topic.json --broker-list "1001,1002,1003" --generate

生成结果:


Current partition replica assignment

{"version":1,
"partitions":[....]
}

Proposed partition reassignment configuration

{"version":1,
"partitions":[.....]
}

Proposed partition reassignment configuration为迁移后的计划。将该计划保存到一个reassign.json文件。

执行计划

./bin/kafka-reassign-partitions.sh --zookeeper bigdata1:2181 --reassignment-json-file ./reassign.json --execute

如果自定义计划,需要注意log_dirs列表的长度必须和replicas的长度一致。

partitions-reassignment-is-failing-in-kafka

确认计划

./bin/kafka-reassign-partitions.sh --zookeeper bigdata1:2181 --reassignment-json-file ./reassign.json --verify

中断正在执行的计划

// 中止正在进行的重分配任务
// 登录zookeeper
./zkCli.sh
get /admin/reassign_partitions
delete /admin/reassign_partitions

迁移优化

如果数据量较多,可以先删除指定分区3个小时前的数据:

./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic com.junbo.dw2 --config retention.ms=86400000

迁移完成后,再改回来,清除3天前的数据:

./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic com.junbo.dw2 --config retention.ms=259200000

或者使用kafka-configs.sh进行删除该配置。

./bin/kafka-configs.sh --zookeeper localhost:2181 --alter \
  --entity-type topics --entity-name com.junbo.dw2 --delete-config retention.ms

在修改完成后,可以使用describe命令来查看配置是否生效。

changing-kafka-retention-period-during-runtime

重新指定分区leader

有的时候节点down了,但是新选的leader并不合适,于是需要重新指定。

手动编辑文件topicPartitionList.json

{"partitions":[{"topic":"com.junbo.dw2","partition":5}]}

执行命令:

$ ./bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181 -path-to-json-file ~/kafka/topicPartitionList.json
如果指定的leader本身不存在该分区的副本,则需要手动创建副本文件夹。一般在logs.dir文件夹下。

手动指定leader

查看指定分区的状态:

在zookeeper的zkCli.sh执行:

get /brokers/topics/com.junbo.dw2/partitions/5/state

修改返回的json,并执行:

set /brokers/topics/com.junbo.dw2/partitions/5/state {"controller_epoch":66,"leader":1003,"version":1,"leader_epoch":31,"isr":[1002]}
kafka partition offline

导出指定主题的数据

使用以下命令导出最近的100W条数据:

./bin/kafka-console-consumer.sh --bootstrap-server bigdata1:6667 --topic com.junbo.dw2  --max-messages 1000000  > /opt/data/com-junbo-dw2.json

也可以指定分区,以使用偏移量。

./bin/kafka-console-consumer.sh --bootstrap-server bigdata1:6667 --topic com.junbo.dw2 --offset 1 --partition 0 > /opt/data/com-junbo-dw2.json

如果需要从开始导出则加上参数:--from-beginning

将数据导入主题

./bin/kafka-console-producer.sh --bootstrap-server bigdata1:6667 --topic com.kafka.test1 < test.txt

优化篇

jstat 查看 gc 信息

参数说明:
S0C:第一个幸存区的大小
S1C:第二个幸存区的大小
S0U:第一个幸存区的使用大小
S1U:第二个幸存区的使用大小
EC:伊甸园区的大小
EU:伊甸园区的使用大小
OC:老年代大小
OU:老年代使用大小
MC:方法区大小
MU:方法区使用大小
CCSC:压缩类空间大小
CCSU:压缩类空间使用大小
YGC:年轻代垃圾回收次数
YGCT:年轻代垃圾回收消耗时间
FGC:老年代垃圾回收次数
FGCT:老年代垃圾回收消耗时间
GCT:垃圾回收消耗总时间

  1. 使用 jps 查看 kafka 的进程ID
  2. 使用jstat -gc <pid> 1s 30 查看当前broker在kafka集群的gc频率
  3. 使用 jmap 查看 kafka 当前的堆内存信息

    1. jmap -heap
    2. 观察指标

      1. Survivor Space 使用情况
      2. G1 Old Generation 使用情况

优化内存

修改kafka配置,在kafka-server-start.sh修改export KAFKA_HEAP_OPTS="-Xmx1G -Xms 1G"改为对应的内存。

重启后再次查看内存gc频率。

https://www.cnblogs.com/yinzhengjie/p/9884552.html

优化记录

说明
PID:kafka进程ID
TIME:kafka运行时长(单位1/100秒)
YGC:年轻代垃圾回收次数
YGCT:年轻代垃圾回收消耗时间
2020-11-09
机器PIDTIMEYGCYGCT回收速度
bigdata129661114962:144543762102034.8570.395次/秒
bigdata212089102601:30343334673693.9260.334次/秒
bigdata41824456538.15186454639092.4380.329次/秒

与 Spring Boot 的结合

引用

build.gradle中加下kafka的依赖:

dependencies {
    compile("org.springframework.kafka:spring-kafka:2.2.0.RELEASE")
}

兼容

  • Apache Kafka Clients 2.0.0
  • Spring Framework 5.1.x
  • Java 8+

配置

java:  
  kafka:
    bootstrap-servers: 192.168.218.198:9092
    consumer:
      group-id: test-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      #key-deserializer: #默认情况下是字符串序列化
      #value-deserializer: 
      properties:
        session.timeout.ms: 15000
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      #value-serializer: com.junbo.database.serializer.JsonSerializer #建议使用自定义的JsonSerializer
      properties:
        session.timeout.ms: 15000

监听 Topic

Kafka直接多种类型的序列化与反序列化。此处建议均使用默认的字符串序列化方式。所以在producer的配置处将value-serizlizer配置为JsonSerizlizer。由于Kafka自带库的Json序列化方式存在问题,所以使用自定义的序列化:

public class JsonSerializer<T> implements Serializer<T> {
    private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);

    @Override
    public void configure(Map configs, boolean isKey) {}

    @Override
    public byte[] serialize(String topic, Object data) {
        try {
            return JsonMapperUtil.MAPPER.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            logger.warn("Json序列化失败", e);
        }
        return new byte[0];
    }

    @Override
    public void close() {}
}

监听Topic,代码如下:

@Component
public class KafkaReceiver {
    private static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);

    @KafkaListener(topics = "kafkaTopic}")
    public void listen(ConsumerRecord<String, String> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            logger.info("listen record: {}", record);
            logger.info("listen key:{}, message: {}", record.key(), message);
        }
    }
}

我们可以根据发送的key名,去做数据类型的区分。

发送 Topic

注入KafkaTemplate即可执行消息的发送:

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
private KafkaTemplate<String, UserEntity> kafkaTemplate;

发送一条测试的userEntity

UserEntity userEntity = new UserEntity();
userEntity.setId(1L);
userEntity.setBirthDay(new Date());
userEntity.setDataSource("test");
userEntity.setGender(0);
userEntity.setEmail("a@a.com");
this.kafkaTemplate.send(this.topic, "key-name", userEntity);

引用

标签: 大数据

添加新评论