网站开发专业就业培训学校兴化市建设局网站
消息过滤
1 ) 简单消息过滤
/*** 订阅指定topic下tags分别等于 TagA 或 TagC 或 TagD
*/consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
 
- 如以上代码所示,简单消息过滤通过指定多个 Tag 来过滤消息,过滤的动作在服务器进行
 
2 ) 高级消息过滤
 
-  
- Broker 所在的机器会启劢多个 FilterServer 过滤迕程
 
 -  
- Consumer 启劢后,会吐 FilterServer 上传一个过滤的 Java 类
 
 -  
- Consumer 从 FilterServer 拉消息,FilterServer 将请求转发给 Broker,FilterServer 从 Broker 收到消息后,按照 Consumer 上传的 Java 过滤程序做过滤,过滤完成后返回给 Consumer
 
 - 总结 
- 使用 CPU 资源来换取网卡流量资源
 - FilterServer 与 Broker 部署在同一台机器,数据通过本地回环通信,不走网卡
 - 一台 Broker 部署多个 FilterServer,充分利用 CPU 资源,因为单个 JVM 难以全面利用高配的物理机 CPU 资源
 - 因为过滤代码使用 Java 语言来编写,应用几乎可以做任意形式的服务器端消息过滤,例如通过 Message Header 进行过滤,甚至可以按照 Message Body 进行过滤
 - 使用 Java 语言进行作为过滤表达式是一个双刃剑,方便了应用的过滤操作,但是带来了服务器端的安全风险。需要应用来保证过滤代码安全,例如在过滤程序里尽可能不做申请大内存,创建线程等操作。避免 Broker 服务器发生资源泄漏
 
 
通信组件
- RocketMQ 通信组件使用了 Netty-4.0.9.Final,在之上做了简单的协议封装
 
1 )网络协议

- 大端 4 个字节整数,等于 2、3、4 长度总和
 - 大端 4 个字节整数,等于 3 的长度
 - 使用 json 序列化数据
 - 应用自定义二进制序列化数据
 
- Header 格式
{"code": 0,"language": "JAVA","version": 0,"opaque": 0,"flag": 1,"remark": "hello, I am respponse /127.0.0.1:27603","extFields": {"count": "0","messageTitle": "HelloMessageTitle"} } 
| Header 字段名 | 类型 | Request | Response | 
|---|---|---|---|
| code | 整数 | 请求操作代码,请求接收方 根据不同的代码做不同的操作  | 应答结果代码,0 表示成功, 非 0 表示各种错误代码  | 
| language | 字符串 | 请求发起方实现语言,默认JAVA | 应答接收方实现语言 | 
| version | 整数 | 请求发起方程序版本 | 应答接收方程序版本 | 
| opaque | 整数 | 请求发起方在同一连接上不同的请求标识代码,多线程连接复用使用 | 应答方不做修改,直接返回 | 
| flag | 整数 | 通信局的标志位 | 通信局的标志位 | 
| remark | 字符串 | 传输自定义文本信息 | 错误详细描述信息 | 
| extFields | HashMap<String,String> | 请求自定义字段 | 应答自定义字段 | 
2 )心跳处理
- 通信组件本身不处理心跳,由上局进行心跳处理
 
3 )连接复用
- 同一个网络连接,客户端多个线程可以同时发送请求,应答响应通过 header 中的 opaque 字段来标识
 
4 )超时连接
- 如果某个连接超过特定时间没有活动(无读写事件),则自动关闭此连接
 - 并通知上层业务,清除连接对应的注册信息
 
RocketMQ 服务发现(Name Server)
- Name Server 是专为 RocketMQ 设计的轻量级名称服务,代码小于 1000 行
 - 具有简单、可集群横向扩展、无状态等特点
 - 将要支持的主备自动切换功能会强依赖 Name Server
 
