手把手一起入门 RabbitMQ 的六大使用模式(Java 客户端)

原文地址:手把手一起入门 RabbitMQ 的六大使用模式(Java 客户端)

为什么使用 MQ?

在这里我就不多说了,无非就是削峰、解耦和异步。这里没有许多关于 MQ 的理论和观点,只想手把手带你一起学习 RabbitMQ 的六大使用模式!

一、通俗行列

我们发送新闻和吸收新闻时,只需要直接指定行列的名字即可。这是最简朴的一种使用场景。

生产者:使用 channel 发送新闻时,直接指定 queueName。

public class Send {

    private static final String queueName = "hyf.hello.queue";

    public static void main(String[] args) throws Exception{

        ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()){

            // 是否持久化(默认保留在内存,可以持久化到磁盘)
            boolean durable = false;
            // 是否独占(此 Connection 独占,通过其他 Connection 确立的 channel 无法访问此行列)
            boolean exclusive = false;
            // 是否自动删除行列(行列没有消费者时,删除)
            boolean autoDelete = false;
            channel.queueDeclare(queueName, durable, exclusive, autoDelete, null);

            String message = "Hello world3!";
            // 第一个参数是交换器名字,第二个参数是 routingKey(不使用交换器时,为行列名称),第三个参数是新闻属性(AMQP.BasicProperties),第四个参数是新闻
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("公布乐成");
        }
    }
}

注重:使用 try-with-resources ,在程序结束时,我们不用显式挪用 close() 方式来关闭资源。

消费者:也是用 channel 指定 queueName,然后绑定一个交付回调。

public class Receive {

    private static final String queueName = "hyf.hello.queue";

    public static void main(String[] args) throws Exception{

        ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(queueName, false, false, false, null);
        // 回调(吸收 RabbitMQ 服务器发送过来的新闻)
        DeliverCallback deliverCallback =  (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(message);
        };

        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
    }
}

注重:这里我们可以不用 try-with-resource,由于消费者需要一直运行着。

关于通俗行列,人人可以理解为下图:
手把手一起入门 RabbitMQ 的六大使用模式(Java 客户端)

二、事情模式(work queues)

通俗行列中,都是一个消费者去消费行列,而在 work 模式中,是多个消费者同时去消费同一个行列。

生产者和消费者我们照样可以用回上面的代码。

1、循环轮询

默认情况下,RabbitMQ 将按顺序将每个新闻发送给下一个使用者。平均而言,每个消费者都市收到相同数目的新闻。这种分发新闻的方式称为循环。

这样会导致一个问题,纵然其中一个消费者消费速率很快,已经消费完 RabbitMQ 新闻,而且行列中另有未消费新闻(已经分配给其他消费者),那么他也将在白白守候,RabbitMQ 而不会说将分配的新闻接纳重新分配给空闲的消费者。

2、自动提交新闻 ack

默认情况下,消费者会不准时自动提交 ack,不管新闻是否消费乐成,而当 RabbitMQ 吸收到消费者的 ack 新闻后,会将新闻添加删除标识来标识新闻已被消费乐成。然则这个自动 ack 机制会导致新闻丢失和新闻重复消费问题。

  • 客户端还没消费某条新闻,就自动提交了 ack,若是此时客户端宕机了,那么会导致这条新闻消费失败;而 RabbitMQ 在吸收到 ack 时,也将这条新闻标记为已消费,那么也无法重新消费了。
  • 客户端已经消费某条新闻,然则还没自动提交 ack 就宕机了,此时就会导致新闻重复消费,由于 RabbitMQ 没收到 ack 新闻,那么这条新闻没有被设置为删除标识,以是消费者还可以消费此条新闻。

3、手动 ack 解决空闲消费者、新闻丢失、新闻重复消费

消费者:

a. 限制每次读取新闻数目:

我们行使 basicQos() 方式来设置 prefetchCount(预期计数) 为1,即 限制客户端每次都只读取一个新闻,只有当这个新闻消费完了,才气继续读取下一个新闻。

b. 手动 ack:

