当前位置: 首页 > news >正文

正规的佛山网站建设个人网站设计的参考文献

正规的佛山网站建设,个人网站设计的参考文献,登录wordpress,哪个网站建设水善利万物而不争,处众人之所恶,故几于道💦 文章目录 1. Kafka_Sink 2. Kafka_Sink - 自定义序列化器 3. Redis_Sink_String 4. Redis_Sink_list 5. Redis_Sink_set 6. Redis_Sink_hash 7. 有界流数据写入到ES 8. 无界流数据写入到ES 9. 自定…

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

  1. Kafka_Sink
  2. Kafka_Sink - 自定义序列化器
  3. Redis_Sink_String
  4. Redis_Sink_list
  5. Redis_Sink_set
  6. Redis_Sink_hash
  7. 有界流数据写入到ES
  8. 无界流数据写入到ES
  9. 自定义sink - mysql_Sink
  10. Jdbc_Sink

官方文档 - Flink1.13

在这里插入图片描述


1. Kafka_Sink

addSink(new FlinkKafkaProducer< String>(kafka_address,topic,序列化器)

要先添加依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.13.6</version>
</dependency>
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayList<WaterSensor> waterSensors = new ArrayList<>();waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);stream.keyBy(WaterSensor::getId).sum("vc").map(JSON::toJSONString).addSink(new FlinkKafkaProducer<String>("hadoop101:9092",  // kafaka地址"flink_sink_kafka",  //要写入的Kafkatopicnew SimpleStringSchema()  // 序列化器));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

2. Kafka_Sink - 自定义序列化器

  自定义序列化器,new FlinkKafkaProducer()的时候,选择四个参数的构造方法,然后使用new KafkaSerializationSchema序列化器。然后重写serialize方法

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayList<WaterSensor> waterSensors = new ArrayList<>();waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);Properties sinkConfig = new Properties();sinkConfig.setProperty("bootstrap.servers","hadoop101:9092");stream.keyBy(WaterSensor::getId).sum("vc").addSink(new FlinkKafkaProducer<WaterSensor>("defaultTopic",  // 默认发往的topic ,一般用不上new KafkaSerializationSchema<WaterSensor>() {  // 自定义的序列化器@Overridepublic ProducerRecord<byte[], byte[]> serialize(WaterSensor waterSensor,@Nullable Long aLong) {String s = JSON.toJSONString(waterSensor);return new ProducerRecord<>("flink_sink_kafka",s.getBytes(StandardCharsets.UTF_8));}},sinkConfig,  // Kafka的配置FlinkKafkaProducer.Semantic.AT_LEAST_ONCE  // 一致性语义:现在只能传入至少一次));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

3. Redis_Sink_String

addSink(new RedisSink<>(config, new RedisMapper< WaterSensor>() {}

写到String结构里面

添加依赖:

<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version>
</dependency>
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayList<WaterSensor> waterSensors = new ArrayList<>();waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream.keyBy(WaterSensor::getId).sum("vc");/*
往redis里面写字符串,string   命令提示符用set
假设写的key是id,value是整个json格式的字符串
key         value
sensor_1    json格式字符串*/// new一个单机版的配置FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("hadoop101").setPort(6379).setMaxTotal(100)  //最大连接数量.setMaxIdle(10)  // 连接池里面的最大空闲.setMinIdle(2)   // 连接池里面的最小空闲.setTimeout(10*1000)  // 超时时间.build();// 写出到redis中result.addSink(new RedisSink<>(config, new RedisMapper<WaterSensor>() {// 返回命令描述符:往不同的数据结构写数据用的方法不一样@Overridepublic RedisCommandDescription getCommandDescription() {// 写入到字符串,用setreturn new RedisCommandDescription(RedisCommand.SET);}@Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}@Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

4. Redis_Sink_list

addSink(new RedisSink<>(config, new RedisMapper< WaterSensor>() {}

写到 list 结构里面

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayList<WaterSensor> waterSensors = new ArrayList<>();waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream.keyBy(WaterSensor::getId).sum("vc");// key是id,value是处理后的json格式字符串FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("hadoop101").setPort(6379).setMaxTotal(100)  //最大连接数量.setMaxIdle(10)  // 连接池里面的最大空闲.setMinIdle(2)   // 连接池里面的最小空闲.setTimeout(10*1000)  // 超时时间.build();result.addSink(new RedisSink<>(config, new RedisMapper<WaterSensor>() {@Overridepublic RedisCommandDescription getCommandDescription() {// 写入listreturn new RedisCommandDescription(RedisCommand.RPUSH);}@Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}@Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

5. Redis_Sink_set

addSink(new RedisSink<>(config, new RedisMapper< WaterSensor>() {}

写到 set 结构里面

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayList<WaterSensor> waterSensors = new ArrayList<>();waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream.keyBy(WaterSensor::getId).sum("vc");FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("hadoop101").setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).build();result.addSink(new RedisSink<>(config, new RedisMapper<WaterSensor>() {@Overridepublic RedisCommandDescription getCommandDescription() {// 数据写入set集合return new RedisCommandDescription(RedisCommand.SADD);}@Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}@Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

6. Redis_Sink_hash

addSink(new RedisSink<>(config, new RedisMapper< WaterSensor>() {}

写到 hash结构里面

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayList<WaterSensor> waterSensors = new ArrayList<>();waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream.keyBy(WaterSensor::getId).sum("vc");FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("hadoop101").setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).build();result.addSink(new RedisSink<>(config, new RedisMapper<WaterSensor>() {@Overridepublic RedisCommandDescription getCommandDescription() {// 数据写入hashreturn new RedisCommandDescription(RedisCommand.HSET,"a");}@Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}@Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

7. 有界流数据写入到ES中

new ElasticsearchSink.Builder()

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayList<WaterSensor> waterSensors = new ArrayList<>();waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream.keyBy(WaterSensor::getId).sum("vc");List<HttpHost> hosts = Arrays.asList(new HttpHost("hadoop101", 9200),new HttpHost("hadoop102", 9200),new HttpHost("hadoop103", 9200));ElasticsearchSink.Builder<WaterSensor> builder = new ElasticsearchSink.Builder<WaterSensor>(hosts,new ElasticsearchSinkFunction<WaterSensor>() {@Overridepublic void process(WaterSensor element,  // 需要写出的元素RuntimeContext runtimeContext, // 运行时上下文   不是context上下文对象RequestIndexer requestIndexer) {  // 把要写出的数据,封装到RequestIndexer里面String msg = JSON.toJSONString(element);IndexRequest ir = Requests.indexRequest("sensor").type("_doc")  // 定义type的时候, 不能下划线开头. _doc是唯一的特殊情况.id(element.getId())  // 定义每条数据的id. 如果不指定id, 会随机分配一个id. id重复的时候会更新数据.source(msg, XContentType.JSON);requestIndexer.add(ir);  // 把ir存入到indexer, 就会自动的写入到es中}});result.addSink(builder.build());try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

8. 无界流数据写入到ES

  和有界差不多 ,只不过把数据源换成socket,然后因为无界流,它高效不是你来一条就刷出去,所以设置刷新时间、大小、条数,才能看到结果。
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> result = env.socketTextStream("hadoop101",9999).map(line->{String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).sum("vc");List<HttpHost> hosts = Arrays.asList(new HttpHost("hadoop101", 9200),new HttpHost("hadoop102", 9200),new HttpHost("hadoop103", 9200));ElasticsearchSink.Builder<WaterSensor> builder = new ElasticsearchSink.Builder<WaterSensor>(hosts,new ElasticsearchSinkFunction<WaterSensor>() {@Overridepublic void process(WaterSensor element,  // 需要写出的元素RuntimeContext runtimeContext, // 运行时上下文   不是context上下文对象RequestIndexer requestIndexer) {  // 把要写出的数据,封装到RequestIndexer里面String msg = JSON.toJSONString(element);IndexRequest ir = Requests.indexRequest("sensor").type("_doc")  // 定义type的时候, 不能下划线开头. _doc是唯一的特殊情况.id(element.getId())  // 定义每条数据的id. 如果不指定id, 会随机分配一个id. id重复的时候会更新数据.source(msg, XContentType.JSON);requestIndexer.add(ir);  // 把ir存入到indexer, 就会自动的写入到es中}});// 自动刷新时间builder.setBulkFlushInterval(2000);  // 默认不会根据时间自动刷新builder.setBulkFlushMaxSizeMb(1024);  // 当批次中的数据大于等于这个值刷新builder.setBulkFlushMaxActions(2);   // 每来多少条数据刷新一次// 这三个是或的关系,只要有一个满足就会刷新result.addSink(builder.build());try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

9. 自定义sink - mysql_Sink

  需要写一个类,实现RichSinkFunction,然后实现invoke方法。这里因为是写MySQL所以需要建立连接,那就用Rich版本。

  记得导入MySQL依赖

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayList<WaterSensor> waterSensors = new ArrayList<>();waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream.keyBy(WaterSensor::getId).sum("vc");result.addSink(new MySqlSink());try {env.execute();} catch (Exception e) {e.printStackTrace();}}public static class MySqlSink extends RichSinkFunction<WaterSensor> {private Connection connection;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.cj.jdbc.Driver");connection = DriverManager.getConnection("jdbc:mysql://hadoop101:3306/test?useSSL=false", "root", "123456");}@Overridepublic void close() throws Exception {if (connection!=null){connection.close();}}// 调用:每来一条元素,这个方法执行一次@Overridepublic void invoke(WaterSensor value, Context context) throws Exception {// jdbc的方式想MySQL写数据
//            String sql = "insert into sensor(id,ts,vc)values(?,?,?)";//如果主键不重复就新增,主键重复就更新
//            String sql = "insert into sensor(id,ts,vc)values(?,?,?) duplicate key update vc=?";String sql = "replace into sensor(id,ts,vc)values(?,?,?)";// 1. 得到预处理语句PreparedStatement ps = connection.prepareStatement(sql);// 2. 给sql中的占位符进行赋值ps.setString(1,value.getId());ps.setLong(2,value.getTs());ps.setInt(3,value.getVc());
//            ps.setInt(4,value.getVc());// 3. 执行ps.execute();// 4. 提交
//            connection.commit();  MySQL默认自动提交,所以这个地方不用调用// 5. 关闭预处理ps.close();}
}

运行结果:
在这里插入图片描述

10. Jdbc_Sink

addSink(JdbcSink.sink(sql,JdbcStatementBuilder,执行参数,连接参数)

  对于jdbc数据库,我们其实没必要自定义,因为官方给我们了一个JDBC Sink -> 官方JDBC Sink 传送门

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.13.6</version>
</dependency>
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayList<WaterSensor> waterSensors = new ArrayList<>();waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream.keyBy(WaterSensor::getId).sum("vc");result.addSink(JdbcSink.sink("replace into sensor(id,ts,vc)values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement ps,WaterSensor waterSensor) throws SQLException {// 只做一件事:给占位符赋值ps.setString(1,waterSensor.getId());ps.setLong(2,waterSensor.getTs());ps.setInt(3,waterSensor.getVc());}},new JdbcExecutionOptions.Builder()  //设置执行参数.withBatchSize(1024)   // 刷新大小上限.withBatchIntervalMs(2000) //刷新间隔.withMaxRetries(3)  // 重试次数.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUrl("jdbc:mysql://hadoop101:3306/test?useSSL=false").withUsername("root").withPassword("123456").build()));try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

http://www.yayakq.cn/news/701362/

相关文章:

  • 东莞网站设计网址深圳惠州网站建设公司
  • 有开源项目做的网站ps做图软件怎么下载网站
  • 雷州市规划建设局网站高端网站建设定制
  • 如何加盟网站建设网页工具
  • 建设银行上海分行招聘网站网站开发常遇到客户问题
  • 英文网站建视频剪辑自学网站
  • 广西城乡和住房建设厅网站计算机网站建设是什么
  • 南昌专业的企业网站开发公司六安网站制作哪家好
  • 页面排版西宁整站优化
  • wordpress怎么做网盘站wordpress调用百度文库
  • 上海网站建设推荐q479185700顶你目前做外贸平台
  • 没有备案的网站 推广平邑哪里有做网站的
  • 电商网站开发设计文档毕节市交通建设集团网站
  • 做网站js是什么wordpress模板最多使用
  • 网站 组成建设网站的预期收益
  • 青海省建设局网站怎样用自己的电脑,做网站
  • 公司网站域名备案北京网站排名制作
  • 如果在工商局网站上做股权质押建网站啦
  • 做网页要花多少钱邢台市seo服务
  • 上海建设网站价格网络营销用什么软件
  • 企业网站排名技巧工业设计效果图
  • 如何上国外购物网站在线a视频网站一级a做片
  • 泸州市建设局网站广州做网站的公司哪家好
  • 长城建设投资有限公司网站上海企业网络营销推广多少钱
  • 有域名如何做网站wordpress里的站点标题是什么意思
  • 山东网站建设app刷单网站搭建
  • 网站开发总结与收获中英文网站建站
  • 网站建设 微盘编辑网页用什么软件
  • 广州网站建设多少钱网站建设个人博客
  • 成都 网站建设 app 开发短视频推广app