RabbitMQ 新闻行列入门

文档 入门

主要的内容:one two three four five six seven

前言

中间件

新闻行列

  • 异步处置,注册完发短信
  • 应用解耦,订单接口挪用扣库存接口,失败了怎么办?
  • 流量削峰,大量请求到达营业接口,这不行!
  • 日志处置,每个营业代码都挪用一下写日志的方式吗?连系AOP头脑,营业程序为什么要体贴写日志的事情?
  • 新闻通讯等,ABC处在谈天室内里,一起谈天?foreach吗?

官网有7个入门教程,过了一遍,做个条记。

正文

HelloWorld

概述

RabbitMQ,是个新闻代理人message broker。它吸收存储转发新闻。

几个常用的术语:

  1. 生产者Producer,生产发送新闻。
  2. 消费者Consumer,吸收新闻。
  3. 行列Queue,只受系统内存和硬盘巨细限制。存储新闻,生产者往行列内里发送,消费者监听读取。

这几个工具可以漫衍在差别的机械。

RabbitMQ 新闻行列入门

使用Client

P和C的角色。maven堆栈包为amqp-clientslf4j-nop

<dependencies>
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.8.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-nop -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-nop</artifactId>
        <version>1.7.30</version>
    </dependency>
<endencies>

发送

也就是Producer.java

public class Send {
    private static final String QUEUE_NAME = "hello1";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xxx.xxx.xxx.xxx");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello world";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(Charset.forName("utf-8")));
            System.out.println(" [x] Sent '" + message + "'");
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

ConnectionChannel 都实现了ICloseable接口,以是可以使用try(…)接口自动释放资源。Channel是我们要经常使用的API工具。channel.queueDeclare是幂等的,只有在没有的情形下才会建立。然后挪用basicPublish方式,往行列发送字节数组新闻。

吸收

Consumer,Receiver.java

public class Receiver {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xxx.xxx.xxx.xxx");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println(" [x] Received [" + msg + "]");
        });
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
    }
}

connectionchannel 工具没有使用try-with-resource自动释放,factory.newConnection()之后程序就会保持运行,挪用basicConsume方式来消费获得的新闻。该方式第二个autoAck参数写了false,这样,新闻就属于未确认的状态,每次启动都市重复收到。

Work Queue 事情行列

新闻发生的速率大于消费的速率,该怎么办?

每个http请求的时间不宜过长,以是可以把内部耗时的方式做成异步,然后用回调callback的方式实现。换个角度说就是consumer内里有对照耗时的义务,可以用thread.sleep()模拟一下。

DeliverCallback deliverCallback = ((consumerTag, message) -> {
    String msg = new String(message.getBody(), "UTF-8");
    int i = new Random().nextInt(5);
    try {
        Thread.sleep(i * 1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(msg + "休眠了" + i + "秒");
});

不外这不是这节的重点,这里的重点是几个参数。

新闻确认

首先设置一下每次吸收的新闻数,每次一个channel.basicQos(1);。在客户端没有确认之前不会吸收新的新闻。channel.basicConsume方式的第二个参数autoAck示意自动确认。新闻有两种状态,ready和unacked的。新闻发送到queue→ready→consumer消费,但不确认→unacked→确认→竣事,守候下一个。

public class NewTask {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);//一次吸收一个新闻
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            int i = new Random().nextInt(2);
            try {
                Thread.sleep(i * 1000);
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);//只确认这个tag对应的新闻
                System.out.println(msg + "执行了" + i + "秒,consumerTag=" + consumerTag + "并发送了确认");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        boolean autoAck = false;//不自动确认
        String str = channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
            System.out.println(consumerTag);
        });
    }
}

Message Durablity

Redis类似。

