博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)
阅读量:5899 次
发布时间:2019-06-19

本文共 15828 字,大约阅读时间需要 52 分钟。

一、本文产生原由:  

 之前文章《》已对RabbitMQ的安装、用法都做了详细说明,而本文主要是针对在高并发且单次从RabbitMQ中消费消息时,出现了连接数不足、连接响应较慢、RabbitMQ服务器崩溃等各种性能问题的解方案,之所以会出现我列举的这些问题,究基根源,其实是TCP连接创建与断开太过频繁所致,这与我们使用ADO.NET来访问常规的关系型DB(如:SQL SERVER、MYSQL)有所不同,在访问DB时,我们一般都建议大家使用using包裹,目的是每次创建完DB连接,使用完成后自动释放连接,避免不必要的连接数及资源占用。可能有人会问,为何访问DB,可以每次创建再断开连接,都没有问题,而同样访问MQ(本文所指的MQ均是RabbitMQ),每次创建再断开连接,如果在高并发且创建与断开频率高的时候,会出现性能问题呢?其实如果了解了DB的连接创建与断开以及MQ的连接创建与断开原理就知道其中的区别了。这里我简要说明一下,DB连接与MQ连接 其实底层都是基于TCP连接,创建TCP连接肯定是有资源消耗的,是非常昂贵的,原则上尽可能少的去创建与断开TCP连接,DB创建连接、MQ创建连接可以说是一样的,但在断开销毁连接上就有很大的不同,DB创建连接再断开时,默认情况下是把该连接回收到连接池中,下次如果再有DB连接创建请求,则先判断DB连接池中是否有空闲的连接,若有则直接复用,若没有才创建连接,这样就达到了TCP连接的复用,而MQ创建连接都是新创建的TCP连接,断开时则直接断开TCP连接,简单粗暴,看似资源清理更彻底,但若在高并发高频率每次都重新创建与断开MQ连接,则性能只会越来越差(上面说过TCP连接是非常昂贵的),我在公司项目中就出现了该问题,后面在技术总监的指导下,对MQ的连接创建与断开作了优化,实现了类似DB连接池的概念。

连接池,故名思义,连接的池子,所有的连接作为一种资源集中存放在池中,需要使用时就可以到池中获取空闲连接资源,用完后再放回池中,以此达到连接资源的有效重用,同时也控制了资源的过度消耗与浪费(资源多少取决于池子的容量)

二、源代码奉献(可直接复制应用到大家的项目中) 

