实现一个轻量级高可复用的RabbitMQ客户端

前言

    本示例通过对服务订阅的封装、隐藏细节实现、统一配置、自动重连、异常处理等各个方面来打造一个简单易用的 RabbitMQ 工厂;本文适合适合有一定 RabbitMQ 使用经验的读者阅读,如果你还没有实际使用过 RabbitMQ,也没有关系,因为本文的代码都是基于直接运行的实例,通过简单的修改 RabbitMQ 即可运行。

  • 解决方案如下

创建基础连接管理帮助类

    首先,创建一个 .netcore 控制台项目,创建 Helper、Service、Utils 文件夹,分别用于存放通道管理、服务订阅、公共组件。

  • 接下来创建一个 MQConfig 类,用于存放 RabbitMQ 主机配置等信息
    public class MQConfig
    {
        /// <summary>
        ///  访问消息队列的用户名
        /// </summary>
        public string UserName { get; set; }
        /// <summary>
        ///  访问消息队列的密码
        /// </summary>
        public string Password { get; set; }
        /// <summary>
        ///  消息队列的主机地址
        /// </summary>
        public string HostName { get; set; }
        /// <summary>
        ///  消息队列的主机开放的端口
        /// </summary>
        public int Port { get; set; }
    }
  • 创建 RabbitMQ 连接管理类,用于创建连接,关闭连接

  • 创建一个消息体对象 MessageBody,用于解析和传递消息到业务系统中,在接下来的 MQChannel 类中会用到

  public class MessageBody
    {
        public EventingBasicConsumer Consumer { get; set; }
        public BasicDeliverEventArgs BasicDeliver { get; set; }
        /// <summary>
        ///  0成功
        /// </summary>
        public int Code { get; set; }
        public string Content { get; set; }
        public string ErrorMessage { get; set; }
        public bool Error { get; set; }
        public Exception Exception { get; set; }
    }
  • 创建一个通道类,用于订阅、发布消息,同时提供一个关闭通道连接的方法 Stop
 public class MQChannel
    {
        public string ExchangeTypeName { get; set; }
        public string ExchangeName { get; set; }
        public string QueueName { get; set; }
        public string RoutekeyName { get; set; }
        public IConnection Connection { get; set; }
        public EventingBasicConsumer Consumer { get; set; }

        /// <summary>
        ///  外部订阅消费者通知委托
        /// </summary>
        public Action<MessageBody> OnReceivedCallback { get; set; }

        public MQChannel(string exchangeType, string exchange, string queue, string routekey)
        {
            this.ExchangeTypeName = exchangeType;
            this.ExchangeName = exchange;
            this.QueueName = queue;
            this.RoutekeyName = routekey;
        }

        /// <summary>
        ///  向当前队列发送消息
        /// </summary>
        /// <param name="content"></param>
        public void Publish(string content)
        {
            byte[] body = MQConnection.UTF8.GetBytes(content);
            IBasicProperties prop = new BasicProperties();
            prop.DeliveryMode = 1;
            Consumer.Model.BasicPublish(this.ExchangeName, this.RoutekeyName, false, prop, body);
        }

        internal void Receive(object sender, BasicDeliverEventArgs e)
        {
            MessageBody body = new MessageBody();
            try
            {
                string content = MQConnection.UTF8.GetString(e.Body);
                body.Content = content;
                body.Consumer = (EventingBasicConsumer)sender;
                body.BasicDeliver = e;
            }
            catch (Exception ex)
            {
                body.ErrorMessage = $"订阅-出错{ex.Message}";
                body.Exception = ex;
                body.Error = true;
                body.Code = 500;
            }
            OnReceivedCallback?.Invoke(body);
        }

        /// <summary>
        ///  设置消息处理完成标志
        /// </summary>
        /// <param name="consumer"></param>
        /// <param name="deliveryTag"></param>
        /// <param name="multiple"></param>
        public void SetBasicAck(EventingBasicConsumer consumer, ulong deliveryTag, bool multiple)
        {
            consumer.Model.BasicAck(deliveryTag, multiple);
        }

        /// <summary>
        ///  关闭消息队列的连接
        /// </summary>
        public void Stop()
        {
            if (this.Connection != null && this.Connection.IsOpen)
            {
                this.Connection.Close();
                this.Connection.Dispose();
            }
        }
    }
  • 在上面的 MQChannel 类中,首先是在构造函数内对当前通道的属性进行设置,其次提供了 Publish 和 OnReceivedCallback 的委托,
    当通道接收到消息的时候,会进入方法 Receive 中,在 Receive 中,经过封装成 MessageBody 对象,并调用委托 OnReceivedCallback ,将,解析好的
    消息传递到外边订阅者的业务中。最终在 MQChannel 中还提供了消息确认的操作方法 SetBasicAck,供业务系统手动调用。

  • 接着再创建一个 RabbitMQ 通道管理类,用于创建通道,代码非常简单,只有一个公共方法 CreateReceiveChannel,传入相关参数,创建一个 MQChannel 对象

    public class MQChannelManager
    {
        public MQConnection MQConn { get; set; }

        public MQChannelManager(MQConnection conn)
        {
            this.MQConn = conn;
        }

        /// <summary>
        ///  创建消息通道
        /// </summary>
        /// <param name="cfg"></param>
        public MQChannel CreateReceiveChannel(string exchangeType, string exchange, string queue, string routekey)
        {
            IModel model = this.CreateModel(exchangeType, exchange, queue, routekey);
            model.BasicQos(0, 1, false);
            EventingBasicConsumer consumer = this.CreateConsumer(model, queue);
            MQChannel channel = new MQChannel(exchangeType, exchange, queue, routekey)
            {
                Connection = this.MQConn.Connection,
                Consumer = consumer
            };
            consumer.Received += channel.Receive;
            return channel;
        }

        /// <summary>
        ///  创建一个通道,包含交换机/队列/路由,并建立绑定关系
        /// </summary>
        /// <param name="type">交换机类型</param>
        /// <param name="exchange">交换机名称</param>
        /// <param name="queue">队列名称</param>
        /// <param name="routeKey">路由名称</param>
        /// <returns></returns>
        private IModel CreateModel(string type, string exchange, string queue, string routeKey, IDictionary<string, object> arguments = null)
        {
            type = string.IsNullOrEmpty(type) ? "default" : type;
            IModel model = this.MQConn.Connection.CreateModel();
            model.BasicQos(0, 1, false);
            model.QueueDeclare(queue, true, false, false, arguments);
            model.QueueBind(queue, exchange, routeKey);
            return model;
        }

        /// <summary>
        ///  接收消息到队列中
        /// </summary>
        /// <param name="model">消息通道</param>
        /// <param name="queue">队列名称</param>
        /// <param name="callback">订阅消息的回调事件</param>
        /// <returns></returns>
        private EventingBasicConsumer CreateConsumer(IModel model, string queue)
        {
            EventingBasicConsumer consumer = new EventingBasicConsumer(model);
            model.BasicConsume(queue, false, consumer);

            return consumer;
        }
    }
  • 通道管理类的构造方法
 public MQChannelManager(MQConnection conn)
        {
            this.MQConn = conn;
        }
  • 需要传入一个 MQConnection 对象,仅是一个简单的连接类,代码如下
    public class MQConnection
    {
        private string vhost = string.Empty;
        private IConnection connection = null;
        private MQConfig config = null;

        /// <summary>
        ///  构造无 utf8 标记的编码转换器
        /// </summary>
        public static UTF8Encoding UTF8 { get; set; } = new UTF8Encoding(false);

        public MQConnection(MQConfig config, string vhost)
        {
            this.config = config;
            this.vhost = vhost;
        }

        public IConnection Connection
        {
            get
            {
                if (connection == null)
                {
                    ConnectionFactory factory = new ConnectionFactory
                    {
                        AutomaticRecoveryEnabled = true,
                        UserName = this.config.UserName,
                        Password = this.config.Password,
                        HostName = this.config.HostName,
                        VirtualHost = this.vhost,
                        Port = this.config.Port
                    };
                    connection = factory.CreateConnection();
                }

                return connection;
            }
        }
    }
  • 在上面的代码中,还初始化了一个静态对象 UTF8Encoding ,使用无 utf8 标记的编码转换器来解析消息

