当前位置: 首页 > news >正文

怎么进行网站推广云指官网

怎么进行网站推广,云指官网,中信建设有限责任公司投资部执行总监张鹏,阿里云 做购物网站网站1.需要阿里云开通商业版RocketMQ 普通消息新建普通主题,普通组,延迟消息新建延迟消息主题,延迟消息组 2.结构目录 3.引入依赖 <!--阿里云RocketMq整合--><dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</…

1.需要阿里云开通商业版RocketMQ

普通消息新建普通主题,普通组,延迟消息新建延迟消息主题,延迟消息组

2.结构目录

在这里插入图片描述

3.引入依赖

<!--阿里云RocketMq整合--><dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.8.5.Final</version></dependency>

4.延迟消息配置

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import com.aliyun.openservices.ons.api.bean.BatchConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;/*** 延迟消息配置类*/
@Configuration
public class BatchConsumerClient {@Autowiredprivate MqConfig mqConfig;@Autowiredprivate BatchDemoMessageListener messageListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public BatchConsumerBean buildBatchConsumer() {BatchConsumerBean batchConsumerBean = new BatchConsumerBean();//配置文件Properties properties = mqConfig.getMqPropertie();properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getDelayGroupId());//将消费者线程数固定为20个 20为默认值properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");batchConsumerBean.setProperties(properties);//订阅关系Map<Subscription, BatchMessageListener> subscriptionTable = new HashMap<Subscription, BatchMessageListener>();Subscription subscription = new Subscription();subscription.setTopic(mqConfig.getDelayTopic());subscription.setExpression(mqConfig.getDelayTag());subscriptionTable.put(subscription, messageListener);//订阅多个topic如上面设置batchConsumerBean.setSubscriptionTable(subscriptionTable);return batchConsumerBean;}}
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;/*** 延迟消息消费者*/
@Slf4j
@Component
public class BatchDemoMessageListener implements BatchMessageListener {@Overridepublic Action consume(final List<Message> messages, final ConsumeContext context) {log.info("消费者收到消息大小:"+messages.size());for (Message message : messages) {byte[] body = message.getBody();String s = new String(body);Date date = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String formatTime = sdf.format(date);System.out.println("接收到消息时间:"+formatTime);log.info("接收到消息内容:"+s);}try {//do something..return Action.CommitMessage;} catch (Exception e) {//消费失败return Action.ReconsumeLater;}}
}

5.MQ配置类


import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;import java.util.Properties;@Data
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfig {private String accessKey;private String secretKey;private String nameSrvAddr;private String topic;private String groupId;private String tag;private String orderTopic;private String orderGroupId;private String orderTag;private String delayTopic;private String delayGroupId;private String delayTag;public Properties getMqPropertie() {Properties properties = new Properties();properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);return properties;}}

6.YML配置

## 阿里云RocketMQ配置
rocketmq:accessKey: laskdfjlaksdjflaksjdflaksdjflakdjfsecretKey: asdfasdlfkasjdlfkasjdlfkajsdlkfjkalksdfjnameSrvAddr: rmq..rmq.acs.com:8080topic: topic_lsdjf_testgroupId: Glskdfjalsdkfjalksdjflaksdfj_pushtag: "*"orderTopic: XXXorderGroupId: XXXorderTag: "*"delayTopic: topic_alskdjfalksdjflksdjfkla_delaydelayGroupId: GIlaskdjflkasdjflkajsdkf_delaydelayTag: "*"

7.普通消息配置

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;/*** 普通消息配置类*/
@Configuration
public class ConsumerClient {@Autowiredprivate MqConfig mqConfig;@Autowiredprivate DemoMessageListener messageListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean buildConsumer() {ConsumerBean consumerBean = new ConsumerBean();//配置文件Properties properties = mqConfig.getMqPropertie();properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());//将消费者线程数固定为20个 20为默认值properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");consumerBean.setProperties(properties);//订阅关系Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();Subscription subscription = new Subscription();subscription.setTopic(mqConfig.getTopic());subscription.setExpression(mqConfig.getTag());subscriptionTable.put(subscription, messageListener);//订阅多个topic如上面设置consumerBean.setSubscriptionTable(subscriptionTable);return consumerBean;}}

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** 普通主题消费者*/
@Component
@Slf4j
public class DemoMessageListener implements MessageListener {@Overridepublic Action consume(Message message, ConsumeContext context) {log.info("接收到消息: " + message);try {byte[] body = message.getBody();String s = new String(body);log.info("接收到消息字符串:"+s);//Action.CommitMessag 进行消息的确认return Action.CommitMessage;} catch (Exception e) {//消费失败return Action.ReconsumeLater;}}
}
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 普通消息生产者配置类*/
@Configuration
public class ProducerClient {@Autowiredprivate MqConfig mqConfig;@Bean(initMethod = "start", destroyMethod = "shutdown")public ProducerBean buildProducer() {ProducerBean producer = new ProducerBean();producer.setProperties(mqConfig.getMqPropertie());return producer;}}
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.stereotype.Component;/*** 普通消息生产者***/
@Component
public class RocketMessageProducer {private static ProducerBean producer;private static MqConfig mqConfig;public RocketMessageProducer(ProducerBean producer, MqConfig mqConfig) {this.producer = producer;this.mqConfig = mqConfig;}/*** @Description: <h2>生产 普通 消息</h2>* @author: LiRen*/public  static void producerMsg(String tag, String key, String body) {Message msg = new Message(mqConfig.getTopic(), tag, key, body.getBytes());long time = System.currentTimeMillis();try {SendResult sendResult = producer.send(msg);assert sendResult != null;System.out.println(time+ " Send mq message success.Topic is:" + msg.getTopic()+ " Tag is:" + msg.getTag() + " Key is:" + msg.getKey()+ " msgId is:" + sendResult.getMessageId());} catch (ONSClientException e) {e.printStackTrace();System.out.println(time + " Send mq message failed. Topic is:" + msg.getTopic());}}}
import com.aliyun.openservices.ons.api.*;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;import java.util.Properties;/*** 普通消息消费者*/
//效果和 DemoMessageListener 一致
//@Component
public class RocketMQConsumer {@Autowiredprivate MqConfig rocketMQConfig;/*** 1、普通订阅** @param*/@Bean //不加@Bean Spring启动时没有注册该方法,就无法被调用public void normalSubscribe( ) {Properties properties = rocketMQConfig.getMqPropertie();properties.put(PropertyKeyConst.GROUP_ID,rocketMQConfig.getGroupId());Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe(rocketMQConfig.getTopic(), rocketMQConfig.getTag(), new MessageListener() {@Overridepublic Action consume(Message message, ConsumeContext context) {System.out.println("Receive: " + new String(message.getBody()));//把消息转化为java对象//JSONObject jsonObject=JSONObject.parseObject(jsonString);//Book book= jsonObject.toJavaObject(Book.class);return Action.CommitMessage;}});consumer.start();}
}

7.order没用到


import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.OrderConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
public class OrderConsumerClient {@Autowiredprivate MqConfig mqConfig;@Autowiredprivate OrderDemoMessageListener messageListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public OrderConsumerBean buildOrderConsumer() {OrderConsumerBean orderConsumerBean = new OrderConsumerBean();//配置文件Properties properties = mqConfig.getMqPropertie();properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getOrderGroupId());orderConsumerBean.setProperties(properties);//订阅关系Map<Subscription, MessageOrderListener> subscriptionTable = new HashMap<Subscription, MessageOrderListener>();Subscription subscription = new Subscription();subscription.setTopic(mqConfig.getOrderTopic());subscription.setExpression(mqConfig.getOrderTag());subscriptionTable.put(subscription, messageListener);//订阅多个topic如上面设置orderConsumerBean.setSubscriptionTable(subscriptionTable);return orderConsumerBean;}}

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class OrderDemoMessageListener implements MessageOrderListener {@Overridepublic OrderAction consume(final Message message, final ConsumeOrderContext context) {log.info("接收到消息: " + message);try {//do something..return OrderAction.Success;} catch (Exception e) {//消费失败,挂起当前队列return OrderAction.Suspend;}}
}
import com.aliyun.openservices.ons.api.bean.OrderProducerBean;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 生产者配置类*/
@Configuration
public class OrderProducerClient {@Autowiredprivate MqConfig mqConfig;@Bean(initMethod = "start", destroyMethod = "shutdown")public OrderProducerBean buildOrderProducer() {OrderProducerBean orderProducerBean = new OrderProducerBean();orderProducerBean.setProperties(mqConfig.getMqPropertie());return orderProducerBean;}}

8.事务消息没用到

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** 事务消息*/
@Slf4j
@Component
public class DemoLocalTransactionChecker implements LocalTransactionChecker {@Overridepublic TransactionStatus check(Message msg) {log.info("开始回查本地事务状态");return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的TransactionStatus}
}
import com.aliyun.openservices.ons.api.bean.TransactionProducerBean;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 事务消息配置类*/
@Configuration
public class TransactionProducerClient {@Autowiredprivate MqConfig mqConfig;@Autowiredprivate DemoLocalTransactionChecker localTransactionChecker;@Bean(initMethod = "start", destroyMethod = "shutdown")public TransactionProducerBean buildTransactionProducer() {TransactionProducerBean producer = new TransactionProducerBean();producer.setProperties(mqConfig.getMqPropertie());producer.setLocalTransactionChecker(localTransactionChecker);return producer;}}

9.测试类


import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON;
import com.atkj.devicewx.config.MqConfig;
import com.atkj.devicewx.normal.RocketMessageProducer;
import com.atkj.devicewx.service.TestService;
import com.atkj.devicewx.vo.MetabolicVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;/*** @Author: albc* @Date: 2024/07/12/10:22* @Description: good good study,day day up*/
@RequestMapping("/api/v1/mq/test")
@RestController
public class TestController {@Autowiredprivate TestService testService;@Autowiredprivate MqConfig mqConfig;@RequestMapping("/one")public String testOne(){Integer count = testService.testOne();return "发送成功:"+count;}/*** 普通消息测试* @return*/@RequestMapping("/useRocketMQ")public String useRocketMQ() {MetabolicVo metabolicVo = new MetabolicVo();metabolicVo.setAge(123);metabolicVo.setName("测试名字");metabolicVo.setWeight(75);RocketMessageProducer.producerMsg("123","666", JSON.toJSONString(metabolicVo));return "请求成功!";}/*** 发送延迟消息测试* @return*/@RequestMapping("/delayMqMsg")public String delayMqMsg() {Properties producerProperties = new Properties();producerProperties.setProperty(PropertyKeyConst.AccessKey, mqConfig.getAccessKey());producerProperties.setProperty(PropertyKeyConst.SecretKey, mqConfig.getSecretKey());producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, mqConfig.getNameSrvAddr());//注意!!!如果访问阿里云RocketMQ 5.0系列实例,不要设置PropertyKeyConst.INSTANCE_ID,否则会导致收发失败Producer producer = ONSFactory.createProducer(producerProperties);producer.start();System.out.println("生产者启动..........");Date date = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String formatTime = sdf.format(date);String meg = formatTime + "发送延迟消息测试";Message message = new Message(mqConfig.getDelayTopic(), mqConfig.getDelayTag(), meg.getBytes());// 延时时间单位为毫秒(ms),指定一个时刻,在这个时刻之后才能被消费,这个例子表示 3秒 后才能被消费long delayTime = 3000;message.setStartDeliverTime(System.currentTimeMillis() + delayTime);try {SendResult sendResult = producer.send(message);assert sendResult != null;System.out.println(new Date() + "发送mq消息主题:" + mqConfig.getDelayTopic() + "消息id: " + sendResult.getMessageId());} catch (ONSClientException e) {// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理System.out.println(new Date() + "重试发送mq消息主题:" + mqConfig.getDelayTopic());e.printStackTrace();}return "请求成功!";}}

优化部分

每次发送消息都要创建生产者,效率低下
使用单例优化

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.util.Properties;/*** 生产者单例* @Author: albc* @Date: 2024/07/15/15:49* @Description: good good study,day day up*/
@Component
@Slf4j
public class ProducerSingleton {private volatile static Producer producer;private static String accessKey;private static String secretKey;private static String nameSrvAddr;private ProducerSingleton() {}@Value("${rocketmq.accessKey}")private void setAccessKey(String accessKey) {ProducerSingleton.accessKey = accessKey;}@Value("${rocketmq.secretKey}")private void setSecretKey(String secretKey) {ProducerSingleton.secretKey = secretKey;}@Value("${rocketmq.nameSrvAddr}")private void setNameSrvAddr(String nameSrvAddr) {ProducerSingleton.nameSrvAddr = nameSrvAddr;}/*** 创建生产者* @return*/public static Producer getProducer(){if (producer == null){synchronized(ProducerSingleton.class){if (producer == null){Properties producerProperties = new Properties();producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);//注意!!!如果访问阿里云RocketMQ 5.0系列实例,不要设置PropertyKeyConst.INSTANCE_ID,否则会导致收发失败producer = ONSFactory.createProducer(producerProperties);producer.start();log.info("生产者启动........");}}}return producer;}}

import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.atkj.devicewx.level.config.MqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 延迟消息生产者** @Author: albc* @Date: 2024/07/15/14:11* @Description: good good study,day day up*/
@Slf4j
@Component
public class BatchMessageProducer {@Autowiredprivate MqConfig mqConfig;/*** 发送消息* @param msg 发送消息内容* @param delayTime 延迟时间,毫秒*/public void sendDelayMeg(String msg,Long delayTime) {Producer producer = ProducerSingleton.getProducer();Message message = new Message(mqConfig.getDelayTopic(), mqConfig.getDelayTag(), msg.getBytes());message.setStartDeliverTime(System.currentTimeMillis() + delayTime);try {SendResult sendResult = producer.send(message);assert sendResult != null;log.info( "发送mq消息主题:" + mqConfig.getDelayTopic() + "消息id: " + sendResult.getMessageId());} catch (ONSClientException e) {// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理log.error("重试发送mq消息主题:" + mqConfig.getDelayTopic());e.printStackTrace();}finally {message = null;}}}

其他不变

http://www.yayakq.cn/news/920869/

相关文章:

  • 网站建站公司费用淞南网站建设
  • 哪里有网站开发定制wordpress模特主题
  • 做电池网站的引导页网站建设搭建专业网站平台公司
  • 企业做网站需注意什么爱站工具包官网下载
  • 哪些网站建设公司网站制作建设建议兴田德润
  • python在线网站网站开发必备流程
  • 网站建设制作设计公司佛山网站域名注册商
  • 怎么销售网站创建公众号的步骤
  • 广东有做阿里网站的吗网站建设需要那些人才
  • 富拉尔基网站建设免费网页申请注册
  • 网站数据库建设access如何自己做游戏软件
  • 微网站开发多少钱个人网站建设发布信息
  • wordpress 多个域名网站怎么做优化百度能搜索到
  • 做一个网站的流程模板免费下载
  • 云平台网站优化品牌商城网站建设公司
  • 商丘网站建设有限公司天津优化网站哪家好用
  • 三亚婚纱摄影 织梦网站源码两耳清风怎么做网站
  • 现在建网站赚钱吗求个网站能用的
  • 交互式网站开发技术asp建设银行插u盾网站上不去
  • 企业门户网站免费模板互动平台上市公司
  • 高端制作网站哪家专业wordpress会员
  • 网站备案管理wordpress最好用的企业主题
  • 东莞做网站公司首选!小学网站模板下载
  • 做国厂家的网站技术支持凯里网站建设
  • 有哪些网站结构是不合理的网站展示型和营销型有什么区别
  • 网站开发技术 下载php整站最新版本下载
  • 烟台汽车网站建设个人一般注册什么类型的公司
  • 给公司做网站销售怎样啦wordpress搭建官网视频
  • 文化馆网站建设意义wordpress主题图片替换
  • 新手做网站需要哪些软件淘宝客导购网站 丢单