确保新闻不丢。也就是确保未消费的新闻在服务器意外宕机重启之后新闻不丢。RabbitMQ会以一定距离把新闻写入磁盘,但不是实时【以是照样有一个短的时间距离会发生新闻的丢失情形】。为了解决这个问题,需要两个设置

  1. 界说queue的时刻设置durable参数为true。rabbitmq不允许queue name相同其他参数差别的两个行列,以是可以先删以前的。

    boolean durable = true;
    channel.queueDeclare("hello", durable, false, false, null);
    
  2. 发送的时刻设置MessageProperties属性。

    channel.basicPublish("", "hello",
                MessageProperties.PERSISTENT_TEXT_PLAIN,//持久化为文本
                message.getBytes());
    

Fair Dispatch 公正分发

RabbitMQ的默认推送计谋是把第N个新闻推送给第N个客户端,他不会管一个客户端是否另有没确认的新闻,以是可能会导致某个客户端异常的忙。解决方案:

挪用basicOps设置prefetchCount为1,这样一个客户端在没有确认当前新闻之前不会收到下一个新闻。

Publish/Subscribe 公布/订阅

一次性给所有的Consumer发送新闻

回首一下前面的例子,基本的代码流程是

  1. 建立ConnectionFactory→设置参数→建立Connection→建立Channel
  2. Producer声明QueueName,往Exchanges=”“发送新闻
  3. Consumer指定相同的QueueName,设置新闻处置函数,读取数据,发送确认。

Exchanges

RabbitMQ中有Exchange的观点。新闻实际上不会直接发送给Queue,而是给Exchange,然后通过exchange转发给queue,然后给Consumer消费。exchange为空字符示意系统内部默认的exchange。

RabbitMQ 新闻行列入门

[root@test]~# rabbitmqctl list_exchanges -p test_host1
Listing exchanges for vhost test_host1 ...
name	type
amq.fanout	fanout
amq.direct	direct
amq.match	headers
amq.rabbitmq.trace	topic direct
amq.headers	headers
amq.topic	topic

amp.* 为系统自带的exchange。

治理界面查看

RabbitMQ 新闻行列入门

ExchangeType

Type决议一个Exchange怎么处置吸收到的新闻,广播到所有行列或者推送到特定的行列或者直接抛弃新闻。

内置的ExchangeType枚举

public enum BuiltinExchangeType {
    DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");
  	//...省略其他
}

fanout

顾名思义,是一种广播的处置方式,会发送到所有的queue。看个demo。

先看Send

//Send.java
public class Send {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");

        try (final Connection connection = factory.newConnection();
             final Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//Exchange声明
            final String msg = String.valueOf(LocalDateTime.now());
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(Charset.defaultCharset()));//第二个routingKey留空待定
            System.out.println("发送" + msg);
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

界说一个FANOUT类型的Exchange,没有界说Queue。挪用basicPublish发送新闻。

再看Receiver.java

//Receiver.java
public class Receiver {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        final String queueName = channel.queueDeclare().getQueue();//获取自动天生的queue,
        channel.queueBind(queueName, EXCHANGE_NAME, "");//绑定,最后一个参数待定

        channel.basicConsume(queueName, true, (consumerTag, message) -> {
            final String s = new String(message.getBody(), Charset.defaultCharset());
            System.out.println("收到" + s);
        }, consumerTag -> { });

    }
}

界说一个和Send相同的Exchange。获取建立的channel对应的系统自动天生的queue(竣事之后会自动删除,制止系统有太多行列)。绑定queue和exchange。RabbitMQ会抛弃新闻若是这个exchange下面没有绑定queue的话。

RabbitMQ 新闻行列入门

运行多个Receiver实例。由于ExchangeType是fanout,以是,每个实例都市收到广播的新闻。

对比前面例子中的默认Exchange,一个新闻,发送到一个Exchange(默认的空字符串),由于queuename指定了是同一个,以是,只会有一个client收到新闻。

而这个例子中,queue是自动天生的,以是会有多个自动删除的queue,一个queue对应一个client。ExchangeType是fanout,以是,每个client都市收到。

