mq java mq队列怎么判断队列为空

12874人阅读
java(290)
&dependency&
&groupId&org.apache.activemq&/groupId&
&artifactId&activemq-core&/artifactId&
&version&5.7.0&/version&
&/dependency&
import org.apache.activemq.ActiveMQConnectionF
import javax.jms.*;
下载安装/activemq/
启动命令:bin\win64\activemq.bat
/phoebus0501/archive//1964228.html
public class MessageReceiver implements Runnable {
private final String QUEUE;
public MessageReceiver(String queue, String url, String user, String password) {
this.url =
this.user =
this.password =
this.QUEUE =
public void run() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
Session session =
Destination receiveQ
Connection connection = connectionFactory.createConnection();
session = connection
.createSession(true, Session.SESSION_TRANSACTED);
receiveQueue = session.createQueue(QUEUE);
MessageConsumer consumer = session.createConsumer(receiveQueue);
connection.start();
System.out.println(Thread.currentThread().getName()+& start&);
while (true) {
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage receiveMessage = (TextMessage)
System.out.println(&我是Receiver,收到消息如下: \r\n&
+ receiveMessage.getText());
connection.close();
System.out.println(Thread.currentThread().getName()+& close&);
} catch (JMSException e) {
e.printStackTrace();
public String getUrl() {
public void setUrl(String url) {
this.url =
public String getUser() {
public void setUser(String user) {
this.user =
public String getPassword() {
public void setPassword(String password) {
this.password =
import org.apache.activemq.ActiveMQConnectionF
import javax.jms.*;
import java.util.D
* 消息发送器
* @author xiaochuanyu
public class MessageSender implements Runnable {
private final String QUEUE;
public MessageSender(String queue, String url, String user, String password) {
this.url =
this.user =
this.password =
this.QUEUE =
public void run() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
Session session =
Destination sendQ
Connection connection =
int messageCount = 0;
connection = connectionFactory.createConnection();
connection.start();
System.out.println(Thread.currentThread().getName()+& start&);
while (true) {
session = connection.createSession(true,
Session.SESSION_TRANSACTED);
sendQueue = session.createQueue(QUEUE);
MessageProducer sender = session.createProducer(sendQueue);
TextMessage outMessage = session.createTextMessage();
outMessage.setText(new Date() + &现在发送是第& + messageCount + &条消息&);
sender.send(outMessage);
sender.close();
if ((++messageCount) == 10) {
// 发够十条消息退出
Thread.sleep(1000);
connection.close();
System.out.println(Thread.currentThread().getName()+& close&);
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
public String getUrl() {
public void setUrl(String url) {
this.url =
public String getUser() {
public void setUser(String user) {
this.user =
public String getPassword() {
public void setPassword(String password) {
this.password =
java通过ActiveMQ实现JMS的消息队列实例
下载安装/activemq/
启动命令:bin\win64\activemq.bat
/phoebus0501/archive//1964228.html
public class MyActiveMQDemo {
//.cn/s/blog_a459dcf501017oml.html需要安装ActiveMQ 然后启动bin\win64\activemq.bat
public static void main(String[] args) {
String url = &tcp://localhost:61616&;
String user = &xxx&;
String password = &xxx&;
String query = &MyQueue&;
new Thread(new MessageReceiver(query,url,user,password), &Name-Receiver&).start();
new Thread(new MessageSender(query,url,user,password), &Name-Sender&).start();
官网例子:
import org.apache.activemq.ActiveMQConnectionF
import javax.jms.C
import javax.jms.DeliveryM
import javax.jms.D
import javax.jms.ExceptionL
import javax.jms.JMSE
import javax.jms.M
import javax.jms.MessageC
import javax.jms.MessageP
import javax.jms.S
import javax.jms.TextM
* Hello world!
public class App {
public static void main(String[] args) throws Exception {
thread(new HelloWorldProducer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
Thread.sleep(1000);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
Thread.sleep(1000);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldProducer(), false);
Thread.sleep(1000);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
public static void thread(Runnable runnable, boolean daemon) {
Thread brokerThread = new Thread(runnable);
brokerThread.setDaemon(daemon);
brokerThread.start();
public static class HelloWorldProducer implements Runnable {
public void run() {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(&vm://localhost&);
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue(&TEST.FOO&);
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a messages
String text = &Hello world! From: & + Thread.currentThread().getName() + & : & + this.hashCode();
TextMessage message = session.createTextMessage(text);
// Tell the producer to send the message
System.out.println(&Sent message: &+ message.hashCode() + & : & + Thread.currentThread().getName());
producer.send(message);
// Clean up
session.close();
connection.close();
catch (Exception e) {
System.out.println(&Caught: & + e);
e.printStackTrace();
public static class HelloWorldConsumer implements Runnable, ExceptionListener {
public void run() {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(&vm://localhost&);
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue(&TEST.FOO&);
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage)
String text = textMessage.getText();
System.out.println(&Received: & + text);
System.out.println(&Received: & + message);
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
System.out.println(&Caught: & + e);
e.printStackTrace();
public synchronized void onException(JMSException ex) {
System.out.println(&JMS Exception occured.
Shutting down client.&);
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:1640493次
积分:24000
积分:24000
排名:第262名
原创:779篇
转载:382篇
评论:206条
我的其他技术博客站点,欢迎关注。
(1)(3)(4)(7)(18)(17)(26)(13)(39)(31)(17)(18)(34)(21)(42)(49)(10)(14)(10)(11)(21)(21)(26)(24)(15)(17)(24)(9)(24)(14)(17)(15)(24)(39)(28)(14)(19)(27)(35)(10)(16)(31)(12)(13)(41)(32)(16)(18)(14)(2)(10)(37)(9)(29)(3)(30)(12)(16)(10)(1)&1&PCFMessageAgent&&&&
&2&PCFMessage&&&&&
&3&PCFMessage&[]&&&&&
&4&//&Connect&a&PCFAgent&to&the&specified&queue&manager
&5&agent&=&new&PCFMessageAgent&("134.175.7.84",&14146,&"SYSTEM.ADMIN.SVRCONN");
&6&//&Build&the&request
&7&request&=&new&PCFMessage&(CMQCFC.MQCMD_INQUIRE_Q);
&8&request.addParameter&(CMQC.MQCA_Q_NAME,&"*");
&9&request.addParameter&(CMQC.MQIA_Q_TYPE,&CMQC.MQQT_LOCAL);
10&request.addParameter&(CMQCFC.MQIACF_Q_ATTRS,&
11&new&int&[]&{&CMQC.MQCA_Q_NAME,&CMQC.MQIA_CURRENT_Q_DEPTH&});
12&//&Use&the&agent&to&send&the&request
13&responses&=&agent.send&(request);
14&//&Display&the&results
15&for&(int&i&=&0;&i&&&responses.&i++)
17&&&String&name&=&responses&[i].getStringParameterValue&(CMQC.MQCA_Q_NAME);
18&&&int&depth&=&responses&[i].getIntParameterValue&(CMQC.MQIA_CURRENT_Q_DEPTH);
20&//&Disconnect
21&agent.disconnect&();
&  我弄错的代码主要是PCFMessageAgent ("134.175.7.84", 14146, "SYSTEM.ADMIN.SVRCONN");
  最后附上IBM工程师给我的代码,很简单,但是好在有关键的注解:
MQEnvironment.CCSID = 1381;&&&&&&&& //要与队列管理器的一样&&&&&&&&&&&&&&&&&& &
MQEnvironment.hostname = "localhost";&&&&&&&&& // 队列管理器所在的机器名,要能ping通
MQEnvironment.port = 1414;&&&&&&&&&&&&&&&&&&&&&&&&& // 队列管理器的监听端口
MQEnvironment.channel& = "CHTEST"; &
MQQueueManager qmgr=new MQQueueManager("TESTQM");&& //队列管理器名称
MQQueue queue = qmgr.accessQueue("QTEST",MQC.MQOO_INPUT_AS_Q_DEF);& //队列
MQMessage& theMessage&&&& = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
queue.get(theMessage,gmo);
//System.out.println("the message length is:"+theMessage.getDataLength());
//int i=theMessage.getDataLength();
System.out.println("the message is:"+theMessage.readLine());
queue.close();
qmgr.disconnect();
Badwood's Blogjava连接远程MQ的问题 - ITeye问答
用java连接远程MQ一个往队列put消息,一个从队列取消息,如果消息是用java放进去的,取出来没有问题。
可是如果消息是从队列那边右键放进去的或是从主题发布的消息,用java拿出来就会出现乱码
取消息代码:
public class RemoteSample2 {
private static final String qManager = "QM_SRV";
private static final String qName = "IN.Q";
public static void main(String args[]) {
& try {
&&&& Hashtable ht = new Hashtable();
&& ht.put(MQC.PORT_PROPERTY, 1415);
&& ht.put(MQC.CCSID_PROPERTY, 1208);
&& ht.put(MQC.HOST_NAME_PROPERTY, "172.18.73.210");
&& ht.put(MQC.CHANNEL_PROPERTY, "CLIENT.QM_SRV");
&& ht.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);
&& MQQueueManager qMgr = new MQQueueManager(qManager, ht);
&& int openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF
&&&& | MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;
&& MQQueue queue = qMgr.accessQueue(qName, openOptions);
&&&& System.out.println("CurrentDepth is : " + queue.getCurrentDepth());
&& MQMessage rcvMessage = new MQMessage();
&& MQGetMessageOptions gmo = new MQGetMessageOptions();
&& queue.get(rcvMessage, gmo);
&& System.out.println(rcvMessage.getDataLength());
&&& int strLen = rcvMessage.getMessageLength();
&& byte[] strData = new byte[strLen];
&& rcvMessage.readFully(strData);
&& String msgText = new String(strData);
&& System.out.println("The message is: " + msgText);
&& queue.close();
&&& qMgr.disconnect();
& } catch (MQException ex) {
&& System.out
&&&& .println("A WebSphere MQ Error occured : Completion Code "
&&&&&& + ex.completionCode + " Reason Code "
&&&&&& + ex.reasonCode);
&& ex.printStackTrace();
&& for (Throwable t = ex.getCause(); t != t = t.getCause()) {
&&& System.out.println("... Caused by ");
&&& t.printStackTrace();
&& }
& } catch (java.io.IOException ex) {
&& System.out
&&&& .println("An IOException occured whilst writing to the message buffer: "
&&&&&& + ex);
& }
&
}
}
求解怎么将消息头去掉
目前还没有答案
已解决问题
未解决问题出处:http://coderbee.net 现状
业务部门反应网站访问特别慢,负责运维监控的同事说MQ消息队列积压了,中间件的说应用服务器内存占用很高,GC 一直回收不了内存,GC 线程占了近 100% 的 CPU,其他的基本上都在等待,数据库很正常,完全没压力。没啥办法,线程、堆 dump 出来后,重启吧,然后应用又正常了。
这种故障之前其实也碰到过了,分析了当时 dump 出来的堆后发现,处理 MQ 消息的线程池的队列长度达百万级别,占用了超过 1.3G
内存,这些内存都是没法回收的。
程序的实现目前是这样的:关联系统把消息推送到 MQ 上,我们再从 MQ 上拉消息下来处理;每种类型的消息都有一个线程负责从 MQ 上拉消息,拉下来后封装成线程池的任务提交给相应的线程池去执行。代码可以简化为:
package net.coderbee.mq.
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
public class MQListener {
public ExecutorService executor = Executors.newFixedThreadPool(8);
public void onMessage(final Object message) {
executor.execute(new Runnable() {
public void run() {
// 耗时且复杂的消息处理逻辑
complicateHanlde(message);
private void complicateHanlde(Object message) {
这个实现就是导致故障的根源,
Executors.newFixedThreadPool(8) 创建的线程池的任务队列是无边界的:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue&Runnable&());
当时是关联系统出故障了,他们恢复后,往 MQ 里狂推消息,我们系统里面的 MQListener 不断地从 MQ 拉消息下来,直接塞进线程池里,由于线程池处理消息的速度远远慢于消息进入的速度,所以线程池的队列不断增长,直到把所有的堆内存都占用了,这时不断引发 FullGC,但每次 FullGC 都没法回收到内存,应用也就挂死在那了。
之前那次故障也是线程池队列积压导致的,引起的原因是消息处理逻辑调用了外部接口,由于外部接口的响应非常慢,严重拖慢了消息的处理进度,改成异步调用之后好了些。但问题的根源并没有解决,就像昨天关联系统狂推消息后,我们的系统还是挂了。
我的思路其实很简单,MQ 是用来系统间解耦的,也是一个缓冲,目前的实现是把处理消息的线程池又用作一个 MQ 了,消息不能不受控地进入线程池的任务队列,所以,要换成使用定长的阻塞队列,队列满了就暂停拉取消息。把线程池替换成:
private int nThreads = 8;
private int MAX_QUEUQ_SIZE = 2000;
private ExecutorService executor = new ThreadPoolExecutor(nThreads,
nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue&Runnable&(MAX_QUEUQ_SIZE),
new ThreadPoolExecutor.CallerRunsPolicy());
把线程池队列满的时候直接让调用者(也就是
MQListener)执行任务,这样即延缓了消息拉取的速度,当
MQListener 再去拉取消息时,发现线程池有空间时可以提交到线程池,让线程池的工作线程去处理,它继续保持拉取速度。
这样既控制了线程池占用的内存,又可以让消息处理线程池处理不过来时多一个线程处理消息。
由于上面的代码采用调用者执行的方式,那么要考虑消息处理的顺序问题,比如一个订单的处理可能有多个步骤,对应多条 MQ 消息,那么要考虑这些步骤如果乱序了是否可以接受,因为第3步骤的处理消息可能被 MQListener 处理了,而第2步的处理消息还积压在线程池里。
相关 [线程 mq 消息] 推荐:
- 码蜂笔记
业务部门反应网站访问特别慢,负责运维监控的同事说MQ消息队列积压了,中间件的说应用服务器内存占用很高,GC 一直回收不了内存,GC 线程占了近 100% 的 CPU,其他的基本上都在等待,数据库很正常,完全没压力. 没啥办法,线程、堆 dump 出来后,重启吧,然后应用又正常了. 这种故障之前其实也碰到过了,分析了当时 dump 出来的堆后发现,处理 MQ 消息的线程池的队列长度达百万级别,占用了超过 1.3G
内存,这些内存都是没法回收的.
- 开源软件 - ITeye博客
本文主要讲解关于kafka mq的设计思想及个人理解. 关于kafka的详细信息,大家可以参考官网的文献
http://kafka.apache.org/documentation.html这是一篇相当不错的文章,值得仔细研读. 第一个问题:消息队列(Message Queue)是干嘛用的. 首先,要对消息队列有一个基本的理解.
- 企业架构 - ITeye博客
目前常用的消息队列组建无非就是MSMQ和ActiveMQ,至于他们的异同,这里不想做过多的比较. 简单来说,MSMQ内置于微软操作系统之中,在部署上包含一个隐性条件:Server需要是微软操作系统. (对于这点我并去调研过MSMQ是否可以部署在非微软系统,比如:Linux,只是拍脑袋想了想,感觉上是不可以).
- 编程语言 - ITeye博客
本文实例是基于
WebSphere MQ中将消息发送至远程队列的配置的基础上的,且如果要能正常运行并发送、接收消息,还需要在两个队列管理器(QM_ORANGE和QM_APPLE)上做如下配置或修改.
1.创建名称为DC.SVRCONN的服务器连接通道.
2.将队列管理器的通道认证记录设置为“已禁用”.
- 行业应用 - ITeye博客
假设在IBM MQ中定义的队列管理器的名为QueueManager, 端口1414,CCSID 437 ,创建名为LQ1,LQ2的队列分别用于发送和接收消息, 服务器连接通道名为SVRCONN. 确保在项目的Classpath中导入了以下的jar包:. 如果需使用spring的JmsTemplate方式来读写MQ,还需要导入.
- 编程语言 - ITeye博客
2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行. 3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中. if(size==0){
//队列缓存池没有消息,等待. if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理.
- 藏书人 - 李志官方博客
1,经过深思熟虑,我放弃了十月份做个人小巡演的计划,全心全意投入跨年音乐会的准备工作. 如不出意外,12月31日南京见. 2,如果不出意外,第六张专辑会在十一之前发布. 经过深思熟虑,我决定不做实体,直接放到官网提供下载,能者多劳,愿者给钱. 3,当然对我而言,意外是常态.
- 水御龙神 - 1416 教室
每一个光鲜的封面,都饱含美术编辑的”血泪“和杂志主编的“阴谋”——今天的消息树让我们将掀开封面往里瞅瞅. 最新一期的美国新闻周刊封面,实在让人有点儿难以置信. 优雅的戴安娜王妃突然出现在二十一世纪的街头,旁边是她的儿媳妇Kate,但仔细看,她却不是当年的王妃,变老了,变丑了——这是新闻周刊编辑们想象中的一个五十岁的女人的样子.
坚持分享优质有趣的原创文章,并保留作者信息和版权声明,任何问题请联系:@。jms 是 java 消息队列的标准, activemq 是具体实现,还是不太懂
22:36:17 +08:00 · 1196 次点击
谁能给个例子,或者比喻,原来一直就只会用 rabbitmq 和 rocketmq 这个轮子,结果系统里面别人用的封装的 mq ,看源码有点晕
8 回复 &| &直到
18:02:58 +08:00
& & 22:50:07 +08:00
JMS 在 Java 里面跟 JPA 是同一个级别的概念,是一种与实现无关的规范。
完全遵循 JMS 标准的的 MQ 产品,是可以在几乎不动项目代码的基础上更换 MQ 的。
RabbitMQ 就不属于标准 JMS 实现。但是 MQ 的核心思想都是一样的。用例如 Spring 提供的 JMS API 时,更加能做到与具体 JMS 产品无关。
& & 23:02:19 +08:00
@ 言简意赅,谢了,看来还是要多了解,不能只用轮子
& & 10:19:42 +08:00
@ 标准的好处就是让你很方便的 只用 轮子
& & 10:31:07 +08:00
@ 可是很多实用的轮子并没有遵循框架
& & 10:31:31 +08:00
@ 标准,说错了
& & 10:58:13 +08:00
举个栗子:最近我家里的淋浴的喷头坏掉了,我在想,是不是要换一个同样牌子的喷头才能接的上(但附近不一定能买到同样牌子的),或者要把一整套全换掉(感觉要花很多钱)。到了五金店我才发现多虑了,原来所有的喷头的接口都一样,也就是有一套统一的标准。
JMS 同样是制定的一套标准(代号 JSR 914 ),一系列 Interface 。消息队列的开发者可以根据这个标准来实现,这样一来,跟其他系统的集成,或者切换使用其他的消息队列,就非常方便。
Java 还有很多其他的标准,如 JAX-RS ( RESTful WebService 标准,代号 JSR 311 )、 JAX-WS 、 JDBC 、 JPA 。。。
其实都是类似于 Java 的接口和实现类, List list = new ArrayList(); 如果哪天觉得 ArrayList 性能不好,换成 LinkedList ,不用改变其他代码,多好~
& & 12:52:52 +08:00
activemq/rabbitmq 是 AMPQ 的具体实现——你可以理解成 nginx 、 apache 、 lighthttpd 、 tomcat 等服务器是 http 协议的具体实现
jms 是 java 消息队列的标准——你可以理解成 Servlet Api 是 java web 开发里与 http 请求、响应交互的标准接口
& & 18:02:58 +08:00
@ 你可以在符合标准的轮子里选嘛。那么多呢是吧
& · & 501 人在线 & 最高记录 3541 & · &
创意工作者们的社区
World is powered by solitude
VERSION: 3.9.7.5 · 53ms · UTC 22:06 · PVG 06:06 · LAX 15:06 · JFK 18:06? Do have faith in what you're doing.}

我要回帖

更多关于 java mq队列 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信