官方文档
Apache RocketMQ
Linux安装
- 点击下载安装包
- 进入解压后目录
- 修改bin/runbroker.sh配置
这是以前配置JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"改成:
1
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m"
- 修改conf/broker.conf配置文件
新增两个配置
1
2
3
4
brokerIP1=192.168.40.133
autoCreateTopicEnable=true
- 开放端口
1
2
3
firewall-cmd --add-port=9876/tcp --permanent
firewall-cmd --add-port=10911/tcp --permanent
firewall-cmd --reload
- 启动 Start Name Server
1
2
3
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
启动后如果报错:ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
先检查有没有装jdk,没有装的话先装jdk
修改bin/runbroker.sh文件
这是原来的
1
2
3
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
新加一行
1
2
3
4
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/home/jdk1.8.0_281/jre
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
修改bin/runserver.sh文件 修改方法同runbroker.sh
- Start Broker
1
2
3
nohup sh bin/mqbroker -n 192.168.40.133:9876 -c conf/broker.conf &
tail -f ~/logs/rocketmqlogs/broker.log
- 关闭命令
1
2
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
代码测试
- 添加依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
- 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new
DefaultMQProducer("DefaultCluster");
producer.setNamesrvAddr("192.168.40.133:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicTest1" ,
"TagA" ,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
- 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultCluster");
consumer.setNamesrvAddr("192.168.40.133:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
延时消息
RocketMQ不支持任意时间间隔的延时消息,只支持特定级别的延时消息
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
这个配置下标从1开始 比如级别2是延时5秒、级别5是延时1分钟。默认配置在不满足需求的情况下,可以在broker配置文件加入messageDelayLevel参数覆盖默认的延时级别配置
1
2
msg.setDelayTimeLevel(3);
<h2><a id="_0"></a>官方文档</h2>
<p><a href="http://rocketmq.apache.org/docs/quick-start/" target="_blank">Apache RocketMQ</a></p>
<h2><a id="Linux_3"></a>Linux安装</h2>
<ol>
<li><a href="https://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip" target="_blank">点击下载安装包</a></li>
<li>进入解压后目录</li>
</ol>
<pre><div class="hljs"><code class="lang-bash"><span class="hljs-built_in">cd</span> rocketmq-all-4.8.0
</code></div></pre>
<ol start="3">
<li>修改bin/runbroker.sh配置<br />
这是以前配置JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"改成:</li>
</ol>
<pre><div class="hljs"><code class="lang-bash">JAVA_OPT=<span class="hljs-string">"<span class="hljs-variable">${JAVA_OPT}</span> -server -Xms256m -Xmx256m -Xmn125m"</span>
</code></div></pre>
<ol start="4">
<li>修改conf/broker.conf配置文件<br />
新增两个配置</li>
</ol>
<pre><div class="hljs"><code class="lang-bash"><span class="hljs-comment"># 配置服务器ip地址</span>
brokerIP1=192.168.40.133
<span class="hljs-comment"># 允许自动创建Topic</span>
autoCreateTopicEnable=<span class="hljs-literal">true</span>
</code></div></pre>
<ol start="5">
<li>开放端口</li>
</ol>
<pre><div class="hljs"><code class="lang-bash">firewall-cmd --add-port=9876/tcp --permanent
firewall-cmd --add-port=10911/tcp --permanent
firewall-cmd --reload
</code></div></pre>
<ol start="6">
<li>启动 Start Name Server</li>
</ol>
<pre><div class="hljs"><code class="lang-bash">nohup sh bin/mqnamesrv &
<span class="hljs-comment"># 查看日志</span>
tail -f ~/logs/rocketmqlogs/namesrv.log
</code></div></pre>
<p>启动后如果报错:ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!<br />
先检查有没有装jdk,没有装的话先装jdk<br />
修改bin/runbroker.sh文件<br />
这是原来的</p>
<pre><div class="hljs"><code class="lang-bash">[ ! -e <span class="hljs-string">"<span class="hljs-variable">$JAVA_HOME</span>/bin/java"</span> ] && JAVA_HOME=<span class="hljs-variable">$HOME</span>/jdk/java
[ ! -e <span class="hljs-string">"<span class="hljs-variable">$JAVA_HOME</span>/bin/java"</span> ] && JAVA_HOME=/usr/java
[ ! -e <span class="hljs-string">"<span class="hljs-variable">$JAVA_HOME</span>/bin/java"</span> ] && error_exit <span class="hljs-string">"Please set the JAVA_HOME variable in your environment, We need java(x64)!"</span>
</code></div></pre>
<p>新加一行</p>
<pre><div class="hljs"><code class="lang-bash">[ ! -e <span class="hljs-string">"<span class="hljs-variable">$JAVA_HOME</span>/bin/java"</span> ] && JAVA_HOME=<span class="hljs-variable">$HOME</span>/jdk/java
[ ! -e <span class="hljs-string">"<span class="hljs-variable">$JAVA_HOME</span>/bin/java"</span> ] && JAVA_HOME=/usr/java
[ ! -e <span class="hljs-string">"<span class="hljs-variable">$JAVA_HOME</span>/bin/java"</span> ] && JAVA_HOME=/home/jdk1.8.0_281/jre
[ ! -e <span class="hljs-string">"<span class="hljs-variable">$JAVA_HOME</span>/bin/java"</span> ] && error_exit <span class="hljs-string">"Please set the JAVA_HOME variable in your environment, We need java(x64)!"</span>
</code></div></pre>
<p>修改bin/runserver.sh文件 修改方法同runbroker.sh</p>
<ol start="7">
<li>Start Broker</li>
</ol>
<pre><div class="hljs"><code class="lang-bash">nohup sh bin/mqbroker -n 192.168.40.133:9876 -c conf/broker.conf &
<span class="hljs-comment"># 查看日志</span>
tail -f ~/logs/rocketmqlogs/broker.log
</code></div></pre>
<ol start="8">
<li>关闭命令</li>
</ol>
<pre><div class="hljs"><code class="lang-bash">sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
</code></div></pre>
<h2><a id="_82"></a>代码测试</h2>
<ol>
<li>添加依赖</li>
</ol>
<pre><div class="hljs"><code class="lang-xml"><span class="hljs-tag"><<span class="hljs-name">dependency</span>></span>
<span class="hljs-tag"><<span class="hljs-name">groupId</span>></span>org.apache.rocketmq<span class="hljs-tag"></<span class="hljs-name">groupId</span>></span>
<span class="hljs-tag"><<span class="hljs-name">artifactId</span>></span>rocketmq-client<span class="hljs-tag"></<span class="hljs-name">artifactId</span>></span>
<span class="hljs-tag"><<span class="hljs-name">version</span>></span>4.8.0<span class="hljs-tag"></<span class="hljs-name">version</span>></span>
<span class="hljs-tag"></<span class="hljs-name">dependency</span>></span>
</code></div></pre>
<ol start="2">
<li>生产者</li>
</ol>
<pre><div class="hljs"><code class="lang-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">SyncProducer</span> </span>{
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> <span class="hljs-keyword">throws</span> Exception </span>{
<span class="hljs-comment">//Instantiate with a producer group name.</span>
DefaultMQProducer producer = <span class="hljs-keyword">new</span>
DefaultMQProducer(<span class="hljs-string">"DefaultCluster"</span>);
<span class="hljs-comment">// Specify name server addresses.</span>
producer.setNamesrvAddr(<span class="hljs-string">"192.168.40.133:9876"</span>);
<span class="hljs-comment">//Launch the instance.</span>
producer.start();
<span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> i = <span class="hljs-number">0</span>; i < <span class="hljs-number">100</span>; i++) {
<span class="hljs-comment">//Create a message instance, specifying topic, tag and message body.</span>
Message msg = <span class="hljs-keyword">new</span> Message(<span class="hljs-string">"TopicTest1"</span> <span class="hljs-comment">/* Topic */</span>,
<span class="hljs-string">"TagA"</span> <span class="hljs-comment">/* Tag */</span>,
(<span class="hljs-string">"Hello RocketMQ "</span> +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) <span class="hljs-comment">/* Message body */</span>
);
<span class="hljs-comment">//Call send message to deliver message to one of brokers.</span>
SendResult sendResult = producer.send(msg);
System.out.printf(<span class="hljs-string">"%s%n"</span>, sendResult);
}
<span class="hljs-comment">//Shut down once the producer instance is not longer in use.</span>
producer.shutdown();
}
}
</code></div></pre>
<ol start="3">
<li>消费者</li>
</ol>
<pre><div class="hljs"><code class="lang-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Consumer</span> </span>{
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> <span class="hljs-keyword">throws</span> InterruptedException, MQClientException </span>{
<span class="hljs-comment">// Instantiate with specified consumer group name.</span>
DefaultMQPushConsumer consumer = <span class="hljs-keyword">new</span> DefaultMQPushConsumer(<span class="hljs-string">"DefaultCluster"</span>);
<span class="hljs-comment">// Specify name server addresses.</span>
consumer.setNamesrvAddr(<span class="hljs-string">"192.168.40.133:9876"</span>);
<span class="hljs-comment">// Subscribe one more more topics to consume.</span>
consumer.subscribe(<span class="hljs-string">"TopicTest"</span>, <span class="hljs-string">"*"</span>);
<span class="hljs-comment">// Register callback to execute on arrival of messages fetched from brokers.</span>
consumer.registerMessageListener(<span class="hljs-keyword">new</span> MessageListenerConcurrently() {
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> ConsumeConcurrentlyStatus <span class="hljs-title">consumeMessage</span><span class="hljs-params">(List<MessageExt> msgs,
ConsumeConcurrentlyContext context)</span> </span>{
System.out.printf(<span class="hljs-string">"%s Receive New Messages: %s %n"</span>, Thread.currentThread().getName(), msgs);
<span class="hljs-keyword">return</span> ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
<span class="hljs-comment">//Launch the consumer instance.</span>
consumer.start();
System.out.printf(<span class="hljs-string">"Consumer Started.%n"</span>);
}
}
</code></div></pre>
<h2><a id="_158"></a>延时消息</h2>
<blockquote>
<p>RocketMQ不支持任意时间间隔的延时消息,只支持特定级别的延时消息<br />
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h<br />
这个配置下标从1开始 比如级别2是延时5秒、级别5是延时1分钟。默认配置在不满足需求的情况下,可以在broker配置文件加入messageDelayLevel参数覆盖默认的延时级别配置</p>
</blockquote>
<pre><div class="hljs"><code class="lang-java"><span class="hljs-comment">//设置消息延时级别 3对应10秒后发送</span>
msg.setDelayTimeLevel(<span class="hljs-number">3</span>);
</code></div></pre>