定义和实现服务契约

    设想一下,有这样的一个业务场景,通道管理和服务管理都是相同的操作,如果这些基础操作都在一个
地方定义,且有一个默认的实现,那么后来者就不需要去关注这些技术细节,直接继承基础类后,传入相应的消息配置即可完成
消息订阅和发布操作。

  • 有了想法,接下来就先定义契约接口 IService,此接口包含创建通道、开启/停止订阅,一个服务可能承载多个通道,所以还需要包含通道列表
public interface IService
    {
        /// <summary>
        ///  创建通道
        /// </summary>
        /// <param name="queue">队列名称</param>
        /// <param name="routeKey">路由名称</param>
        /// <param name="exchangeType">交换机类型</param>
        /// <returns></returns>
        MQChannel CreateChannel(string queue, string routeKey, string exchangeType);

        /// <summary>
        ///  开启订阅
        /// </summary>
        void Start();

        /// <summary>
        ///  停止订阅
        /// </summary>
        void Stop();

        /// <summary>
        ///  通道列表
        /// </summary>
        List<MQChannel> Channels { get; set; }

        /// <summary>
        ///  消息队列中定义的虚拟机
        /// </summary>
        string vHost { get; }

        /// <summary>
        ///  消息队列中定义的交换机
        /// </summary>
        string Exchange { get; }
    }
  • 接下来创建一个抽象类来实现该接口,将实现细节进行封装,方便后面的业务服务继承调用
  public abstract class MQServiceBase : IService
    {
        internal bool started = false;
        internal MQServiceBase(MQConfig config)
        {
            this.Config = config;
        }

        public MQChannel CreateChannel(string queue, string routeKey, string exchangeType)
        {
            MQConnection conn = new MQConnection(this.Config, this.vHost);
            MQChannelManager cm = new MQChannelManager(conn);
            MQChannel channel = cm.CreateReceiveChannel(exchangeType, this.Exchange, queue, routeKey);
            return channel;
        }

        /// <summary>
        ///  启动订阅
        /// </summary>
        public void Start()
        {
            if (started)
            {
                return;
            }

            MQConnection conn = new MQConnection(this.Config, this.vHost);
            MQChannelManager manager = new MQChannelManager(conn);
            foreach (var item in this.Queues)
            {
                MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, this.Exchange, item.Queue, item.RouterKey);
                channel.OnReceivedCallback = item.OnReceived;
                this.Channels.Add(channel);
            }
            started = true;
        }

        /// <summary>
        ///  停止订阅
        /// </summary>
        public void Stop()
        {
            foreach (var c in this.Channels)
            {
                c.Stop();
            }
            this.Channels.Clear();
            started = false;
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <param name="message"></param>
        public abstract void OnReceived(MessageBody message);

        public List<MQChannel> Channels { get; set; } = new List<MQChannel>();

        /// <summary>
        ///  消息队列配置
        /// </summary>
        public MQConfig Config { get; set; }

        /// <summary>
        ///  消息队列中定义的虚拟机
        /// </summary>
        public abstract string vHost { get; }

        /// <summary>
        ///  消息队列中定义的交换机
        /// </summary>
        public abstract string Exchange { get; }

        /// <summary>
        ///  定义的队列列表
        /// </summary>
        public List<QueueInfo> Queues { get; } = new List<QueueInfo>();
    }

    上面的抽象类,原封不动的实现接口契约,代码非常简单,在 Start 方法中,创建通道和启动消息订阅;同时,将
