博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
NET Core 使用RabbitMQ
阅读量:5265 次
发布时间:2019-06-14

本文共 8426 字,大约阅读时间需要 28 分钟。

原文:

                   
                                                                                                   

640?wx_fmt=png&wxfrom=5&wx_lazy=1

RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

RabbitMQ安装

RabbitMQ安装,网上已经有许多教程了,这里简单介绍一下在CentOS下安装RabbitMQ。使用的版本为3.6.12最新版。

1.首先安装erlang

rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

2.然后安装socat

yun install socat

3.最后安装RabbitMQ

rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm

RabbitMQ常用命令

启用Web控制台

rabbitmq-plugins enable rabbitmq_management

开启服务

systemctl start rabbitmq-server.service

停止服务

systemctl stop rabbitmq-server.service

查看服务状态

systemctl status rabbitmq-server.service

查看RabbitMQ状态

rabbitmqctl status

添加用户赋予管理员权限

rabbitmqctl  add_user  username  passwordrabbitmqctl  set_user_tags  username  administrator

查看用户列表

rabbitmqctl list_users

删除用户

rabbitmqctl delete_user username

修改用户密码

rabbitmqctl oldPassword Username newPassword

访问Web控制台

http://服务器ip:15672/ 注意配置防火墙,默认用户名密码都是guest,若新建用户一定要记得配置权限。

0?wx_fmt=png

.NET Core 使用RabbitMQ

定义生产者
//创建连接工厂 ConnectionFactory factory = new ConnectionFactory{    UserName = "admin",//用户名    Password = "admin",//密码    HostName = "192.168.157.130"//rabbitmq ip }; //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel();//声明一个队列channel.QueueDeclare("hello", false, false, false, null);Console.WriteLine("\nRabbitMQ连接成功,请输入消息,输入exit退出!"); string input; do{    input = Console.ReadLine();     var sendBytes = Encoding.UTF8.GetBytes(input);      //发布消息    channel.BasicPublish("", "hello", null, sendBytes);} while (input.Trim().ToLower()!="exit");channel.Close();connection.Close();
定义消费者
           //创建连接工厂            ConnectionFactory factory = new ConnectionFactory            {                UserName = "admin",//用户名                Password = "admin",//密码                HostName = "192.168.157.130"//rabbitmq ip            };                       //创建连接            var connection = factory.CreateConnection();                //创建通道            var channel = connection.CreateModel();                      //事件基本消费者            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);                    //接收到消息事件            consumer.Received += (ch, ea) =>            {                var message = Encoding.UTF8.GetString(ea.Body);                Console.WriteLine($"收到消息: {message}");                     //确认该消息已被消费                channel.BasicAck(ea.DeliveryTag, false);            };            //启动消费者 设置为手动应答消息            channel.BasicConsume("hello", false, consumer);            Console.WriteLine("消费者已启动");            Console.ReadKey();            channel.Dispose();            connection.Close();
运行

0?wx_fmt=gif

启动了一个生产者,两个消费者,可以看见两个消费者都能收到消息,消息投递到哪个消费者是由RabbitMQ决定的。

RabbitMQ消费失败的处理

RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。

修改一下消费者的代码:

//接收到消息事件 consumer.Received += (ch, ea) =>{     var message = Encoding.UTF8.GetString(ea.Body);    Console.WriteLine($"收到消息: {message}");    Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");    Thread.Sleep(10000);    //确认该消息已被消费    channel.BasicAck(ea.DeliveryTag, false);    Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");};

演示:

0?wx_fmt=gif

从图中可以看出,设置了消息应答延迟10s,如果在这10s中,该消费者断开了连接,那么消息会被RabbitMQ重新投递。

使用RabbitMQ的Exchange

前面我们可以看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)

0?wx_fmt=png

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

RabbitMQ提供了四种Exchange模式:direct,fanout,topic,header 。但是 header模式在实际使用中较少,所以这里只介绍前三种模式。

Exchange不是消费者关心的,所以消费者的代码完全不用变,用上面的消费者就行了。

由于避免文章过长,影响阅读,所以只贴了部分代码,但是demo里面是完整可运行的,详细代码请查看demo。

Direct Exchange

0?wx_fmt=png

所有发送到Direct Exchange的消息被转发到具有指定RouteKey的Queue。

Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

//创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //定义一个Direct类型交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null); //定义一个队列 channel.QueueDeclare(queueName, false, false, false, null); //将队列绑定到交换机 channel.QueueBind(queueName, exchangeName, routeKey, null);

运行:

0?wx_fmt=gif

Fanout Exchange

0?wx_fmt=png

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

所以,Fanout Exchange 转发消息是最快的。

为了演示效果,定义了两个队列,分别为hello1,hello2,每个队列都拥有一个消费者。

