当前位置: 首页 > news >正文

网站开发强制开启浏览器极速模式郑州网络建

网站开发强制开启浏览器极速模式,郑州网络建,做app和做网站的区别,免费ppt课件下载网站概述 不同中间件,有各自的使用方法,代码也不一样。 可以使用Spring Cloud Stream解耦,切换中间件时,不需要修改代码。实现方式为使用绑定层,绑定层对生产者和消费者提供统一的编码方式,需要连接不同的中间…

概述

不同中间件,有各自的使用方法,代码也不一样。

可以使用Spring Cloud Stream解耦,切换中间件时,不需要修改代码。实现方式为使用绑定层,绑定层对生产者和消费者提供统一的编码方式,需要连接不同的中间件时,绑定层使用不同的绑定器即可,也就是把切换中间件需要做相应的修改工作交给绑定层来做。

本文的操作是在 微服务调用链路追踪 的基础上进行。

环境说明

jdk1.8

maven3.6.3

mysql8

spring cloud2021.0.8

spring boot2.7.12

idea2022

rabbitmq3.12.4

步骤

消息生产者

创建子模块stream_producer

添加依赖

    <dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>

刷新依赖

配置application.yml

server:port: 7001
spring:application:name: stream_producerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:output:destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbit

查看Source.class源码

编写生产者代码,发送一条消息("hello world")到rabbitmq的my-default exchange中

package org.example.stream;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;@EnableBinding(Source.class)
@SpringBootApplication
public class StreamProductApplication implements CommandLineRunner {@Autowiredprivate MessageChannel output;@Overridepublic void run(String... args) throws Exception {//发送消息// messageBuilder 工具类,创建消息output.send(MessageBuilder.withPayload("hello world").build());}public static void main(String[] args) {SpringApplication.run(StreamProductApplication.class, args);}}

查看rabbitmq web UI

http://localhost:15672/

看到Exchanges中还没有my-default

运行StreamProductApplication

刷新rabbitmq Web UI,看到了my-dafault的exchange

消息消费者

创建子模块stream_consumer

添加依赖

    <dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>

配置application.yml

server:port: 7002
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbit

查看内置通道名称为input

编写消息消费者启动类,在启动类监听接收消息

package org.example.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;@SpringBootApplication
@EnableBinding(Sink.class)
public class StreamConsumerApplication {@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("监听收到:" + message.getPayload());}public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication.class, args);}
}

运行stream_consumer消费者服务,监听消息

运行stream_producer生产者服务,发送消息

查看消费者服务控制台日志,接收到了消息

优化代码

之前把生产和消费的消息都写在启动类中了,代码耦合高。

优化思路是把不同功能的代码分开放。

消息生产者

stream_producer 代码结构如下

package org.example.stream.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 向中间件发送数据*/
@Component
@EnableBinding(Source.class)
public class MessageSender {@Autowiredprivate MessageChannel output;//通道//发送消息public void send(Object obj){output.send(MessageBuilder.withPayload(obj).build());}
}

修改启动类

package org.example.stream;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;@SpringBootApplication
public class StreamProductApplication {public static void main(String[] args) {SpringApplication.run(StreamProductApplication.class, args);}}

pom.xml添加junit依赖

<dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope>
</dependency>

刷新依赖

编写测试类

在stream_producerm模块的src/test目录下,新建org.example.stream包,再建出ProducerTest类,代码如下

package org.example.stream;import org.example.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {@Autowiredprivate MessageSender messageSender;//注入发送消息工具类@Testpublic void testSend(){messageSender.send("hello world");}
}

 

消息消费者

stream_consumer代码结构如下

添加MessageListener类获取消息

package org.example.stream.consumer;import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
@EnableBinding(Sink.class)
public class MessageListener {// 监听binding中的信息@StreamListener(Sink.INPUT)public void input(String message){System.out.println("获取信息:" + message);}
}

修改启动类

package org.example.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;@SpringBootApplication
public class StreamConsumerApplication {public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication.class, args);}}

启动consumer接收消息

执行producer单元测试类ProducerTest的testSend()方法,发送消息

查看consumer控制台输出,接收到信息了

代码解耦后,同样能成功生产消息和消费消息。

自定义消息通道

此前使用默认的消息通道outputinput。

也可以自己定义消息通道,例如:myoutputmyinput

消息生产者

org.example.stream包下新建channel包,在channel包下新建MyProcessor接口类

package org.example.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;/*** 自定义的消息通道*/
public interface MyProcessor {/*** 消息生产这的配置*/String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();/*** 消息消费者的配置*/String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}

修改MessageSender

package org.example.stream.producer;import org.example.stream.channel.MyProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 向中间件发送数据*/
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender {@Autowiredprivate MessageChannel myoutput;//通道//发送消息public void send(Object obj){myoutput.send(MessageBuilder.withPayload(obj).build());}
}

修改application.yml