下面就先贴出实现MQHelper(含连接池)的源代码:

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;using RabbitMQ.Util;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Web.Caching;using System.Web;using System.Configuration;using System.IO;using System.Collections.Concurrent;using System.Threading;using System.Runtime.CompilerServices;namespace Zuowj.Core{    public class MQHelper    {        private const string CacheKey_MQConnectionSetting = "MQConnectionSetting";        private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount";        private readonly static ConcurrentQueue
FreeConnectionQueue;//空闲连接对象队列 private readonly static ConcurrentDictionary
BusyConnectionDic;//使用中(忙)连接对象集合 private readonly static ConcurrentDictionary
MQConnectionPoolUsingDicNew;//连接池使用率 private readonly static Semaphore MQConnectionPoolSemaphore; private readonly static object freeConnLock = new object(), addConnLock = new object(); private static int connCount = 0; public const int DefaultMaxConnectionCount = 30;//默认最大保持可用连接数 public const int DefaultMaxConnectionUsingCount = 10000;//默认最大连接可访问次数 private static int MaxConnectionCount { get { if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null) { return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]); } else { int mqMaxConnectionCount = 0; string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount]; if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0) { mqMaxConnectionCount = DefaultMaxConnectionCount; } string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config"); HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath)); return mqMaxConnectionCount; } } } ///
/// 建立连接 /// ///
服务器地址 ///
登录账号 ///
登录密码 ///
private static ConnectionFactory CrateFactory() { var mqConnectionSetting = GetMQConnectionSetting(); var connectionfactory = new ConnectionFactory(); connectionfactory.HostName = mqConnectionSetting[0]; connectionfactory.UserName = mqConnectionSetting[1]; connectionfactory.Password = mqConnectionSetting[2]; if (mqConnectionSetting.Length > 3) //增加端口号 { connectionfactory.Port = Convert.ToInt32(mqConnectionSetting[3]); } return connectionfactory; } private static string[] GetMQConnectionSetting() { string[] mqConnectionSetting = null; if (HttpRuntime.Cache[CacheKey_MQConnectionSetting] == null) { //MQConnectionSetting=Host IP|;userid;|;password string mqConnSettingStr = ConfigurationManager.AppSettings[CacheKey_MQConnectionSetting]; if (!string.IsNullOrWhiteSpace(mqConnSettingStr)) { mqConnSettingStr = EncryptUtility.Decrypt(mqConnSettingStr);//解密MQ连接字符串,若项目中无此需求可移除,EncryptUtility是一个AES的加解密工具类,大家网上可自行查找 if (mqConnSettingStr.Contains(";|;")) { mqConnectionSetting = mqConnSettingStr.Split(new[] { ";|;" }, StringSplitOptions.RemoveEmptyEntries); } } if (mqConnectionSetting == null || mqConnectionSetting.Length < 3) { throw new Exception("MQConnectionSetting未配置或配置不正确"); } string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config"); HttpRuntime.Cache.Insert(CacheKey_MQConnectionSetting, mqConnectionSetting, new CacheDependency(appConfigPath)); } else { mqConnectionSetting = HttpRuntime.Cache[CacheKey_MQConnectionSetting] as string[]; } return mqConnectionSetting; } public static IConnection CreateMQConnection() { var factory = CrateFactory(); factory.AutomaticRecoveryEnabled = true;//自动重连 var connection = factory.CreateConnection(); connection.AutoClose = false; return connection; } static MQHelper() { FreeConnectionQueue = new ConcurrentQueue
(); BusyConnectionDic = new ConcurrentDictionary
(); MQConnectionPoolUsingDicNew = new ConcurrentDictionary
();//连接池使用率 MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore");//信号量,控制同时并发可用线程数 } public static IConnection CreateMQConnectionInPoolNew() { SelectMQConnectionLine: MQConnectionPoolSemaphore.WaitOne();//当
DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //如果取到空闲连接,判断是否使用次数是否超过最大限制,超过则释放连接并重新创建 { mqConnection.Close(); mqConnection.Dispose(); // BaseUtil.Logger.DebugFormat("close > DefaultMaxConnectionUsingCount mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count); mqConnection = CreateMQConnection(); MQConnectionPoolUsingDicNew[mqConnection] = 0; // BaseUtil.Logger.DebugFormat("create new mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count); } BusyConnectionDic[mqConnection] = true;//加入到忙连接集合中 MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//使用次数加1 // BaseUtil.Logger.DebugFormat("set BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); return mqConnection; } private static void ResetMQConnectionToFree(IConnection connection) { lock (freeConnLock) { bool result = false; if (BusyConnectionDic.TryRemove(connection, out result)) //从忙队列中取出 { // BaseUtil.Logger.DebugFormat("set FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); } else { // BaseUtil.Logger.DebugFormat("failed TryRemove BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); } if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//如果因为高并发出现极少概率的>MaxConnectionCount,则直接释放该连接 { connection.Close(); connection.Dispose(); } else { FreeConnectionQueue.Enqueue(connection);//加入到空闲队列,以便持续提供连接服务 } MQConnectionPoolSemaphore.Release();//释放一个空闲连接信号 //Interlocked.Decrement(ref connCount); //BaseUtil.Logger.DebugFormat("Enqueue FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2},thread count:{3}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count,connCount); } } ///
/// 发送消息 /// ///
消息队列连接对象 ///
消息类型
///
队列名称 ///
是否持久化 ///
消息 ///
public static string SendMsg(IConnection connection, string queueName, string msg, bool durable = true) { try { using (var channel = connection.CreateModel())//建立通讯信道 { // 参数从前面开始分别意思为:队列名称,是否持久化,独占的队列,不使用时是否自动删除,其他参数 channel.QueueDeclare(queueName, durable, false, false, null); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;//1表示不持久,2.表示持久化 if (!durable) properties = null; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish("", queueName, properties, body); } return string.Empty; } catch (Exception ex) { return ex.ToString(); } finally { ResetMQConnectionToFree(connection); } } ///
/// 消费消息 /// ///
消息队列连接对象 ///
队列名称 ///
是否持久化 ///
消息处理函数 ///
保存日志方法,可选 public static void ConsumeMsg(IConnection connection, string queueName, bool durable, Func
dealMessage, Action
saveLog = null) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queueName, durable, false, false, null); //获取队列 channel.BasicQos(0, 1, false); //分发机制为触发式 var consumer = new QueueingBasicConsumer(channel); //建立消费者 // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者 channel.BasicConsume(queueName, false, consumer); while (true) //如果队列中有消息 { ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息 string message = null; try { var body = ea.Body; message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { if (saveLog != null) { saveLog(message, ex); } } if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //消息从队列中删除 } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //消息重回队列 } else { channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丢弃 } } } } catch (Exception ex) { if (saveLog != null) { saveLog("QueueName:" + queueName, ex); } throw ex; } finally { ResetMQConnectionToFree(connection); } } ///
/// 依次获取单个消息 /// ///
消息队列连接对象 ///
队列名称 ///
持久化 ///
处理消息委托 public static void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, Func
dealMessage) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, durable, false, false, null); //获取队列 channel.BasicQos(0, 1, false); //分发机制为触发式 uint msgCount = channel.MessageCount(QueueName); if (msgCount > 0) { var consumer = new QueueingBasicConsumer(channel); //建立消费者 // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者 channel.BasicConsume(QueueName, false, consumer); ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息 try { var body = ea.Body; var message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { throw ex; } finally { if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //消息从队列中删除 } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //消息重回队列 } else { channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丢弃 } } } else { dealMessage(string.Empty); } } } catch (Exception ex) { throw ex; } finally { ResetMQConnectionToFree(connection); } } ///
/// 获取队列消息数 /// ///
///
///
public static int GetMessageCount(IConnection connection, string QueueName) { int msgCount = 0; try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, true, false, false, null); //获取队列 msgCount = (int)channel.MessageCount(QueueName); } } catch (Exception ex) { throw ex; } finally { ResetMQConnectionToFree(connection); } return msgCount; } } public enum ConsumeAction { ACCEPT, // 消费成功 RETRY, // 消费失败,可以放回队列重新消费 REJECT, // 消费失败,直接丢弃 }}

现在对上述代码的核心点作一个简要的说明:

先说一下静态构造函数:

FreeConnectionQueue 用于存放空闲连接对象队列,为何使用Queue,因为当我从中取出1个空闲连接后,空闲连接数就应该少1个,这个Queue很好满足这个需求,而且这个Queue是并发安全的Queue哦(ConcurrentQueue)

BusyConnectionDic 忙(使用中)连接对象集合,为何这里使用字典对象呢,因为当我用完后,需要能够快速的找出使用中的连接对象,并能快速移出,同时重新放入到空闲队列FreeConnectionQueue ,达到连接复用

MQConnectionPoolUsingDicNew 连接使用次数记录集合,这个只是辅助记录连接使用次数,以便可以计算一个连接的已使用次数,当达到最大使用次数时,则应断开重新创建

MQConnectionPoolSemaphore 这个是信号量,这是控制并发连接的重要手段,连接池的容量等同于这个信号量的最大可并行数,保证同时使用的连接数不超过连接池的容量,若超过则会等待;

具体步骤说明:

1.MaxConnectionCount:最大保持可用连接数(可以理解为连接池的容量),可以通过CONFIG配置,默认为30; 

2.DefaultMaxConnectionUsingCount:默认最大连接可访问次数,我这里没有使用配置,而是直接使用常量固定为1000,大家若有需要可以改成从CONFIG配置,参考MaxConnectionCount的属性设置(采取了依赖缓存)

3.CreateMQConnectionInPoolNew:从连接池中创建MQ连接对象,这个是核心方法,是实现连接池的地方,代码中已注释了重要的步骤逻辑,这里说一下实现思路:

  3.1 通过MQConnectionPoolSemaphore.WaitOne() 利用信号量的并行等待方法,如果当前并发超过信号量的最大并行度(也就是作为连接池的最大容量),则需要等待空闲连接池,防止连接数超过池的容量,如果并发没有超过池的容量,则可以进入获取连接的逻辑;

  3.2FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount,如果空闲连接队列+忙连接集合的总数小于连接池的容量,则可以直接创建新的MQ连接,否则FreeConnectionQueue.TryDequeue(out mqConnection) 尝试从空闲连接队列中获取一个可用的空闲连接使用,若空闲连接都没有,则需要返回到方法首行,重新等待空闲连接;

  3.3MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen 如果取到空闲连接,则先判断使用次数是否超过最大限制,超过则释放连接或空闲连接已断开连接也需要重新创建,否则该连接可用;

  3.4BusyConnectionDic[mqConnection] = true;加入到忙连接集合中,MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1; 使用次数加1,确保每使用一次连接,连接次数能记录

4.ResetMQConnectionToFree:重置释放连接对象,这个是保证MQ连接用完后能够回收到空闲连接队列中(即:回到连接池中),而不是直接断开连接,这个方法很简单就不作作过多说明。

 好了,都说明了如何实现含连接池的MQHelper,现在再来举几个例子来说明如何用:

三、实际应用(简单易上手)

获取并消费一个消息:

public string GetMessage(string queueName)        {            string message = null;            try            {                var connection = MQHelper.CreateMQConnectionInPoolNew();                MQHelper.ConsumeMsgSingle(connection, queueName, true, (msg) =>                {                    message = msg;                    return ConsumeAction.ACCEPT;                });            }            catch (Exception ex)            {                BaseUtil.Logger.Error(string.Format("MQHelper.ConsumeMsgSingle Error:{0}", ex.Message), ex);                message = "ERROR:" + ex.Message;            }            //BaseUtil.Logger.InfoFormat("第{0}次请求,从消息队列(队列名称:{1})中获取消息值为:{2}", Interlocked.Increment(ref requestCount), queueName, message);            return message;        }

 发送一个消息:

public string SendMessage(string queueName, string msg)        {            string result = null;            try            {                var connection = MQHelper.CreateMQConnectionInPoolNew();                result = MQHelper.SendMsg(connection, queueName, msg);            }            catch (Exception ex)            {                BaseUtil.Logger.Error(string.Format("MQHelper.SendMessage Error:{0}", ex.Message), ex);                result = ex.Message;            }            return result;        }

 获取消息队列消息数:

public int GetMessageCount(string queueName)        {            int result = -1;            try            {                var connection = MQHelper.CreateMQConnectionInPoolNew();                result = MQHelper.GetMessageCount(connection, queueName);            }            catch (Exception ex)            {                BaseUtil.Logger.Error(string.Format("MQHelper.GetMessageCount Error:{0}", ex.Message), ex);                result = -1;            }            return result;        }

 这里说一下:BaseUtil.Logger 是Log4Net的实例对象,另外上面没有针对持续订阅消费消息(ConsumeMsg)作说明,因为这个其实可以不用连接池也不会有问题,因为它是一个持久订阅并持久消费的过程,不会出现频繁创建连接对象的情况。

 最后要说的是,虽说代码贴出来,大家一看就觉得很简单,好像没有什么技术含量,但如果没有完整的思路也还是需要花费一些时间和精力的,代码中核心是如何简单高效的解决并发及连接复用的的问题,该MQHelper有经过压力测试并顺利在我司项目中使用,完美解决了之前的问题,由于这个方案是我在公司通宵实现的,可能有一些方面的不足,大家可以相互交流或完善后入到自己的项目中。

 

转载地址:http://nohsx.baihongyu.com/

你可能感兴趣的文章
Chapter 3:Code Style in Django
查看>>
FIDO联盟拥抱政府监管,全面打造安全可信网络
查看>>
BlackNurse新型DoS攻击 15M流量就可以打瘫思科防火墙 思科做出了回应
查看>>
Interop 2015:思科为其SDN架构做好安全防护
查看>>
第19届亚太反病毒研究者联盟(AVAR)国际大会开幕在即
查看>>
挖掘数据金矿 领军协同创新 曙光荣膺“2016大数据创新应用领袖企业”称号
查看>>
Fast通道获得Win10 Mobile Build 14977更新
查看>>
如何处理IT事件管理以避免混乱
查看>>
《BackTrack 5 Cookbook中文版——渗透测试实用技巧荟萃》—第3章3.6节识别操作系统...
查看>>
linux系统防火墙iptables命令规则及配置的示例
查看>>
10 个顶尖的 Linux 开源人工智能工具
查看>>
Firefox 跟踪保护技术将页面加载时间减少 44%
查看>>
聚合(根)、实体、值对象精炼思考总结
查看>>
java解析虾米音乐
查看>>
rails将类常量重构到数据库对应的表中之三
查看>>
error C4430: missing type specifier - int assumed. Note: C++ does not support default-int
查看>>
mysql 多行合并函数
查看>>
【案例】RAID卡写策略改变引发的问题
查看>>
[Django学习]如何得到一个App
查看>>
第四十八讲:tapestry 与 淘宝kissy editor编辑器带图片上传
查看>>