kafka

80

docker安装kafka

拉取镜像

1
2
docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka

启动Zookeeper

1
2
3
4
5
6
7
8
9
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper ## 查看zookeeper端口: sudo docker inspect zookeeper | grep IPAddress ********************************************* "SecondaryIPAddresses": null, "IPAddress": "172.17.0.4", "IPAddress": "172.17.0.4", *********************************************

启动Kafka

1
2
#ip可以填宿主主机ip(192.168.x.x) 也可以填写docker容器ip(172.17.0.4) 最好宿主主机ip docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.239.129:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.239.129:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

参数说明:

  • -e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
  • -e KAFKA_ZOOKEEPER_CONNECT=192.168.139.187:2181/kafka 将配置中 KAFKA_ZOOKEEPER_CONNECT的zookeeper的端口设置为外网ip,因为127.0.0.1或0.0.0.0相对kafka镜像来说是自己的镜像内部的内部ip,docker镜像外部无法访问。
  • -e KAFKA_LISTENERS=PLAINTEXT://:9092 配置kafka的监听端口 就是主要用来定义Kafka Broker的Listener的配置项。 是kafka真正bind的地址
  • -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.187:9092 参数的作用就是将Broker的Listener信息发布到Zookeeper中 是暴露给外部的listeners,如果没有设置,会用listeners
  • -v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

测试

订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) { Properties p = new Properties(); p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.129:9092"); p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); p.put(ConsumerConfig.GROUP_ID_CONFIG, "mysql"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p); // 订阅消息 kafkaConsumer.subscribe(Collections.singletonList(MyProducer.topic)); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value())); } } }

发布

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static String topic = "test";//定义主题 public static void main(String[] args) throws InterruptedException { Properties p = new Properties(); //kafka地址,多个地址用逗号分割 p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.129:9092"); p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p); try { while (true) { String msg = "Hello," + new Random().nextInt(100); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg); kafkaProducer.send(record); System.out.println("消息发送成功:" + msg); Thread.sleep(1000); } } finally { kafkaProducer.close(); } }
目录