cloud:stream:bindings:output:destination: my-default #指定消息发送的目的地myoutput:destination: custom-output

消息消费者

在stream_consumer服务的org.example.stream包下新建channel包,在channel包下新建MyProcessor接口类

package org.example.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;/*** 自定义的消息通道*/
public interface MyProcessor {/*** 消息生产者的配置*/String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();/*** 消息消费者的配置*/String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}

修改MessageListener

package org.example.stream.stream;import org.example.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
@EnableBinding(MyProcessor.class)
public class MessageListener {// 监听binding中的信息@StreamListener(MyProcessor.MYINPUT)public void input(String message){System.out.println("获取信息:" + message);}
}

修改application.yml配置

cloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地myinput:destination: custom-output

测试

启动stream_consumer

运行单元测试的testSend()方法生产消息

查看stream_consumer控制台,能看到生产的消息,如下

获取信息:hello world

消息分组

采用复制配置方式运行两个消费者

启动第一个消费者(端口为7002)

修改端口为7003,copy configuration,再启动另一个消费者

执行生产者单元测试生产消息,看到两个消费者都接收到了信息

说明:如果有两个消费者,生产一条消息后,两个消费者均能收到信息。

但当我们发送一条消息只需要其中一个消费者消费消息时,这时候就需要用到消息分组,发送一条消息消费者组内只有一个消费者消费到。

我们只需要在服务消费者端设置spring.cloud.stream.bindings.input.group 属性即可

重启两个消费者

修改端口号为7002,重新启动第一个消费者

修改端口号为7003,重新启动第二个消费者

生产者生产一条消息

查看消费者接收消息情况,只有一个消费者接收到信息。

消息分区

消息分区就是实现特定消息只往特定机器发送。

修改生产者配置

  cloud:stream:bindings:output:destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myoutput:destination: custom-outputproducer:partition-key-expression: payload #分区关键字 可以是对象中的id,或对象partition-count: 2 #分区数量

修改消费者1的application.yml配置

server:port: 7002
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myinput:destination: custom-outputgroup: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息consumer:partitioned: true #开启分区支持binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbitinstance-count: 2 #消费者总数instance-index: 0 #当前消费者的索引

启动消费者1

修改消费者2的配置

server:port: 7003
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myinput:destination: custom-outputgroup: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息consumer:partitioned: true #开启分区支持binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbitinstance-count: 2 #消费者总数instance-index: 1 #当前消费者的索引

修改端口号为7003,当前消费者的索引instance-index的值修改为1

启动消费者2

生产者发送消息,看到只有Application(2)接收到消息

再用生产者发送一次消息,也是Application(2)接收到消息

说明实现了消息分区

也可以更改发送的数据,看是否能发送到不同消费者

修改生产者,发送数据由hello world变为hello world1,同时发送5次

	public void testSend(){for (int i = 0; i < 5; i++) {messageSender.send("hello world1");}}

看到hello world1全部被Application消费

所以消息分区是根据发送的消息不同,发送到不同消费者中。

完成!enjoy it!

http://www.yayakq.cn/news/13843/

相关文章:

  • 重庆专业的网站建设公司哪家好广州制作app
  • 代码做网站常用单词网站建设及维护费
  • 山东省工程建设信息官方网站公众号运营外包价格
  • 大连 做 企业网站织梦响应式网站
  • 做车贷的网站wordpress id从1开始
  • 网站的收录电子商务企业有哪些
  • 10有免费建网站镇江方圆建设监理咨询有限公司网站
  • 网站两边的悬浮框怎么做零基础网站开发要学多久
  • 多终端响应式网站班级网站制作模板
  • 在哪个网站做ppt模板赚钱网站制作和优化
  • 什么亲子网站可以做一下广告开展网络营销的企业网站有哪些
  • 校园二手市场网站建设方案如何查找网站的死链接
  • 民宿网站建设方案上海网站优化排名公司
  • 网站改版 升级的目的是什么意思杭州知名网站建设
  • 查icp备案是什么网站小型crm系统
  • 中国联通网站备案互联网营销软件
  • 网站免费的不用下载企业年金一般交多少钱
  • 网站设计与优化建网站的哪家好
  • 示范高校建设网站可以自己做主题的软件
  • 上海网站开发网站开发公司上海关键词优化按天计费
  • 福州网站制作案例高性能网站建设 pdf
  • 珠海做网站开发服务公司产品展厅柜设计公司
  • xml网站开发工具衡水建设公司网站
  • 企业网站建设的目的深圳有什么好玩的
  • 旅游网站设计与制作课程设计自适应网站导航是怎么做的
  • 聊城网站建设动态网站后台管理要求
  • 做美食网站增城电子商务网站建设
  • 易班网站建设推广平台排行榜app
  • 刘涛做的儿童购物网站网站点击按钮回到页面顶部怎么做
  • 网站建设问题清单visio网站开发流程图