17 0 0 0
cd rocketmq-all-4.8.0
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m"
# 配置服务器ip地址
brokerIP1=192.168.40.133
# 允许自动创建Topic
autoCreateTopicEnable=true
firewall-cmd --add-port=9876/tcp --permanent
firewall-cmd --add-port=10911/tcp --permanent
firewall-cmd --reload
nohup sh bin/mqnamesrv &
# 查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
启动后如果报错:ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !! 先检查有没有装jdk,没有装的话先装jdk 修改bin/runbroker.sh文件 这是原来的
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
新加一行
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/home/jdk1.8.0_281/jre
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
修改bin/runserver.sh文件 修改方法同runbroker.sh
nohup sh bin/mqbroker -n 192.168.40.133:9876 -c conf/broker.conf &
# 查看日志
tail -f ~/logs/rocketmqlogs/broker.log
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("DefaultCluster");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.40.133:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest1" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultCluster");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.40.133:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
RocketMQ不支持任意时间间隔的延时消息,只支持特定级别的延时消息 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 这个配置下标从1开始 比如级别2是延时5秒、级别5是延时1分钟。默认配置在不满足需求的情况下,可以在broker配置文件加入messageDelayLevel参数覆盖默认的延时级别配置
//设置消息延时级别 3对应10秒后发送
msg.setDelayTimeLevel(3);