快盘下载:好资源、好软件、快快下载吧!

快盘排行|快盘最新

当前位置:首页软件教程电脑软件教程 → RocketMq介绍以及集群服务搭建(双主双从同步双写)

RocketMq介绍以及集群服务搭建(双主双从同步双写)

时间:2022-09-28 17:36:00人气:作者:快盘下载我要评论

一、介绍

1、什么是MQ

MQ;Message Queue;消息队列;是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦;异步消息;流量削峰等问题;实现高性能;高可用;可伸缩和最终一致性架构。

RocketMq介绍以及集群服务搭建(双主双从同步双写)

2、简介

RocketMQ是阿里巴巴旗下一款开源的MQ框架;2016年底捐赠给Apache开源基金会成为孵化项目;2017年正式成为了Apache顶级项目;作为一款纯java、分布式、队列模型的开源消息中间件;支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

        


 常见的MQ主要有;ActiveMQ、RabbitMQ、Kafka、RocketMQ

Linux教程:RocketMq介绍以及集群服务搭建(双主双从同步双写)

 3、RocketMQ 基本概念

RocketMQ主要有四大核心组成部分;NameServer、Broker、Producer以及Consumer四部分。

Linux教程:RocketMq介绍以及集群服务搭建(双主双从同步双写)

 与其余组件说明如下;

Producer; 消息生产者;负责产生消息;一般由业务系统负责产生消息Producer Group;消息生产者组;简单来说就是多个发送同一类消息的生产者称之为一个生产者Consumer;消息消费者;负责消费消息;一般是后台系统负责异步消费Consumer Group;消费者组;和生产者类似;消费同一类消息的多个 Consumer 实例组成一个消费者组Topic;主题;用于将消息按主题做划分;Producer将消息发往指定的Topic;Consumer订阅该Topic就可以收到这条消息Message;消息;每个message必须指定一个topic;Message 还有一个可选的 Tag 设置;以便消费端可以基于 Tag 进行过滤消息Tag;标签;子主题;二级分类;对topic的进一步细化,用于区分同一个主题下的不同业务的消息Broker;Broker是RocketMQ的核心模块;负责接收并存储消息;同时提供Push/Pull接口来将消息发送给Consumer。Broker同时提供消息查询的功能;可以通过MessageID和MessageKey来查询消息。Borker会将自己的Topic配置信息实时同步到NameServer  push;推模式; 消息到达消息服务器之后;主动推送给消费者pull;拉模式; 是消费端发起请求;主动向消息服务器(Broker)拉取消息Queue;Topic和Queue是1对多的关系;一个Topic下可以包含多个Queue;主要用于负载均衡;Queue数量设置建议不要比消费者数少。发送消息时;用户只指定Topic;Producer会根据Topic的路由信息选择具体发到哪个Queue上。Consumer订阅消息时;会根据负载均衡策略决定订阅哪些Queue的消息Offset;RocketMQ在存储消息时会为每个Topic下的每个Queue生成一个消息的索引文件;每个Queue都对应一个Offset记录当前Queue中消息条数NameServer;NameServer可以看作是RocketMQ的注册中心;它管理两部分数据;集群的Topic-Queue的路由配置;Broker的实时配置信息。其它模块通过Nameserv提供的接口获取最新的Topic配置和路由信息;各 NameServer 之间不会互相通信; 各 NameServer 都有完整的路由信息;即无状态。

4、消费模式

        广播模式

        一条消息被多个Consumer消费;即使这些Consumer属于同一个Consumer Group;消息也会被Consumer Group中的每一个Consumer都消费一次。

        集群模式

        一个Consumer Group中的所有Consumer平均分摊消费消息(一个消息只会被一个消费者消费)

5、消息类型

RocketMq消息类型分为三种;普通消息、顺序消息、事务消息

  普通消息;有三种发送方式  单向发送;单向发送是指发送方只负责发送消息;不等待服务器回应;且没有回调函数触发。即只发送请求而不管响应。同步发送;同步发送是指消息发送方发出数据后;会在收到接收方发回响应之后才会发送下一个数据包的通讯方式。异步发送;异步发送是指发送方发出数据后;不等接收方发回响应;接着发送下一个数据包的通讯方式。发送方通过回调接口接收服务器响应;并对响应结果进行处理。顺序消息

        一般情况下;每个主题;topic;都会有多个消息队列;message queue;;假设投递了同一个主题的十条消息;那么这十条消息会分散在不同的队列中。对于消费者而言;每个消息队列是等价的;就不能确保消息总体的顺序。而顺序消息的方案就是把这十条消息都投递到同一个消息队列中。顺序消息与普通消息同样有三种发送方式。

事务消息

        RocketMQ提供了事务消息;通过事务消息就能达到分布式事务的最终一致;从而实现了可靠消息服务。

        事务消息发送步骤;

发送方将半事务消息发送至RocketMQ服务端。RocketMQ服务端将消息持久化之后;向发送方返回Ack确认消息已经发送成功。由于消息为半事务消息;在未收到生产者对该消息的二次确认前;此消息被标记成“暂不能投递”状态。发送方开始执行本地事务逻辑。发送方根据本地事务执行结果向服务端提交二次确认;Commit 或是 Rollback;;服务端收到Commit 状态则将半事务消息标记为可投递;订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息;订阅方将不会接受该消息。

        事务消息回查机制;

在断网或者是应用重启的特殊情况下;上述步骤4提交的二次确认最终未到达服务端;经过固定时间后服务端将对该消息发起消息回查。发送方收到消息回查后;需要检查对应消息的本地事务执行的最终结果。发送方根据检查得到的本地事务的最终状态再次提交二次确认;服务端仍按照步骤4对半事务消息进行操作。

