MLXY 2020-05-25
一、什么是消息队列?
1、消息就是数据。
2、队列有队尾和队头,队列有入队和出队,队列先进先出。
3、生产者
存数据入口
4、消费者
取数据入口
二、推模型--发布订阅模型--阻塞
主动把消息推给订阅者。
数据实时要求高,用推。
三、拉模型--生产者消费者模型--非阻塞
消费者自己去拉取数据。
数据实时要求不高,用拉。
四、它有哪些优势?为什么使用它?
可以解决一些分布式场景,如:异步场景,应用解耦,流量削峰,今天讲讲解决异步场景。
五、同步场景
客户端发起一个请求,创建订单,创建完订单需要增加积分,然后发送短信,假设创建订单花费1s,增加积分花费1s,发送短信花费1s,实则花费了3s。
缺陷:耗时时间太长。
六、异步场景
如果在订单服务开启1个异步线程去处理发送短信服务,这样做会有下面的缺陷。
缺陷:1个客户端请求就是1个线程,在订单服务端开启的异步线程一多,就会导致订单服务端的线程数减少,间接会导致订单服务并发量降低。
七、消息队列下的异步场景
积分服务和短信服务订阅消息队列,消息队列推送消息到积分服务,短信服务,这样创建订完单响应给客户端只需要花费1s,但是又不会影响订单服务的并发量。
八、封装Redis消息队列,解决消息队列下的异步场景
1、封装Redis消息队列
namespace MyRedisUnitty { /// <summary> /// 封装Redis消息队列 /// </summary> public class RedisMsgQueueHelper : IDisposable { /// <summary> /// Redis客户端 /// </summary> public RedisClient redisClient { get; } public RedisMsgQueueHelper(string redisHost) { redisClient = new RedisClient(redisHost); } /// <summary> /// 入队 /// </summary> /// <param name="qKeys">入队key</param> /// <param name="qMsg">入队消息</param> /// <returns></returns> public long EnQueue(string qKey, string qMsg) { //1、编码字符串 byte[] bytes = System.Text.Encoding.UTF8.GetBytes(qMsg); //2、Redis消息队列入队 long count = redisClient.LPush(qKey, bytes); return count; } /// <summary> /// 出队(非阻塞) === 拉 /// </summary> /// <param name="qKey">出队key</param> /// <returns></returns> public string DeQueue(string qKey) { //1、redis消息出队 byte[] bytes = redisClient.RPop(qKey); string qMsg = null; //2、字节转string if (bytes == null) { Console.WriteLine($"{qKey}队列中数据为空"); } else { qMsg = System.Text.Encoding.UTF8.GetString(bytes); } return qMsg; } /// <summary> /// 出队(阻塞) === 推 /// </summary> /// <param name="qKey">出队key</param> /// <param name="timespan">阻塞超时时间</param> /// <returns></returns> public string DeQueueBlock(string qKey, TimeSpan? timespan) { // 1、Redis消息出队 string qMsg = redisClient.BlockingPopItemFromList(qKey, timespan); return qMsg; } /// <summary> /// 获取队列数量 /// </summary> /// <param name="qKey">队列key</param> /// <returns></returns> public long GetQueueCount(string qKey) { return redisClient.GetListCount(qKey); } /// <summary> /// 关闭Redis /// </summary> public void Dispose() { redisClient.Dispose(); } } }
2、订单服务发送积分消息和短信消息
namespace MyReidsMsgQueue.Async { class OrderService { public string CreateOrder() { //统计时间 Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); //1、订单号生成 string orderNo = GetOrderGenrator(); //1.1、模拟存储到数据库,需要花费1s时间 Thread.Sleep(1000); Console.WriteLine($"订单:{orderNo}保存成功"); //2、添加积分 //Console.WriteLine($"*******************开始调用积分服务*******************"); //PointsService pointsService = new PointsService(); //pointsService.AddPoints(orderNo); //Console.WriteLine($"*******************积分服务调用完成*******************"); ////3、发送短信 //Console.WriteLine($"*******************开始调用短信服务*******************"); //SmsService smsService = new SmsService(); //smsService.SendSms(orderNo); //Console.WriteLine($"*******************短信服务调用完成*******************"); //Redis优化 using (var msgQueue = new RedisMsgQueueHelper("localhost:6379")) { // 1、发送积分消息 msgQueue.EnQueue("My_Points", orderNo); // 2、发送短信消息 msgQueue.EnQueue("My_Sms", orderNo); } stopwatch.Stop(); Console.WriteLine($"订单完成耗时:{stopwatch.ElapsedMilliseconds}ms"); return orderNo; } /// <summary> /// 订单号生成器 /// </summary> /// <returns></returns> private string GetOrderGenrator() { Random ran = new Random(); return "O-" + DateTime.Now.ToString("yyyyMMddHHmmssfff") + ran.Next(1000, 9999).ToString(); } } }
3、消费积分消息
namespace MyRedisPoints { class Program { static void Main(string[] args) { //Redis优化 using (var msgQueue = new RedisMsgQueueHelper("localhost:6379")) { Console.WriteLine("积分消息......"); //1、获取积分消息--反复消费 while (true) { string msgPoints = msgQueue.DeQueueBlock("My_Points", TimeSpan.FromSeconds(60)); if (msgPoints != null) { //2、添加积分 PointsService pointsService = new PointsService(); pointsService.AddPoints(msgPoints); } } } } } }
amespace MyRedisPoints.Async { public class PointsService { public void AddPoints(string orderNo) { //1、模拟积分添加到数据库,需要花费1s时间 Thread.Sleep(1000); Console.WriteLine($"增加积分:orderNo:{orderNo}成功"); } } }
4、消费短信消息
namespace MyRedisSms { class Program { static void Main(string[] args) { //Redis优化 using (var msgQueue = new RedisMsgQueueHelper("localhost:6379")) { Console.WriteLine("短信消息......"); //1、获取短信消息--反复消费 while (true) { string msgSms = msgQueue.DeQueueBlock("My_Sms", TimeSpan.FromSeconds(60)); if (msgSms != null) { //2、发送短信 SmsService smsService = new SmsService(); smsService.SendSms(msgSms); } } } } } }
namespace MyRedisSms.Async { public class SmsService { public void SendSms(string orderNo) { //1、模拟调用第三方短信接口发送短信,需要花费1s时间 Thread.Sleep(1000); Console.WriteLine($"发送短信:orderNo:{orderNo}成功"); } } }
十、客户端调用订单服务
namespace MyReidsMsgQueue { class Program { static void Main(string[] args) { #region 异步处理 { OrderService orderService = new OrderService(); orderService.CreateOrder(); } #endregion Console.ReadKey(); } } }
十一、运行效果
十二、项目结构
MyReidsMsgQueue里的PointsService.cs和SmsService.cs是为了演示同步场景订单服务消耗的时间
思考:从队列中取出积分消息,去添加积分失败了怎么办?也就是消费消息失败了怎么办?