当前位置: 首页 > news >正文

公司网站建设推广方案门户网站内容建设

公司网站建设推广方案,门户网站内容建设,江西建筑人才网,wordpress301插件使用java远程提交flink任务到yarn集群 背景 由于业务需要,使用命令行的方式提交flink任务比较麻烦,要么将后端任务部署到大数据集群,要么弄一个提交机,感觉都不是很离线。经过一些调研,发现可以实现远程的任务发布。…

使用java远程提交flink任务到yarn集群

背景

由于业务需要,使用命令行的方式提交flink任务比较麻烦,要么将后端任务部署到大数据集群,要么弄一个提交机,感觉都不是很离线。经过一些调研,发现可以实现远程的任务发布。接下来就记录一下实现过程。这里用flink on yarn 的Application模式实现

环境准备

  • 大数据集群,只要有hadoop就行
  • 后端服务器,linux mac都行,windows不行

正式开始

1. 上传flink jar包到hdfs

去flink官网下载你需要的版本,我这里用的是flink-1.18.1,把flink lib目录下的jar包传到hdfs中。

在这里插入图片描述
其中flink-yarn-1.18.1.jar需要大家自己去maven仓库下载。

2. 编写一段flink代码

随便写一段flink代码就行,我们目的是测试

package com.azt;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;
import java.util.concurrent.TimeUnit;public class WordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> ctx) throws Exception {String[] words = {"spark", "flink", "hadoop", "hdfs", "yarn"};Random random = new Random();while (true) {ctx.collect(words[random.nextInt(words.length)]);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {}});source.print();env.execute();}
}

3. 打包第二步的代码,上传到hdfs

在这里插入图片描述

4. 拷贝配置文件

  • 拷贝flink conf下的所有文件到java项目的resource中
  • 拷贝hadoop配置文件到到java项目的resource中

具体看截图
在这里插入图片描述

5. 编写java远程提交任务的程序

这一步有个注意的地方就是,如果你跟我一样是windows电脑,那么本地用idea提交会报错;如果你是mac或者linux,那么可以直接在idea中提交任务。

package com.test;import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;/*** @date :2021/5/12 7:16 下午*/
public class Main {public static void main(String[] args) throws Exception {///home/root/flink/lib/libSystem.setProperty("HADOOP_USER_NAME","root");
//        String configurationDirectory = "C:\\project\\test_flink_mode\\src\\main\\resources\\conf";String configurationDirectory = "/export/server/flink-1.18.1/conf";org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());String flinkLibs = "hdfs://node1.itcast.cn/flink/lib";String userJarPath = "hdfs://node1.itcast.cn/flink/user-lib/original.jar";String flinkDistJar = "hdfs://node1.itcast.cn/flink/lib/flink-yarn-1.18.1.jar";YarnClient yarnClient = YarnClient.createYarnClient();YarnConfiguration yarnConfiguration = new YarnConfiguration();yarnClient.init(yarnConfiguration);yarnClient.start();YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever.create(yarnClient);//获取flink的配置Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(configurationDirectory);flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);flinkConfiguration.set(PipelineOptions.JARS,Collections.singletonList(userJarPath));YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration,configurationDirectory);Path remoteLib = new Path(flinkLibs);flinkConfiguration.set(YarnConfigOptions.PROVIDED_LIB_DIRS,Collections.singletonList(remoteLib.toString()));flinkConfiguration.set(YarnConfigOptions.FLINK_DIST_JAR,flinkDistJar);//设置为application模式flinkConfiguration.set(DeploymentOptions.TARGET,YarnDeploymentTarget.APPLICATION.getName());//yarn application nameflinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobname");//设置配置,可以设置很多flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));flinkConfiguration.set(TaskManagerOptions.NUM_TASK_SLOTS, 4);flinkConfiguration.setInteger("parallelism.default", 4);ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();//		设置用户jar的参数和主类ApplicationConfiguration appConfig = new ApplicationConfiguration(args,"com.azt.WordCount");YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConfiguration,yarnConfiguration,yarnClient,clusterInformationRetriever,true);ClusterClientProvider<ApplicationId> clusterClientProvider = null;try {clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(clusterSpecification,appConfig);} catch (ClusterDeploymentException e){e.printStackTrace();}ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();System.out.println(clusterClient.getWebInterfaceURL());ApplicationId applicationId = clusterClient.getClusterId();System.out.println(applicationId);Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();int counts = 30;while (jobStatusMessages.size() == 0 && counts > 0) {Thread.sleep(1000);counts--;jobStatusMessages = clusterClient.listJobs().get();if (jobStatusMessages.size() > 0) {break;}}if (jobStatusMessages.size() > 0) {List<String> jids = new ArrayList<>();for (JobStatusMessage jobStatusMessage : jobStatusMessages) {jids.add(jobStatusMessage.getJobId().toHexString());}System.out.println(String.join(",",jids));}}
}

由于我这里是windows电脑,所以我打包放到服务器上去运行
执行命令 :

java -cp test_flink_mode-1.0-SNAPSHOT.jar com.test.Main

不出以外的话,会打印如下日志

log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
http://node2:33811
application_1715418089838_0017
6d4d6ed5277a62fc9a3a274c4f34a468

复制打印的url连接,就可以打开flink的webui了,在yarn的前端页面中也可以看到flink任务。

http://www.yayakq.cn/news/743130/

相关文章:

  • 中英文 网站win10 电脑做网站服务器吗
  • 建设个定制网站需要多少钱阜宁网站建设
  • 网站幻灯通栏代码淘宝网站怎么做视频教程
  • 全网营销网站建设西安seo优化培训
  • 张家港江阴网站制作国际学校网站如何建设
  • 更合网站制作公司建地方门户网站
  • 郑州搭建网站网站开发工具教程
  • 用手机可以建设一个手机网站吗现在房地产的最新情况
  • 公司做阿里巴巴网站要多少钱做园林景观的网站
  • 不在百度做推广他会把你的网站排名弄掉软件开发培训平台
  • 杭州微网站开发公司电话国际域名的外贸网站
  • 东莞网站建设市场微信朋友圈的广告怎么投放
  • 建筑工程网站模板如何开发一个软件
  • 网站建设不用备案的wordpress统计分析
  • 卫浴洁具网站模板wordpress头部导航栏代码
  • 怎么查网站建设时间dedecms游戏门户网站源码
  • 本地网站开发公司门户类网站
  • 河南官网网站建设wordpress能开发商城网站
  • 如何架设内部网站交互效果好的移动端网站
  • 婺源网站建设深圳龙华区教师招聘
  • 亲子装网站建设东莞做网站 动点官网
  • 长沙做网站咨询公司服务器没有安装wordpress
  • 中国建设银行网站查征信wordpress商品展示模块
  • 网站建设企业名录网易云播放器做网站播放
  • 宝坻区建设路小学网站对手网站分析
  • 大型网站过程动画设计思路怎么写
  • 东红物流网站建设规划书关于小学网站建设的论文
  • 免费建站宝盒制作相册app
  • 做网站引用别人的图片怎么分析网站的外链建设情况
  • 舞钢市城市建设局网站龙岩建筑公司有哪些