kafka的性能分析

NIO
Zero Copy
磁盘顺序读写
Queue数据结构的极致使用

今天听到右同学说搭建kafka的时候出现了问题,导致消费不了。我想了想,这问题可能出在zookeeper中,
晚上回来比较晚了,想想也猜不出来到底是什么原因,索性自己搭了一个伪集群,其中也踩了一些坑。
注意: zookeeper 3.3.6 和 kafka 2.12冲突,搭建会服务不可用,4个小时血粼粼的教训

kafka采用的版本是kafka_2.11-2.3.0.tgz

zookeeper采用版本是 apache-zookeeper-3.5.5-bin.tar.gz

首先进行zookeeper的搭建

下载解压缩包

  • 复制三份,分别起名apache-zookeeper-3.5.5-bin0,apache-zookeeper-3.5.5-bin1,apache-zookeeper-3.5.5-bin2
    三个zookeeper解压包

  • 开始配置, 进入apache-zookeeper-3.5.5-bin0/conf下,将 zoo_sample.cfg复制一份改名为zoo.cfg,并打开删除所有,粘贴一下这几行。

      tickTime=2000
      initLimit=10
      syncLimit=5
      dataDir=xxxxx/apache-zookeeper-3.5.5-bin0/data // 改成自己zoopeeker的路径下
      dataLogDir=xxxxx/apache-zookeeper-3.5.5-bin0/zk1/log // 同上
      admin.serverPort=14110
      clientPort=12180  //端口自己改,这里是我写的
      server.1=localhost:12888:13888 `//这里是zookeeper在进行选举需要的端口,因为是伪集群,所以也不能重复`
      server.2=localhost:12899:13899
      server.3=localhost:12877:13877
    

这里不做解释,网上太多了,但是着重解释一下,因为最新版引入了jetty,所以admin.serverPort=14110 这一行必须加,端口改为只要不冲突就可以。郁闷,我在这纠结了好久,最后还是万能的度娘告诉我怎么解。赞赞赞
其他两台服务,同样修改下,但是需要修改clientPort=12180为不同的端口,以不冲突为准。
度娘万岁

  • 创建data文件夹

因为是搭建集群,所以zookeeper就需要一个标示id来区分,上边一步已经设置了dataDir的路径,所以这一步就比较简单了,只需要执行在zookeeper的根目录里执行
sudo mkdir data && echo 1 >> myid
同样的次步另外两台服务也需要执行,只是myid不能一样,分别执行 sudo mkdir data && echo 2 >> myid 和 sudo mkdir data && echo 3 >> myid

  • 启动

相对简单的多分别在三个服务下执行
sudo ./bin/zkServer.sh --config ./conf start

  • 验证服务是否,并查看leader

在不同的服务根目录下执行: sudo ./bin/zkServer.sh status
leader
follower

zookeeper安装成功。剩下下准备安装kafka。

kafka的安装

  • 解压压缩包并分成三份。
    kafka服务三部分

  • 开始配置,kafka的配置就相对简单的多(基本使用,复杂的另算,本教程只是简单使用,不涉及复杂配置)

进入kafka_2.11-2.3.0/config中开始对kafka的服务配置,打开server.properties
我们配置一下几点,其他两个服务同样复制,但是需要修改broker.id和port和日志路径
配置基本broker.id , 端口,日志之类
非伪集群需要这样配置
这里配置zookeeper连接

  • 使用命令分别启动三台kafka

sudo ./bin/kafka-server-start.sh ./config/server.properties 前台启动
sudo ./bin/kafka-server-start.sh -daemon ./config/server.properties 后台启动

Test测试使用

##kafka的命令

注释:kafka的命令不能使用tab制表符:https://blog.csdn.net/lzm1340458776/article/details/45576825

##创建topic

./bin/kafka-topics.sh --create --zookeeper localhost:12180,localhost:12181,localhost:12182 --replication-factor 3 --partitions 3 --topic test
创建topic


##显示topic信息

./bin/kafka-topics.sh --describe --zookeeper localhost:12180,localhost:12181,localhost:12182 --topic test
显示topic信息


##列出topic

./bin/kafka-topics.sh --list --zookeeper localhost:12180,localhost:12181,localhost:12182
列出topic


##创建producer

./bin/kafka-console-producer.sh --broker-list localhost:9092 -topic test
生产者


##创建消费者consumer

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
消费者

Java连接测试

  • 采用maven导入kafka驱动jar包
<dependency>       
    <groupId>org.apache.kafka</groupId>        
    <artifactId>kafka-clients</artifactId>         
    <version>1.0.1</version>        
 </dependency>          
<dependency>       
     <groupId>com.typesafe</groupId>        
     <artifactId>config</artifactId>        
     <version>1.3.3</version>           
</dependency>          

kafka配置文件