Routing

有选择性的吸收新闻

前面例子使用了fanout广播的方式来公布新闻,一条新闻会被推送到所有的行列,又由于行列是自动天生的,一个行列对应一个consumer,以是所有的consumer都市收到所有的新闻。这无法实现某个consumer只体贴某种类型的新闻的需求。以是,这里引入exchangetype=direct的例子。

name 相同,type差别的exchange不合法,可以先在rabbitmq的治理平台界面删除原先的exchange。

并发工具——CyclicBarrier

Binding

回首前面的代码。publish和queue绑定的时刻都留空了routingKey参数。

Send.java

RabbitMQ 新闻行列入门

Consumer.java

RabbitMQ 新闻行列入门

Consumer的queueBind 和 Producer的basicPublish中routingKey需要匹配。fanout类型的exchange会忽略routingKey参数,以是我们直接留空。

direct Exchange

fanout的新闻分发不太天真,以是这里使用direct的Exchange。看下图,若是Producer发生的routingKey为orange,那么只会发送给Q1,那么只有C1会收到新闻。若是routingKey为black或者green,那么C2会收到新闻。

RabbitMQ 新闻行列入门

Multiple Bindings

多个行列绑定同一个routingKey也是合理。下面的例子Q1和Q2都市收到black的新闻,这种绑定本质上就退化成了一种前面的fanout Exchange。

RabbitMQ 新闻行列入门

Demo

场景:Producer发生3种routingKey的message,Info,Error,Fault。界说两个Consumer,C1吸收Info的message,C2吸收Error和Fault。

