博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[九]RabbitMQ-客户端源码之Consumer
阅读量:5951 次
发布时间:2019-06-19

本文共 3470 字,大约阅读时间需要 11 分钟。

在中讲述basicConsume的方法时设计到Consumer这个回调函数,Consumer其实是一个接口,真正实现它的是QueueingConsumer和DefaultConsumer,且DefaultConsumer是QueueingConsumer的父类,里面都是空方法。在用户使用时可以简单的采用QueueingConsumer或者采用DefaultConsumer来重写某些方法。

这里先来看下消费者客户端的关键代码:

QueueingConsumer consumer = new QueueingConsumer(channel);        channel.basicQos(32);        channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer)        while (true) {            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            System.out.println(" [X] Received '" + message + "'");            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        }

可以看到QueueingConsumer作为channel.basicConsume的回调函数,之后再进行处理。

在AMQConnection中有关MainLoop的主线程,专门用来”第一线”的处理Broker发送回客户端从帧。当Basic.Consume/.ConsumeOk开启消费模式之后,Broker主动的向客户端发送Basic.Delivery帧,MainLoop线程一步步的调用,最后到ChannelN的processAsync()方法中有:

if (method instanceof Basic.Deliver) {    processDelivery(command, (Basic.Deliver) method);    return true;}

之后调用processDelivery方法:

protected void processDelivery(Command command, Basic.Deliver method) {    Basic.Deliver m = method;    Consumer callback = _consumers.get(m.getConsumerTag());    if (callback == null) {        if (defaultConsumer == null) {            throw new IllegalStateException("Unsolicited delivery -" + " see Channel.setDefaultConsumer to handle this" + " case.");        }        else {            callback = defaultConsumer;        }    }    Envelope envelope = new Envelope(m.getDeliveryTag(), m.getRedelivered(),m.getExchange(),m.getRoutingKey());    try {        this.dispatcher.handleDelivery(callback, m.getConsumerTag(),envelope, (BasicProperties) command.getContentHeader(),command.getContentBody());    } catch (Throwable ex) {        getConnection().getExceptionHandler().handleConsumerException(this, ex,callback,m.getConsumerTag(), "handleDelivery");    }}

这个方法首先根据consumerTag从ChannelN中的_consumer这个HashMap中获取相应的Consumer回调函数,然后调用这个回调函数的handleDeliver()方法进行处理,这里有些同学会有疑问,明明是调用ConsumerDispatcher dispatcher的handleDeliver()方法,其实这里只是包了一层皮,ConsumerDispatcher的handleDeliver()方法就是调用了Consumer的handleDeliver()方法。

我们接下去看看QueueingConsumer这个实现Consumer接口的类是怎么处理的:

@Override public void handleDelivery(String consumerTag,                           Envelope envelope,                           AMQP.BasicProperties properties,                           byte[] body)    throws IOException{    checkShutdown();    this._queue.add(new Delivery(envelope, properties, body));}

这里的queue就是一个LinkedBlockingQueue,客户端程序通过调用nextDelivery()方法来获取数据:

public Delivery nextDelivery()    throws InterruptedException, ShutdownSignalException, ConsumerCancelledException{    return handle(_queue.take());}private Delivery handle(Delivery delivery) {    if (delivery == POISON ||        delivery == null && (_shutdown != null || _cancelled != null)) {        if (delivery == POISON) {            _queue.add(POISON);            if (_shutdown == null && _cancelled == null) {                throw new IllegalStateException(                    "POISON in queue, but null _shutdown and null _cancelled. " +                    "This should never happen, please report as a BUG");            }        }        if (null != _shutdown)            throw Utility.fixStackTrace(_shutdown);        if (null != _cancelled)            throw Utility.fixStackTrace(_cancelled);    }    return delivery;}

这个nextDelivery方法说白就是一个LinkedBlockingQueue的take()操作,也就是一个可能会阻塞等待的操作。


附:本系列全集

转载地址:http://yjpxx.baihongyu.com/

你可能感兴趣的文章
正则表达式验证身份证格式是否正确
查看>>
Firebird(全功能的,免维护的数据库,能够管理多个独立的数据库) V2.1.3 英文特别版...
查看>>
xml格式文件解析
查看>>
ios百度地图-路径规划
查看>>
Python高效编程技巧
查看>>
配置Eclipse使用maven构建项目默认JDK为1.8
查看>>
jsp内置对象以及jsp动作
查看>>
Struts上路_09-数据类型转换
查看>>
CMake与动态链接库(dll, so, dylib)
查看>>
myeclipse(eclipse)乱码处理
查看>>
SpringBoot 过滤器, 拦截器, 监听器 对比及使用场景
查看>>
数据库索引探索
查看>>
struts2使用json需要注意的问题
查看>>
gitlab runner 优化
查看>>
快速添加百度网盘文件到Aria2 猴油脚本
查看>>
mac 无法登录mysql的解决办法
查看>>
Shiro权限判断异常之命名导致的subject.isPermitted 异常
查看>>
Hello world travels in cpp - 字符串(2)
查看>>
struts2自定义拦截器
查看>>
Eclipse安装adt插件后之后看不到andorid manger
查看>>