rocketmq 创建groupgroup不同为什么能消费

博客分类:
原创文章,转载请注明出处:http://jameswxx.iteye.com/blog/2091971
1.1. 控制台使用
RocketMQ 提供有控制台及一系列控制台命令,用于管理员对主题,集群,broker 等信息的管理;
登录控制台
首先进入RocketMQ 工程,进入/RocketMQ/bin
在该目录下有个mqadmin 脚本
在mqadmin 下可以查看有哪些命令
sh mqadmin
查看具体命令的使用
sh mqadmin help 命令名称
例如,查看updateTopic 的使用
sh mqadmin help updateTopic
1.2. 详细命令
1.2.1. 创建Topic
updateTopic
com.alibaba.rocketmq.tools.command.topic.UpdateTopicSubCommand
如果-c为空,则必填
broker 地址,表示topic 建在该broker
如果-b为空,则必填
cluster 名称,表示topic 建在该集群(集群可通过clusterList 查询)
nameserve 服务地址列表,格式ip:ip:...
指定新topic 的权限限制( W|R|WR )
可读队列数(默认为8)
可写队列数(默认为8)
opic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ )
1.2.2. 删除Topic
deleteTopic
com.alibaba.rocketmq.tools.command.topic.DeleteTopicSubCommand
cluster 名称,表示删除某集群下的某个topic (集群可通过clusterList 查询)
nameserve 服务地址列表,格式ip:ip:…
topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ )
1.2.3. 创建(修订)订阅组
updateSubGroup
com.alibaba.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand
如果 –c 为空,则必填
broker 地址,表示订阅组建在该broker
如果 –b 为空,则必填
cluster名称,表示topic 建在该集群(集群可通过clusterList查询)
是否容许广播方式消费
从哪个broker 开始消费
是否容许从队列的最小位置开始消费,默认会设置为false
消费失败的消息放到一个重试队列,每个订阅组配置几个重试队列
重试消费最大次数,超过则投递到死信队列,不再投递,并报警
消费功能是否开启
发现消息堆积后,将Consumer 的消费请求重定向到另外一台Slave 机器
nameserve 服务地址列表,格式ip:ip:...
1.2.4. 删除订阅组配置
deleteSubGroup
com.alibaba.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand
如果–c 为空,则必填
broker 地址,表示订阅组建在该broker
如果–b 为空,则必填
cluster 名称,表示topic建在该集群(集群可通过clusterList查询)
nameserve 服务地址列表,格式ip:ip:...
1.2.5. 更新Broker 配置文件
updateBrokerConfig
com.alibaba.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand
如果–c为空,则必填
broker 地址,表示订阅组建在该broker
如果–b 为空,则必填
cluster名称,表示topic 建在该集群(集群可通过clusterList查询)
nameserve 服务地址列表,格式ip:ip:...
1.2.6. 查看Topic 列表信息
com.alibaba.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand
nameserve 服务地址列表,格式ip:ip:...
1.2.7. 查看Topic 路由信息
topicRoute
com.alibaba.rocketmq.tools.command.topic.TopicRouteSubCommand
topic 名称
nameserve 服务地址列表,格式ip:ip:...
1.2.8. 查看Topic 统计信息
topicStats
com.alibaba.rocketmq.tools.command.topic.TopicStatsSubCommand
topic 名称
nameserve 服务地址列表,格式ip:ip:...
1.2.9. 查看Broker 统计信息
brokerStats
com.alibaba.rocketmq.tools.command.broker.BrokerStatsSubCommanD
broker 地址
nameserve 服务地址列表,格式ip:ip:...
1.2.10. 根据消息ID 查询消息
queryMsgById
com.alibaba.rocketmq.tools.command.message.QueryMsgByIdSubCommand
nameserve 服务地址列表,格式ip:ip:...
1.2.11. 根据消息Key 查询消息
queryMsgByKey
com.alibaba.rocketmq.tools.command.message.QueryMsgByKeySubCommand
被查询消息的截止时间
topic 名称
nameserve 服务地址列表,格式ip:ip:...
1.2.12. 根据Offset 查询消息
queryMsgByOffset
com.alibaba.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand
Broker 名称,表示订阅组建在该broker(这里需要注意填写的是broker 的名称,不是broker 的地址,broker名称可以在clusterList 查到
query 队列id
topic 名称
nameserve 服务地址列表,格式ip:ip:...
1.2.13. 查询Producer 的网络连接
该命令只打印当前与cluster 连接的producer 网络连接信息
producerConnection
com.alibaba.rocketmq.tools.command.connection.ProducerConnectionSubCommand
生产者所属组名
topic 名称
nameserve 服务地址列表,格式ip:ip:...
1.2.14. 查询Consumer 的网络连接
该命令只打印当前与cluster 连接的consumer 网络连接信息
consumerConnection
com.alibaba.rocketmq.tools.command.connection.ConsumerConnectionSubCommand
消费者所属组名
nameserve 服务地址列表,格式ip:ip:...
1.2.15. 查看订阅组消费状态
consumerProgress
com.alibaba.rocketmq.tools.command.consumer.ConsumerProgressSubCommand
消费者所属组名
nameserve 服务地址列表,格式ip:ip:...
1.2.16. 查看集群消息
clusterList
com.alibaba.rocketmq.tools.command.cluster.ClusterListSubCommand
打印更多信息
nameserve 服务地址列表,格式ip:ip:...
1.2.17. 添加(更新)KV 配置信息
updateKvConfig
com.alibaba.rocketmq.tools.command.namesrv.UpdateKvConfigCommand
Namespace 值
nameserve 服务地址列表,格式ip:ip:...
1.2.18. 删除KV 配置信息
deleteKvConfig
com.alibaba.rocketmq.tools.command.namesrv.DeleteKvConfigCommand
Namespace 值
nameserve 服务地址列表,格式ip:ip:...
1.2.19. 添加(更新)Project group 配置信息
指令 updateProjectGroup
类路径 com.alibaba.rocketmq.tools.command.namesrv.UpdateProjectGroupCommand
project group 名
nameserve 服务地址列表,格式ip:ip:...
1.2.20. 删除Project group 配置信息
deleteProjectGroup
com.alibaba.rocketmq.tools.command.namesrv.DeleteProjectGroupCommand
project group 名
nameserve 服务地址列表,格式ip:ip:...
1.2.21. 取得Project group 配置信息
getProjectGroup
com.alibaba.rocketmq.tools.command.namesrv.GetProjectGroupCommand
project group 名
nameserve 服务地址列表,格式ip:ip:...
1.2.22. 设置消费进度
根据时间来设置消费进度,设置之前要关闭这个订阅组的所有consumer,设置完再启动,方可生效
resetOffsetByTime
com.alibaba.rocketmq.tools.command.offset.ResetOffsetByTimeSubCommand
通过时间戳强制回滚(true|false),默认为true
消费者所属组名
topic 名称
nameserve 服务地址列表,格式ip:ip:...
1.2.23. 清除特定Broker权限
wipeWritePerm
com.alibaba.rocketmq.tools.command.namesrv.WipeWritePermSubCommand
broker 地址
nameserve 服务地址列表,格式ip:ip:...
1.2.24. 获取Consumer消费进度
该命令只打印当前与cluster 连接的consumer 的消费进度
getConsumerStatus
com.alibaba.rocketmq.tools.command.offset.GetConsumerStatusCommand
消费者所属组名
Consumer 客户端ip
nameserve 服务地址列表,格式ip:ip:...
浏览 41865
浏览: 569372 次
来自: 杭州
非常赞,帮助理解了问题。今天也是遇到了这样的问题
请问楼主,新增一个broker的话应该怎么做?给新的broke ...
此文:有意义!
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'豆丁微信公众号
君,已阅读到文档的结尾了呢~~
RocketMQ 原理简介
扫扫二维码,随身浏览文档
手机或平板扫扫即可继续访问
RocketMQ 原理简介
举报该文档为侵权文档。
举报该文档含有违规或不良信息。
反馈该文档无法正常浏览。
举报该文档为重复文档。
推荐理由:
将文档分享至:
分享完整地址
文档地址:
粘贴到BBS或博客
flash地址:
支持嵌入FLASH地址的网站使用
html代码:
&embed src='http://www.docin.com/DocinViewer--144.swf' width='100%' height='600' type=application/x-shockwave-flash ALLOWFULLSCREEN='true' ALLOWSCRIPTACCESS='always'&&/embed&
450px*300px480px*400px650px*490px
支持嵌入HTML代码的网站使用
您的内容已经提交成功
您所提交的内容需要审核后才能发布,请您等待!
3秒自动关闭窗口他的最新文章
他的热门文章
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)博客分类:
一、部署1.从https://github.com/alibaba/RocketMQ下载安装包。2.tar -xf ***.tar 解压tar包。安装git yum install git3.echo $JAVA_HOME 确认安装java环境变量。4.export JAVA_HOME='*****' 设置环境变量。5.安装nameserver,打开安装路径的bin目录,执行 nohup mqnamesrv & 命令。6.设置环境nameserver环境变量,export NAMESRV_ADDR=192.168.0.1:9876。7.设置RocketMQ的的安装位置环境变量ROCKATMQ_HOME8.安装broker,打开安装路径的bin目录,运行 mqbroker -n "192.168.0.1:9876"(如果设置了环境变量,-n参数可以省略)。ps:通过nohup.out可以查看安装启动日志。二、 Broker集群部署
推荐的几种 Broker 集群部署方式,这里的Slave 不可写,但可读,类似与 Mysql 主备方式。1.单个 Master
这种方式风险较大,一旦Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。2.多 Master 模式
一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。
先启动 NameServer,例如机器 IP 为:192.168.1.1:9876nohup sh mqnamesrv &
在机器 A,启动第一个 Masternohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
在机器 B,启动第二个 Masternohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &3.多 Master 多 Slave 模式,异步复制
每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。
先启动 NameServer,例如机器 IP 为:192.168.1.1:9876nohup sh mqnamesrv &
在机器 A,启动第一个 Masternohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
在机器 B,启动第二个 Masternohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
在机器 C,启动第一个 Slavenohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
在机器 D,启动第二个 Slavenohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &4.
多 Master 多 Slave 模式,同步双写
每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用同步双写方式,主备都写成功,向应用返回成功。
优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。
先启动 NameServer,例如机器 IP 为:192.168.1.1:9876nohup sh mqnamesrv &
在机器 A,启动第一个 Masternohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
在机器 B,启动第二个 Masternohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
在机器 C,启动第一个 Slavenohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
在机器 D,启动第二个 Slavenohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &
以上 Broker 与 Slave 配对是通过指定相同的brokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的BrokerId 必须是大与 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。
$ROCKETMQ_HOST 指的 RocketMQ 安装目录,需要用户自己设置此环境变量三、启动1.启动nameserver。 nohup sh mqnamesrv &2.启动broker。nohup sh mqbroker &四、使用1.构造消息的生成者producer和消息的消费者consumer。2.在maven中添加如下dependency.
&dependency&
&groupId&com.alibaba.rocketmq&/groupId&
&artifactId&rocketmq-client&/artifactId&
&version&3.0.2&/version&
&/dependency&
&dependency&
&groupId&com.alibaba.rocketmq&/groupId&
&artifactId&rocketmq-remoting&/artifactId&
&version&3.0.2&/version&
&/dependency&
&dependency&
&groupId&com.alibaba.rocketmq&/groupId&
&artifactId&rocketmq-broker&/artifactId&
&version&3.0.4-open&/version&
&/dependency&
&dependency&
&groupId&com.alibaba.rocketmq&/groupId&
&artifactId&rocketmq-common&/artifactId&
&version&3.0.2&/version&
&/dependency&
&dependency&
&groupId&io.netty&/groupId&
&artifactId&netty&/artifactId&
&version&3.8.0.Final&/version&
&/dependency&
&dependency&
&groupId&io.netty&/groupId&
&artifactId&netty-common&/artifactId&
&version&4.0.7.Final&/version&
&/dependency&
&dependency&
&groupId&org.apache.httpcomponents&/groupId&
&artifactId&httpclient&/artifactId&
&version&4.0.1&/version&
&/dependency&
&dependency&
&groupId&com.qq.sdk&/groupId&
&artifactId&qzone-sdk&/artifactId&
&version&1.0.0&/version&
&/dependency&
&dependency&
&groupId&io.netty&/groupId&
&artifactId&netty-buffer&/artifactId&
&version&4.0.7.Final&/version&
&/dependency&
&dependency&
&groupId&io.netty&/groupId&
&artifactId&netty-all&/artifactId&
&version&4.0.10.Final&/version&
&/dependency&
&dependency&
&groupId&com.alibaba&/groupId&
&artifactId&fastjson&/artifactId&
&version&1.1.41&/version&
&/dependency&
3.Producer代码如下所示:
import com.alibaba.rocketmq.client.exception.MQClientE
import com.alibaba.rocketmq.client.producer.DefaultMQP
import com.alibaba.rocketmq.client.producer.SendR
import com.alibaba.rocketmq.common.message.M
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
* 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例&br&
* 注意:ProducerGroupName需要由应用来保证唯一&br&
* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
* 因为服务器会回查这个Group下的任意一个Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("10.10.0.102:9876");
* Producer对象在使用之前必须要调用start初始化,初始化一次即可&br&
* 注意:切记不可以在每次发送消息时,都调用start方法
producer.start();
* 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,&br&
* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,&br&
* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
for (int i = 0; i & 10; i++)
Message msg = new Message("TopicTest1",// topic
"TagA",// tag
"OrderID001",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
Message msg = new Message("TopicTest2",// topic
"TagB",// tag
"OrderID0034",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
Message msg = new Message("TopicTest3",// topic
"TagC",// tag
"OrderID061",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}catch (Exception e) {
e.printStackTrace();
* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
producer.shutdown();
4.Consumer代码如下所示:
import java.util.L
import com.alibaba.rocketmq.client.consumer.DefaultMQPushC
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyC
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyS
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerC
import com.alibaba.rocketmq.client.exception.MQClientE
import com.alibaba.rocketmq.common.consumer.ConsumeFromW
import com.alibaba.rocketmq.common.message.MessageE
public class PushConsumer {
* 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。&br&
* 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法&br&
public static void main(String[] args) throws InterruptedException, MQClientException {
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例&br&
* 注意:ConsumerGroupName需要由应用来保证唯一
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_001");
consumer.setNamesrvAddr("10.10.0.102:9876");
// consumer.setNamesrvAddr("127.0.0.1:9876");
* 订阅指定topic下tags分别等于TagA或TagC或TagD
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
* 订阅指定topic下所有消息&br&
* 注意:一个consumer对象可以订阅多个topic
consumer.subscribe("TopicTest2", "*");
consumer.subscribe("TopicTest3", "*");
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费&br&
* 如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
* 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
public ConsumeConcurrentlyStatus consumeMessage(List&MessageExt& msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("TopicTest1")) {
// 执行TopicTest1的消费逻辑
if (msg.getTags() != null && msg.getTags().equals("TagA")) {
// 执行TagA的消费
System.out.println("TagA开始。");
else if (msg.getTags() != null && msg.getTags().equals("TagC")) {
System.out.println("TagC开始。");
// 执行TagC的消费
else if (msg.getTags() != null && msg.getTags().equals("TagD")) {
// 执行TagD的消费
System.out.println("TagD开始。");
else if (msg.getTopic().equals("TopicTest2")) {
// 执行TopicTest2的消费逻辑
System.out.println("TopicTest2");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可&br&
consumer.start();
System.out.println("Consumer Started.");
RocketMQ开发文档见附件。
(785.5 KB)
下载次数: 226
浏览 10727
浏览: 15929 次
来自: 北京
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'他的最新文章
他的热门文章
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)}

我要回帖

更多关于 rocketmq groupname 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信