docker安装kafka
拉取镜像
1
2
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
启动Zookeeper
1
2
3
4
5
6
7
8
9
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
#
sudo docker inspect zookeeper | grep IPAddress
*********************************************
"SecondaryIPAddresses": null,
"IPAddress": "172.17.0.4",
"IPAddress": "172.17.0.4",
*********************************************
启动Kafka
1
2
#ip可以填宿主主机ip(192.168.x.x) 也可以填写docker容器ip(172.17.0.4) 最好宿主主机ip
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.239.129:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.239.129:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
参数说明:
- -e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
- -e KAFKA_ZOOKEEPER_CONNECT=192.168.139.187:2181/kafka 将配置中 KAFKA_ZOOKEEPER_CONNECT的zookeeper的端口设置为外网ip,因为127.0.0.1或0.0.0.0相对kafka镜像来说是自己的镜像内部的内部ip,docker镜像外部无法访问。
- -e KAFKA_LISTENERS=PLAINTEXT://:9092 配置kafka的监听端口 就是主要用来定义Kafka Broker的Listener的配置项。 是kafka真正bind的地址
- -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.187:9092 参数的作用就是将Broker的Listener信息发布到Zookeeper中 是暴露给外部的listeners,如果没有设置,会用listeners
- -v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
测试
订阅
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.129:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, "mysql");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
kafkaConsumer.subscribe(Collections.singletonList(MyProducer.topic));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic:%s,offset:%d,消息:%s",
record.topic(), record.offset(), record.value()));
}
}
}
发布
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static String topic = "test";
public static void main(String[] args) throws InterruptedException {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.129:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
try {
while (true) {
String msg = "Hello," + new Random().nextInt(100);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
kafkaProducer.send(record);
System.out.println("消息发送成功:" + msg);
Thread.sleep(1000);
}
} finally {
kafkaProducer.close();
}
}
<h2><a id="dockerkafka_0"></a>docker安装kafka</h2>
<h3><a id="_1"></a>拉取镜像</h3>
<pre><div class="hljs"><code class="lang-shell">docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
</code></div></pre>
<h3><a id="Zookeeper_6"></a>启动Zookeeper</h3>
<pre><div class="hljs"><code class="lang-shell">docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
<span class="hljs-meta">
#</span><span class="bash"><span class="hljs-comment"># 查看zookeeper端口:</span></span>
sudo docker inspect zookeeper | grep IPAddress
*********************************************
"SecondaryIPAddresses": null,
"IPAddress": "172.17.0.4",
"IPAddress": "172.17.0.4",
*********************************************
</code></div></pre>
<h3><a id="Kafka_18"></a>启动Kafka</h3>
<pre><div class="hljs"><code class="lang-shell"><span class="hljs-meta">#</span><span class="bash">ip可以填宿主主机ip(192.168.x.x) 也可以填写docker容器ip(172.17.0.4) 最好宿主主机ip</span>
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.239.129:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.239.129:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
</code></div></pre>
<p>参数说明:</p>
<ul>
<li>-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己</li>
<li>-e KAFKA_ZOOKEEPER_CONNECT=192.168.139.187:2181/kafka 将配置中 KAFKA_ZOOKEEPER_CONNECT的zookeeper的端口设置为外网ip,因为127.0.0.1或0.0.0.0相对kafka镜像来说是自己的镜像内部的内部ip,docker镜像外部无法访问。</li>
<li>-e KAFKA_LISTENERS=PLAINTEXT://:9092 配置kafka的监听端口 就是主要用来定义Kafka Broker的Listener的配置项。 是kafka真正bind的地址</li>
<li>-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.187:9092 参数的作用就是将Broker的Listener信息发布到Zookeeper中 是暴露给外部的listeners,如果没有设置,会用listeners</li>
<li>-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间</li>
</ul>
<h2><a id="_31"></a>测试</h2>
<h3><a id="_32"></a>订阅</h3>
<pre><div class="hljs"><code class="lang-java"><span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> </span>{
Properties p = <span class="hljs-keyword">new</span> Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, <span class="hljs-string">"192.168.239.129:9092"</span>);
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, <span class="hljs-string">"mysql"</span>);
KafkaConsumer<String, String> kafkaConsumer = <span class="hljs-keyword">new</span> KafkaConsumer<String, String>(p);
<span class="hljs-comment">// 订阅消息</span>
kafkaConsumer.subscribe(Collections.singletonList(MyProducer.topic));
<span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(<span class="hljs-number">100</span>);
<span class="hljs-keyword">for</span> (ConsumerRecord<String, String> record : records) {
System.out.println(String.format(<span class="hljs-string">"topic:%s,offset:%d,消息:%s"</span>,
record.topic(), record.offset(), record.value()));
}
}
}
</code></div></pre>
<h3><a id="_54"></a>发布</h3>
<pre><div class="hljs"><code class="lang-java"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> String topic = <span class="hljs-string">"test"</span>;<span class="hljs-comment">//定义主题</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> <span class="hljs-keyword">throws</span> InterruptedException </span>{
Properties p = <span class="hljs-keyword">new</span> Properties();
<span class="hljs-comment">//kafka地址,多个地址用逗号分割</span>
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, <span class="hljs-string">"192.168.239.129:9092"</span>);
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = <span class="hljs-keyword">new</span> KafkaProducer<>(p);
<span class="hljs-keyword">try</span> {
<span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
String msg = <span class="hljs-string">"Hello,"</span> + <span class="hljs-keyword">new</span> Random().nextInt(<span class="hljs-number">100</span>);
ProducerRecord<String, String> record = <span class="hljs-keyword">new</span> ProducerRecord<String, String>(topic, msg);
kafkaProducer.send(record);
System.out.println(<span class="hljs-string">"消息发送成功:"</span> + msg);
Thread.sleep(<span class="hljs-number">1000</span>);
}
} <span class="hljs-keyword">finally</span> {
kafkaProducer.close();
}
}
</code></div></pre>