当前位置: 首页 > news >正文

建站公司用的开源系统公司网站建设济南兴田德润地址

建站公司用的开源系统,公司网站建设济南兴田德润地址,wordpress 添加广告窗口,怎么做卖保险的网站一. Source 简介 DataStream是Flink的低级API,用于进行数据的实时处理,Flink编程模型分为Source、Transformation、Sink三个部分,如下图所示。 默认Flink提供了大量的内置Source,常见的Source如下: 基于文件的Sour…
一. Source 简介

DataStream是Flink的低级API,用于进行数据的实时处理,Flink编程模型分为Source、Transformation、Sink三个部分,如下图所示。
在这里插入图片描述

默认Flink提供了大量的内置Source,常见的Source如下:

  • 基于文件的Source
  • 基于Socket的Source
  • 基于集合的Source
  • 基于Kafka消息队列的Source

当以上内置Source不能满足业务需要时,可以实现自定义Source。

Flink中有关Source的接口类的继承关系如下:
在这里插入图片描述

  • SourceFunction:单并行度Source的基类
  • RichSourceFunction:单并行度增强型Source的基类
  • ParallelSourceFunction:多并行度Source的基类
  • RichParallelSourceFunction:多并行度增强型Source的基类
二. 自定义单并行度Source

自定义单并行度的source需要实现SourceFunction接口。

代码实现

MySource.java

package flink.basic.source;import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;public class MySource implements SourceFunction<String> {boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {Random random = new Random();while (running) {// "Num"加上0~100的随机数生成一个字符串ctx.collect("Num: " + random.nextInt(100));Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

SourceDemo.java

package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MySource());source.print();env.execute("source_demo");}
}

运行结果

5> Num: 62
6> Num: 91
7> Num: 13
8> Num: 53
三. 自定义多并行度Source

自定义多并行度的source需要实现ParallelSourceFunction接口。

代码实现

MyParallelSource.java

package flink.basic.source;import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;public class MyParallelSource implements ParallelSourceFunction<String> {boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {Random random = new Random();while (running) {ctx.collect("Num: " + random.nextInt(100));Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

SourceDemo.java

package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MyParallelSource());source.print();env.execute("source_demo");}
}

运行结果

7> Num: 43
8> Num: 30
1> Num: 92
2> Num: 50
5> Num: 39
6> Num: 6
4> Num: 20
3> Num: 2
四. 自定义单并行度增强型Source

增强型Source额外提供了open和close方法,可以用于自定义Source的初始化和清理工作。单并行度增强型Source需要实现RichSourceFunction接口。下面演示实现读取mysql表的单并行度Source。

在mysql中创建student表,并插入三条数据。

create table student (id int primary key,name varchar(50),age int
);insert into student values(1, "name1", 20),(2, "name2", 30), (3, "name3", 15);

实现代码

Student.java

package flink.basic.source;public class Student {private int id;private String name;private int age;public Student(int id, String name, int age) {this.id = id;this.name = name;this.age = age;}public Student() {}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", age=" + age +'}';}
}

MysqlSource.java

package flink.basic.source;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;public class MysqlSource extends RichSourceFunction<Student> {Connection conn;Statement stmt;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.cj.jdbc.Driver");String url = "jdbc:mysql://192.168.47.130:3306/test";String user = "root";String password = "root";conn = DriverManager.getConnection(url,user,password);stmt = conn.createStatement();}@Overridepublic void run(SourceContext<Student> ctx) throws Exception {ResultSet rs = stmt.executeQuery("select * from student");while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int age = rs.getInt("age");ctx.collect(new Student(id, name, age));}rs.close();}@Overridepublic void cancel() {}@Overridepublic void close() throws Exception {stmt.close();conn.close();}
}

SourceDemo.java

package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();// 添加mysql SourceDataStreamSource<Student> source = env.addSource(new MysqlSource());source.print();env.execute("source_demo");}
}

运行结果

1> Student{id=3, name='name3', age=15}
8> Student{id=2, name='name2', age=30}
7> Student{id=1, name='name1', age=20}
http://www.yayakq.cn/news/236143/

相关文章:

  • 网站开发流程及详解建设实验教学网站的作用
  • 做网站和推广的公司哪家好电子商务网站开发教程课后答案
  • 网站说明书的详细说明汕头网站建设技术外包
  • 重庆集团网站建设怎么在网上建网站啊
  • 音乐网站开发需要什么语言工具ps如何做网站专题
  • 沈阳网站制作招聘网如何创建一个网站的流程
  • 济南房产信息网站官网查询详情页生成器
  • 衡水做网站报价深圳建设交易工程服务网
  • 做携程网站的技术wordpress安装 centos
  • 药材网技术网站建设网站搭建一般要多少钱
  • 手机建立网站最具口碑的企业网站建设
  • 新乡电子商务网站建设的的平台服务电话
  • 做外国人的生意哪家网站好2o18江苏建设网站施工员模试卷
  • 重庆网站推广营销价格网站充值记账凭证怎么做
  • 装饰工程网站模板下载好的建站平台
  • 网站设计与网站建设a卷酒店微网站建设
  • wordpress 适合做什么网站网站客户留言
  • 公众号购物做网站还是小程序企业组网方案
  • 做网站宿迁环球旅行卡怎么用
  • tp5做企业网站河源市规划建设局网站
  • 网页设计与网站建设选择题玉田住房和建设局网站
  • 怎样做旅游网站设计六安木兰巷
  • 网站设计师主要做什么爱论网
  • 以绿色为主色的网站模版上海短视频拍摄制作公司
  • 网站 流量 不够用二级网站免费建
  • 咸阳做企业网站深圳建设工程交易信息网
  • 刚做的网站怎么才能搜索到如何搜索公司所有的网站
  • 网站建设哪家效益快用ps做网站是用像素还是毫米
  • php网站后台密码忘记网站优化培训中心
  • 响应式网站用什么单位网站建设制作深圳