元月's blog 元月's blog
首页
  • 基础
  • 并发编程
  • JVM
  • Spring
  • Redis篇
  • Nginx篇
  • Kafka篇
  • Otter篇
  • Shardingsphere篇
  • 设计模式
  • MySQL
  • Oracle
  • 基础
  • 操作系统
  • 网络
  • 数据结构
  • 技术文档
  • Git常用命令
  • GitHub技巧
  • 博客搭建
  • 开发工具
更多

元月

临渊羡鱼,不如退而结网
首页
  • 基础
  • 并发编程
  • JVM
  • Spring
  • Redis篇
  • Nginx篇
  • Kafka篇
  • Otter篇
  • Shardingsphere篇
  • 设计模式
  • MySQL
  • Oracle
  • 基础
  • 操作系统
  • 网络
  • 数据结构
  • 技术文档
  • Git常用命令
  • GitHub技巧
  • 博客搭建
  • 开发工具
更多
  • Redis

  • Nginx

  • Kafka

    • Kafka简介与安装使用
      • 一、kafka简介
      • 二、快速开始
        • 2.1、安装前环境准备
        • 2.1.1、安装JDK
        • 2.1.2、安装Zookeeper
        • 2.2、安装Kafka
        • 2.2.1、下载安装包
        • 2.2.2、修改配置文件config/server.properties
        • 2.2.3、启动服务
      • 三、Kafka集群
        • 3.1、创建配置文件并修改内容
        • 3.2、启动节点
      • 四、Kafka基本使用
        • 4.1、创建主题
        • 4.2、发送消息
        • 4.3、消费消息
      • 五、kafka可视化管理工具
        • 5.1、下载解压
        • 5.2、修改配置文件vi conf/application.conf
        • 5.3、启动
    • Kafka底层设计原理详解
    • Kafka的集成与应用
    • Kafka线上问题及优化
  • Shardingsphere

  • Otter

  • nexus

  • 中间件
  • Kafka
元月
2023-01-02
目录

Kafka简介与安装使用

# 一、kafka简介

Kafka是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等。

  • 日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反 馈,比如报警和报告。

# 二、快速开始 (opens new window)

# 2.1、安装前环境准备

# 2.1.1、安装JDK

Kafka是用Scala语言开发的,运行在JVM上,所以需要先安装JDK (opens new window)

# 2.1.2、安装Zookeeper

kafka依赖zookeeper,所以需要先安装Zookeeper

# 2.2、安装Kafka

# 2.2.1、下载安装包
#下载2.4.1 release版本,并解压
wget https://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz #2.11是scala的版本,2.4.1是kafka的版本
tar -zxvf kafka_2.11‐2.4.1.tgz
1
2
3
# 2.2.2、修改配置文件config/server.properties
#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://node0:9092 #kafka的消息存储文件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=node0:2181
1
2
3
4
5
6
7
# 2.2.3、启动服务
# 启动kafka,运行日志在logs目录的server.log文件里
bin/kafka-server-start.sh -daemon config/server.properties#后台启动,不会打印日志到控制台或者用
bin/kafka-server-start.sh config/server.properties&

# 我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树
bin/zkCli.sh
ls / #查看zk的根目录kafka相关节点
ls /brokers/ids #查看kafka节点

# 停止kafka
bin/kafka-server-stop.sh
1
2
3
4
5
6
7
8
9
10
11

kafka在zk节点数据图

# 三、Kafka集群

由于服务器资源有限,现在我们在一台机器上同时启动三个broker实例。

# 3.1、创建配置文件并修改内容

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
1
2

修改配置文件vi config/server-1.properties

#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://node0:9093 #kafka的消息存储文件
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址
zookeeper.connect=node0:2181
1
2
3
4
5
6
7

修改配置文件vi config/server-2.properties

#broker.id属性在kafka集群中必须要是唯一
broker.id=2
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://node0:9094 #kafka的消息存储文件
log.dir=/usr/local/data/kafka-logs-2
#kafka连接zookeeper的地址
zookeeper.connect=node0:2181
1
2
3
4
5
6
7

# 3.2、启动节点

bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties

#查看zookeeper确认集群节点是否都注册成功
[zk: localhost:2181(CONNECTED) 13] ls /brokers/ids
[0,1,2]
1
2
3
4
5
6

# 四、Kafka基本使用

# 4.1、创建主题

#创建一个test-topic的主题
bin/kafka-topics.sh --create --zookeeper node0:2181 --replication-factor 1 --partitions 2 --topic test-topic

#查看所有主题
bin/kafka-topics.sh --list --zookeeper node0:2181

#增加分区数量(目前kafka不支持减少分区)
bin/kafka-topics.sh -alter --partitions 3 --zookeeper node0:2181 --topic test-topic

#删除主题
bin/kafka-topics.sh --delete --topic test-topic --zookeeper node0:2181
1
2
3
4
5
6
7
8
9
10
11

查看某个topic的情况

bin/kafka-topics.sh --describe --topic test-topic --zookeeper node0:2181
1

第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息

  • Leader节点负责给定partition的所有读写请求,同一个主题不同分区leader副本一般不一样(为了容灾)
  • Replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
  • Isr 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。

# 4.2、发送消息

bin/kafka-console-producer.sh --broker-list node0:9092,node1:9093,node2:9094 --topic test-topic
>this is a msg1
>this is a msg2
1
2
3

# 4.3、消费消息

#默认是消费最新的消息
bin/kafka-console-consumer.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --topic test-topic

#消费之前的消息`--from-beginning`
bin/kafka-console-consumer.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --topic test-topic --from-beginning

#消费多主题
bin/kafka-console-consumer.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --whitelist "test-topic|test-2"

#单播消费:一条消息只能被某一个消费者消费的模式,只需让所有消费者在同一个消费组里即可
bin/kafka-console-consumer.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --topic test-topic --consumer-property group.id=testGroup1

#多播消费:一条消息能被多个消费者消费的模式,类似publish-subscribe模式,只要保证这些消费者属于不同的消费组即可
bin/kafka-console-consumer.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --topic test-topic --consumer-property group.id=testGroup2

#查看消费组名
bin/kafka-consumer-groups.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --list
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

查看消费组的消费偏移量

bin/kafka-consumer-groups.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --describe --group testGroup
1

current-offset:当前消费组的已消费偏移量 log-end-offset:主题对应分区消息的结束偏移量(HW) lag:当前消费组未消费的消息数

# 五、kafka可视化管理工具

# 5.1、下载解压

unzip kafka-manager-1.3.3.7.zip
cd kafka-manager-1.3.3.7
1
2

# 5.2、修改配置文件vi conf/application.conf

#kafka-manager.zkhosts="localhost:2181"       ##注释这一行,下面换成zk集群地址
kafka-manager.zkhosts="node0:2181,node1:2181,node2:2181"
1
2

# 5.3、启动

#bin/kafka-manager
#kafka-manager 默认的端口是9000,可通过 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:

nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000 &
1
2
3
4

更多安装使用可参考:https://www.cnblogs.com/dadonggg/p/8205302.html (opens new window)

#Kafka
Nginx安装和基本使用
Kafka底层设计原理详解

← Nginx安装和基本使用 Kafka底层设计原理详解→

最近更新
01
otter二次开发-支持按目标端主键索引Load数据
08-03
02
mvnw简介
06-21
03
gor流量复制工具
06-03
更多文章>
Theme by Vdoing | Copyright © 2022-2024 元月 | 粤ICP备2022071877号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式