接着我们需要关闭自动提交 ack,而且在消费完新闻后,手动提交 ack。只有当 RabbitMQ 收到 ack 新闻后,才会认定这个新闻已经消费完了,继续给消费者推送下一条新新闻。

最后看看代码:

public class Receive1 {

    private static final String queueName = "hyf.work.queue";

    public static void main(String[] args) throws Exception{

        ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(queueName, false, false, false, null);
        // 每次只读取一条新闻
        channel.basicQos(1);
        DeliverCallback deliverCallback =  (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            ThreadUtil.sleep(2, TimeUnit.SECONDS);
            System.out.println(message);
            // 是否批量提交
            boolean multiple = false;
            // 手动 ack
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),multiple);
        };
        // 作废自动 ack
        boolean autoAck = false;
        channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});
    }
}

总结:只有当我们使用了手动ack 和 prefetchCount = 1 ,事情模式才算乐成启动。
手把手一起入门 RabbitMQ 的六大使用模式(Java 客户端)

4、扩展点:若何保证新闻不丢失

当发送者发送新闻到 RabbitMQ 后,RabbitMQ 会将新闻缓存在内存中,而若是此时 RabbitMQ 宕机了,默认情况下,内存中的 queue 和 message 都市所有丢失。

而若是我们需要保证新闻不丢失,那么需要告诉 RabbitMQ 若何做;此时我们需要做的是:将 queue 和 message 都设置为持久化。

queue 持久化:

private static final String queueName = "hyf.work.queue";

boolean durable = true;
channel.queueDeclare(queueName, durable, false, false, null);

注重:若是一开始 queue 已经界说为不持久化,那么我们不能重界说为持久化;当 RabbitMQ 检测到 queue 被重界说了,那么会返回一个错误来提醒我们。

message 持久化:

private static final String queueName = "hyf.work.queue";

channel.basicPublish("", queueName,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

三、公布订阅模式(Publish/Subscribe)

上面的 work queue,每一个新闻只能被一个消费者消费。而有些场景,我们需要一个新闻可以被多个消费者消费;例如:用户下了订单,短信通知模块需要给用户发送一个短信通知,库存模块需要凭据用户下单信息减去商品的库存等等,此时我们需要使用公布订阅模式。

1、交换器 exchange

要做公布订阅模式,我们首先需要使用到交换器,生产者不再直接行使 channel
往 queue 发送新闻,而是将新闻发送到交换器,让交换器来决议发送到哪些 queue 中。

RabbitMQ 提供了几个类型的交换器:directtopicheadersfanout

使用公布订阅模式,我们只需要使用 fanout 类型的交换器,fanout 类型的交换器,会将新闻发送到所有绑定到此交换器的 queue。

2、生产者发送新闻:

行使 channel 声明交换器:

// 声明交换器名字和类型
channel.exchangeDeclare(exchangeName,"fanout");

接着我们就可以直接指定交换器举行新闻公布:

// 第二个参数是 queueName/routingKey
channel.basicPublish(exchangeName , "", null, message.getBytes())

完整代码:

public class Send {

    private static final String exchangeName = "hyf.ps.exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()){
            // 声明 fanout 类型的交换器
            channel.exchangeDeclare(exchangeName,"fanout");
            for (int i = 0; i <= 10; i++){
                String message = "新闻"+i;
                // 直接指定交换器举行新闻公布
                channel.basicPublish(exchangeName,"", null, message.getBytes());
            }
        }
    }
}

我们可以发现,我们不再需要指定 queueName,而是直接指定 exchangeName,将新闻发送到交换器,由交换器决议公布到哪些 queue。

3、消费者:queue 与 exchange 确立绑定关系

确立绑定前,我们照样需要先声明 fanout 类型的交换器,而且命名要和生产者声明时的名字一致:

channel.exchangeDeclare(exchangeName, "fanout");

接着,将 queue 和 fanout 类型的交换器确立绑定新闻,交换器会将新闻发送到和它有绑定关系的 queue。

channel.queueBind(queueName, exchangeName, "");

此时,行列已经和交换器乐成确立绑定关系,交换器吸收到新闻时,会发送到与交换器绑定的所有行列中。

如何科学地完成一场 AR 发布会?全在这份超细节活动策划 Xmind 里了