//Send.java
public class Send {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {

        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");//省略其他设置
        try (final Connection connection = factory.newConnection();
             final Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//指定exchange的类型
            String messageType = args[0];//传入routingKey
            String msg = messageType + " message" + LocalDateTime.now();
            channel.basicPublish(EXCHANGE_NAME, messageType, null, msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送了" + msg);
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
//Receiver.java
public class Receive {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        final String queueName = channel.queueDeclare().getQueue();
        for (String arg : args) {//遍历所有的routingKey,绑定所有当前queue
            System.out.println("绑定routingKey:" + arg);
            channel.queueBind(queueName, EXCHANGE_NAME, arg);
        }

        channel.basicConsume(queueName, true, ((consumerTag, message) -> {
            final String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("收到" + msg);
        }), consumerTag -> {
        });
    }
}

建立5个启动设置,3个为Send,划分发送Info,Fault,Error新闻;2个Receiv,第一个吸收Info,第二个吸收Error和Fault。

RabbitMQ 新闻行列入门

RabbitMQ 新闻行列入门

最终效果

RabbitMQ 新闻行列入门

RabbitMQ 新闻行列入门

RabbitMQ 新闻行列入门

Topic

通过pattern模式来指定吸收的新闻。

前面例子使用的ExchangeType为direct,相对于fanout,是天真了一些,然则照样有一些瑕玷,好比无法组合条件。好比有个consumer体贴所有的error新闻以及和a相关的info新闻。这里就可以使用Topic的Exchange。然后都是通过routingKey参数来指定。

通配符

* 星号,代表一个词

# 井号,代表0个或多个词(包罗一个)

以点分开,组成routingKey。好比*.a.b.#

若是设置BuiltinExchangeType.TOPIC的exchangeType,然则没有使用通配符,那么就和BuiltinExchangeType.DIRECT是一样的。

未匹配任何模式的新闻会被抛弃。

要害代码

声明exchange

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

发送新闻

if (args[0].equals("info")) {
    s = "a's info message";
    channel.basicPublish(EXCHANGE_NAME, "a.info", null, s.getBytes(StandardCharsets.UTF_8));
} else {
    s = "xxx.yyy's error message";
    channel.basicPublish(EXCHANGE_NAME, "xxx.yyy.error", null, s.getBytes(StandardCharsets.UTF_8));
}

使用通配符吸收新闻

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
final String queueName = channel.queueDeclare().getQueue();//暂且的queue
String routingKey = args[0];//传入的参数 好比*.info 或 #.error来匹配
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println("绑定" + routingKey + "的行列");
channel.basicConsume(queueName, true, ((consumerTag, message) -> {
    String msg = new String(message.getBody(), StandardCharsets.UTF_8);
    System.out.println("收到新闻" + msg);
}), consumerTag -> {});

RPC 远程挪用

远程历程挪用。

Client 挪用 Server的服务。Client发送新闻,Server消费新闻。Server盘算效果,公布一个新闻到对应的行列。Client消费行列内里的新闻。这一个历程Client和Server都是双重身份。这个是和其他最主要的区别。

RabbitMQ 新闻行列入门

关于RPC

RPC是一种常见的模式,但也存在一些争议,这主要体现在若是开发者有意或无意的不去注重这是一个内陆的方式照样对照耗时的远程方式。RPC也增加了系统的调试复杂度。

开发RPC的几个建议:

  1. 确保方式容易辨识是远程照样内陆
  2. 做好文档
  3. 处置挪用时刻的异常

回调行列

Client需要Server的盘算效果,以是需要在新闻内里带上CallbackQueueName。凭据AMQP 0-9-1协议,界说了14个属性,除了4个对照常用,其他都很少用。

  • deliveryMode 设置新闻的持久化,第二个例子中用过。
  • contentType 设置内容的mime-type ,建议application/json
  • replyTo 回复行列名
  • correlationId 关联id 由于新闻是异步的,以是可以给每个新闻带上个id,用来关联发送的新闻。
final AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
                .correlationId("uuid")
                .replyTo("xxx")
                .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

简易版Client

public class Client {
    private static final String RPC_QUEUE_NAME = "rpc_queue";//rpc挪用的queue,往内里发rpc挪用参数

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.xx.x.x");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        String msg = String.valueOf(3);//模拟挪用参数

        final String replyQueueName = channel.queueDeclare().getQueue();
        String corrId = UUID.randomUUID().toString();
        final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .replyTo(replyQueueName)//回复的行列
                .correlationId(corrId)//当前新闻的uuid
                .build();
        channel.basicPublish("",
                RPC_QUEUE_NAME,
                properties,
                msg.getBytes(StandardCharsets.UTF_8));//广播的方式往rpc queue公布新闻
        System.out.println("发送盘算[" + msg + "]的新闻");

        //守候新闻回复
        channel.basicConsume(replyQueueName, true, (consumerTag, message) -> {
            String revCorrId = message.getProperties().getCorrelationId();
            if (corrId.equals(revCorrId)) {//拿到了回复
                final String result = new String(message.getBody(), StandardCharsets.UTF_8);
                System.out.println("发送" + msg + "获得回复" + result);
            } else {
                System.out.println("收到correlationId:" + revCorrId);
            }
        }, consumerTag -> {
        });
    }
}

channel.queueDeclare()用来声明一个暂且行列,为吸收返回效果的行列。代码中只公布了一个盘算请求,以是basicConsume中corrId判断实在没有必要。正常情形下可以在当前暂且行列公布多个盘算请求,每个的盘算效果都传入到当前的暂且行列,以是需要判断corrId的匹配情形。

简易版Server

public class Server {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("hello from server");
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        factory.setPort(5672);
        factory.setUsername("full_access");
        factory.setPassword("111111");
        factory.setVirtualHost("test_host1");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);//声明非排他的行列,用来消费rpc_queue内里的盘算请求。
        channel.basicConsume(RPC_QUEUE_NAME,
                true,//自动回复,后面就不需要手动ack
                (consumerTag, message) -> {
                    String msg = new String(message.getBody(), StandardCharsets.UTF_8);
                    String replyMsg = new String(msg + "Result");//简朴模拟盘算效果。
                    System.out.println("收到" + msg + "最先盘算,盘算完成效果为:[" + replyMsg + "]");
                  	//拿到需要回复properties
                    String replyQueueName = message.getProperties().getReplyTo();
                    String correlationId = message.getProperties().getCorrelationId();
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(correlationId)//correlationId返回去。
                            .build();
                    // 把盘算效果发回去
                    channel.basicPublish("", replyQueueName, replyProps, replyMsg.getBytes(StandardCharsets.UTF_8));
                }, consumerTag -> {

                });
    }
}

