当前位置: 首页 > news >正文

建设电影网站选服务器怎么选做网站服务器需要自己提供吗

建设电影网站选服务器怎么选,做网站服务器需要自己提供吗,公司网站需要备案吗,玉田住房与城乡建设局网站背景 最近遇到了一个问题,在使用rabbitmq的时候出现了丢消息、消息重复消费等一系列的问题,使用的是.net框架,背景是高并发压力下的mq消费,按理说即使队列中堆了几百条消息,我客户端可以同处理5个消息。 原因是多线程…

背景

最近遇到了一个问题,在使用rabbitmq的时候出现了丢消息、消息重复消费等一系列的问题,使用的是.net框架,背景是高并发压力下的mq消费,按理说即使队列中堆了几百条消息,我客户端可以同处理5个消息。

原因是多线程同时处理时导致的内存混乱。

官方文档已经解释的很全面了:https://www.rabbitmq.com/dotnet-api-guide.html

一个简易的单线程消费者

注意如下代码,这只是一个简易的单线程同步的消费者;
每次消费1条消息,消息消费完进行手动ack;

Task.Run(() =>{AutoResetEvent autoResetEvent = new AutoResetEvent(false);ConnectionFactory factory = new ConnectionFactory();// "guest"/"guest" by default, limited to localhost connectionsfactory.UserName = user;factory.Password = pass;factory.VirtualHost = vhost;factory.HostName = hostName;// this name will be shared by all connections instantiated by// this factoryfactory.ClientProvidedName = "app:audit component:event-consumer";IConnection conn = factory.CreateConnection();using (IModel channel = conn .CreateModel()){channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);channel.QueueDeclare(queueName, false, false, false, null);channel.QueueBind(queueName, exchangeName, routingKey, null);consumer.Received += (ch, ea) =>{var body = ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...channel.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume(queue: "my-queue",autoAck: false,consumer: consumer);}ConsoleUtil.WriteLine("mq started");autoResetEvent.WaitOne();ConsoleUtil.WriteLine("mq shutdown");}
});

批量消费

好的,那么我现在想要同时消费5条消息,想要达到并行的效果,需要如何改代码呢?看下面的改动:

改动1:

先看两个概念

  1. prefetchCount(预取计数):

    • prefetchCount 是一个用来限制每个消费者一次性从队列中获取的消息数量的参数。
    • 当你有多个消费者同时连接到同一个队列时,RabbitMQ 可以将消息均匀地分发给这些消费者。
    • 通过设置 prefetchCount,你可以告诉 RabbitMQ 每个消费者一次最多获取多少条消息。
    • 这个参数的目的是确保消息在被消费者处理之前不会全部放到内存中,从而提高系统的稳定性和性能。它有助于避免 一个消费者获取了太多消息而导致其他消费者无法获取任何消息的情况。
  2. concurrentConsumers(并发消费者):

    • concurrentConsumers 是指在同一队列上允许多少个并发消费者。
    • 每个并发消费者都会独立地处理消息,这有助于提高系统的处理能力和吞吐量。
    • 通过增加 concurrentConsumers 数量,你可以增加并发处理消息的能力。
    • 注意,这个参数不同于 prefetchCount,它控制的是同时运行的消费者的数量,而不是单个消费者一次性获取的消息数量。
// 参数1:prefetchSize:可接收消息的大小,如果设置为0,那么表示对消息本身的大小不限制
// 参数2:prefetchCount:处理消息最大的数量。相当于消费者能一次接受的队列大小
// 参数3:global:是不是针对整个 Connection 的,因为一个 Connection 可以有多个 Channel
// global=false:针对的是这个 Channel
// global=ture: 针对的是这个 Connection
channel.BasicQos(0, 5, false);
factory.ConsumerDispatchConcurrency = 5;

好的,这时候我配置了同时处理5条消息,看起来没问题了,但是官网文档有这样一句话:

IModel instance usage by more than one thread simultaneously should be avoided. Application code should maintain a clear notion of thread ownership for IModel instances.

This is a hard requirement for publishers: sharing a channel (an IModel instance) for concurrent publishing will lead to incorrect frame interleaving at the protocol level. Channel instances must not be shared by threads that publish on them.

If more than one thread needs to access a particular IModel instances, the application should enforce mutual exclusion. One way of achieving this is for all users of an IModel to lock the instance itself:

大概意思就是应该避免多个线程同时使用IModel实例,也就是channel对象,如果这么做的后果就是高负载情况下导致内存混乱,有可能你的线程1消费到了线程5本该消费的消息,这听起来后果是很严重的,那么我们应该怎么改动呢?官网也给方案了,就是给channel对象加锁,看下面的代码改动:

改动2

consumer.Received += (ch, ea) =>{	var body = ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...lock (channel){channel.BasicAck(ea.DeliveryTag, false);}
};lock (channel){channel.BasicConsume(queue: "my-queue",autoAck: false,consumer: consumer);
}

异步支持

新增一个配置:

factory.DispatchConsumersAsync = true;

然后修改消费者:

var consumer = new AsyncEventingBasicConsumer(channel);consumer.Received += async (model, ea) =>
{await Task.Run(() =>{var body = ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...lock (channel){channel.BasicAck(ea.DeliveryTag, false);}});
};
http://www.yayakq.cn/news/41769/

相关文章:

  • 成都做网站设计公司价格定制化网站开发
  • 专业做设计师品牌网站搜索wordpress博客
  • 网站页面设计方案怎么写关键词自动优化
  • 建设一个收入支出持平的网站做印尼购物网站如何发货
  • 做易拉宝的素材网站wordpress 底部代码
  • 网站定制二次开发百度广告位价格
  • 友情链接的网站图片wordpress博客好吗
  • 收费网站开发做汽配外贸哪个网站
  • 怀化举报网站句容网站设计公司
  • 做网站如何规避法律风险哪个网站域名便宜
  • 企业网站如何做网警备案企业微信网站建设
  • 国内做视频的网站企业公众号以及网站建设
  • 计算机专业网站开发开题报告怎么做电脑网站后台
  • 无锡新吴区住房和建设交通局网站百度站长平台账号购买
  • 杭州钱塘区网站建设网站建设框架注意事项
  • 企业门户网站怎么做广州网站建设公司品牌
  • 怎么建设国外网站wordpress动态主题
  • 怎么做网站广告卖钱做响应式网站的价格
  • 安徽省城乡住房建设厅网站wordpress 安装 权限设置
  • 营销网站建设价格湛江网站设计服务
  • 懒人学做网站微信不能分享wordpress
  • 网站开发摊销年限蒙山县网站建设
  • 济南商城网站建设公司品牌设计广告公司
  • 宁波建设安全协会网站商城app制作
  • 有网站源码怎么建站宁波公司有哪些
  • 只做网站的有专门做序列图的网站
  • 马鞍山做网站的公司78网络营销的主要手段和策略
  • 网站建设、微信小程序、网站如何做百度推广方案
  • 深圳营销型网站建设哪家好wordpress 403
  • 取消网站验证码外贸局合并到哪个局