最后,我们再挪用 channel.basicConsume() 举行行列监听和 绑定回调,借此来吸收和消费新闻:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

完整代码:

public class Receive1 {

    private static final String exchangeName = "hyf.ps.exchange";
    private static final String queueName = "hyf.ps.queue1";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(exchangeName,"fanout");
        channel.queueDeclare(queueName,false, false, false, null);
        channel.queueBind(queueName, exchangeName,"");

        DeliverCallback callback = (s, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };

        channel.basicQos(1);
        boolean autoAck = false;
        channel.basicConsume(queueName, autoAck, callback, consumerTag -> {});
    }
}

关于公布订阅模式,我们可以理解为下图:
手把手一起入门 RabbitMQ 的六大使用模式(Java 客户端)

4、公布丁订阅模式中使用事情模式

公布订阅模式中,我们照样可以继续使用上面的事情模式(多个消费者订阅同一个行列)。由于在分布式系统中,一个服务往往有多个实例,例如库存模块可以有多个实例,我们行使手动 ack 和 prefetchCount = 1,照样可以让 fanout 类型交换器的其中一个 queue 进入事情模式。

四、路由模式(routing)

上面的公布订阅模式,只要是与 fanout 类型交换器绑定的 queue,都市吸收到交换器公布的新闻。而我们现在的场景需要加倍天真新闻分配机制。例如:error 行列只会吸收到 error 类型的信息,info 行列只会吸收都 info 类型的信息等等。

那么我们需要是使用天真的路由模式,而这种模式照样需要由交换器来完成,然则此时需要使用 direct 类型的交换器来替换 fanout 类型的交换器。

bindingKey 和 routingKey

做到路由模式,不只要使用 direct 类型的交换器,还需要行使 bindingKeyroutingKey 来完成。bindingKey 是消费者端的观点,而 routingKey 是生产者端的观点。

1、bingdingKey

公布订阅模式的消费者代码中,我们可以发现:将 queue 与交换器确立绑定关系的 queueBind() 方式中,第三个参数是空的,实在这就是设置 bindingKey 的地方。固然了,纵然第三个参数不为空,fanout 类型的交换器照样会直接忽略掉的。

channel.queueBind(queueName, exchangeName, "");

例如现在我们的消费者要监听 error 类型的信息,我们需要声明 direct 类型的交换器,而且给 queue 绑定值为 error 的 bindingKey 。

public class ErrorReceive {

    private static final String exchangeName = "hyf.routing.exchange";
    private static final String queueName = "hyf.routing.error.queue";
    private static final String bindingKey = "error";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明 exchange 和 queue
        channel.exchangeDeclare(exchangeName, "direct");
        channel.queueDeclare(queueName, false, false, false, null);

        // 举行绑定
        channel.queueBind(queueName, exchangeName, bindingKey);

        DeliverCallback callback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(),"utf-8");
            System.out.println("ErrorReceive 吸收到" + delivery.getEnvelope().getRoutingKey() + "新闻:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };

        channel.basicQos(1);
        channel.basicConsume(queueName, false, callback, consumerTag -> {});
    }
}

例如现在我们的消费者2要监听 info 类型的信息,这也是异常简朴,同样是上面的代码,只需要修改 queueName 和 bindingKey 即可。

// ... 省略

private static final String queueName = "hyf.info.queue";
private static final String bindingKey = "info";

// ... 省略

2、queue 绑定多个 bindingKey

上面的 hyf.error.queue 行列,只绑定了值为 error 的 bindingKey,若是现在我们不只需要吸收 error 类型的信息,还需要 info 类型的信息,那么我们可以为 hyf.error.queue 再绑定多一个值为 info 的 bindingKey。

private static final String bindingKey = "error";
private static final String bindingKey2 = "info";

// 举行绑定
channel.queueBind(queueName, exchangeName, bindingKey);
channel.queueBind(queueName, exchangeName, bindingKey2);

此时,hyf.error.queue 行列同时绑定了 error 和 info 这两个 bindingKey,那么它就能同时吸收到 error 类型和 info 类型的信息。

3、routingKey

