当前位置: 首页 > news >正文

网站紧急维护网站制作公司智能 乐云践新

网站紧急维护,网站制作公司智能 乐云践新,开家网站设计公司,wechat官方下载一、同步消息 1、生产者 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调…

一、同步消息

1、生产者

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}

二、异步消息

1、生产者

异步消息有两种:

1.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));}// 5. 关闭资源kafkaProducer.close();}
}
1.2、带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法//(2)消息发送失败  exception != null  也会调用该方法if (exception == null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}

三、顺序消息

以订单为例,

  • 生产者将相同的key的订单状态事件推送到kafka的同一分区
  • kafka 消费者接收消息
  • 消费者将消息提交给线程池
  • 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
  • 单个线程不停的从阻塞队列获取订单状态消息消费

@RestController
public class OrderController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public String send() throws InterruptedException {int size = 1000;for (int i = 0; i < size; i++) {OrderDto orderDto = new InterOrderDto();orderDto.setOrderNo(i + "");orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return "success";}private String getStatus(int status){return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";}
}

http://www.yayakq.cn/news/870194/

相关文章:

  • 连云建网站公司化妆品网站制作
  • 如何建立自己的网站免费seo什么意思中文意思
  • 做网站设计哪里有软文网站推荐
  • 什么是交互式网站wordpress自定义段
  • php网站源码模板ps做的图片能做直接做网站吗
  • 邯郸网站设计联系电话泸州百拓网站建设
  • 安庆专业网站建设公如何在本地运行WordPress
  • 泉州市建设工程质量监督站网站软件工程培训班多少钱
  • 最便宜的网站建设免费招商信息发布平台
  • 招聘代做网站中职学校专业建设方案
  • 有自己网站好处适合代码新手做的网站
  • 福田皇岗社区做网站黄浦区网站建设公司
  • 青岛海诚互联做网站好吗做心悦腾龙光环的网站是什么
  • 网站备案最快几天工信部 网站备案规定
  • 苏州网站建设哪里好福建网站建设公司
  • 免费的创建个人网站自己做链接网站
  • 网站建设流程百科柳州网站制作推荐
  • 辽宁建设厅新网站青岛网络优化
  • 网站的规划大型茶叶网站建设
  • 做植物网站浦东做网站的公司
  • 重庆快速网站建设wordpress文章无法访问
  • 建设网站的计划表app运营模式
  • 企业网站建设遵循的原则wordpress网站加密方式
  • 广州网络建站唐山seo排名
  • 郑州树标网站建设网站开发中定义路由的作用
  • 盐步网站制作智慧团建pc版官网
  • 番禺网站开发价格企业营销型网站建设哪家公司好
  • 滨州市住房和城乡建设局网站网络营销推广的目标与策略
  • 做网站的预算表网站建设服务费计入什么科目
  • 个人网站对主机有什么要求网站建设岗位内容