resources目录下创建kafkaConfig.conf文件,并复制一下配置,修改
producer {
bootstrap.servers = “localhost:9092,localhost:9091,localhost:9090”
partitioner.class = “org.apache.kafka.clients.producer.internals.DefaultPartitioner”
key.serializer = “org.apache.kafka.common.serialization.StringSerializer”
value.serializer = “org.apache.kafka.common.serialization.StringSerializer”
compression.type = none
request.timeout.ms = 1000
linger.ms = 0
Client.id = “test”
max.request.size = 1024
batch.size = 16348
buffer.memory = 33554432
transactional.id = 123
}
consumer {
bootstrap.servers = “localhost:9092,localhost:9091,localhost:9090”
group.id = “abs”
enable.auto.commit = true
auto.commit.interval.ms = 1000
session.timeout.ms = 30000
key.deserializer = “org.apache.kafka.common.serialization.StringDeserializer”
value.deserializer = “org.apache.kafka.common.serialization.StringDeserializer”
topic=“test”
}

kafka javaDemo

    
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
public class KafkaClient {
    private static final Properties PROPERTIES_KAKFA_PRODUCER = new Properties();
    private static final Properties PROPERTIES_KAKFA_CONSUMER = new Properties();
    private static ArrayList<String> TOPICS = null;
    static {
        Config load = ConfigFactory.load("Application.conf");
        Config producer = load.getConfig("producer");
        String string = producer.getString("bootstrap.servers");
        PROPERTIES_KAKFA_PRODUCER.put("bootstrap.servers", Arrays.asList(string.split(",")));
        PROPERTIES_KAKFA_PRODUCER.put("partitioner.class", producer.getString("partitioner.class"));
        PROPERTIES_KAKFA_PRODUCER.put("key.serializer", producer.getString("key.serializer"));
        PROPERTIES_KAKFA_PRODUCER.put("value.serializer", producer.getString("value.serializer"));
        PROPERTIES_KAKFA_PRODUCER.put("compression.type", producer.getString("compression.type"));
        PROPERTIES_KAKFA_PRODUCER.put("request.timeout.ms", producer.getString("request.timeout.ms"));
        PROPERTIES_KAKFA_PRODUCER.put("linger.ms", producer.getString("linger.ms"));
        PROPERTIES_KAKFA_PRODUCER.put("Client.id", producer.getString("Client.id"));
        PROPERTIES_KAKFA_PRODUCER.put("max.request.size", producer.getString("max.request.size"));
        PROPERTIES_KAKFA_PRODUCER.put("batch.size", producer.getString("batch.size"));
        PROPERTIES_KAKFA_PRODUCER.put("buffer.memory", producer.getString("buffer.memory"));
        String tid = producer.getString("transactional.id");
        if (tid.length() > 0 && tid != null) {
            PROPERTIES_KAKFA_PRODUCER.put("transactional.id", tid);
        }
        Config consumer = load.getConfig("consumer");
        String str = producer.getString("bootstrap.servers");
        PROPERTIES_KAKFA_CONSUMER.put("bootstrap.servers", Arrays.asList(str.split(",")));
        PROPERTIES_KAKFA_CONSUMER.put("group.id", consumer.getString("group.id"));
        PROPERTIES_KAKFA_CONSUMER.put("enable.auto.commit", consumer.getString("enable.auto.commit"));
        PROPERTIES_KAKFA_CONSUMER.put("auto.commit.interval.ms", consumer.getString("auto.commit.interval.ms"));
        PROPERTIES_KAKFA_CONSUMER.put("key.deserializer", consumer.getString("key.deserializer"));
        PROPERTIES_KAKFA_CONSUMER.put("value.deserializer", consumer.getString("value.deserializer"));
        TOPICS = new ArrayList<String>(Arrays.asList(consumer.getString("topic").split(",")));
    }
    private static volatile KafkaProducer<String, String> kafkaProducer = null;
    public static KafkaProducer<String, String> builderProducer() {
        if (kafkaProducer == null) {
            synchronized (KafkaProducer.class) {
                if (kafkaProducer == null)
                    kafkaProducer = new KafkaProducer(PROPERTIES_KAKFA_PRODUCER);
            }
        }
        return kafkaProducer;
    }
    private static volatile KafkaConsumer<String, String> kafkaConsumer = null;
    public static KafkaConsumer<String, String> builderConsumer() {
        if (kafkaConsumer == null) {
            synchronized (KafkaProducer.class) {
                if (kafkaConsumer == null)
                    kafkaConsumer = new KafkaConsumer(PROPERTIES_KAKFA_CONSUMER);
                kafkaConsumer.subscribe(TOPICS);
            }
        }
        return kafkaConsumer;
    }
}

调用


import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class Client {
    public static void main(String[] args) {
        producer();
        consumer();
    }
    /**
     * 消费者
     */
    private static void consumer() {
        KafkaConsumer<String, String> consumer = KafkaClient.builderConsumer();
        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(100);
            poll.forEach(record -> {
                System.err.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
            });
        }
    }
    /**
     * 生产者
     */
    private static void producer() {
        KafkaProducer<String, String> kafkaProducer = KafkaClient.builderProducer();
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        kafkaProducer.send(new ProducerRecord<>("test", "value"));
        kafkaProducer.commitTransaction();
    }
}