2 0 0 0
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
# 查看zookeeper端口:
sudo docker inspect zookeeper | grep IPAddress
*********************************************
"SecondaryIPAddresses": null,
"IPAddress": "172.17.0.4",
"IPAddress": "172.17.0.4",
*********************************************
#ip可以填宿主主机ip(192.168.x.x) 也可以填写docker容器ip(172.17.0.4) 最好宿主主机ip
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.239.129:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.239.129:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
参数说明:
public static void main(String[] args) {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.129:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, "mysql");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
// 订阅消息
kafkaConsumer.subscribe(Collections.singletonList(MyProducer.topic));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic:%s,offset:%d,消息:%s",
record.topic(), record.offset(), record.value()));
}
}
}
public static String topic = "test";//定义主题
public static void main(String[] args) throws InterruptedException {
Properties p = new Properties();
//kafka地址,多个地址用逗号分割
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.129:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
try {
while (true) {
String msg = "Hello," + new Random().nextInt(100);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
kafkaProducer.send(record);
System.out.println("消息发送成功:" + msg);
Thread.sleep(1000);
}
} finally {
kafkaProducer.close();
}
}