通道加入属性 Channels 中,方便后面的自检服务使用;在 Start 方法中

 /// <summary>
        ///  启动订阅
        /// </summary>
        public void Start()
        {
            if (started)
            {
                return;
            }

            MQConnection conn = new MQConnection(this.Config, this.vHost);
            MQChannelManager manager = new MQChannelManager(conn);
            foreach (var item in this.Queues)
            {
                MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, this.Exchange, item.Queue, item.RouterKey);
                channel.OnReceivedCallback = item.OnReceived;
                this.Channels.Add(channel);
            }
            started = true;
        }

    使用 MQChannelManager 创建了一个通道,并将通道的回调委托 OnReceivedCallback 设置为 item.OnReceived 方法,该方法
将有子类实现;在将当前订阅服务通道创建完成后,标记服务状态 started 为 true,防止重复启动;
同时,在该抽象类中,不实现契约的 OnReceived(MessageBody message);强制基础业务服务类去自我实现,因为各种业务的特殊性,这块
对消息的处理不能再基础服务中完成

  • 接下来要介绍的是服务监控管理类,该类内部定义一个简单的定时器功能,不间断的对 RabbitMQ 的通讯进行侦听,一旦发现
    有断开的连接,就自动创建一个新的通道,并移除旧的通道;同时,提供 Start/Stop 两个方法,以供程序 启动/停止 的时候对
    RabbitMQ 的连接和通道进行清理;代码如下
