拉取镜像
1
docker pull apache/rocketmq:5.3.2
创建容器共享网络
RocketMQ 中有多个服务,需要创建多个容器,创建 docker 网络便于容器间相互通信。
1
docker network create rocketmq
启动NameServer
1
2
3
4
5
# 启动 NameServer
docker run -d --name rmqnamesrv -p 9876:9876 --network rocketmq apache/rocketmq:5.3.2 sh mqnamesrv
# 验证 NameServer 是否启动成功
docker logs -f rmqnamesrv
看到 ‘The Name Server boot success…’, 表示NameServer 已成功启动。
启动 Broker+Proxy
配置 Broker 的IP地址
1
echo "brokerIP1=127.0.0.1" > broker.conf
把127.0.0.1改成外网ip
修改端口
创建rmq-proxy.json文件
1
2
3
4
5
6
{
"rocketMQClusterName": "DefaultCluster",
"remotingAccessAddr": "v-kun.com",
"remotingListenPort": 17080,
"grpcServerPort": 17081
}
remotingListenPort默认8080端口是remoting监听的端口
grpcServerPort默认8081是grpc接口, 根据文档, rocketmq推荐使用grpc接口, 后续也会主要基于grpc进行开发
启动
1
2
3
4
5
6
7
8
9
10
11
docker run -d \
--name rmqbroker \
--network rocketmq \
-p 10912:10912 -p 10911:10911 -p 10909:10909 \
-p 17080:17080 -p 17081:17081 \
-e "NAMESRV_ADDR=rmqnamesrv:9876" \
-v ./broker.conf:/home/rocketmq/rocketmq-5.3.2/conf/broker.conf \
-v ./rmq-proxy.json:/home/rocketmq/rocketmq-5.3.2/conf/rmq-proxy.json \
apache/rocketmq:5.3.2 sh mqbroker \
--enable-proxy \
-c /home/rocketmq/rocketmq-5.3.2/conf/broker.conf
修改内存
- 复制出启动文件
1
docker cp rmqbroker:/home/rocketmq/rocketmq-5.3.2/bin/runbroker.sh ./
- 修改默认内存配置
1
DEFAULT_HEAP_OPTS="-Xms256m -Xmx256m -Xmn128m -XX:MaxDirectMemorySize=1g"
- 再将文件cp回去
1
docker cp ./runbroker.sh rmqbroker:/home/rocketmq/rocketmq-5.3.2/bin/
- 重启
1
2
docker stop rmqbroker
docker start rmqbroker
客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints("v-kun.com:17081")
.build();
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
try {
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(group)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(this::exec)
.build();
System.out.println("消费者已启动:" + topic+"\t"+group);
} catch (ClientException e) {
throw new RuntimeException(e);
}
<h2><a id="_0"></a>拉取镜像</h2>
<pre><div class="hljs"><code class="lang-shell">docker pull apache/rocketmq:5.3.2
</code></div></pre>
<h2><a id="_5"></a>创建容器共享网络</h2>
<p>RocketMQ 中有多个服务,需要创建多个容器,创建 docker 网络便于容器间相互通信。</p>
<pre><div class="hljs"><code class="lang-shell">docker network create rocketmq
</code></div></pre>
<h2><a id="NameServer_10"></a>启动NameServer</h2>
<pre><div class="hljs"><code class="lang-shell"><span class="hljs-meta">#</span><span class="bash"> 启动 NameServer</span>
docker run -d --name rmqnamesrv -p 9876:9876 --network rocketmq apache/rocketmq:5.3.2 sh mqnamesrv
<span class="hljs-meta">
#</span><span class="bash"> 验证 NameServer 是否启动成功</span>
docker logs -f rmqnamesrv
</code></div></pre>
<p>看到 ‘The Name Server boot success…’, 表示NameServer 已成功启动。</p>
<h2><a id="_BrokerProxy_21"></a>启动 Broker+Proxy</h2>
<h3><a id="_Broker_IP_22"></a>配置 Broker 的IP地址</h3>
<pre><div class="hljs"><code class="lang-shell">echo "brokerIP1=127.0.0.1" > broker.conf
</code></div></pre>
<p>把127.0.0.1改成外网ip</p>
<h3><a id="_27"></a>修改端口</h3>
<p>创建rmq-proxy.json文件</p>
<pre><div class="hljs"><code class="lang-json">{
<span class="hljs-attr">"rocketMQClusterName"</span>: <span class="hljs-string">"DefaultCluster"</span>,
<span class="hljs-attr">"remotingAccessAddr"</span>: <span class="hljs-string">"v-kun.com"</span>,
<span class="hljs-attr">"remotingListenPort"</span>: <span class="hljs-number">17080</span>,
<span class="hljs-attr">"grpcServerPort"</span>: <span class="hljs-number">17081</span>
}
</code></div></pre>
<p>remotingListenPort默认8080端口是remoting监听的端口<br />
grpcServerPort默认8081是grpc接口, 根据文档, rocketmq推荐使用grpc接口, 后续也会主要基于grpc进行开发</p>
<h3><a id="_39"></a>启动</h3>
<pre><div class="hljs"><code class="lang-shell">docker run -d \
--name rmqbroker \
--network rocketmq \
-p 10912:10912 -p 10911:10911 -p 10909:10909 \
-p 17080:17080 -p 17081:17081 \
-e "NAMESRV_ADDR=rmqnamesrv:9876" \
-v ./broker.conf:/home/rocketmq/rocketmq-5.3.2/conf/broker.conf \
-v ./rmq-proxy.json:/home/rocketmq/rocketmq-5.3.2/conf/rmq-proxy.json \
apache/rocketmq:5.3.2 sh mqbroker \
--enable-proxy \
-c /home/rocketmq/rocketmq-5.3.2/conf/broker.conf
</code></div></pre>
<h2><a id="_54"></a>修改内存</h2>
<ol>
<li>复制出启动文件</li>
</ol>
<pre><div class="hljs"><code class="lang-shell">docker cp rmqbroker:/home/rocketmq/rocketmq-5.3.2/bin/runbroker.sh ./
</code></div></pre>
<ol start="2">
<li>修改默认内存配置</li>
</ol>
<pre><div class="hljs"><code class="lang-shell">DEFAULT_HEAP_OPTS="-Xms256m -Xmx256m -Xmn128m -XX:MaxDirectMemorySize=1g"
</code></div></pre>
<ol start="3">
<li>再将文件cp回去</li>
</ol>
<pre><div class="hljs"><code class="lang-shell">docker cp ./runbroker.sh rmqbroker:/home/rocketmq/rocketmq-5.3.2/bin/
</code></div></pre>
<ol start="4">
<li>重启</li>
</ol>
<pre><div class="hljs"><code class="lang-shell">docker stop rmqbroker
docker start rmqbroker
</code></div></pre>
<h2><a id="_73"></a>客户端</h2>
<pre><div class="hljs"><code class="lang-java"> ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(<span class="hljs-string">"v-kun.com:17081"</span>)
.build();
FilterExpression filterExpression = <span class="hljs-keyword">new</span> FilterExpression(<span class="hljs-string">"*"</span>, FilterExpressionType.TAG);
<span class="hljs-keyword">try</span> {
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
<span class="hljs-comment">// 设置消费者分组。</span>
.setConsumerGroup(group)
<span class="hljs-comment">// 设置预绑定的订阅关系。</span>
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
<span class="hljs-comment">// 设置消费监听器。</span>
.setMessageListener(<span class="hljs-keyword">this</span>::exec)
.build();
System.out.println(<span class="hljs-string">"消费者已启动:"</span> + topic+<span class="hljs-string">"\t"</span>+group);
} <span class="hljs-keyword">catch</span> (ClientException e) {
<span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> RuntimeException(e);
}
</code></div></pre>