activemq删除队列消息 发出的消息为什么没收到

& & & &JMS即(Java Message Service)接口是一个中关于面向(MOM)的API,用于在两个应用程序之间,或中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供对JMS提供支持。
&&&&&& 在 Java 里有 JMS 的多个实现,ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
&&&&&& JMS 定义了两种方式:Quere(点对点);Topic(发布/订阅)。
&&&&&& ConnectionFactory 是连接工厂,负责创建Connection。Connection 负责创建 Session。Destination 是消息的目的地。
&&&&&& Session 创建 MessageProducer(用来发消息) 和 MessageConsumer(用来接收消息)。
&&&&&& ActiveMQ的官方网址:。在此可以下载ActiveMQ的最新版本和阅读相关文档。
&&&&&& 下面是使用ActiveMQ发送和接收消息的JAVA实现:
&&&&&& 1、消息发送者
&&&&&& package com.
&&&&&& import javax.jms.BytesM
import javax.jms.C
import javax.jms.DeliveryM
import javax.jms.D
import javax.jms.JMSE
import javax.jms.MapM
import javax.jms.MessageP
import javax.jms.ObjectM
import javax.jms.S
import javax.jms.StreamM
import javax.jms.TextM
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
* 说明: activemq send message
* @author xajava
* @version 创建时间: 下午1:22:40
public class JmsSender {
private String USER = ActiveMQConnection.DEFAULT_USER;
private String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private String SUBJECT = &ActiveMQ.Demo&;
private Destination destination =
private Connection conn =
private Session session =
private MessageProducer producer =
private void initialize() throws JMSException, Exception {
// 连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
conn = connectionFactory.createConnection();
// 事务性会话,自动确认消息
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 消息的目的地(Queue/Topic)
destination = session.createQueue(SUBJECT);
// destination = session.createTopic(SUBJECT);
// 消息的提供者(生产者)
producer = session.createProducer(destination);
// 不持久化消息
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
public void sendMessage(String msgType) throws JMSException, Exception {
initialize();
// 连接到JMS提供者(服务器)
conn.start();
// 发送文本消息
if (&text&.equals(msgType)) {
String textMsg = &ActiveMQ Text Message!&;
TextMessage msg = session.createTextMessage();
// TextMessage msg = session.createTextMessage(textMsg);
msg.setText(textMsg);
producer.send(msg);
// 发送Map消息
if (&map&.equals(msgType)) {
MapMessage msg = session.createMapMessage();
msg.setBoolean(&boolean&, true);
msg.setShort(&short&, (short) 0);
msg.setLong(&long&, 123456);
msg.setString(&MapMessage&, &ActiveMQ Map Message!&);
producer.send(msg);
// 发送流消息
if (&stream&.equals(msgType)) {
String streamValue = &ActiveMQ stream Message!&;
StreamMessage msg = session.createStreamMessage();
msg.writeString(streamValue);
msg.writeBoolean(false);
msg.writeLong();
producer.send(msg);
// 发送对象消息
if (&object&.equals(msgType)) {
JmsObjectMessageBean jmsObject = new JmsObjectMessageBean(&ActiveMQ Object Message&, 18, false);
ObjectMessage msg = session.createObjectMessage();
msg.setObject(jmsObject);
producer.send(msg);
// 发送字节消息
if (&bytes&.equals(msgType)) {
String byteValue = &字节消息&;
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(byteValue.getBytes());
producer.send(msg);
// 关闭连接
public void close() throws JMSException {
if (producer != null)
producer.close();
if (session != null)
session.close();
if (conn != null)
conn.close();
2、消息接收者
package com.
import java.util.E
import javax.jms.BytesM
import javax.jms.C
import javax.jms.D
import javax.jms.JMSE
import javax.jms.MapM
import javax.jms.M
import javax.jms.MessageC
import javax.jms.MessageL
import javax.jms.ObjectM
import javax.jms.S
import javax.jms.StreamM
import javax.jms.TextM
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
* @author xajava
* @version 创建时间: 下午2:06:48
public class JmsReceiver implements MessageListener {
private String USER = ActiveMQConnection.DEFAULT_USER;
private String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private String SUBJECT = &ActiveMQ.Demo&;
private Destination dest =
private Connection conn =
private Session session =
private MessageConsumer consumer =
private boolean stop =
private void initialize() throws JMSException, Exception {
& // 连接工厂是用户创建连接的对象.
& ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
& // 连接工厂创建一个jms connection
& conn = connectionFactory.createConnection();
& // 是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。
& session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 不支持事务
& // 目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象.
& dest = session.createQueue(SUBJECT);
& // dest = session.createTopic(SUBJECT);
& // 会话创建消息的生产者将消息发送到目的地
& consumer = session.createConsumer(dest);
& * 消费消息
& * @throws JMSException
& * @throws Exception
public void receiveMessage() throws JMSException, Exception {
& initialize();
& conn.start();
& consumer.setMessageListener(this);
& // 等待接收消息
& while (!stop) {
&& Thread.sleep(5000);
@SuppressWarnings(&rawtypes&)
public void onMessage(Message msg) {
&& if (msg instanceof TextMessage) {
&&& TextMessage message = (TextMessage)
&&& System.out.println(&------Received TextMessage------&);
&&& System.out.println(message.getText());
&& } else if (msg instanceof MapMessage) {
&&& MapMessage message = (MapMessage)
&&& System.out.println(&------Received MapMessage------&);
&&& System.out.println(message.getLong(&long&));
&&& System.out.println(message.getBoolean(&boolean&));
&&& System.out.println(message.getShort(&short&));
&&& System.out.println(message.getString(&MapMessage&));
&&& System.out.println(&------Received MapMessage for while------&);
&&& Enumeration enumer = message.getMapNames();
&&& while (enumer.hasMoreElements()) {
&&&& Object obj = enumer.nextElement();
&&&& System.out.println(message.getObject(obj.toString()));
&& } else if (msg instanceof StreamMessage) {
&&& StreamMessage message = (StreamMessage)
&&& System.out.println(&------Received StreamMessage------&);
&&& System.out.println(message.readString());
&&& System.out.println(message.readBoolean());
&&& System.out.println(message.readLong());
&& } else if (msg instanceof ObjectMessage) {
&&& System.out.println(&------Received ObjectMessage------&);
&&& ObjectMessage message = (ObjectMessage)
&&& JmsObjectMessageBean jmsObject = (JmsObjectMessageBean) message.getObject();
&&& System.out.println(jmsObject.getUserName() + &__& + jmsObject.getAge() + &__& + jmsObject.isFlag());
&& } else if (msg instanceof BytesMessage) {
&&& System.out.println(&------Received BytesMessage------&);
&&& BytesMessage message = (BytesMessage)
&&& byte[] byteContent = new byte[1024];
&&& int length = -1;
&&& StringBuffer content = new StringBuffer();
&&& while ((length = message.readBytes(byteContent)) != -1) {
&&&& content.append(new String(byteContent, 0, length));
&&& System.out.println(content.toString());
&& } else {
&&& System.out.println(msg);
& } catch (JMSException e) {
&& e.printStackTrace();
& } finally {
&&& this.close();
&& } catch (JMSException e) {
&&& e.printStackTrace();
// 关闭连接
public void close() throws JMSException {
& System.out.println(&Consumer:-&Closing connection&);
& if (consumer != null)
&& consumer.close();
& if (session != null)
&& session.close();
& if (conn != null)
&& conn.close();
3、对象消息
package com.
import java.io.S
* 说明: JMS 对象消息示例对象
* @author xajava
* @version 创建时间: 下午1:56:07
public class JmsObjectMessageBean implements Serializable {
private static final long serialVersionUID = 5963095L;
private String userN
private int age = 16;
private boolean flag =
public JmsObjectMessageBean(String userName,int age,boolean flag){
& this.setUserName(userName);
& this.setAge(age);
& this.setFlag(flag);
public String getUserName() {
& return userN
public void setUserName(String userName) {
& this.userName = userN
public int getAge() {
public void setAge(int age) {
& this.age =
public boolean isFlag() {
public void setFlag(boolean flag) {
& this.flag =
package com.
import javax.jms.JMSE
* @author& xajava&
* @version 创建时间: 下午4:33:17&
public class Test {
public static void main(String[] args) throws JMSException, Exception {
& JmsSender sender = new JmsSender();
& JmsReceiver receiver = new JmsReceiver();
& sender.sendMessage(&bytes&);
& sender.close();
& receiver.receiveMessage();
& receiver.close();
package com.
import javax.jms.JMSE
* @author& xajava&
* @version 创建时间: 下午4:33:17&
public class Test {
public static void main(String[] args) throws JMSException, Exception {
JmsSender sender = new JmsSender();
JmsReceiver receiver = new JmsReceiver();
sender.sendMessage(&bytes&);
sender.close();
receiver.receiveMessage();
receiver.close();
package com.
import javax.jms.JMSE
* @author& xajava&
* @version 创建时间: 下午4:33:17&
public class Test {
public static void main(String[] args) throws JMSException, Exception {
& JmsSender sender = new JmsSender();
& JmsReceiver receiver = new JmsReceiver();
& sender.sendMessage(&bytes&);
& sender.close();
& receiver.receiveMessage();
& receiver.close();
---------------------------------------------------------------------------------
http://www.blogjava.net/xajava/archive//390165.html
本文已收录于以下专栏:
相关文章推荐
消息主体包含了消息的核心数据。
JMS 定义了5中消息类型: TextMessage、MapMessage、BytesMessage、
StreamMessage和Obje...
简单发送实例:
package com.xuwei.
import javax.jms.C
import javax.jms.ConnectionFac...
程序员升职加薪指南!还缺一个“证”!
CSDN出品,立即查看!
示例中使用activeMQ-all的jar包实现jms消息的发送和接收,发送端和接收端不依赖与服务器,如果想了解使用服务器配置的方式请参照之前的jsm和MDB的文章。
注意topic消息...
1、首先,在这里确认一下版本,activemq版本是apache-activemq-5.4.1-bin.zip,activemq安装环境是win7,jdk是1.6版本。下载apache-activem...
ActiveMQ 部署及发送接收消息
下载地址:http://activemq.apache.org/ 我这里使用的版本为当前最新5.8.0。
下载版本有Wind...
之所以将题目 成为ActiveMq小结 是因为 最近我做的一个项目用到了JMS 然后我采用了开源的MQ 这个,但因为时间比较紧,所以 我暂时没有花太多的时间去研究它。不过也看了一些网友的文章,给了我许...
错误信息:
org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'aisleServ...
当前配置是允许接收所有的对象序列化,更多设置详情参考官网:
http://activemq.apache.org/objectmessage.html1、保证当前MQ能正常接收发送消息
2、发送端...
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)君,已阅读到文档的结尾了呢~~
豆丁精品文档: jms activemq activemq spring spring集成activemq activemq apache activemq spring3 activemq spring jms activemq启动 linux安装activemq activemq下载
扫扫二维码,随身浏览文档
手机或平板扫扫即可继续访问
spring_JMS、activemq中消费者收不到生产者发送的消息的原因解析
举报该文档为侵权文档。
举报该文档含有违规或不良信息。
反馈该文档无法正常浏览。
举报该文档为重复文档。
推荐理由:
将文档分享至:
分享完整地址
文档地址:
粘贴到BBS或博客
flash地址:
支持嵌入FLASH地址的网站使用
html代码:
&embed src='/DocinViewer-4.swf' width='100%' height='600' type=application/x-shockwave-flash ALLOWFULLSCREEN='true' ALLOWSCRIPTACCESS='always'&&/embed&
450px*300px480px*400px650px*490px
支持嵌入HTML代码的网站使用
您的内容已经提交成功
您所提交的内容需要审核后才能发布,请您等待!
3秒自动关闭窗口spring整合activeMq监听不到消息是怎么回事、、、【java吧】_百度贴吧
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&签到排名:今日本吧第个签到,本吧因你更精彩,明天继续来努力!
本吧签到人数:0成为超级会员,使用一键签到本月漏签0次!成为超级会员,赠送8张补签卡连续签到:天&&累计签到:天超级会员单次开通12个月以上,赠送连续签到卡3张
关注:637,149贴子:
spring整合activeMq监听不到消息是怎么回事、、、收藏
spring整合 MQ 消费者监听不到消息、、、懂MQ的帮忙看看、、 配置文件如下生产者:&?xml version=&1.0& encoding=&UTF-8&?&&beans xmlns=&&
xmlns:xsi=&& xmlns:jms=&&
xsi:schemaLocation=&
&!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--&
&bean id=&targetConnectionFactory& class=&org.apache.activemq.ActiveMQConnectionFactory&&
&!-- mq服务器地址 --&
&property name=&brokerURL& value=&tcp://localhost:61616&/&
&!-- 用户名 --&
&property name=&userName& value=&admin&/&
&!-- 密码 --&
&property name=&password& value=&admin&/&
&bean id=&pooledConnectionFactory& class=&org.apache.activemq.pool.PooledConnectionFactory&&
&property name=&connectionFactory& ref=&targetConnectionFactory&/&
&property name=&maxConnections& value=&10&/&
&!-- 配置客户端消息工厂 --&
&bean id=&connectionFactory& class=&org.springframework.jms.connection.SingleConnectionFactory&&
&property name=&targetConnectionFactory& ref=&pooledConnectionFactory&/&
&!-- 定义消息队列(Queue) --&
&bean id=&queueDestination& class=&org.mand.ActiveMQQueue&&
&!-- 设置消息队列的名字 --&
&constructor-arg&
&value&testSpringQueue&/value&
&/constructor-arg&
&bean id=&remoteMessageJmsTemplate& class=&com.intcache.god.service.CustomJmsTemplate&&
&property name=&connectionFactory& ref=&connectionFactory&/&
&property name=&explicitQosEnabled& value=&true&/&
&property name=&deliveryMode& value=&1&/&
&property name=&defaultDestination& ref=&queueDestination&/&
&property name=&receiveTimeout& value=&10000& /&
&/bean&&/beans&消费者:&?xml version=&1.0& encoding=&UTF-8&?&&beans xmlns=&&
xmlns:xsi=&& xmlns:amq=&&
xsi:schemaLocation=&
&!-- ActiveMQ 连接工厂 --&
&!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --&
&!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码 --&
&amq:connectionFactory id=&amqConnectionFactory&
brokerURL=&tcp://localhost:61616& userName=&admin& password=&admin& /&
&!-- Spring Caching连接工厂 --&
&!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --&
&bean id=&connectionFactory&
class=&org.springframework.jms.connection.CachingConnectionFactory&&
&!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --&
&property name=&targetConnectionFactory& ref=&amqConnectionFactory&&&/property&
&!-- Session缓存数量 --&
&property name=&sessionCacheSize& value=&100& /&
&!--这个是队列目的地 --&
&!-- 定义消息队列(Queue) --&
&bean id=&queueDestination& class=&org.mand.ActiveMQQueue&&
&!-- 设置消息队列的名字 --&
&constructor-arg value=&testSpringQueue&/&
&!-- 消息消费者 start --&
&!-- 消息监听器 --&
&bean id=&queueReceiver1& class=&com.manage.interceptors.ConsumerMessageListener& /&
&!-- 定义 topic 监听器 --&
&!--&jms:listener-container destination-type=&topic& container-type=&default& connection-factory=&connectionFactory& acknowledge=&auto&&--&
&!--&jms:listener destination=&messageTest& ref=&queueReceiver1&/&--&
&!--&/jms:listener-container&--&
&!-- 消息监听容器,配置连接工厂,监听器是上面定义的监听器 --&
&bean id=&jmsContainer& class=&org.springframework.jms.listener.DefaultMessageListenerContainer&&
&property name=&connectionFactory& ref=&connectionFactory& /&
&property name=&destination& ref=&queueDestination& /&
&!--主题(Topic)和队列消息的主要差异体现在JmsTemplate中&pubSubDomain&是否设置为True。如果为True,则是Topic;如果是false或者默认,则是queue--&
&property name=&pubSubDomain& value=&true& /&
&property name=&messageListener& ref=&queueReceiver1& /&
&/bean&&/beans&应该不是代码问题,觉得是配置问题,MQ中可以看到发送的消息 ,但是消费者就是接收不到、、、、
登录百度帐号推荐应用16:35 提问
ActiveMQ 有时候接收不到消息
使用ActiveMQ做个小例子的时候, 有时候消息接收不到,有时候可以,没有被消费的消息,再重启后又能全部接收到,不知道问题出在哪。谢谢各位大神指点
Spring ActiveMQ 配置如下
package com.zym.robot.
import org.apache.activemq.ActiveMQConnectionF
import org.springframework.beans.factory.annotation.A
import org.springframework.context.annotation.B
import org.springframework.context.annotation.C
import org.springframework.context.annotation.L
import org.springframework.jms.connection.CachingConnectionF
import com.zym.robot.constant.C
@Configuration
@Lazy(false)
public class ConnectionFactoryConfig{
@Autowired
private ApplicationUtil applicationU
@Bean(name="activeMQConnectionFactory")
public ActiveMQConnectionFactory activeMQConnectionFactory(){
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(Constant.activemq_brokerurl);
activeMQConnectionFactory.setUserName(Constant.activemq_username);
activeMQConnectionFactory.setPassword(Constant.activemq_password);
activeMQConnectionFactory.setTrustAllPackages(true);
return activeMQConnectionF
@Bean(name="connectionFactory")
public CachingConnectionFactory connectionFactory(){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory((ActiveMQConnectionFactory)applicationUtil.getBean("activeMQConnectionFactory"));
cachingConnectionFactory.setSessionCacheSize(10);
return cachingConnectionF
package com.zym.robot.
import javax.jms.D
import org.mand.ActiveMQQ
import org.springframework.beans.factory.annotation.A
import org.springframework.beans.factory.annotation.Q
import org.springframework.context.annotation.B
import org.springframework.context.annotation.C
import org.springframework.context.annotation.I
import org.springframework.context.annotation.L
import org.springframework.jms.connection.CachingConnectionF
import org.springframework.jms.core.JmsT
@Configuration
@Import(ConnectionFactoryConfig.class)
@Lazy(false)
public class MQTemplateFactory {
@Autowired
@Qualifier("connectionFactory")
private CachingConnectionFactory connectionF
@Autowired
private ApplicationUtil applicationU
* @author ZhaoYM
* @description 微信消息队列
@Bean(name="messageQueueTemplate")
public JmsTemplate messageQueueTemplate(){
JmsTemplate template = new JmsTemplate();
template.setPubSubNoLocal(false);
template.setConnectionFactory(connectionFactory);
template.setDefaultDestination((ActiveMQQueue)applicationUtil.getBean("messageQueue"));
* @author ZhaoYM
* @description 登录消息队列
@Bean(name="loginQueueTemplate")
public JmsTemplate loginQueueTemplate(){
JmsTemplate template = new JmsTemplate();
template.setPubSubNoLocal(false);
template.setConnectionFactory(connectionFactory);
template.setDefaultDestination((ActiveMQQueue)applicationUtil.getBean("loginQueue"));
@Bean(name="messageQueue")
public Destination messageQueue(){
ActiveMQQueue activeMQQueue = new ActiveMQQueue();
activeMQQueue.setPhysicalName("messageQueue");
return activeMQQ
@Bean(name="loginQueue")
public Destination loginQueue(){
ActiveMQQueue activeMQQueue = new ActiveMQQueue();
activeMQQueue.setPhysicalName("loginQueue");
return activeMQQ
发送消息代码
package com.zym.robot.
import java.io.S
import org.springframework.jms.core.JmsT
public class SendMessage {
* @author ZhaoYM
* @description 发送消息
* @param object
* @param jmsTemplate
public static void sendMessage(Serializable object, JmsTemplate jmsTemplate){
SimpleMessageCreatorImpl messageCreator = new SimpleMessageCreatorImpl();
messageCreator.setMessage(object);
jmsTemplate.send(messageCreator);
package com.zym.robot.
import java.io.S
import javax.jms.JMSE
import javax.jms.M
import javax.jms.S
import org.springframework.jms.core.MessageC
* @author ZhaoYM
@description message 简单实现
public class SimpleMessageCreatorImpl implements MessageCreator{
public Serializable getMessage() {
public void setMessage(Serializable message) {
this.message =
public Message createMessage(Session session) throws JMSException {
if (message == null) {
return session.createTextMessage("默认消息");
return session.createObjectMessage(getMessage());
按赞数排序
把发消息和收消息放到两个进程启动,对于新手建议用onmessage代替receive
其他相似问题ActiveMQ Broker发送消息给消费者过程详解 - Donald_Draper - ITeye博客
博客分类:
JMS(ActiveMQ) PTP和PUB/SUB模式实例:
ActiveMQ连接工厂、连接详解:
ActiveMQ会话初始化:
ActiveMQ生产者:
ActiveMQ消费者:
ActiveMQ启动过程详解:
ActiveMQ Broker发送消息给消费者过程详解:
Spring与ActiveMQ的集成:
Spring与ActiveMQ的集成详解一:
Spring与ActiveMQ的集成详解二:
引言:
从activemq脚本,可以看出启动ActiveMQ实际是启动,bin文件夹下的其实activemq.jar
包中有一个类为Main,这就是active的启动入口,Main主要是加载lib目录和ClassPath,初始化
类加载器,委托给ShellCommand,由ShellCommand根据命令描述去执行,如果是Version和HELP,
则打印信息,若是启动命令,则通过XBeanBrokerFactory创建BrokerService,这个过程主要利用的Spring的bean容器机制,然后启动BrokerService,主要启动持久化适配器,JMX连接,上下文关系器,最后启动所有网络连接,及TcpTransport连接TransportConnector,默认使用的是openwire:tcp,所以我们就看一下TcpTransportServer,TcpTransportServer有TcpTransportFactory创建并配置OpenWire协议转换器,启动TcpTransportServer,就是从ServerSocketFactory获取ServerSocket,并绑定ip和port,监听连接
,并设置ServerSocket的监听器org.apache.activemq.transport.nio.SelectorManager.Listener,这个用的是java nio。
前一篇文章中,说过ActiveMQ启动过程,今天看一下TcpTransportServer与ActiveMQConnection如何交互,如何将消息发送给消费者,从TransportConnector启动开始。
//TransportConnector
//启动TCP监听
public void start()
throws Exception
broker = brokerService.getBroker();
brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setBrokerId(broker.getBrokerId());
brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
//添加TcpTransportServer的监听器
getServer().setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport)
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run()
if(!brokerService.isStopping())
//创建TransportConnector,并启动
Connection connection = createConnection(transport);
connection.start();
final Transport val$
final _cls1 this$1;
this$1 = _cls1.
transport = transport1;
final TransportConnector this$0;
this$0 = TransportConnector.
//启动TcpTransportServer
getServer().setBrokerInfo(brokerInfo);
getServer().start();
DiscoveryAgent da = getDiscoveryAgent();
if(da != null)
da.registerService(getPublishableConnectString());
da.start();
if(enableStatusMonitor)
statusDector = new TransportStatusDetector(this);
statusDector.start();
//创建TransportConnector连接
protected Connection createConnection(Transport transport)
throws IOException
TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null : taskRunnerFactory, brokerService.getTaskRunnerFactory());
boolean statEnabled = getStatistics().isEnabled();
answer.getStatistics().setEnabled(statEnabled);
answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
我们再来看TcpTransportServer的启动
getServer().start();
这个启动有两层含义一是启动TcpTransportServer线程,而是启动Service,
先来看第一层含义
//TcpTransportServer
public void run()
final ServerSocketChannel chan = serverSocket.getChannel();
if(chan != null)
//如果socket通道存在,则设置通道选择器
chan.configureBlocking(false);
selector = SelectorManager.getInstance().register(chan, new org.apache.activemq.transport.nio.SelectorManager.Listener() {
public void onSelect(SelectorSelection sel)
SocketChannel sc = chan.accept();
if(sc != null)
if(isStopped() || getAcceptListener() == null)
sc.close();
if(useQueueForAccept)
socketQueue.put(sc.socket());
handleSocket(sc.socket());
final ServerSocketChannel val$
final TcpTransportServer this$0;
this$0 = TcpTransportServer.
selector.setInterestOps(16);
selector.enable();
//如果socket通道不存在,则serverSocket接受连接,并处理Socket连接
if(isStopped())
Socket socket =
socket = serverSocket.accept();
if(socket != null)
if(isStopped() || getAcceptListener() == null)
socket.close();
if(useQueueForAccept)
socketQueue.put(socket);
handleSocket(socket);
} while(true);
protected final void handleSocket(Socket socket)
boolean closeSocket =
if(currentTransportCount.get() &= maximumConnections)
throw new ExceededMaximumConnectionsException("Exceeded the maximum number of allowed client connections. See the 'maximumConnections' property on the TCP transport configuration URI in the ActiveMQ configuration file (e.g., activemq.xml)");
HashMap options = new HashMap();
options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
options.put("trace", Boolean.valueOf(trace));
options.put("soTimeout", Integer.valueOf(soTimeout));
options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
options.put("logWriterName", logWriterName);
options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
options.put("startLogging", Boolean.valueOf(startLogging));
options.putAll(transportOptions);
WireFormat format = wireFormatFactory.createWireFormat();
//创建transport
Transport transport = createTransport(socket, format);
closeSocket =
//将transport添加到ServiceSupport监听器列表中
if(transport instanceof ServiceSupport)
((ServiceSupport)transport).addServiceListener(this);
Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
//TcpTransportServer监听器接受连接transport,
getAcceptListener().onAccept(configuredTransport);
currentTransportCount.incrementAndGet();
protected Transport createTransport(Socket socket, WireFormat format)
throws IOException
return new TcpTransport(format, socket);
TcpTransportServer父类栈
public class TcpTransportServer extends TransportServerThreadSupport
implements ServiceListener
public abstract class TransportServerThreadSupport extends TransportServerSupport
implements Runnable
public abstract class TransportServerSupport extends ServiceSupport
implements TransportServer
再来看Service层的启动
public abstract class ServiceSupport
implements Service
private AtomicB
private AtomicB
private AtomicB
private List serviceL//service监听器
public void start()
throws Exception
if(!pareAndSet(false, true))
break MISSING_BLOCK_LABEL_93;
stopped.set(false);
preStart();
//doStart为抽象函数,待父类扩展
doStart();
started.set(success);
break MISSING_BLOCK_LABEL_54;
started.set(success);
//启动所有Service监听,在TcpServe处理Socket的连接中(handleSocket),
//将transport添加到ServiceSupport监听器列表中
for(Iterator i$ = serviceListeners.iterator(); i$.hasNext(); l.started(this))
l = (ServiceListener)i$.next();
而TcpTransport也是service,来看他的启动,也有两层含义,第一启动TcpTransport线程,而启动Service,
先看第一层
//初始化Socket,及数据输入输出流,已经过,这里不再将
protected void doStart()
throws Exception
connect();
stoppedLatch.set(new CountDownLatch(1));
super.doStart();
protected void connect()
throws Exception
InetSocketAddress localAddress =
InetSocketAddress remoteAddress =
if(localLocation != null)
localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
if(remoteLocation != null)
String host = resolveHostName(remoteLocation.getHost());
remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
trafficClassSet = setTrafficClass(socket);
if(socket != null)
if(localAddress != null)
socket.bind(localAddress);
if(remoteAddress != null)
if(connectionTimeout &= 0)
socket.connect(remoteAddress, connectionTimeout);
socket.connect(remoteAddress);
if(localAddress != null)
socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
initialiseSocket(socket);
initializeStreams();
再看TcpTransport线程的启动
public class TcpTransport extends TransportThreadSupport
implements Transport, Service, Runnable
public void run()
LOG.trace((new StringBuilder()).append("TCP consumer thread for ").append(this).append(" starting").toString());
runnerThread = Thread.currentThread();
//如果TcpTransport
for(; !isStopped(); doRun());
((CountDownLatch)stoppedLatch.get()).countDown();
protected void doRun()
throws IOException
//读取命令
Object command = readCommand();
//处理命令
doConsume(command);
}
读取命令
Object command = readCommand();
protected Object readCommand()
throws IOException
//通过wireFormat解析字节流
return wireFormat.unmarshal(dataIn);
}
//OpenWireFormat
public final class OpenWireFormat
implements WireFormat
private DataStreamMarshaller dataMarshallers[];
private boolean stackTraceE
private boolean tcpNoDelayE
private boolean cacheE
private boolean tightEncodingE
private boolean sizePrefixD
private long maxFrameS
private short nextMarshallCacheI
private short nextMarshallCacheEvictionI
private Map marshallCacheM
private DataStructure marshallCache[];//命令字节流发送缓存
private DataStructure unmarshallCache[];//命令字节流解析缓存
private DataByteArrayOutputStream bytesO//数据输入流
private DataByteArrayInputStream bytesIn;//数据输出流
private WireFormatInfo preferedWireFormatI//协议格式信息
public synchronized Object unmarshal(ByteSequence sequence)
throws IOException
//从二进制字节流读取数据到缓存,记录读取位置
bytesIn.restart(sequence);
if(!sizePrefixDisabled)
int size = bytesIn.readInt();
if(sequence.getLength() - 4 == size);
if((long)size & maxFrameSize)
throw new IOException((new StringBuilder()).append("Frame size of ").append(size / 1048576).append(" MB larger than max allowed ").append(maxFrameSize / 1048576L).append(" MB").toString());
//解析输入流,转为为command
Object command = doUnmarshal(bytesIn);
}
//DataByteArrayInputStream
public final class DataByteArrayInputStream extends InputStream
implements DataInput
private byte buf[];
//从二进制字节流读取数据到缓存,记录读取位置
public void restart(ByteSequence sequence)
buf = sequence.getData();
pos = sequence.getOffset();
}
//OpenWireFormat
解析输入流,转为为command
public Object doUnmarshal(DataInput dis)
throws IOException
//获取命令类型
byte dataType = dis.readByte();
if(dataType != 0)
//创建命令字节流对应大小的字节流处理器,DataStreamMarshaller为WireFormatInfoMarshaller
DataStreamMarshaller dsm = dataMarshallers[dataType & 255];
if(dsm == null)
throw new IOException((new StringBuilder()).append("Unknown data type: ").append(dataType).toString());
//创建命令对应的数据结构
Object data = dsm.createObject();
if(tightEncodingEnabled)
BooleanStream bs = new BooleanStream();
bs.unmarshal(dis);
dsm.tightUnmarshal(this, data, dis, bs);
//解析命令字节流
dsm.looseUnmarshal(this, data, dis);
//WireFormatInfoMarshaller
public class WireFormatInfoMarshaller extends BaseDataStreamMarshaller
//创建命令对应的数据结构
public DataStructure createObject()
return new WireFormatInfo();
//设置WireFormat格式下,命令命令对应的魔数,版本信息
public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn)
throws IOException
//解析命令字节流,委托给BaseDataStreamMarshaller
super.looseUnmarshal(wireFormat, o, dataIn);
WireFormatInfo info = (WireFormatInfo)o;
//如果命令字节流属性不为null,则初始化命令字节流
info.beforeUnmarshall(wireFormat);
info.setMagic(looseUnmarshalConstByteArray(dataIn, 8));
info.setVersion(dataIn.readInt());
//设置命令字节流属性
info.setMarshalledProperties(looseUnmarshalByteSequence(dataIn));
info.afterUnmarshall(wireFormat);
}
//WireFormatInfo
public class WireFormatInfo
implements Command, MarshallAware
public static final byte DATA_STRUCTURE_TYPE = 1;
private static final int MAX_PROPERTY_SIZE = 4096;
private static final byte MAGIC[] = {
65, 99, 116, 105, 118, 101, 77, 81
protected byte magic[];
protected ByteSequence marshalledP
protected transient M
private transient E
private transient E
public void beforeMarshall(WireFormat wireFormat)
throws IOException
if(marshalledProperties == null && properties != null)
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
MarshallingSupport.marshalPrimitiveMap(properties, os);
os.close();
marshalledProperties = baos.toByteSequence();
//设置命令字节流属性
public void setMarshalledProperties(ByteSequence marshalledProperties)
this.marshalledProperties = marshalledP
}
//BaseDataStreamMarshaller
public abstract class BaseDataStreamMarshaller
implements DataStreamMarshaller
public static final Constructor STACK_TRACE_ELEMENT_CONSTRUCTOR;
Constructor constructor =
constructor = java/lang/StackTraceElement.getConstructor(new Class[] {
java/lang/String, java/lang/String, java/lang/String, Integer.TYPE
catch(Throwable throwable) { }
STACK_TRACE_ELEMENT_CONSTRUCTOR =
public void looseUnmarshal(OpenWireFormat openwireformat, Object obj, DataInput datainput)
throws IOException
回到TcpTransport处理命令
doConsume(command);
处理命令
public void doConsume(Object command)
if(command != null)
if(transportListener != null)
//如果transport监听器不为空,则处理命令
transportListener.onCommand(command);
LOG.error((new StringBuilder()).append("No transportListener available to process inbound command: ").append(command).toString());
public abstract class TransportSupport extends ServiceSupport
implements Transport
TransportListener transportL//transport监听器
public void doConsume(Object command)
if(command != null)
if(transportListener != null)
transportListener.onCommand(command);
LOG.error((new StringBuilder()).append("No transportListener available to process inbound command: ").append(command).toString());
再回到看ActiveMQConnection实现transportListener
public class ActiveMQConnection
implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection
public void onCommand(Object o)
final Command command = (Command)o;
if(!closed.get() && command != null)
command.visit(new CommandVisitorAdapter() {
//分发消息
public Response processMessageDispatch(MessageDispatch md)
throws Exception
waitForTransportInterruptionProcessingToComplete();
//根据分发消息id,获取消费者,然后消费者,消费消息
ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)dispatchers.get(md.getConsumerId());
if(dispatcher != null)
Message msg = md.getMessage();
if(msg != null)
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
msg.setMemoryUsage(null);
md.setMessage(msg);
//分发消息
dispatcher.dispatch(md);
//处理生产者恢复消息
public Response processProducerAck(ProducerAck pa)
throws Exception
if(pa != null && pa.getProducerId() != null)
ActiveMQMessageProducer producer = (ActiveMQMessageProducer)producers.get(pa.getProducerId());
if(producer != null)
producer.onProducerAck(pa);
//处理broker
public Response processBrokerInfo(BrokerInfo info)
throws Exception
brokerInfo =
brokerInfoReceived.countDown();
optimizeAcknowledge = brokerInfo.isFaultTolerantConfiguration() ? 0 : 1;
getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
//处理连接错误
public Response processConnectionError(final ConnectionError error)
throws Exception
executor.execute(new Runnable() {
public void run()
onAsyncException(error.getException());
final ConnectionError val$
final _cls3 this$1;
this$1 = _cls3.
//处理控制命令
public Response processControlCommand(ControlCommand command)
throws Exception
onControlCommand(command);
//处理连接命令
public Response processConnectionControl(ConnectionControl control)
throws Exception
onConnectionControl((ConnectionControl)command);
//处理消费控制命令
public Response processConsumerControl(ConsumerControl control)
throws Exception
onConsumerControl((ConsumerControl)command);
public Response processWireFormat(WireFormatInfo info)
throws Exception
onWireFormatInfo((WireFormatInfo)command);
final Command val$
final ActiveMQConnection this$0;
this$0 = ActiveMQConnection.
command = command1;
//启动所有连接注册的监听器
TransportL
for(Iterator iter = transportListeners.iterator(); iter.hasNext(); listener.onCommand(command))
listener = (TransportListener)iter.next();
处理消费控制命令
protected void onConsumerControl(ConsumerControl command)
if(command.isClose())
for(Iterator i$ = sessions.iterator(); i$.hasNext(); session.close(command.getConsumerId()))
session = (ActiveMQSession)i$.next();
Iterator i$;
//设置会话消费者抓取数据大小
for(i$ = sessions.iterator(); i$.hasNext(); session.setPrefetchSize(command.getConsumerId(), command.getPrefetch()))
session = (ActiveMQSession)i$.next();
i$ = connectionConsumers.iterator();
if(!i$.hasNext())
ActiveMQConnectionConsumer connectionConsumer = (ActiveMQConnectionConsumer)i$.next();
ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
if(consumerInfo.getConsumerId().equals(command.getConsumerId()))
//设置消费抓取数据大小
consumerInfo.setPrefetchSize(command.getPrefetch());
} while(true);
//启动所有连接注册的监听器
TransportL
for(Iterator iter = transportListeners.iterator(); iter.hasNext(); listener.onCommand(command))
listener = (TransportListener)iter.next();
在ActiveMQConnection构造中有这么一段
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats)
throws Exception
//transport的监听器为ActiveMQConnection
this.transport.setTransportListener(this);
stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
this.factoryStats.addConnection(this);
connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
实际上调用的是ActiveMQConnection的onCommand
ResponseCorrelator
public void onCommand(Object o)
Command command =
if(o instanceof Command)
command = (Command)o;
throw new ClassCastException((new StringBuilder()).append("Object cannot be converted to a Command,
Object: ").append(o).toString());
if(command.isResponse())
Response response = (Response)
FutureResponse future =
synchronized(requestMap)
future = (FutureResponse)requestMap.remove(Integer.valueOf(response.getCorrelationId()));
if(future != null)
future.set(response);
LOG.debug((new StringBuilder()).append("Received unexpected response: {").append(command).append("}for command id: ").append(response.getCorrelationId()).toString());
getTransportListener().onCommand(command);
回到ActiveMQConnection的onCommand
//分发消息
public Response processMessageDispatch(MessageDispatch md)
throws Exception
waitForTransportInterruptionProcessingToComplete();
//根据分发消息id,获取消费者,然后消费者,消费消息
ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)dispatchers.get(md.getConsumerId());
if(dispatcher != null)
Message msg = md.getMessage();
if(msg != null)
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
//设置分发消息连接
msg.setConnection(ActiveMQConnection.this);
msg.setMemoryUsage(null);
md.setMessage(msg);
//分发消息
dispatcher.dispatch(md);
public class ActiveMQConnectionConsumer
implements ConnectionConsumer, ActiveMQDispatcher
private ActiveMQC//连接
private ServerSessionPool sessionP//会话池
private ConsumerInfo consumerI//消费者信息
public void dispatch(MessageDispatch messageDispatch)
ServerSession serverS
messageDispatch.setConsumer(this);
//获取护花
serverSession = sessionPool.getServerSession();
Session s = serverSession.getSession();
if(s instanceof ActiveMQSession)
session = (ActiveMQSession)s;
if(s instanceof ActiveMQTopicSession)
ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s;
session = (ActiveMQSession)topicSession.getNext();
if(s instanceof ActiveMQQueueSession)
ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s;
session = (ActiveMQSession)queueSession.getNext();
connection.onClientInternalException(new JMSException((new StringBuilder()).append("Session pool provided an invalid session type: ").append(s.getClass()).toString()));
//会话分发消息,这个前面已说过,就是
session.dispatch(messageDispatch);
serverSession.start();
catch(JMSException e)
connection.onAsyncException(e);
//ActiveMQSession
public void dispatch(MessageDispatch messageDispatch)
//会话执行,执行消息分发
executor.execute(messageDispatch);
}
//ActiveMQSessionExecutor
public class ActiveMQSessionExecutor
implements Task
private final ActiveMQS
private final MessageDispatchChannel messageQ//未消费消息队列
private boolean dispatchedBySessionP
private volatile TaskRunner taskR
private boolean startedOrWarnedThatNotS
void execute(MessageDispatch message)
throws InterruptedException
if(!startedOrWarnedThatNotStarted)
ActiveMQConnection connection = session.
long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
if(connection.isStarted() || aboutUnstartedConnectionTimeout & 0L)
startedOrWarnedThatNotStarted =
long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();
if(elapsedTime & aboutUnstartedConnectionTimeout)
LOG.warn((new StringBuilder()).append("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: ").append(connection).append(" Received: ").append(message).toString());
startedOrWarnedThatNotStarted =
if(!session.isSessionAsyncDispatch() && !dispatchedBySessionPool)
//如果不是异步分发消息,则直接分发消息
dispatch(message);
//将分发消息添加到未分发消息队列
messageQueue.enqueue(message);
}
我们来看同步,获取会话消费者,遍历消费,消费消息
void dispatch(MessageDispatch message)
Iterator i$ = session.consumers.iterator();
if(!i$.hasNext())
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i$.next();
ConsumerId consumerId = message.getConsumerId();
if(!consumerId.equals(consumer.getConsumerId()))
consumer.dispatch(message);
} while(true);
}
上面分发消息这一段,我们在前几篇有说过,这一就不在讲。
总结:
TransportConnector的启动,主要为添加TcpTransportServer的监听器TransportAcceptListener,监听器主要任务是接受连接,并启动连接TransportConnector,在启动的过程启动一个TcpTransportServer,并启动TcpTransportServer,TcpTransportServer监听连接请求,如果有连接请求,则创建连接,同时启动连接的Transport,Transport启动过程主要是读取命令,然后交由TransportListener处理,实际为ActiveMQConnection,如果命令为消息分发命令则有ActiveMQConnectionConsumer根据分化消息获取消费者信息,并从ActiveMQConnectionConsumer获取连接会话,然后由会话来分发消息,最后交由会话执行器分发消息。
Donald_Draper
浏览: 121074 次
taibangtle~~~
xuexile ~~~~
学习了~~~·
xuexile~~~}

我要回帖

更多关于 activemq 消息类型 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信