public class MQServcieManager
    {
        public int Timer_tick { get; set; } = 10 * 1000;
        private Timer timer = null;

        public Action<MessageLevel, string, Exception> OnAction = null;
        public MQServcieManager()
        {
            timer = new Timer(OnInterval, "", Timer_tick, Timer_tick);
        }


        /// <summary>
        ///  自检,配合 RabbitMQ 内部自动重连机制
        /// </summary>
        /// <param name="sender"></param>
        private void OnInterval(object sender)
        {
            int error = 0, reconnect = 0;
            OnAction?.Invoke(MessageLevel.Information, $"{DateTime.Now} 正在执行自检", null);
            foreach (var item in this.Services)
            {
                for (int i = 0; i < item.Channels.Count; i++)
                {
                    var c = item.Channels[i];
                    if (c.Connection == null || !c.Connection.IsOpen)
                    {
                        error++;
                        OnAction?.Invoke(MessageLevel.Information, $"{c.ExchangeName} {c.QueueName} {c.RoutekeyName} 重新创建订阅", null);
                        try
                        {
                            c.Stop();
                            var channel = item.CreateChannel(c.QueueName, c.RoutekeyName, c.ExchangeTypeName);
                            item.Channels.Remove(c);
                            item.Channels.Add(channel);

                            OnAction?.Invoke(MessageLevel.Information, $"{c.ExchangeName} {c.QueueName} {c.RoutekeyName} 重新创建完成", null);
                            reconnect++;
                        }
                        catch (Exception ex)
                        {
                            OnAction?.Invoke(MessageLevel.Information, ex.Message, ex);
                        }
                    }
                }
            }
            OnAction?.Invoke(MessageLevel.Information, $"{DateTime.Now} 自检完成,错误数:{error},重连成功数:{reconnect}", null);
        }

        public void Start()
        {
            foreach (var item in this.Services)
            {
                try
                {
                    item.Start();
                }
                catch (Exception e)
                {
                    OnAction?.Invoke(MessageLevel.Error, $"启动服务出错 | {e.Message}", e);
                }
            }
        }

        public void Stop()
        {
            try
            {
                foreach (var item in this.Services)
                {
                    item.Stop();
                }
                Services.Clear();
                timer.Dispose();
            }
            catch (Exception e)
            {
                OnAction?.Invoke(MessageLevel.Error, $"停止服务出错 | {e.Message}", e);
            }
        }

        public void AddService(IService service)
        {
            Services.Add(service);
        }
        public List<IService> Services { get; set; } = new List<IService>();
    }
  • 代码比较简单,就不在一一介绍,为了将异常等内部信息传递到外边,方便使用第三方组件进行日志记录等需求,MQServcieManager 还
    使用了 MessageLevel 这个定义,方便业务根据不同的消息级别对消息进行处理
    public enum MessageLevel
    {
        Trace = 0,
        Debug = 1,
        Information = 2,
        Warning = 3,
        Error = 4,
        Critical = 5,
        None = 6
    }

开始使用

    终于来到了这一步,我们将要开始使用这个基础服务;首先,创建一个 DemoService 继承自 MQServiceBase ;
