作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了精准定时调度方案,经受住了生产环境的考验。为使更多童鞋受益,现给出开源框架地址:
https://github.com/sunshinelyz/mykit-delay
PS: 欢迎各位Star源码,也可以pr你牛逼哄哄的代码。
最近小伙伴们的要求越来越高,学完设计模式学高并发,学完高并发又想学Java8新特性,学完Java8新特性又要学Spring,这不又让我整理一篇关于分布式事务的文章,而且还提出了要求:要实战型的!那好吧,安排上!关于分布式事务,写些啥呢?想来想去,还是按照小伙伴们留言说的:先写一篇关于实战型的文章吧!也是纯技术文章,熬夜两周多整理出来的!在此,也要感谢下在公众号方面给予我支持和帮助的前辈们!
假如某人有5个女朋友(有点复杂),每天晚上都会给他的女朋友打电话说晚安,那么每给一个女朋友打电话,其他女朋友都要进入等待状态。一个一个打下去。。。等打到最后一个已经是凌晨了,对方都睡了。那么有什么办法可以解决呢?此时这个人可以利用微信公众号将自己甜言蜜语放进公众号中,让他女朋友订阅公众号,则这些女朋友不用依次等待又可收到消息。此原理就是消息队列。
非底层操作系统软件,非业务应用软件,不是直接给最终用户使用,不能直接给客户带来价值的软件统称为中间件。
关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统。
分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点上
1、负载均衡:预防单点故障,提升键壮性 2、缓存:各种缓存,功盖天下 3、异步:提高并发,提升用户体验 4、读写分离:积微成著,提高吞吐量
1、解耦:传统模式的缺点:系统间耦合性太强;中间件模式的的优点:将消息写入消息队列,需要消息的系统自己从消息队列中订阅
2、异步:传统模式的缺点:一些非必要的业务逻辑以同步的方式运行,太耗费时间;中间件模式的的优点:将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
3、横向扩展 4、安全可靠 5、顺序保证
这里我只演示前两者,目前只会前两种(比较流行),至于Kafka是基于日志形式,严格意义上他作于消息中间件角色不是很突出。
1、什么是JMS规范
Java消息服务(Java Message Service)即JMS,是一个Java平台中面向消息中间件的API,用于在两个应用程序之间或分布式系统中发送/接收消息,进行异步通信。
2、JMS相关概念
1)、消费者/订阅者:接收并处理消息的客户端
2)、消息:应用程序之间传递的数据内容
3)、消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式
3、ActiveMQ安装
Window下安装ActiveMQ
1)、下载:ActiveMQ下载
2)、进入bin/win64目录
3)、启动(演示方式一)
方式一:以管理员身份运行activemq.bat(直接可以运行)
方式二:installService.bat是以服务方式启动的,用超级管理员身份运行后还需到服务中启动该服务
4)、根据客户端显示的地址用浏览器进行访问:127.0.0.1:8161
5)、利用默认用户密码为admin admin即可进入管理平台
Linux下载
1)、下载
2)、运行
3)、防火墙开启端口号
4)、浏览器访问管理平台:activemq的端口号为8161
默认用户名为admin 密码为admin 访问域名:http://Ip:8161
5)、关闭服务
4、实战入门
队列模式
运行代码,访问http://192.168.174.133:8161/admin/ ,点击queues,此时可以看出生产了100条消息,0个被消费
此时运行消费者,可以消费消息,在管理平台也可以看到被消费的消息通知
主题模式
5、ActiveMQ集成Spring
ConnectionFactory:用于管理连接的连接工厂,一个Spring为我们提供的连接池,JmsTemplate每次发送消息都会重新创建连接,会话和productor。Spring中提供了SingleConnectionFactory和CachingConnectionFactory。
JmsTemplate:用于发送和接收消息的模板类,是Spring提供的,只需向Spring容器内注册这个类就可以使用JmsTemplate方便的操作Jms,JmsTemplate类是线程安全的,可以在整个应用范围使用。
MessageListerner:消息监听器,实现一个onMessage方法,该方法只接收一个Message参数。
2.1) 先引入依赖
2.2) 生产者
2.2.1) 定义接口
2.2.2) 定义实现类
2.2.3) 定义调用者
2.2.4) 定义公共配置文件
此文件因为生产者和消费者都需要一些公共配置,最后将公共的部分抽取出来
2.2.5) 定义生产者配置文件
2.3) 消费者
2.3.1) 定义监听者类
2.3.2) 定义调用者
2.3.3) 定义消费者配置文件
2.4) 扩展:订阅模式
生产者采用订阅模式
消费者配置文件中引入订阅模式
6、ActiveMQ集群
实现高可用,以排除单点故障引起的服务中断
实现负载均衡,以提升效率为更多客户提供服务
客户端模式:让多个消费者消费同一个队列
Broker clusters:多个Broker之间同步消息
Master Slave:实现高可用
允许当其中一台消息服务器宕机,客户端在传输层上重新连接到其他消息服务器
transportOptions参数说明
randomize默认true,表示在URI列表中选择URI连接时是否采用随机策略
initialReconnectDelay:默认为10,单位毫秒,表示第一次尝试重连之间等待的时间
maxReconnetDelay:默认30000,单位毫秒,最长重连的时间间隔
网络连接器主要配置ActiveMQ服务器与服务器之间的网路通讯方式,用于服务器传递消息
网络连接器分为静态连接器和动态连接器
静态连接器
动态连接器
Shared storage master/slave 共享存储
Replicated LevelDB Store 基于复制的LevelDB Store
7、实战
7.1构建集群环境
7.2创建一个共享目录用来做master、slave集群使用的共享存储文件夹
7.3配置节点
a节点:
(1) 先找到网络提供服务的地方,a节点默认使用61616端口,其他端口也不需要,所以可以直接注释掉
(2) 给a节点添加网络连接器的配置项
(3) 给a节点配置后端提供管理地址的jetty服务器端口,注意a节点使用的是默认配置,所以这里无须改动,直接退出
a节点配置完毕
b节点
(1) 注释网络服务
(2) 给b节点添加网络连接器的配置项
(3) 因为b节点和c节点需要配置成master和slave,我们采用的是共享文件夹的方式,所以需要配置节点存储方案,位置配置指定的自定义存储的目录
(4) 修改对外提供服务的端口号,改为61617
(5) 为b节点配置jetty提供的服务地址,也就是管理端口
c节点
(1) 因为b节点和c节点配置文件差不多,可以讲b配置文件复制给c
(2) 编辑c配置文件
(3) 为c节点配置管理端口
7.4一次启动三台ActiveMQ
测试(这里利用之前queue代码)
生产者
注意url需要配置状态转移failover,此作用是当前一个配置的节点挂失后直接转到第二个配置的节点
消费者
消费者需要配置三台域名
7.5浏览器输入域名访问管理平台
运行生成者,查看队列中的消息
注意此时b节点为slave,他不对外提供服务
http://192.168.174.133:8161/admin/
http://192.168.174.133:8163/admin
此时是在c节点生成的消息,挂失c节点,模拟c机器故障
运行消费者
访问域名b
http://192.168.174.133:8162/admin/
查看各个节点连接情况以及消费情况,注意防火墙需要开通8162和8163端口号
1、RabbitMQ安装(Linux)
1)安装GCC GCC-C++ Openssl等模块(安装过请忽略)
4.1) 下载rabbitmq-server的jar包
4.2) 下载好之后:
4.3) 随后移动至/usr/local/下 改名rabbitmq
4.4) 这种下载的方式解压后直接可以使用,无需再编译安装
4.4) 这种下载的方式解压后直接可以使用,无需再编译安装
4.5) 开启防火墙端口号15672
4.6) 添加用户
方式一:通过代码形式
方式二:通过管理平台的方式
2、RabbitMQ入门之Hello篇
3、work模式
将消息发给两个消费者(将消息发送至消息队列,由两个消费者接收),但是每次只能一个消费者接收到,一种集群模式
消费者1:
消费者2
启动顺序:先启动两个消费者,再启动生产者
1)、值得注意的是:发送者发送100条消息,两个消费者接受,虽然两个消费者都能接收到消息,但是接收的是不同内容,可能是消费者1接受的是偶数,消费2接收的是奇数等。这里模拟了两台机器处理事务效率不同(利用睡眠进行耗时操作),针对于耗时多的机器,说明其处理事务性能不强,可以利用channel.basicQos(1);设置给他分配少点的事务处理
2)、work模式用于集群环境下,如电商下订单处理,可以由多个服务器接受业务,只要能接收到处理事务就行
4、Publish_Subscribe发布订阅
生产者将消息不再直接发送到队列,而是发送到交换机,此时有不同的队列注册到交换机上,不同消费者注册到不同的队列上。此时生产者发送的消息,所有在交换机上注册的消费者都能收的到
第一次启动如果报交换机不存在,说明得先启动生产者注册交换机,再启动两个消费者,启动生成者发送消息应用:用于给多个机器发送消息,需要多方收到消息
5、路由模式
配置交换机通过路由模式指定发送消息到不同的队列
原理:发送者绑定端口号,如这里绑定Key1,会去匹配哪个消费者绑定了该键,如果绑定就可以收到消息,这里消费者1和2都绑定了,所以都可收的到
6、topic模式
Router模式弊端:生成者必须根据消费绑定的端口进行选择,不能凭空铸造,Topic模式满足匹配某类或者某几类操作,就如同通配符模式,“#”可以匹配一个或者多个词,“*”匹配不多不少一个词
生成者测试数据:key.1 或者key.1.2 或者 abc.1.2
7、rabbitmq与spring整合(自动确认消息接收)
8、spring与rabbitmq整合(手动确认消息接收)
9、关于消息持久化
当服务停掉时再次重启,消费者还是能接收到重启前的消息
业务需求:每次我们shopping时,都会或多或少涉及到优惠券的事。在接口调用订单系统创建订单后,需要锁定优惠券,此时订单系统需要调用优惠券接口,要保证优惠券和订单的数据都能对的上,一个订单只能使用一张优惠券,优惠券只能被一个订单使用
1、环境部署之数据库表
注意:订单和优惠券为不同的项目工程
订单表:
优惠券:
2、未引入分布式事务实战
当接口调用失败时,系统事务回滚,提示用户操作失败
代码如下
订单:
2.1)引入依赖
2**.**2) spring之dao层配置文件
2.3) spring之service层配置文件
2.4) 配置连接数据库配置文件
2.5) 配置log4j
2.6) 配置SpringMvc配置文件
2.7) 针对web.xml进行修改配置spring配置文件和springMV配置文件的加载
2.8) Controller层
2.9) 业务层
优惠券
优惠券环境和订单差不多
此时模拟分布式事务环境操纵优惠券时网络卡顿情况:打开优惠券controller层被注释的代码
因为在订单业务层定义了工具类(一下代码),其工具类中定义了请求调用优惠券时间不能超过2秒,而优惠券睡眠了3秒肯定会请求超时,此时应该会导致事务执行失败,事务回滚
运行:http://localhost:8080/Order/order/create.action?userId=jiaxianseng&orderContent=huagua&couponId=c00001
根据发送的请求会调用订单中的createOrder方法,在该方法中又会调用service业务层,进而通过RestTemplate接口远程调用优惠券方法。
运行条件前提:保证优惠券表中有c0000优惠券信息(且优惠券状态是null)
结果:订单虽然未创建,但是优惠券被锁了(被使用了)
分析原因:当接口调用失败时,订单系统事务回滚,提示用户操作失败
误区:接口出错的情况下,是能够实现的。但是在接口调用超时的情况下,会出现“优惠券系统处理成功,订单系统处理失败”的情况
3、分布式事务实战--基于RabbitMQ消息队列实现
3.1) 订单和优惠券引入关于RabbitMQ的依赖jar包
订单:
3.2) 定义支持分布式事务的订单service
3.3) 订单的spring配置文件之service层引入该类的注解
3.4) 在订单controller中,重新调用支持事务的service,并且运行代码,此时订单创建成功,但是优惠券没有使用,其操纵放在了消息队列中了,等待消费者消费
优惠券
3.5) 编写优惠券业务层
3.6) 编写优惠券的spring之service层引入该类配置文件
关键点1:如果发送消息至rabbitmq失败?怎么处理
rabbitmq提供消息确认机制confim(保证消息投递到消息队列),用于响应消息发送情况,但是其前提是将它持久化(持久化是防止中途宕加挂失导致消息随机器挂失而丢失,持久化是保存到消息队列中)
关键点2:Queue中的消息被正确的消费
ACK机制:消费者开启手动ack的模式
通知mq,消息正确消费
通知mq,消息处理异常,需要再次消费
通知mq,消息处理异常,丢弃掉
问题:怎么保证消费者消费了
详细代码如下:
订单业务层
订单controller层
优惠券业务层
运行:http://localhost:8080/Order/order/create.action?userId=jiaxianseng&orderContent=huagua&couponId=c00001
此时优惠券没有controller层,因为它要手动方式启动优惠券项目,项目启动后就会自动加载MQConsumer中init方法,从而消费消息队列中的消息,订单被创建,优惠券处于被锁状态
1、什么是CAP
CAP定理是指分布式WEB服务无法同时满足以下3个属性
数据一致性:如果系统对一个写操作返回成功,那么之后的读请求都必须读到这个新数据;如果返回失败,那么所有读操作都不能读到这个数据,对调用者而言数据具有强一致性
服务可用性:所有读写请求在一定时间内得到响应,可终止、不会一直等待
分区容错性:在网络分区的情况下,被分隔的及诶单仍然正常对外服务。
分析:一个使用了MQ的项目,如果连这个问题都没有考虑过,就把MQ引进去了,那就给自己的项目带来了风险。我们引入一个技术,要对这个技术的弊端有充分的认识,才能做好预防。要记住,不要给公司挖坑!回答:回答也很容易,从以下两个个角度来答:
系统可用性降低:你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低。
系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大
分析:这个问题其实换一种问法就是,如何保证消息队列的幂等性?这个问题可以认为是消息队列领域的基本问题。
回答:先来说一下为什么会造成重复消费?
其实无论是那种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念,简单说一下,就是每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。那造成重复消费的原因?,就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。
如何解决?这个问题针对业务场景来答分以下几点:
(1)比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
本文地址:http://www.tpjde.com/quote/1447.html 推平第 http://www.tpjde.com/ , 查看更多