RocketMQ

17 0 0 0

官方文档

Apache RocketMQ

Linux安装

  1. 点击下载安装包
  2. 进入解压后目录
cd rocketmq-all-4.8.0
  1. 修改bin/runbroker.sh配置 这是以前配置JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"改成:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m"
  1. 修改conf/broker.conf配置文件 新增两个配置
# 配置服务器ip地址
brokerIP1=192.168.40.133
# 允许自动创建Topic
autoCreateTopicEnable=true
  1. 开放端口
firewall-cmd --add-port=9876/tcp --permanent
firewall-cmd --add-port=10911/tcp --permanent
firewall-cmd --reload
  1. 启动 Start Name Server
nohup sh bin/mqnamesrv &
# 查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log

启动后如果报错:ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !! 先检查有没有装jdk,没有装的话先装jdk 修改bin/runbroker.sh文件 这是原来的

[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

新加一行

[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/home/jdk1.8.0_281/jre
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

修改bin/runserver.sh文件 修改方法同runbroker.sh

  1. Start Broker
nohup sh bin/mqbroker -n 192.168.40.133:9876 -c conf/broker.conf &
# 查看日志
tail -f ~/logs/rocketmqlogs/broker.log
  1. 关闭命令
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

代码测试

  1. 添加依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>
  1. 生产者
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("DefaultCluster");
        // Specify name server addresses.
        producer.setNamesrvAddr("192.168.40.133:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest1" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}
  1. 消费者
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultCluster");

        // Specify name server addresses.
        consumer.setNamesrvAddr("192.168.40.133:9876");

        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

延时消息

RocketMQ不支持任意时间间隔的延时消息,只支持特定级别的延时消息 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 这个配置下标从1开始 比如级别2是延时5秒、级别5是延时1分钟。默认配置在不满足需求的情况下,可以在broker配置文件加入messageDelayLevel参数覆盖默认的延时级别配置

//设置消息延时级别  3对应10秒后发送
msg.setDelayTimeLevel(3);
目录