注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

网易杭研后台技术中心的博客

 
 
 
 
 

日志

 
 

rabbitmq简明使用系列3(队列模型,客户端使用)  

来自genww   2014-04-15 10:55:07|  分类: 默认分类 |举报 |字号 订阅

  下载LOFTER 我的照片书  |
前面讲了服务器端的配置及使用,现在讲讲客户端这边(java方向)

基本操作

创建与rabbitmq的连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("fs-0.photo.163.org");
factory.setPort(5672);
factory.setUsername("test");//可选
factory.setPassword("test");//可选
Connection connection = factory.newConnection();
//创建channel,channel是线程安全的,一般一个应用一个channel一个线程就好,如果多个线程,最好用多个channel(因为其方法都是序列化的)
Channel channel = connection.createChannel();
//定义队列类型,这是一个幂等的操作,它只有在该queue不存在的时候才起作用。无论在生产和消费都要定义,而且生产和消费的定义需要一致
//参数1队列名,参数2是否支持持久化,参数3是否为excluse队列(仅连接者可见且一旦断开就自动删除),参数4是否自动删除(没有任何消费者的话便队列便删除),参数5其他属性
channel.queueDeclare("test1", false, false, false, null);


插入队列
String message = "Hello World!";
//参数1指定exchange,参数2指定routingKey,这里可以理解为队列名,参数3消息类型
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

扩展:
消息类型说明:
MessageProperties.PERSISTENT_TEXT_PLAIN 消息需要持久化(注意队列类型要支持可持久化的)


取队列消息
// 可选:指定预取个数。默认rabbit将队列中的消息均分成n份分发给n个consumer,而不管consumer有多少消息是否被ack。指定预取为1了表示rabbit一次只需要推一条消息,直到该消息ack了一个再推下一条。注释1
channel.basicQos(1);
// QueueingConsumer里面会创建一个缓存队列用于保存服务器推过来的消息,这样可以异步的实现推送和消费,当然你可以在构造函数中自己传入用于buffer的queue.
QueueingConsumer consumer = new QueueingConsumer(channel);
// 要设置自动ack为false,注释2
channel.basicConsume("test1", false, consumer);

while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(message);
      // 处理完消息后要返回ack,表示已成功处理,否则服务器将不会删除该消息,内存很快被吃掉,注释3
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
      Thread.sleep(1000);
}

注意:实践中需要通过设置注释1,2,3来确定,如果不设置,当服务器有消息就会存入到客户端缓存中,一旦缓存数据没有及时处理,则会抛oom异常。

其他说明:
对于手动ack的队列,如果有多个消费者同时处理时会怎么样呢?
一般情况下是依次循环(round-robin)的往各个消费者放数据的,比如两个消费者,则a分别放1,3,5而b分别放2,4,6...
那假如消费者a先启动,消费者b后启动呢,由或者queue中已经有大量数据了呢?
当消费者a读到一定量的数据后(只是放在缓存中,并没有处理消息),消费者b启动后先看看mq中是否还剩数据(剩余=原数据 - 已ack数据 - 其他consumer拿到并缓存但还未ack),有则取之,没有等待。若此时消费者a挂了,消费者b能拿到剩余未ack的消息。由此看来,一个消费者拿到的数据不一定是顺序的。

如果我想拒绝收到的某个消息呢?
//参数2:为true表示该消息会发送给下一个consumer,为false则直接删除。
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true);


发布,订阅,路由消息

前面是基本的队列模型,下面我们讨论一下rabbitmq提供的其他消息模型
这里要说说rabbitmq的真实队列模型:它先将消息交给exchange,然后交给queue,最后由consumer来获取。
exchange实际上只做这些事情,发给哪个对应的queue或者多个queue,是否抛弃等
exchange类型有多个:
direct:消息直接按routingkey指定到对应的queue中,只有绑定了routingKey的queue且publish时指定的routingKey与之相同时queue才会收到数据,否则会被忽略。
topic:和direct类似,但是不是确定的routingkey,而是类似abc.*.#表达式(由word和.组成)。*匹配一个word,而#匹配0到多个word。当publish指定routingkey满足表达式要求时即可发送到指定的queue
headers
fanout:消息直接展开给每个对应的queue

//当在发送消息的时候,可以指定对应的exchange是什么:
channel.basicPublish("logs", "", null, message.getBytes());
参数1为指定exchange,若用""表示发送到默认的exchange,rabbit中的默认exchange类型是direct,表示消息将通过routingkey直接发送到对应的queue中
在这里我们指定exchange为logs,所以在之前需要自己创建一个新的exchange。参数2其实不需要,可以为""
//参数1:名称,参数2:类型
channel.exchangeDeclare("logs", "fanout");

//作为消费者(订阅者)它可能只需要临时的一个queue,一旦连接消失,则该queue消失。
创建临时queue
queueName = channel.queueDeclare().getQueue();

我们需要将exchange绑定到指定的queue上,这里可以理解为queue对logs感兴趣
//参数1queue名,参数2exchange名,参数3路由key。该key的具体作用由exchange决定。在fanout中被忽略
channel.queueBind(QUEUE_NAME, "logs", "");
注意绑定支持一个queue绑定多个routingKey,或者一个routingKey绑定到多个queue

rpc

在发送消息的时候也能够带上一些属性,以支持类似rpc的功能。在rabbitmq文档例子里,实现rpc功能实际上就是client发送带属性的消息,属性中包含回调tmp_queue名和用于标示该消息的correlationId。server端收到消息后从属性中取得tmp_queue,将处理的结果放在该queue中,并将拿到的correlationId作为结果消息的属性一并发送。client端有个while循环一直阻塞并获取tmp_queue消息,直到拿到响应消息并匹配correlationId一致后退出循环,表示拿到了rpc的响应。采用此方式实现rpc的好处是一旦一个server太慢,可以简单的启动一个新的server去读取queue并响应即可。
//创建消息属性(replayTo属性常用于指定回调queue,correlationId属性常用于指定在大量回调queue中的消息哪个是属于该发送消息的回复。在rabbitmq中这些属性没有特殊作用)
BasicProperties props = new BasicProperties().builder().correlationId("id1").replyTo("temp_queue").build();
//发送时带上消息属性(AMQP协议有14种属性之多,但是实际上很少有用)
channel.basicPublish("", QUEUE_NAME, props, message.getBytes());

//接收消息时获取属性:
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();




包装publisher为transaction或confirm

前面虽然可以持久化消息,但是并不能完全保证消息不回丢失(不会每次接收到消息都fsync(2)),要保证消息不丢失,可以采用该方法
//开启事务
channel.txSelect();
//发送内容
channel.basicPublish("topic", "name", null, message.getBytes());
...
//提交事务
channel.txCommit();

//使用confirm实现轻量级的确认(即被routed的消息和丢弃的消息将被,持久化的消息将被确认。不支持回滚)
channel.confirmSelect();
channel.basicPublish("topic", "name", null, message.getBytes());
channel.waitForConfirmsOrDie();

  评论这张
 
阅读(3721)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017