static void Main(string[] args){    string exchangeName = "TestFanoutChange";     string queueName1 = "hello1";      string queueName2 = "hello2";        string routeKey = "";    //创建连接工厂    ConnectionFactory factory = new ConnectionFactory    {        UserName = "admin",//用户名        Password = "admin",//密码        HostName = "192.168.157.130"//rabbitmq ip    };    //创建连接    var connection = factory.CreateConnection();    //创建通道    var channel = connection.CreateModel();    //定义一个Direct类型交换机    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);    //定义队列1    channel.QueueDeclare(queueName1, false, false, false, null);       //定义队列2    channel.QueueDeclare(queueName2, false, false, false, null);        //将队列绑定到交换机    channel.QueueBind(queueName1, exchangeName, routeKey, null);    channel.QueueBind(queueName2, exchangeName, routeKey, null);       //生成两个队列的消费者    ConsumerGenerator(queueName1);    ConsumerGenerator(queueName2);    Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!");    string input;    do    {        input = Console.ReadLine();         var sendBytes = Encoding.UTF8.GetBytes(input);            //发布消息        channel.BasicPublish(exchangeName, routeKey, null, sendBytes);    } while (input.Trim().ToLower() != "exit");    channel.Close();    connection.Close();}/// /// 根据队列名称生成消费者/// /// static void ConsumerGenerator(string queueName){    //创建连接工厂    ConnectionFactory factory = new ConnectionFactory    {        UserName = "admin",//用户名        Password = "admin",//密码        HostName = "192.168.157.130"//rabbitmq ip    };    //创建连接    var connection = factory.CreateConnection();    //创建通道    var channel = connection.CreateModel();    //事件基本消费者    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);    //接收到消息事件    consumer.Received += (ch, ea) =>    {        var message = Encoding.UTF8.GetString(ea.Body);        Console.WriteLine($"Queue:{queueName}收到消息: {message}");        //确认该消息已被消费        channel.BasicAck(ea.DeliveryTag, false);    };    //启动消费者 设置为手动应答消息    channel.BasicConsume(queueName, false, consumer);    Console.WriteLine($"Queue:{queueName},消费者已启动");}

运行:

0?wx_fmt=gif

Topic Exchange

0?wx_fmt=png

所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上,

Exchange 将路由进行模糊匹配。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“XiaoChen.#”能够匹配到“XiaoChen.pets.cat”,但是“XiaoChen.” 只会匹配到“XiaoChen.money”。

所以,Topic Exchange 使用非常灵活。

string exchangeName = "TestTopicChange"; string queueName = "hello"; string routeKey = "TestRouteKey.*"; //创建连接工厂 ConnectionFactory factory = new ConnectionFactory{    UserName = "admin",//用户名    Password = "admin",//密码    HostName = "192.168.157.130"//rabbitmq ip}; //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //定义一个Direct类型交换机channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null); //定义队列1 channel.QueueDeclare(queueName, false, false, false, null); //将队列绑定到交换机 channel.QueueBind(queueName, exchangeName, routeKey, null);Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!"); string input;do{    input = Console.ReadLine();    var sendBytes = Encoding.UTF8.GetBytes(input);    //发布消息    channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);} while (input.Trim().ToLower() != "exit");channel.Close();connection.Close();

运行

0?wx_fmt=gif

Demo下载:DotNetCore.RabbitMQ

相关文章: 

原文地址:http://www.cnblogs.com/stulzq/p/7551819.html


.NET社区新闻,深度好文,微信中搜索dotNET跨平台或扫描二维码关注

640?wx_fmt=jpeg

               
             

再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!

posted on
2019-01-18 15:07 阅读(
...) 评论(
...)

转载于:https://www.cnblogs.com/lonelyxmas/p/10287708.html

你可能感兴趣的文章
eclipse中的.project 和 .classpath文件的具体作用是什么?
查看>>
Unity调用Windows窗口句柄,选择文件和目录
查看>>
字节流缓存
查看>>
HashMap循环遍历方式
查看>>
python面试题(二)顺时针打印二维数组,快速排序
查看>>
第一次作业,三班王泽04
查看>>
本地浏览器Websql数据库操作
查看>>
boost.log要点笔记
查看>>
React Native 入门 调试项目
查看>>
php 简单markdown app 标记语言
查看>>
URL中参数为数组
查看>>
Hacking Grub for fun and profit
查看>>
MySQL数据库 基本操作
查看>>
请大家规范电子邮件用法养成好的邮件习惯
查看>>
微信游戏和微信公众号小说如何有效做好域名防封,给大家分享我的有效经验...
查看>>
前端跨域知识总结
查看>>
C# 通过 Quartz .NET 实现 schedule job 的处理
查看>>
关于java之socket输入流输出流可否放在不同的线程里进行处理
查看>>
vue-购物车
查看>>
有趣的setTimeout
查看>>