当前位置: 首页 > news >正文

如何做网站发布商品长沙市在建工程项目

如何做网站发布商品,长沙市在建工程项目,深圳排名前十的跨境电商公司,网站备案符号文章目录 系列文章索引一、快速上手1、导包2、求词频demo(1)要读取的数据(2)demo1:批处理(离线处理)(3)demo2 - lambda优化:批处理(离线处理&…

文章目录

  • 系列文章索引
  • 一、快速上手
    • 1、导包
    • 2、求词频demo
      • (1)要读取的数据
      • (2)demo1:批处理(离线处理)
      • (3)demo2 - lambda优化:批处理(离线处理)
      • (4)demo3:流处理(实时处理)
      • (5)总结:实时vs离线
      • (6)demo4:批流一体
      • (7)对接Socket
  • 二、Flink部署
    • 1、Flink架构
    • 2、Standalone部署
    • 3、自运行flink-web
    • 4、通过参数传递
    • 5、通过webui提交job
    • 6、停止作业
    • 7、常用命令
    • 8、集群
  • 参考资料

系列文章索引

Flink从入门到实践(一):Flink入门、Flink部署
Flink从入门到实践(二):Flink DataStream API
Flink从入门到实践(三):数据实时采集 - Flink MySQL CDC

一、快速上手

1、导包

<!-- fink 相关依赖 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.18.0</version>
</dependency>

2、求词频demo

注意!自Flink 1.18以来,所有Flink DataSet api都已弃用,并将在未来的Flink主版本中删除。您仍然可以在DataSet中构建应用程序,但是您应该转向DataStream和/或Table API。

(1)要读取的数据

定义data内容:

pk,pk,pk
ruoze,ruoze
hello

(2)demo1:批处理(离线处理)

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** 使用Flink进行批处理,并统计wc*** 结果:* (bye,2)* (hello,3)* (hi,1)*/
public class BatchWordCountApp {public static void main(String[] args) throws Exception {// step0: Spark中有上下文,Flink中也有上下文,MR中也有ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// step1: 读取文件内容  ==> 一行一行的字符串而已DataSource<String> source = env.readTextFile("data/wc.data");// step2: 每一行的内容按照指定的分隔符进行拆分  1:Nsource.flatMap(new FlatMapFunction<String, String>() {/**** @param value 读取到的每一行数据* @param out 输出的集合*/@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {// 使用,进行分割String[] splits = value.split(",");for(String split : splits) {out.collect(split.toLowerCase().trim());}}}).map(new MapFunction<String, Tuple2<String,Integer>>() {/**** @param value 每一个元素 (hello, 1)(hello, 1)(hello, 1)*/@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}}).groupBy(0)  // step4: 按照单词进行分组  groupBy是离线的api,传下标.sum(1)  // ==> 求词频 sum,传下标.print(); // 打印}
}

(3)demo2 - lambda优化:批处理(离线处理)

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** lambda表达式优化*/
public class BatchWordCountAppV2 {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> source = env.readTextFile("data/wc.data");/*** lambda语法: (参数1,参数2,参数3...) -> {函数体}*/
//        source.map(String::toUpperCase).print();// 使用了Java泛型,由于泛型擦除的原因,需要显示的声明类型信息source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {String[] splits = value.split(",");for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).groupBy(0).sum(1).print();}
}

(4)demo3:流处理(实时处理)

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** 流式处理* 结果:* 8> (hi,1)* 6> (hello,1)* 5> (bye,1)* 6> (hello,2)* 6> (hello,3)* 5> (bye,2)*/
public class StreamWCApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.readTextFile("data/wc.data");source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {String[] splits = value.split(",");for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(x -> x.f0) // 这种写法一定要掌握!流式的并没有groupBy,而是keyBy!根据第一个值进行sum.sum(1).print();// 需要手动开启env.execute("作业名字");}
}

(5)总结:实时vs离线

离线:结果是一次性出来的。
实时:来一个数据处理一次,数据是带状态的。

(6)demo4:批流一体

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** 采用批流一体的方式进行处理*/
public class FlinkWordCountApp {public static void main(String[] args) throws Exception {// 统一使用StreamExecutionEnvironment这个执行上下文环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 选择处理方式 批/流/自动DataStreamSource<String> source = env.readTextFile("data/wc.data");source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {String[] splits = value.split(",");for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(x -> x.f0) // 这种写法一定要掌握.sum(1).print();// 执行env.execute("作业名字");}
}

(7)对接Socket

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** 使用Flink对接Socket的数据并进行词频统计** 大数据处理的三段论: 输入  处理  输出**/
public class FlinkSocket {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*** 数据源:可以通过多种不同的数据源接入数据:socket  kafka  text** 官网上描述的是 env.addSource(...)** socket的方式对应的并行度是1,因为它来自于SourceFunction的实现*/DataStreamSource<String> source = env.socketTextStream("localhost", 9527);System.out.println(source.getParallelism());// 处理source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {String[] splits = value.split(",");for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(x -> x.f0) // 这种写法一定要掌握.sum(1)// 数据输出.print();  // 输出到外部系统中去env.execute("作业名字");}
}

二、Flink部署

1、Flink架构

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/
Flink是一个分布式的带有状态管理的计算框架,可以运行在常用/常见的集群资源管理器上(YARN、K8S)。

一个JobManager(协调/分配),一个或多个TaskManager(工作)。
在这里插入图片描述
在这里插入图片描述

2、Standalone部署

按照官网下载执行即可:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation/

可以根据官网来安装,需要下载、解压、安装。
也可以使用docker安装。

启动之后,localhost:8081就可以访问管控台了。

3、自运行flink-web

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>1.18.0</version>
</dependency>
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8082); // 指定web端口,开启webUI,不写的话默认8081
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// 新版本可以直接使用getExecutionEnvironment(conf)

以上亲测并不好使……具体原因未知,设置为flink1.16版本或许就好用了。

4、通过参数传递

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 通过参数传递进来Flink引用程序所需要的参数,flink自带的工具类
ParameterTool tool = ParameterTool.fromArgs(args);
String host = tool.get("host");
int port = tool.getInt("port");DataStreamSource<String> source = env.socketTextStream(host, port);
System.out.println(source.getParallelism());

可以通过命令行参数:–host localhost --port 8765

5、通过webui提交job

在这里插入图片描述
在这里插入图片描述

6、停止作业

在这里插入图片描述

7、常用命令

# 查看作业列表
flink list -a  # 所有
flink list -r  # 正在运行的
# 停止作业
flink cancel <jobid># 提交job
# -c,--class <classname> 指定main方法
# -C,--classpath <url> 指定classpath
# -p,--parallelism <paralle> 指定并行度
flink run -c com.demo.FlinkDemo FlinkTest.jar 

8、集群

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/#flink-application-execution

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/

单机部署Session Mode和Application Mode:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/standalone/overview/

k8s:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/native_kubernetes/

YARN:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/yarn/

参考资料

https://flink.apache.org/
https://nightlies.apache.org/flink/flink-docs-stable/

http://www.yayakq.cn/news/715007/

相关文章:

  • 推广普通话绘画作品大连网络营销seo
  • discuz视频网站模板搜索引擎广告名词解释
  • 苏州正规做网站公司金山专业做网站
  • 网站建设虚拟金融投资公司网站建设论文
  • 网站维护运营主要是做什么的xampp wordpress安装
  • 建设银行上海分行招聘网站杭州的网站建设
  • php在网站后台建设中的优势wordpress 登录用户
  • 网站建设要什么零食天堂专做零食推荐的网站
  • 机械设计网站有哪些做盗版小说网站赚钱嘛
  • node 网站开发 视频教程微信如何绑定网站
  • 心悦免做卡领取网站江苏网站建设公司排名
  • 制作网站费用怎么做分录做电商网站的流程
  • 怎么介绍自己做的网站效果图wordpress 搜索频率
  • 杭州做网站公司有哪些外贸常用网站
  • 电商网站做导购义乌 网站建设
  • 怎么搜索网站建一个网站迈年
  • 专业网站制作网站公司重庆招聘网站建设
  • 国内精美网站界面网址织梦dedecms电影网站模板
  • 网站建设 学生作业如皋市建设局网站在哪里
  • 电子商务网站建设参考书永久免费自助建站源代码
  • 网站建设计算机人员招聘代运营公司前十名
  • 网络空间的竞争归根结底是百度seo关键词排名优化
  • 电子商务网站建设视频教程网页导航菜单设计
  • 鹤城建设集团网站群晖wordpress主机
  • 网站制作的重要性怎么建单位的网站
  • 响应式网站怎么做pc端的wordpress 上传rar
  • 云盘做网站空间建设网站公司那里好相关的热搜问题解决方案
  • 黄山网站建设哪家强网站建设实验总结百科
  • 组服务器做网站产品推广建议
  • 什么是网站开发设计与实现网站建设项目实训报告