kafka

2 0 0 0

docker安装kafka

拉取镜像

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

启动Zookeeper

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

# 查看zookeeper端口:
sudo docker inspect zookeeper | grep IPAddress
*********************************************
"SecondaryIPAddresses": null,
            "IPAddress": "172.17.0.4",
                    "IPAddress": "172.17.0.4",
*********************************************

启动Kafka

#ip可以填宿主主机ip(192.168.x.x) 也可以填写docker容器ip(172.17.0.4) 最好宿主主机ip
docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.239.129:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.239.129:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

参数说明:

  • -e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
  • -e KAFKA_ZOOKEEPER_CONNECT=192.168.139.187:2181/kafka 将配置中 KAFKA_ZOOKEEPER_CONNECT的zookeeper的端口设置为外网ip,因为127.0.0.1或0.0.0.0相对kafka镜像来说是自己的镜像内部的内部ip,docker镜像外部无法访问。
  • -e KAFKA_LISTENERS=PLAINTEXT://:9092 配置kafka的监听端口 就是主要用来定义Kafka Broker的Listener的配置项。 是kafka真正bind的地址
  • -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.187:9092 参数的作用就是将Broker的Listener信息发布到Zookeeper中 是暴露给外部的listeners,如果没有设置,会用listeners
  • -v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

测试

订阅

public static void main(String[] args) {
        Properties p = new Properties();
        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.129:9092");
        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        p.put(ConsumerConfig.GROUP_ID_CONFIG, "mysql");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
        // 订阅消息
        kafkaConsumer.subscribe(Collections.singletonList(MyProducer.topic));

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("topic:%s,offset:%d,消息:%s",
                        record.topic(), record.offset(), record.value()));
            }
        }
    }

发布

public static String topic = "test";//定义主题

    public static void main(String[] args) throws InterruptedException {
        Properties p = new Properties();
        //kafka地址,多个地址用逗号分割
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.129:9092");
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);

        try {
            while (true) {
                String msg = "Hello," + new Random().nextInt(100);
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
                kafkaProducer.send(record);
                System.out.println("消息发送成功:" + msg);
                Thread.sleep(1000);
            }
        } finally {
            kafkaProducer.close();
        }

    }
目录