Linux教程:RocketMq介绍以及集群服务搭建(双主双从同步双写)


二、集群搭建

1、集群模式

单个Master

这种方式风险较大;一旦Broker重启或者宕机时;会导致整个服务不可用;不建议线上环境使用。

多Master模式

一个集群无Slave;全是Master;例如2个Master或者3个Master
单台机器宕机期间;这台机器上未被消费的消息在机器恢复之前不可订阅;消息实时性会受到影响。

多Master多Slave模式;异步复制

每个Master配置一个Slave;有多对Master-Slave;HA采用异步复制方式;主备有短暂消息延迟;毫秒级。
优点;即使磁盘损坏;消息丢失的非常少;且消息实时性不会受影响;因为Master宕机后;消费者仍然可以从Slave消费;此过程对应用透明;不需要人工干预。性能同多Master模式几乎一样。
缺点;Master宕机;磁盘损坏情况;会丢失少量消息。

多Master多Slave模式;同步双写

每个Master配置一个Slave;有多对Master-Slave;HA采用同步双写方式;主备都写成功;向应用返回成功。
优点;数据与服务都无单点;Master宕机情况下;消息无延迟;服务可用性与数据可用性都非常高。
缺点;性能比异步复制模式略低;大约低10%左右。

**此次我们选择双主双从集群搭建~

2、搭建双主双从同步双写模式

服务器环境

1192.168.116.128nameserver、brokerserverMaster1、Slave22192.168.116.132nameserver、brokerserverMaster2、Slave1

两台服务器各启动一个nameserver;各启动两个broker~

步骤一;两台服务器;;上传文件

上传rocketmq压缩包并解压

Linux教程:RocketMq介绍以及集群服务搭建(双主双从同步双写)

解压后移动至/usr/local/rocketmq/下;没有则创建上级目录

Linux教程:RocketMq介绍以及集群服务搭建(双主双从同步双写)

环境变量配置;加入如下;

vim /etc/profile
#set rocketmq
ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.5.1-bin-release
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
source /etc/profile

步骤二;128服务器;;broker配置

① master1

vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-a.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字;注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master;>0 表示 Slave
brokerId=0
#nameServer地址;分号分割
namesrvAddr=192.168.116.128:9876;192.168.116.132:9876
#在发送消息时;自动创建服务器不存在的topic;默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic;建议线下开启;线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组;建议线下开启;线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点;默认凌晨 4点
deleteWhen=04
#文件保留时间;默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条;根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/master
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/master/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/master/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/master/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/master/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/master/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

② slave2

vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-b-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字;注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master;>0 表示 Slave
brokerId=1
#nameServer地址;分号分割
namesrvAddr=192.168.116.128:9876;192.168.116.132:9876
#在发送消息时;自动创建服务器不存在的topic;默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic;建议线下开启;线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组;建议线下开启;线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点;默认凌晨 4点
deleteWhen=04
#文件保留时间;默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条;根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/slave
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/slave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/slave/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/slave/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/slave/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/slave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

步骤三;132服务器;;broker配置

① master2

vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-b.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字;注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master;>0 表示 Slave
brokerId=0
#nameServer地址;分号分割
namesrvAddr=192.168.116.128:9876;192.168.116.132:9876
#在发送消息时;自动创建服务器不存在的topic;默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic;建议线下开启;线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组;建议线下开启;线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点;默认凌晨 4点
deleteWhen=04
#文件保留时间;默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条;根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/master
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/master/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/master/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/master/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/master/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/master/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

② slave1

vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-a-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字;注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master;>0 表示 Slave
brokerId=1
#nameServer地址;分号分割
namesrvAddr=192.168.116.128:9876;192.168.116.132:9876
#在发送消息时;自动创建服务器不存在的topic;默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic;建议线下开启;线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组;建议线下开启;线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点;默认凌晨 4点
deleteWhen=04
#文件保留时间;默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条;根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/slave
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/slave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/slave/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/slave/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/slave/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/slave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

步骤四;两台服务器;;修改启动脚本

vi /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/bin/runbroker.sh

找到如下修改;

JAVA_OPT=;${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m;
vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/bin/runserver.sh

找到如下修改;

JAVA_OPT=;${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m;

步骤五;两台服务器;;启动nameserver

切换到rocketmq目录bin目录下执行;

nohup sh mqnamesrv &

查看日志;

tail -f ~/logs/rocketmqlogs/namesrv.log

Linux教程:RocketMq介绍以及集群服务搭建(双主双从同步双写)

 步骤六;启动broker

① 128服务器

nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties &
tail -f ~/logs/rocketmqlogs/broker.log
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log

② 132服务器

nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties &
tail -f ~/logs/rocketmqlogs/broker.log
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log

启动完成;通过jps查看java进程

Linux教程:RocketMq介绍以及集群服务搭建(双主双从同步双写)

 关闭命令;

关闭namesrv服务;sh bin/mqshutdown namesrv
关闭broker服务 ;sh bin/mqshutdown broker

三、安装可视化平台RocketMq-Dashboard


--------------------------待续



序号IP角色架构模式

网友评论

快盘下载暂未开通留言功能。

关于我们| 广告联络| 联系我们| 网站帮助| 免责声明| 软件发布

Copyright 2019-2029 【快快下载吧】 版权所有 快快下载吧 | 豫ICP备10006759号公安备案:41010502004165

声明: 快快下载吧上的所有软件和资料来源于互联网,仅供学习和研究使用,请测试后自行销毁,如有侵犯你版权的,请来信指出,本站将立即改正。