在公布订阅模式中。我们可以看到公布新闻的 basicPublish() 方式的第二参数是空的,而第二个参数实在就是 routingKey。

channel.basicPublish( exchangeName, "", null, message.getBytes());

我们可以发现,在通俗行列和事情模式中,我们都是指定 queueName 去发送新闻,而 queueName 在 basicPublish 也是第二个位置。以是,在我们不使用交换器时,routingKey 指定的就是 queueName。而当我们使用交换器时,那么 routingKey 就有更厚实的寄义了,它不再只是简朴直接的 queueName,而是林林总总的路由寄义。

要使得上面绑定了 bindingKey 为 error 和 info 的 hyf.error.queue 行列吸收到新闻,那么需要新闻发送者指定 routingKey 为 error 或 info ,然后使用 direct 类型的交换器公布新闻。

private static final String exchangeName = "hyf.log.exchange";
private static final String routingKey = "error";
private static final String routingKey2 = "info";

channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());

当执行上面代码,hyf.error.queue 行列能收到两条新闻,而 hyf.info.queue 只能收到 routingKey 为 info 的新闻。

即当 queue 绑定的 bindingKey 和发送新闻时的 routingKey 完全一致,那么 queue 就能吸收到交换器发送的新闻,我们可以理解为下图:
手把手一起入门 RabbitMQ 的六大使用模式(Java 客户端)

五、主题模式(topic)

上面的路由模式虽然能让我们凭据营业加倍天真的去吸收指定(多种)类型的新闻;然则我们可以发现,若是现在我们想让消费者吸收所有类型的信息,例如 error、info、debug、fail 等新闻所有都要吸收,那么就要挪用多次 queueBind() 方式给 queue 绑定多个 bindingKey,这就显得有点麻烦了。

此时我们可以使用主题模式,纵然用 topic 类型的交换器,然后行使 *# 这两个符号来搞定上面的需求。

1、* 和 # 的使用

“*” 示意匹配一个字符,”#” 示意匹配0个或多个字符

2、场景

我们现在有多个 routingKey 的新闻,例如用户上岸信息 user.login.info,订单信息 order.detail.info,用户的注册信息 user.register.info,库存信息stock.detail.info 等等。

3、消费者

假设消费者1想读取到所有关于用户的信息,例如上岸信息和注册时心,那么我们可以使用 topic 类型的交换器,而且将 bindingKey 设置为 user.#

public class UserReceive {
    private static final String exchangeName = "hyf.topic.exchange";
    private static final String bindingKey = "user.#";
    private static final String queueName = "hyf.topic.user.queue";

    @SneakyThrows
    public static void main(String[] args){

        ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(exchangeName, "topic");
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, bindingKey);

        DeliverCallback callBack = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), "utf-8");
            System.out.println("吸收到一条user新闻:"+msg);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        channel.basicQos(1);
        channel.basicConsume(queueName, false, callBack, consumerTag -> {});
    }
}

假设消费者2 要吸收所有上面关于信息的新闻,那么他的 bindingKey 可以设置为 *.*.info

public class InfoReceive {

    private static final String exchangeName = "hyf.topic.exchange";
    private static final String bindingKey = "*.*.info";
    private static final String queueName = "hyf.topic.info.queue";

    @SneakyThrows
    public static void main(String[] args){

        ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(exchangeName, "topic");
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, bindingKey);

        DeliverCallback callback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), "utf-8");
            System.out.println("吸收到一条info新闻:"+msg);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };

        channel.basicQos(1);
        channel.basicConsume(queueName, false, callback, consumerTag -> {});
    }
}

4、生产者

生产者也需要使用 topic 类型的交换器发送新闻。

public class Send {

    private static final String exchangeName = "hyf.topic.exchange";
    private static final String routingkeyByLogin = "user.login.info";
    private static final String routingkeyByRegister = "user.register.info";
    private static final String routingkeyByOrder = "order.detail.info";
    private static final String routingkeyByStock = "stock.detail.info";

