rabbitmq delivery tag 内存不够用怎么办办

使用rabbitmq工作队列实现任务的负载分发 - 推酷
使用rabbitmq工作队列实现任务的负载分发
大部门下面的测试部,也就是子键他们在搞大批量的硬件信息数据抓取,这次不能用那些高端的saltstack ansible了。因为我们要远程的用ipmitool的接口来抓取信息,要是用在client搞的话,还要给他们密码,这个是很不安全的。 so,要搞一套基于自己的一套密码认证及数据抓取的平台。
他们最后决定用gearman,虽然我也用过这东西,但是总感觉缺点啥,用着不顺畅。
其实我个人还是推荐用mq的东西。我用zeromq实现了分布式的任务的派发,性能很是强劲,在可用性上也做了很多的监控。
虽然不能推荐他们用rabbitmq,但我还是把rabbitmq在实际中的小应用,分享出来。希望对大家有些帮助。
rabbitmq redis的对比
rabbitMQ和redis等都可以做队列,但是他们还是有区别的,比如,redis的消息队列,如果在从队列pop出去的时候,worker处理失败的话,数据不会回到队列中,需要从业务中手动把失败的处理数据push到队列中,而rabbmitMQ可以自动处理失败的worker使数据不丢失;rabbitMQ还可以保证数据在传输过程中持久化,在通道和队列中的数据可以设置为持久化。
rabbitmq zeromq的对比
rabbitmq 虽然没有zeromq那样的速度,但是他在一定的程度上提供了更加可靠的mq,有持久化和防止崩溃的处理。
下面还有详细的对比的。
烦人的官方化介绍:
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然
先来理解他的一些个专有的名词 ~
channel:通道,amqp支持一个tcp连接上启用多个mq通信通道,每个通道都可以被作为通信流。
producer:生产者,是消息产生的源头。
exchange:交换机,可以理解为具有路由表的路由规则。
queues:队列,装载消息的缓存容器。
consumer:消费者,连接到队列并取走消息的客户端。
核心思想:在RabbitMQ中,生产者从不直接将消息发送给队列。
事实上,有些生产者甚至不知道消息是否被送到某个队列中去了。生产者只负责将消息送给交换机,而交换机确切地知道什么消息应该送到哪。
bind:绑定,实际上可以理解为交换机的路由规则。每个消息都有一个称为路由键的属性(routing key),就是一个简单的字符串。一个绑定将【交换机,路由键,消息送达队列】三者绑定在一起,形成一条路由规则。
exchange type:交换机类型:
fanout:不处理路由键,转发到所有绑定的队列上
direct:处理路由键,必须完全匹配,即路由键字符串相同才会转发
topic:路由键模式匹配,此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
关于传说中的性能~
下图是一些个大牛做的综合的测试。
对于rabbitmq zeromq,我自己虽然没有专门的测试,但是实际应用中都有应用的。 zeromq的速度不用质疑,确实很快很快,但是他不做数据的存储持久化和可用性,要是client没有开启的话,他照样会把信息pub出去,不管你存不存在。但是rabbitmq就考虑了很多,看下图大家就知道了。
对头,rabbitmq虽然性能不能和zeromq相比,但是在项目中应用还算不错的。
那么,rabbitmq server的安装很是简单
yum -y install rabbitmq-server
yum - y install rabbitmq - server
我们可以看到他所依赖的那些关联包~ 大家应该知道他是erlang写的吧 !
写的一个小demo 发送端【生产者】,可以想成一个老板,把这次要做的事情到分给大家。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
import sys
import random
def makepassword(rang = &qwertyupasdfghjkzxcvbnm&, size = 8):
return string.join(random.sample(rang, size)).replace(& &,&&)
parameters = pika.ConnectionParameters(host = 'localhost' )
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue = 'task_queue' , durable = True )
message=makepassword()
channel.basic_publish(exchange = '',
routing_key = 'task_queue' ,
body = message,
properties = pika.BasicProperties(
delivery_mode = 2 , # make message persistent
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
import sys
import random
def makepassword ( rang = &qwertyupasdfghjkzxcvbnm& , size = 8 ) :
return string . join ( random . sample ( rang , size ) ) . replace ( & & , && )
parameters = pika . ConnectionParameters ( host = 'localhost' )
connection = pika . BlockingConnection ( parameters )
channel = connection . channel ( )
channel . queue_declare ( queue = 'task_queue' , durable = True )
message = makepassword ( )
channel . basic_publish ( exchange = '' ,
routing_key = 'task_queue' ,
body = message ,
properties = pika . BasicProperties (
delivery_mode = 2 , # make message persistent
接收端【消费者】,可以想成是工人的角色。有几个工人,就几个工人,就几个工人一块干活。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost' ))
channel = connection.channel()
channel.queue_declare(queue = 'task_queue' , durable = True )
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print & [x] Received %r& % (body,)
print & [x] Done&
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count = 1 )
channel.basic_consume(callback,
queue = 'task_queue' )
channel.start_consuming()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
import time
connection = pika . BlockingConnection ( pika . ConnectionParameters ( host = 'localhost' ) )
channel = connection . channel ( )
channel . queue_declare ( queue = 'task_queue' , durable = True )
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback ( ch , method , properties , body ) :
print & [x] Received %r& % ( body , )
print & [x] Done&
ch . basic_ack ( delivery_tag = method . delivery_tag )
channel . basic_qos ( prefetch_count = 1 )
channel . basic_consume ( callback ,
queue = 'task_queue' )
channel . start_consuming ( )
默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。试着添加三个或更多得工作者(workers)。
从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。
def callback(ch, method, properties, body):
print & [x] Received %r& % (body,)
print “正在搞呀”
print & [x] Done&
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='hello')
def callback ( ch , method , properties , body ) :
print & [x] Received %r& % ( body , )
print “正在搞呀”
print & [x] Done&
ch . basic_ack ( delivery_tag = method . delivery_tag )
channel . basic_consume ( callback ,
queue = 'hello' )
运行上面的代码,我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。
如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,它将会流失所有的队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。
channel.queue_declare(queue='task_queue', durable=True)
channel . queue_declare ( queue = 'task_queue' , durable = True )
这两条就是为啥rabbitmq比zeromq更可靠的原因 !
还有一个就是针对消费者的分发的策略。
实现负载均衡,可以在消费者端通知RabbitMQ,一个消息处理完之后才会接受下一个消息。
channel.basic_qos(prefetch_count=1)
channel . basic_qos ( prefetch_count = 1 )
注意:要防止如果所有的消费者都在处理中,则队列中的消息会累积的情况。
对于这样的情况,要不就先用redis中转下,要不就多加work工作者来更多的处理任务。
如果不觉得rabbitmq兔子重的话,那么有任务派发的需求下,用rabbitmq不错的选择。&
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致2271人阅读
消息队列(7)
上一篇文章中,一个队列只有一个消费者,其实可以同时有多个消费者从同一队列里面取消息,如何分配有rabbitmq服务器决定;
代码基本上如上文一致,只是有多个consumer在监控着队列,每个consumer独立处理获取的消息;
1:消息的确认机制
& & & &目前的代码,一旦consumer获取到message,那么这个message就立刻从queue里面移除(自动的消息接收确认);但是如果还没有处理该message,worker被kill,那么这个消息就没有被成功处理;此外,一个consumer可能同时收到了多个消息,这些消息也相当于丢失;
& & & &此时,需要使用消息的手动确认机制,处理成功之后,通知rabbitmq服务器将消息删除;如果没有收到确认消息,改消息状态变成unacked,不会删除;如果rabbitmq重启或者当前client链接失效或者当前worker失效,unacked的消息会参与重新分配,有consumer重新处理;
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck =//默认的是true,自动确认
channel.basicConsume(&hello&, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//此时,consumer可能已经从rabbitmq获得和多个消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//delivery.getEnvelope().getDeliveryTag()消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
2:消息的持久化
& & &目前代码,消息仍然有可能丢失;如果rabbitmq服务器挂掉,队列和消息都没有被持久化;
& &&boolean durable =
channel.queueDeclare(&hello&, durable, false, false, null);
//声明该队列需要持久化& &此外消息也需要被持久化,有可能rabbitmq收到了消息但是还没有放入队列,服务器挂了,此时消息仍有可能丢失;
channel.basicPublish(&&, &task_queue&,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
3:公平的消息分发
& & &目前的消息队列,分发消息时没有考虑consumer的具体情况,有可能造成有的consumer负载过重,有的consumer负载太轻;
& & &应该考虑consumer没有确认消息的数量,如果unacked的消息过多,则应该少往此consumer发送消息;
& & &int prefetchCount = ;//maximum number of messages that the server will deliver, 0 if unlimited
channel.basicQos(prefetchCount);//
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:26527次
积分:1169
积分:1169
排名:千里之外
原创:93篇
转载:12篇
(3)(1)(28)(21)(4)(8)(38)(2)1068人阅读
rabbitMQ中consumer通过建立到queue的连接,创建channel对象,通过channel通道获取message,
Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息,
最近翻阅了基于java的客户端的相关源码,简单做个分析。
编程模型伪代码如下:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
Channel channel=conn.createChannel();
创建Connection需要指定MQ的物理地址和端口,是socket tcp物理连接,而channel是一个逻辑的概念,支持在tcp连接上创建多个MQ channel
以下是基于channel上的两种消费方式。
1、Subscribe订阅方式
boolean autoAck =
channel.basicConsume(queueName, autoAck, &myConsumerTag&,
&&&& new DefaultConsumer(channel) {
&&&&&&&& @Override
&&&&&&&& public void handleDelivery(String consumerTag,
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& Envelope envelope,
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& AMQP.BasicProperties properties,
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& byte[] body)
&&&&&&&&&&&& throws IOException
&&&&&&&& {
&&&&&&&&&&&& String routingKey = envelope.getRoutingKey();
&&&&&&&&&&&& String contentType = properties.contentT
&&&&&&&&&&&& long deliveryTag = envelope.getDeliveryTag();
&&&&&&&&&&&& // (process the message components here ...)
&&&&&&&&&&&& channel.basicAck(deliveryTag, false);
&&&&&&&& }
订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息,
这样当MQ 中queue有消息时,会自动把消息通过该socket(长连接)通道发送出去。
参见ChannelN中的方法
&&& public String basicConsume(String queue, boolean autoAck, String consumerTag,
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& boolean noLocal, boolean exclusive, Map&String, Object& arguments,
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& final Consumer callback)
&&&&&&& throws IOException
&&& ......
&&&&&&& rpc((Method)
&&&&&&&&&&& new Basic.Consume.Builder()
&&&&&&&&&&&& .queue(queue)
&&&&&&&&&&&& .consumerTag(consumerTag)
&&&&&&&&&&&& .noLocal(noLocal)
&&&&&&&&&&&& .noAck(autoAck)
&&&&&&&&&&&& .exclusive(exclusive)
&&&&&&&&&&&& .arguments(arguments)
&&&&&&&&&&& .build(),
&&&&&&&&&&& k);
&&&&&&& try {
&&&&&&&&&&& return k.getReply();
&&&&&&& } catch(ShutdownSignalException ex) {
&&&&&&&&&&& throw wrap(ex);
Consumer接收消息的过程:
创建Connection后,会启动MainLoop后台线程,循环从socket(FrameHandler)中获取数据包(Frame),调用channel.handleFrame(Frame frame)处理消息,
&&& public void handleFrame(Frame frame) throws IOException {
&&&&&&& AMQCommand command = _
&&&&&&& if (command.handleFrame(frame)) { // 对消息进行协议assemble
&&&&&&&&&&& _command = new AMQCommand(); // prepare for the next one
&&&&&&&&&&& handleCompleteInboundCommand(command);//对消息消费处理
ChannelN.handleCompleteInboundCommand
&&&&&& ---ChannelN.processAsync
&&&&&&&&&& ----dispatcher.handleDelivery
&&&&&&&&&&&&&&& &---QueueingConsumer.handleDelivery
&&&&&&&&&&&&&&&&&&&& ---this._queue.add(new Delivery(envelope, properties, body));//消息最终放到队列中
每个Consumer都有一个BlockQueue,用于缓存从socket中获取的消息。
接下来,Consumer对象就可以调用api来从客户端缓存的_queue中依次获取消息,进行消费,参见QueueingConsumer.nextDelivery()
对于这种长连接的方式,没看到心跳功能,以防止长连接的因网络等原因连接失效
2、poll API方式
ChannelN:
GetResponse basicGet(String queue, boolean autoAck)
这种方式比较简单,直接通过RPC从MQ Server端获取队列中的消息
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:821182次
积分:15720
积分:15720
排名:第512名
原创:440篇
转载:1897篇
评论:48条
(12)(46)(6)(4)(34)(6)(9)(13)(9)(1)(3)(1)(12)(40)(27)(70)(184)(125)(127)(118)(73)(122)(295)(341)(154)(76)(161)(202)(30)(16)(8)(7)(1)(17)(26)(1)(1)(2)
http://www.vpser.net/rabbitmq可靠确认模式的java封装及示例 - 简书
<div class="fixed-btn note-fixed-download" data-toggle="popover" data-placement="left" data-html="true" data-trigger="hover" data-content=''>
写了59398字,被175人关注,获得了188个喜欢
rabbitmq可靠确认模式的java封装及示例
rabbitmq.png
最近的一个计费项目,在rpc调用和流式处理之间徘徊了许久,后来选择流式处理。一是可以增加吞吐量,二是事务的控制相比于rpc要容易很多。
确定了流式处理的方式,后续是技术的选型。刚开始倾向于用storm,无奈文档实在太少,折腾起来着实费劲。最终放弃,改用消息队列+微服务的方式实现。
消息队列的选型上,有activemq,rabbitmq,kafka等。最开始倾向于用activemq,因为以前的项目用过,很多代码都是可直接复用的。后来看了不少文章对比,发现rabbitmq对多语言的支持更好一点,同时相比于kafka,牺牲了部分的性能换取了更好的稳定性安全性以及持久化。
最终决定使用rabbitmq。rabbitmq的官网如下:
对rabbitmq的封装,有几个目标:1 提供send接口2 提供consume接口3 保证消息的事务性处理
所谓事务性处理,是指对一个消息的处理必须严格可控,必须满足原子性,只有两种可能的处理结果:(1) 处理成功,从队列中删除消息(2) 处理失败(网络问题,程序问题,服务挂了),将消息重新放回队列为了做到这点,我们使用rabbitmq的手动ack模式,这个后面细说。
1 send接口
public interface MessageSender {
DetailRes send(Object message);
send接口相对简单,我们使用spring的RabbitTemplate来实现,代码如下:
//1 构造template, exchange, routingkey等
//2 设置message序列化方法
//3 设置发送确认
//4 构造sender方法
public MessageSender buildMessageSender(final String exchange, final String routingKey, final String queue) throws IOException, TimeoutException {
Connection connection = connectionFactory.createConnection();
buildQueue(exchange, routingKey, queue, connection);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setExchange(exchange);
rabbitTemplate.setRoutingKey(routingKey);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
("send message failed: " + cause); //+ correlationData.toString());
throw new RuntimeException("send error " + cause);
return new MessageSender() {
public DetailRes send(Object message) {
rabbitTemplate.convertAndSend(message);
} catch (RuntimeException e) {
e.printStackTrace();
("send failed " + e);
rabbitTemplate.convertAndSend(message);
} catch (RuntimeException error) {
error.printStackTrace();
("send failed again " + error);
return new DetailRes(false, error.toString());
return new DetailRes(true, "");
2 consume接口
public interface MessageConsumer {
DetailRes consume();
在consume接口中,会调用用户自己的MessageProcess,接口定义如下:
public interface MessageProcess&T& {
DetailRes process(T message);
consume的实现相对来说复杂一点,代码如下:
//1 创建连接和channel
//2 设置message序列化方法
//3 构造consumer
public &T& MessageConsumer buildMessageConsumer(String exchange, String routingKey,
final String queue, final MessageProcess&T& messageProcess) throws IOException {
final Connection connection = connectionFactory.createConnection();
buildQueue(exchange, routingKey, queue, connection);
final MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
final MessageConverter messageConverter = new Jackson2JsonMessageConverter();
return new MessageConsumer() {
consumer = buildQueueConsumer(connection, queue);
//1 通过delivery获取原始数据
//2 将原始数据转换为特定类型的包
//3 处理数据
//4 手动发送ack确认
public DetailRes consume() {
QueueingConsumer.Delivery delivery =
Channel channel = consumer.getChannel();
delivery = consumer.nextDelivery();
Message message = new Message(delivery.getBody(),
messagePropertiesConverter.toMessageProperties(delivery.getProperties(), delivery.getEnvelope(), "UTF-8"));
@SuppressWarnings("unchecked")
T messageBean = (T) messageConverter.fromMessage(message);
DetailRes detailRes = messageProcess.process(messageBean);
if (detailRes.isSuccess()) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
("send message failed: " + detailRes.getErrMsg());
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
return detailR
} catch (InterruptedException e) {
e.printStackTrace();
return new DetailRes(false, "interrupted exception " + e.toString());
} catch (IOException e) {
e.printStackTrace();
retry(delivery, channel);
("io exception : " + e);
return new DetailRes(false, "io exception " + e.toString());
} catch (ShutdownSignalException e) {
e.printStackTrace();
channel.close();
} catch (IOException io) {
io.printStackTrace();
} catch (TimeoutException timeout) {
timeout.printStackTrace();
consumer = buildQueueConsumer(connection, queue);
return new DetailRes(false, "shutdown exception " + e.toString());
} catch (Exception e) {
e.printStackTrace();
("exception : " + e);
retry(delivery, channel);
return new DetailRes(false, "exception " + e.toString());
3 保证消息的事务性处理rabbitmq默认的处理方式为auto ack,这意味着当你从消息队列取出一个消息时,ack自动发送,mq就会将消息删除。而为了保证消息的正确处理,我们需要将消息处理修改为手动确认的方式。(1) sender的手工确认模式首先将ConnectionFactory的模式设置为publisherConfirms,如下
connectionFactory.setPublisherConfirms(true);
之后设置rabbitTemplate的confirmCallback,如下:
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
("send message failed: " + cause); //+ correlationData.toString());
throw new RuntimeException("send error " + cause);
(2) consume的手工确认模式首先在queue创建中指定模式
channel.exchangeDeclare(exchange, "direct", true, false, null);
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
channel.queueDeclare(queue, true, false, false, null);
只有在消息处理成功后发送ack确认,或失败后发送nack使信息重新投递
if (detailRes.isSuccess()) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
("send message failed: " + detailRes.getErrMsg());
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
4 自动重连机制为了保证rabbitmq的高可用性,我们使用rabbitmq Cluster模式,并配合haproxy。这样,在一台机器down掉时或者网络发生抖动时,就会发生当前连接失败的情况,如果不对这种情况做处理,就会造成当前的服务不可用。在spring-rabbitmq中,已实现了connection的自动重连,但是connection重连后,channel的状态并不正确。因此我们需要自己捕捉ShutdownSignalException异常,并重新生成channel。如下:
catch (ShutdownSignalException e) {
e.printStackTrace();
channel.close();
//recreate channel
consumer = buildQueueConsumer(connection, queue);
5 consumer线程池在对消息处理的过程中,我们期望多线程并行执行来增加效率,因此对consumer做了一个线程池的封装。线程池通过builder模式构造,需要准备如下参数:
//线程数量
int threadC
//处理间隔(每个线程处理完成后休息的时间)
long intervalM
//exchange及queue信息
String routingK
//用户自定义处理接口
MessageProcess&T& messageP
核心循环也较为简单,代码如下:
public void run() {
while (!stop) {
DetailRes detailRes = messageConsumer.consume();
if (infoHolder.intervalMils & 0) {
Thread.sleep(infoHolder.intervalMils);
} catch (InterruptedException e) {
e.printStackTrace();
("interrupt " + e);
if (!detailRes.isSuccess()) {
("run error " + detailRes.getErrMsg());
} catch (Exception e) {
e.printStackTrace();
("run exception " + e);
6 使用示例最后,我们还是用一个例子做结。(1) 定义model
//参考lombok
@AllArgsConstructor
@NoArgsConstructor
public class UserMessage {
(2) rabbitmq配置配置我们使用@Configuration实现,如下:
@Configuration
public class RabbitMQConf {
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPublisherConfirms(true); // enable confirm mode
return connectionF
(3) sender示例
public class SenderExample {
private static final String EXCHANGE = "example";
private static final String ROUTING = "user-example";
private static final String QUEUE = "user-example";
@Autowired
ConnectionFactory connectionF
private MessageSender messageS
@PostConstruct
public void init() throws IOException, TimeoutException {
MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
messageSender = mqAccessBuilder.buildMessageSender(EXCHANGE, ROUTING, QUEUE);
public DetailRes send(UserMessage userMessage) {
return messageSender.send(userMessage);
(4) MessageProcess(用户自定义处理接口)示例,本例中我们只是简单的将信息打印出来
public class UserMessageProcess implements MessageProcess&UserMessage& {
public DetailRes process(UserMessage userMessage) {
System.out.println(userMessage);
return new DetailRes(true, "");
(5) consumer示例
public class ConsumerExample {
private static final String EXCHANGE = "example";
private static final String ROUTING = "user-example";
private static final String QUEUE = "user-example";
@Autowired
ConnectionFactory connectionF
private MessageConsumer messageC
@PostConstruct
public void init() throws IOException, TimeoutException {
MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
messageConsumer = mqAccessBuilder.buildMessageConsumer(EXCHANGE, ROUTING, QUEUE, new UserMessageProcess());
public DetailRes consume() {
return messageConsumer.consume();
(6) 线程池consumer示例在main函数中,我们使用一个独立线程发送数据,并使用线程池接收数据。
public class PoolExample {
private static final String EXCHANGE = "example";
private static final String ROUTING = "user-example";
private static final String QUEUE = "user-example";
@Autowired
ConnectionFactory connectionF
private ThreadPoolConsumer&UserMessage& threadPoolC
@PostConstruct
public void init() {
MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
MessageProcess&UserMessage& messageProcess = new UserMessageProcess();
threadPoolConsumer = new ThreadPoolConsumer.ThreadPoolConsumerBuilder&UserMessage&()
.setThreadCount(Constants.THREAD_COUNT).setIntervalMils(Constants.INTERVAL_MILS)
.setExchange(EXCHANGE).setRoutingKey(ROUTING).setQueue(QUEUE)
.setMQAccessBuilder(mqAccessBuilder).setMessageProcess(messageProcess)
public void start() throws IOException {
threadPoolConsumer.start();
public void stop() {
threadPoolConsumer.stop();
public static void main(String[] args) throws IOException {
ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
PoolExample poolExample = ac.getBean(PoolExample.class);
final SenderExample senderExample = ac.getBean(SenderExample.class);
poolExample.start();
new Thread(new Runnable() {
int id = 0;
public void run() {
while (true) {
senderExample.send(new UserMessage(id++, "" + System.nanoTime()));
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}).start();
7 github地址,路过的帮忙点个星星,谢谢^_^。
附:rabbitmq安装过程:mac版安装可以使用homebrew。brew install就可以,安装好之后通过brew services start rabbitmq启动服务。通过
就可以在页面端看到rabbitmq了,如下:
rabbitmq_manager.png
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
打开微信“扫一扫”,打开网页后点击屏幕右上角分享按钮
被以下专题收入,发现更多相似内容:
如果你是程序员,或者有一颗喜欢写程序的心,喜欢分享技术干货、项目经验、程序员日常囧事等等,欢迎投稿《程序员》专题。
专题主编:小...
· 268426人关注
玩转简书的第一步,从这个专题开始。
想上首页热门榜么?好内容想被更多人看到么?来投稿吧!如果被拒也不要灰心哦~入选文章会进一个队...
· 148628人关注
程序员日常,代码,教程,学习笔记,谢绝推广文,软推文,软广告,blabalabala...
· 6986人关注
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
选择支付方式:}

我要回帖

更多关于 移动流量不够用怎么办 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信