消费RPC_QUEUE_NAME的盘算请求,然后凭据新闻内里带的getReplyTo()的值返回给客户端。

Publisher Confirm 公布确认

可靠公布

启用生产者确认

凭据AMQP 0.9.1协议,这个确认默认是没有启用的,可以通过confirmSelect方式启用。

Channel channel = connection.createChannel();
channel.confirmSelect();

这个方式是针对channel的,不是针对每个新闻,以是,只要 在开启channel之后挪用一次就好。

确认每个新闻

先是一个简朴的例子,每发完一个新闻,都让系统确认守候一下。

while (thereAreMessagesToPublish()) {
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.basicPublish(exchange, queue, properties, body);
    // 5秒超时
    channel.waitForConfirmsOrDie(5_000);
}

每次发完一个新闻,都守候最多5秒钟的一遍确认。这个很显著会极大的影响系统的吞吐率。

批量确认

发送一个确认一个显著会对照low,以是这里引入一种批量确认的方式。不外这只是一种自己营业代码的确认机制,不是rabbitmq提供的。

int batchSize = 100;//
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.basicPublish(exchange, queue, properties, body);
    outstandingMessageCount++;//发送一个加1
    if (outstandingMessageCount == batchSize) {
        ch.waitForConfirmsOrDie(5_000);//到达batchSize之后确认
        outstandingMessageCount = 0;
    }
}
if (outstandingMessageCount > 0) {
    ch.waitForConfirmsOrDie(5_000);//确认剩下的
}

这种确认吞吐量是上来了,不外最大的问题是当confirm出问题了之后是无法定位到详细哪个有问题。

ConcurrentSkipListMap 和 channel.getNextPublishSeqNo()

channel.getNextPublishSeqNo可以获取公布的新闻的下一个序号,有序递增。ConcurrentSkipListMap有一个heapMap方式,可以返回key小于即是param的map子集。在公布新闻之前先获取序号,作为key放到map内里。

map.put(nextPublishSeqNo, byteMsg);
channel.basicPublish("", queueName, null, msgStr.substring(i, i + 1).getBytes(StandardCharsets.UTF_8));

异步确认

Producer只管发新闻,然后注册一个异步回调函数。rabbitmq提供了两个回调函数。一个是发送乐成的回调,一个是发送失败的回调。两个函数的参数是一样的,两个。

  • sequence number 序号。示意乐成/失败的新闻编号
  • multiple 布尔值。false示意只有一个被确认。true示意小于即是当前序号的新闻发送乐成/失败
channel.confirmSelect();//启用新闻确认
channel.addConfirmListener(
        (deliveryTag, multiple) -> {
            if (multiple) {
                System.out.println("序号" + deliveryTag + "的信息发送乐成");
                map.remove(deliveryTag);
            } else {
                System.out.println("序号小于" + deliveryTag + "的信息发送乐成");
                final ConcurrentNavigableMap<Long, Byte> confirmed = map.headMap(deliveryTag, true);
                confirmed.clear();
            }
        },
        (deliveryTag, multiple) -> {
            if (!multiple) {
                System.out.println("发送失败的信息sequence number:" + deliveryTag);
            } else {
                System.out.println("序号小于" + deliveryTag + "的新闻发送失败");
            }
        });

原创文章,作者:28x29新闻网,如若转载,请注明出处:https://www.28x29.com/archives/5547.html