    public static void main(String[] args) throws Exception{

        ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()){
            channel.exchangeDeclare(exchangeName, "topic");

            String msg1 = "用户张三上岸了";
            String msg2 = "新用户李四注册了";
            String msg3 = "张三买了一台iphone12";
            String msg4 = "iphone12库存减一";

            channel.basicPublish(exchangeName, routingkeyByLogin, null, msg1.getBytes());
            channel.basicPublish(exchangeName, routingkeyByRegister, null, msg2.getBytes());
            channel.basicPublish(exchangeName, routingkeyByOrder, null, msg3.getBytes());
            channel.basicPublish(exchangeName, routingkeyByStock, null, msg4.getBytes());
        }
    }
}

经由上面的代码公布新闻,消费者1就能读取到新闻 msg1、msg2;而消费者2可以读取到所有的新闻。

关于主题模式,人人可以理解为下图:
手把手一起入门 RabbitMQ 的六大使用模式(Java 客户端)

六、RPC 模式

正常用 MQ 都是用来做异步化,然则有些场景却需要同步。即当我们使用 channel 发送新闻后,我们需要同步守候消费者对新闻消费后的效果。

RPC 模式主要是行使 replyQueue 和 correlationId 来完成。

1、客户端

客户端往 requestQueue 发送新闻时需要设置 replyQueue,之后我们需要给 replyQueue 绑定一个 DeliverCallback。

为了保证客户端是同步壅闭守候效果,以是我们在 DeliverCallback 的 handle 方式内里,将效果放进壅闭行列(例如 ArrayBlockingQueue);在代码的最后挪用壅闭行列的 take() 方式在获取效果。

public class Client {

    private static final String replyQueueName = "hyf.rpc.reply.queue";
    private static final String requestQueueName = "hyf.rpc.request.queue";

    public static void main(String[] args) throws Exception{

        ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(replyQueueName, false, false, false, null);
        // 壅闭行列
        final BlockingQueue<String> responseQueue = new ArrayBlockingQueue<>(1);

        final String corrId = UUID.randomUUID().toString();
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .replyTo(replyQueueName)
                .correlationId(corrId)
                .build();
        String msg = "客户端新闻";
        channel.basicPublish("", requestQueueName, properties, msg.getBytes());


        String ctag = channel.basicConsume(replyQueueName, true, (consumeTag,delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                responseQueue.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumeTag -> {});

        String result = responseQueue.take();
        System.out.println(result);
        // 作废订阅
        channel.basicCancel(ctag);
    }
}

通过上面代码,我们应该可以留意到 correlationId 的意义是什么。行使 correlationId ,我们可以判断当前从 replyQueue 获取的响应新闻是否是我们发出的新闻消费后的效果,若是不是我们可以直接忽略掉,保证只会获取 correlationId 一致的效果。

2、服务端

服务端在 DeilverCallback 的 handle() 方式里读取 requestQueue 内里的新闻消费后,在手动 ack(关闭了自动 ack)前,需要先拿到新闻的 replyQueue,然后往 replyQueue 内里发送新闻消费后的效果,固然了,还要记得设置回新闻的 correlatinId,最后记得手动 ack。

public class Server {

    private static final String requestQueueName = "hyf.rpc.request.queue";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(requestQueueName, false, false, false, null);
        DeliverCallback callback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), "utf-8");
            // 处置新闻
            String reponse = handleMsg(msg);
            // 将新闻的 correlationId 传回去
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(delivery.getProperties().getCorrelationId())
                    .build();
            channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, reponse.getBytes());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };

        channel.basicQos(1);
        channel.basicConsume(requestQueueName, false, callback, consumeTag -> {});

    }

    private static String handleMsg(String msg){
        return msg + "已经被处置了";
    }
}

关于 RPC 模式,人人可以理解为下图:
手把手一起入门 RabbitMQ 的六大使用模式(Java 客户端)

七、总结

到此,关于 RabbitMQ 的六大使用模式已经先容完毕。固然了,这些都是入门级其余 demo,若是人人照样有啥不明白的,可以到我的 github 上去看看,完整的代码都放在:MQ Demo。后续,我将会继续深入学习 RabbitMQ 的 Java Client,学习若何优化客户端的使用性能。

原创文章,作者:28x29新闻网,如若转载,请注明出处:https://www.28x29.com/archives/20790.html