kafka的性能分析
NIO
Zero Copy
磁盘顺序读写
Queue数据结构的极致使用
今天听到右同学说搭建kafka的时候出现了问题,导致消费不了。我想了想,这问题可能出在zookeeper中,
晚上回来比较晚了,想想也猜不出来到底是什么原因,索性自己搭了一个伪集群,其中也踩了一些坑。
注意: zookeeper 3.3.6 和 kafka 2.12冲突,搭建会服务不可用,4个小时血粼粼的教训
kafka采用的版本是kafka_2.11-2.3.0.tgz
zookeeper采用版本是 apache-zookeeper-3.5.5-bin.tar.gz
首先进行zookeeper的搭建
下载解压缩包
-
复制三份,分别起名apache-zookeeper-3.5.5-bin0,apache-zookeeper-3.5.5-bin1,apache-zookeeper-3.5.5-bin2
-
开始配置, 进入apache-zookeeper-3.5.5-bin0/conf下,将 zoo_sample.cfg复制一份改名为zoo.cfg,并打开删除所有,粘贴一下这几行。
tickTime=2000 initLimit=10 syncLimit=5 dataDir=xxxxx/apache-zookeeper-3.5.5-bin0/data // 改成自己zoopeeker的路径下 dataLogDir=xxxxx/apache-zookeeper-3.5.5-bin0/zk1/log // 同上 admin.serverPort=14110 clientPort=12180 //端口自己改,这里是我写的 server.1=localhost:12888:13888 `//这里是zookeeper在进行选举需要的端口,因为是伪集群,所以也不能重复` server.2=localhost:12899:13899 server.3=localhost:12877:13877
这里不做解释,网上太多了,但是着重解释一下,因为最新版引入了jetty,所以admin.serverPort=14110 这一行必须加,端口改为只要不冲突就可以。郁闷,我在这纠结了好久,最后还是万能的度娘告诉我怎么解。赞赞赞
其他两台服务,同样修改下,但是需要修改clientPort=12180为不同的端口,以不冲突为准。
- 创建data文件夹
因为是搭建集群,所以zookeeper就需要一个标示id来区分,上边一步已经设置了dataDir的路径,所以这一步就比较简单了,只需要执行在zookeeper的根目录里执行
sudo mkdir data && echo 1 >> myid
同样的次步另外两台服务也需要执行,只是myid不能一样,分别执行 sudo mkdir data && echo 2 >> myid 和 sudo mkdir data && echo 3 >> myid
- 启动
相对简单的多分别在三个服务下执行
sudo ./bin/zkServer.sh --config ./conf start
- 验证服务是否,并查看leader
在不同的服务根目录下执行: sudo ./bin/zkServer.sh status
zookeeper安装成功。剩下下准备安装kafka。
kafka的安装
-
解压压缩包并分成三份。
-
开始配置,kafka的配置就相对简单的多(基本使用,复杂的另算,本教程只是简单使用,不涉及复杂配置)
进入kafka_2.11-2.3.0/config中开始对kafka的服务配置,打开server.properties
我们配置一下几点,其他两个服务同样复制,但是需要修改broker.id和port和日志路径
- 使用命令分别启动三台kafka
sudo ./bin/kafka-server-start.sh ./config/server.properties 前台启动
sudo ./bin/kafka-server-start.sh -daemon ./config/server.properties 后台启动
Test测试使用
##kafka的命令
注释:kafka的命令不能使用tab制表符:https://blog.csdn.net/lzm1340458776/article/details/45576825
##创建topic
./bin/kafka-topics.sh --create --zookeeper localhost:12180,localhost:12181,localhost:12182 --replication-factor 3 --partitions 3 --topic test
##显示topic信息
./bin/kafka-topics.sh --describe --zookeeper localhost:12180,localhost:12181,localhost:12182 --topic test
##列出topic
./bin/kafka-topics.sh --list --zookeeper localhost:12180,localhost:12181,localhost:12182
##创建producer
./bin/kafka-console-producer.sh --broker-list localhost:9092 -topic test
##创建消费者consumer
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Java连接测试
- 采用maven导入kafka驱动jar包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.3</version>
</dependency>
kafka配置文件
resources目录下创建kafkaConfig.conf文件,并复制一下配置,修改
producer {
bootstrap.servers = “localhost:9092,localhost:9091,localhost:9090”
partitioner.class = “org.apache.kafka.clients.producer.internals.DefaultPartitioner”
key.serializer = “org.apache.kafka.common.serialization.StringSerializer”
value.serializer = “org.apache.kafka.common.serialization.StringSerializer”
compression.type = none
request.timeout.ms = 1000
linger.ms = 0
Client.id = “test”
max.request.size = 1024
batch.size = 16348
buffer.memory = 33554432
transactional.id = 123
}
consumer {
bootstrap.servers = “localhost:9092,localhost:9091,localhost:9090”
group.id = “abs”
enable.auto.commit = true
auto.commit.interval.ms = 1000
session.timeout.ms = 30000
key.deserializer = “org.apache.kafka.common.serialization.StringDeserializer”
value.deserializer = “org.apache.kafka.common.serialization.StringDeserializer”
topic=“test”
}
kafka javaDemo
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
public class KafkaClient {
private static final Properties PROPERTIES_KAKFA_PRODUCER = new Properties();
private static final Properties PROPERTIES_KAKFA_CONSUMER = new Properties();
private static ArrayList<String> TOPICS = null;
static {
Config load = ConfigFactory.load("Application.conf");
Config producer = load.getConfig("producer");
String string = producer.getString("bootstrap.servers");
PROPERTIES_KAKFA_PRODUCER.put("bootstrap.servers", Arrays.asList(string.split(",")));
PROPERTIES_KAKFA_PRODUCER.put("partitioner.class", producer.getString("partitioner.class"));
PROPERTIES_KAKFA_PRODUCER.put("key.serializer", producer.getString("key.serializer"));
PROPERTIES_KAKFA_PRODUCER.put("value.serializer", producer.getString("value.serializer"));
PROPERTIES_KAKFA_PRODUCER.put("compression.type", producer.getString("compression.type"));
PROPERTIES_KAKFA_PRODUCER.put("request.timeout.ms", producer.getString("request.timeout.ms"));
PROPERTIES_KAKFA_PRODUCER.put("linger.ms", producer.getString("linger.ms"));
PROPERTIES_KAKFA_PRODUCER.put("Client.id", producer.getString("Client.id"));
PROPERTIES_KAKFA_PRODUCER.put("max.request.size", producer.getString("max.request.size"));
PROPERTIES_KAKFA_PRODUCER.put("batch.size", producer.getString("batch.size"));
PROPERTIES_KAKFA_PRODUCER.put("buffer.memory", producer.getString("buffer.memory"));
String tid = producer.getString("transactional.id");
if (tid.length() > 0 && tid != null) {
PROPERTIES_KAKFA_PRODUCER.put("transactional.id", tid);
}
Config consumer = load.getConfig("consumer");
String str = producer.getString("bootstrap.servers");
PROPERTIES_KAKFA_CONSUMER.put("bootstrap.servers", Arrays.asList(str.split(",")));
PROPERTIES_KAKFA_CONSUMER.put("group.id", consumer.getString("group.id"));
PROPERTIES_KAKFA_CONSUMER.put("enable.auto.commit", consumer.getString("enable.auto.commit"));
PROPERTIES_KAKFA_CONSUMER.put("auto.commit.interval.ms", consumer.getString("auto.commit.interval.ms"));
PROPERTIES_KAKFA_CONSUMER.put("key.deserializer", consumer.getString("key.deserializer"));
PROPERTIES_KAKFA_CONSUMER.put("value.deserializer", consumer.getString("value.deserializer"));
TOPICS = new ArrayList<String>(Arrays.asList(consumer.getString("topic").split(",")));
}
private static volatile KafkaProducer<String, String> kafkaProducer = null;
public static KafkaProducer<String, String> builderProducer() {
if (kafkaProducer == null) {
synchronized (KafkaProducer.class) {
if (kafkaProducer == null)
kafkaProducer = new KafkaProducer(PROPERTIES_KAKFA_PRODUCER);
}
}
return kafkaProducer;
}
private static volatile KafkaConsumer<String, String> kafkaConsumer = null;
public static KafkaConsumer<String, String> builderConsumer() {
if (kafkaConsumer == null) {
synchronized (KafkaProducer.class) {
if (kafkaConsumer == null)
kafkaConsumer = new KafkaConsumer(PROPERTIES_KAKFA_CONSUMER);
kafkaConsumer.subscribe(TOPICS);
}
}
return kafkaConsumer;
}
}
调用
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class Client {
public static void main(String[] args) {
producer();
consumer();
}
/**
* 消费者
*/
private static void consumer() {
KafkaConsumer<String, String> consumer = KafkaClient.builderConsumer();
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(100);
poll.forEach(record -> {
System.err.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
});
}
}
/**
* 生产者
*/
private static void producer() {
KafkaProducer<String, String> kafkaProducer = KafkaClient.builderProducer();
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
kafkaProducer.send(new ProducerRecord<>("test", "value"));
kafkaProducer.commitTransaction();
}
}