同时,实现 MQServiceBase 的抽象方法 OnReceived(MessageBody message)

 public class DemoService : MQServiceBase
    {
        public Action<MessageLevel, string, Exception> OnAction = null;
        public DemoService(MQConfig config) : base(config)
        {
            base.Queues.Add(new QueueInfo()
            {
                ExchangeType = ExchangeType.Direct,
                Queue = "login-message",
                RouterKey = "pk",
                OnReceived = this.OnReceived
            });
        }

        public override string vHost { get { return "gpush"; } }
        public override string Exchange { get { return "user"; } }


        /// <summary>
        /// 接收消息
        /// </summary>
        /// <param name="message"></param>
        public override void OnReceived(MessageBody message)
        {
            try
            {
                Console.WriteLine(message.Content);
            }
            catch (Exception ex)
            {
                OnAction?.Invoke(MessageLevel.Error, ex.Message, ex);
            }
            message.Consumer.Model.BasicAck(message.BasicDeliver.DeliveryTag, true);

        }
    }
  • 以上的代码非常简单,几乎不需要业务开发者做更多的其它工作,开发者只需要在构造方法内部传入一个 QueueInfo 对象,如果有多个,可一并传入
    public partial class QueueInfo
    {
        /// <summary>
        ///  队列名称
        /// </summary>
        public string Queue { get; set; }
        /// <summary>
        ///  路由名称
        /// </summary>
        public string RouterKey { get; set; }
        /// <summary>
        ///  交换机类型
        /// </summary>
        public string ExchangeType { get; set; }
        /// <summary>
        ///  接受消息委托
        /// </summary>
        public Action<MessageBody> OnReceived { get; set; }
        /// <summary>
        ///  输出信息到客户端
        /// </summary>
        public Action<MQChannel, MessageLevel, string> OnAction { get; set; }
    }
  • 并设置 vHost 和 Exchange 的值,然后剩下的就是在 OnReceived(MessageBody message) 方法中专心的处理自己的业务了;
    在这里,我们仅输出接收到的消息,并设置 ack 为已成功处理。

测试代码

  • 在 Program,我们执行该测试
   class Program
    {
        static void Main(string[] args)
        {
            Test();
        }

        static void Test()
        {
            MQConfig config = new MQConfig()
            {
                HostName = "127.0.0.1",
                Password = "123456",
                Port = 5672,
                UserName = "dotnet"
            };

            MQServcieManager manager = new MQServcieManager();
            manager.AddService(new DemoService(config));
            manager.OnAction = OnActionOutput;
            manager.Start();

            Console.WriteLine("服务已启动");
            Console.ReadKey();

            manager.Stop();
            Console.WriteLine("服务已停止,按任意键退出...");
            Console.ReadKey();
        }

        static void OnActionOutput(MessageLevel level, string message, Exception ex)
        {
            Console.ForegroundColor = ConsoleColor.Yellow;
            Console.WriteLine("{0} | {1} | {2}", level, message, ex?.StackTrace);
            Console.ForegroundColor = ConsoleColor.Gray;
        }
    }
  • 利用 MQServcieManager 对象,完成了对所有消息订阅者的管理和监控,
  • 首先我们到 RabbitMQ 的 web 控制台发布一条消息到队列 login-message 中

  • 然后查看输出结果

  • 消息已经接收并处理,为了查看监控效果,我还手动将网络进行中断,然后监控服务检测到无法连接,尝试重建通道,并将
    消息输出
  • 图中步骤说明
  • 0:服务启动
  • 1:自检启动
  • 2:服务报错,尝试重建,重建失败,继续监测
  • 3:RabbitMQ 内部监控自动重连,监控程序检测到已恢复,收到消息并处理
  • 4:后续监控服务继续进行监控

结语

    在文章中,我们建立了 RabbitMQ 的通道管理、基础服务管理、契约实现等操作,让业务开发人员
通过简单的继承实现去快速的处理业务系统的逻辑,后续如果有增加消费者的情况下,只需要通过 MQServcieManager.AddService 进行简单的调用操作即可,无需对底层技术细节进行过多的改动。

  • 文中如有疏漏之处,欢迎指正。