RocketMQ系列(一)基本概念

RocketMQ是阿里出品的一款开源的新闻中间件,让其声名大噪的就是它的事务新闻的功效。在企业中,新闻中间件选择使用RocketMQ的照样挺多的,这一系列的文章都是针对RocketMQ的,咱们先从RocketMQ的一些基本概念和环境的搭建最先聊起。

RocketMQ由4部门组成,划分是:名称服务(Name Server)、新闻行列(Brokers)、生产者(producer)和消费者(consumer)。这4部门都可以举行水平扩展,从而制止单点故障,如下图,

RocketMQ系列(一)基本概念

这是RocketMQ官网上的一张图,异常清晰的列出了4个部门,而且都是集群模式。下面我们就划分说一说这4部门。

名称服务(NameServer)

Name Server饰演的角色是一个注册中央,和Zookeeper的作用差不多。它的主要功效有两个,如下:

  • broker的治理:broker集群将自己的信息注册到NameServer,NameServer提供心跳机制检测每一个broker是否正常。
  • 路由治理:每一个NameServer都有整个broker集群和行列的信息,以便客户端(生产者和消费者)查询。

NameServer协调着分布式系统中的每一个组件,而且治理着每一个Topic的路由信息。

Broker

Broker主要是存储新闻,而且提供Topic的机制。它提供推和拉两种模式,另有一些容灾的措施,好比可以设置新闻副本。下面我们看一看Brokcer的主从机制。

Broker的角色分为“异步主”、“同步主”和“从”三个角色。若是你不能容忍新闻的丢失,你可以设置一个“同步主”和“从”两个Broker,若是你以为新闻丢失也无所谓,只要行列可用就ok的话,你可以设置“异步主”和“从”两个broker。若是你只是想简朴的搭建,只设置一个“异步主”,不设置“从”也是可以的。

上面提到的是broker之间的备份,broker里的信息也是可以保留到磁盘的,保留到磁盘的方式也有两种,推荐的方式是异步保留磁盘,同步保留磁盘是异常消耗性能的。

一文说通Dotnet Core的中间件

生产者

生产者支持集群部署,它们向broker集群发送新闻,而且支持多种负载平衡的方式。

当生产者向broker发送新闻时,会获得发送效果,发送效果中有一个发送状态。假设我们的设置中,新闻的设置isWaitStoreMsgOK = true,这个设置默认也是true,若是你设置为false,在发送新闻的过程中,只要不发生异常,发送效果都是SEND_OK。当isWaitStoreMsgOK = true,发送效果有以下几种,

  • FLUSH_DISK_TIMEOUT:保留磁盘超时,当保留磁盘的方式设置为SYNC_FLUSH(同步),而且在syncFlushTimeout设置的时间内(默认5s),没有完成保留磁盘的动作,将会获得这个状态。
  • FLUSH_SLAVE_TIMEOUT:同步“从”超时,当broker的角色设置为“同步主”时,然则在设置的同步时间内,默以为5s,没有完成主从之间的同步,就会获得这个状态。
  • SLAVE_NOT_AVAILABLE:“从”不可用,当我们设置“同步主”,然则没有设置“从”broker时,会返回这个状态。
  • SEND_OK:新闻发送乐成。

再来看看新闻重复与新闻丢失,当你发现你的新闻丢失时,通常有两个选择,一个是丢就丢吧,这样新闻就真的丢了;另一个选择是新闻重新发送,这样有可能引起新闻重复。通常情况下,照样推荐重新发送的,我们在消费新闻的时刻要去除掉重复的新闻。

发送message的巨细一样平常不跨越512k,默认的发送新闻的方式是同步的,发送方法会一直壅闭,直到等到返回的响应。若是你对照在意性能,也可以用send(msg, callback)异步的方式发送新闻。

消费者

多个消费者可以组成消费者组(consumer group),差别的消费者组可以订阅相同的Topic,也可以自力的消费Topic,每一个消费者组都有自己的消费偏移量。

新闻的消费方式一样平常有两种,顺序消费和并发消费。

  • 顺序消费:消费者将锁住新闻行列,确保新闻根据顺序一个一个的被消费掉,顺序消费会引起一部门性能损失。在消费新闻的时刻,若是出现异常,不建议直接抛出,而是应该返回SUSPEND_CURRENT_QUEUE_A_MOMENT 这个状态,它将告诉消费者过一段时间后,会重新消费这个新闻。
  • 并发消费:消费者将并发的消费新闻,这种方式的性能异常好,也是推荐的消费方式。在消费的过程中,若是出现异常,不建议直接抛出,而是返回RECONSUME_LATER 状态,它告诉消费者现在不能准确的消费它,过一段时间后,会再次消费它。

在消费者内部,是使用ThreadPoolExecutor作为线程池的,我们可以通过setConsumeThreadMin setConsumeThreadMax 设置最小消费线程和最大消费线程。

当一个新的消费者组确立以后,它要决议是否消费之前的历史新闻,CONSUME_FROM_LAST_OFFSET将忽略历史新闻,消费新的新闻。CONSUME_FROM_FIRST_OFFSET将消费行列中的每一个新闻,之前的历史新闻也会再消费一遍。CONSUME_FROM_TIMESTAMP可以指定消费新闻的时间,指定时间以后的新闻会被消费。

若是你的应用不能容忍重复消费,那么在消费新闻的过程中,要做好新闻的校验。

好了,今天就到这里吧,下一篇我们将先容RocketMQ的环境搭建。

原创文章,作者:28x29新闻网,如若转载,请注明出处:https://www.28x29.com/archives/13383.html