当前位置: 首页 > news >正文

织梦网站移动化简单 大气 网站模版

织梦网站移动化,简单 大气 网站模版,家装设计软件app免费,手机上能不能制作网站开发Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章 本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前版本不兼容,所以需要对 …

Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章

file

本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前版本不兼容,所以需要对 SeaTunnel-Web的源码进行修改适配。

源码修改编译

克隆SeaYunnel-Web源码到本地

  git  clone https://github.com/apache/seatunnel-web.git

在idea中打开项目

升级Pom中的SeaTunnel版本到2.3.4并重新导入依赖

  <seatunnel-framework.version>2.3.3</seatunnel-framework.version>改为<seatunnel-framework.version>2.3.4</seatunnel-framework.version>

因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的SeaTunnel2.3.4 部分API发生了改动导致直接升级的过程中会出现API不兼容的问题,所以本篇文章重点来了:我们需要对调用SeaTunnel API的SeaTunnel Web源码部分进行修改,修改完之后,我们就能完全适配2.3.4最新版本。

社区推出了2.3.X及Web系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。

org.apache.dolphinscheduler.api.dto.seatunnel.bean.engine.EngineDataType

public static class SeaTunnelDataTypeConvertorimplements DataTypeConvertor<SeaTunnelDataType<?>> {@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) {return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();}@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)throws DataTypeConvertException {return seaTunnelDataType;}@Overridepublic SeaTunnelDataType<?> toConnectorType(SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)throws DataTypeConvertException {return seaTunnelDataType;}@Overridepublic String getIdentity() {return "EngineDataTypeConvertor";}
}
// 改为
public static class SeaTunnelDataTypeConvertorimplements DataTypeConvertor<SeaTunnelDataType<?>> {@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String s, String s1) {return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();}@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {return seaTunnelDataType;}@Overridepublic SeaTunnelDataType<?> toConnectorType(String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {return seaTunnelDataType;}@Overridepublic String getIdentity() {return "EngineDataTypeConvertor";}}

org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl

public TableSchemaServiceImpl() throws IOException {Common.setStarter(true);Set<PluginIdentifier> pluginIdentifiers =SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();pluginIdentifiersList.addAll(pluginIdentifiers);List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);//        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();if (!pluginJarPaths.isEmpty()) {//            List<URL> files = FileUtils.searchJarFiles(path);pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));factory =new DataTypeConvertorFactory(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factory = new DataTypeConvertorFactory();}
}
// 改为public TableSchemaServiceImpl() throws IOException {Common.setStarter(true);Set<PluginIdentifier> pluginIdentifiers =SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();pluginIdentifiersList.addAll(pluginIdentifiers);List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);//        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();if (!pluginJarPaths.isEmpty()) {//            List<URL> files = FileUtils.searchJarFiles(path);pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));factory =new DataTypeConvertorFactory(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factory = new DataTypeConvertorFactory();}}SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
// 改为
SeaTunnelDataType<?> dataType =convertor.toSeaTunnelType(field.getName(), field.getType());

org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()

 public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {Common.setDeployMode(DeployMode.CLIENT);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");try {SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();SeaTunnelClient seaTunnelClient = createSeaTunnelClient();ClientJobExecutionEnvironment jobExecutionEnv =seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));jobInstanceDao.update(jobInstance);CompletableFuture.runAsync(() -> {waitJobFinish(clientJobProxy,userId,jobInstanceId,Long.toString(clientJobProxy.getJobId()),seaTunnelClient);});} catch (ExecutionException | InterruptedException e) {ExceptionUtils.getMessage(e);throw new RuntimeException(e);}return jobInstanceId;}

org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl

else if (statusList.contains("CANCELLING")) {jobStatus = JobStatus.CANCELLING.name();
// 改为
else if (statusList.contains("CANCELING")) {jobStatus = JobStatus.CANCELING.name();

org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl

TableFactoryContext context =new TableFactoryContext(Collections.singletonList(table),ReadonlyConfig.fromMap(config),Thread.currentThread().getContextClassLoader());
// 改为
TableTransformFactoryContext context =new TableTransformFactoryContext(Collections.singletonList(table),ReadonlyConfig.fromMap(config),Thread.currentThread().getContextClassLoader());

org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy

public void restoreJob(@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");try {seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();} catch (ExecutionException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}
}
// 改为
public void restoreJob(@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();try {seaTunnelClient.restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId).execute();} catch (ExecutionException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}}

org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil

public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(PluginType pluginType) throws IOException {Common.setStarter(true);if (!pluginType.equals(PluginType.SOURCE)) {throw new UnsupportedOperationException("ONLY support plugin type source");}Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();List<Factory> factories;if (path.toFile().exists()) {List<URL> files = FileUtils.searchJarFiles(path);factories =FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));} else {factories =FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());}Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();factories.forEach(plugin -> {if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;PluginIdentifier info =PluginIdentifier.of("seatunnel",PluginType.SOURCE.getType(),plugin.factoryIdentifier());featureMap.put(info,new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));}});return featureMap;
}
// 改为public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(PluginType pluginType) {Common.setStarter(true);if (!pluginType.equals(PluginType.SOURCE)) {throw new UnsupportedOperationException("ONLY support plugin type source");}ArrayList<PluginIdentifier> pluginIdentifiers = new ArrayList<>();pluginIdentifiers.addAll(SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);List<Factory> factories;if (!pluginJarPaths.isEmpty()) {factories =FactoryUtil.discoverFactories(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factories =FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());}Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();factories.forEach(plugin -> {if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;PluginIdentifier info =PluginIdentifier.of("seatunnel",PluginType.SOURCE.getType(),plugin.factoryIdentifier());featureMap.put(info,new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));}});return featureMap;

代码格式化

mvn spotless:apply

编译打包

mvn clean package -DskipTests

至此,seatunnel web 适配 seatunnel2.3.4版本完成,对应的安装包会在 seatunnel-web-dist/target目录下生成

Linux部署测试

这里具体请参考之前社区其他老师发布的文章Apache SeaTunnel Web部署指南

重要的配置项

1、seatunnel-web数据库相关配置(application.yml) 
用来web服务中的数据持久化2、SEATUNNEL_HOME(环境变量)
seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器3、ST_WEB_HOME(环境变量)
seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义4、重要的配置文件:
connector-datasource-mapper.yaml 
该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等)
hazelcast-client.yaml 
seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息

感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!

本文由 白鲸开源科技 提供发布支持!

http://www.yayakq.cn/news/518258/

相关文章:

  • 襄阳南漳县城乡建设局网站做网站常用字体
  • 国内特效网站wordpress主题删除失败
  • 网站备案需要关站wordpress 注册 填写密码
  • 济源新站seo关键词排名推广室内设计女孩子学难吗
  • 本地集团网站建设wordpress 年索引
  • 蓬莱做网站那家好厦门网页设计培训班
  • 建设部网站实名制举报百度业务范围
  • 做网站标配我想自己做的知道网站
  • 网站开发工作程序怎么写国家653建筑工程网
  • 宁波育才建设教育集团网站网络平台举报中心
  • 做网站有什么用出杭州 定制网站
  • 中小学门户网站建设利川网站建设
  • 网上宿迁官方网站邯郸论坛官网
  • 东营区住房和城乡建设局网站wordpress批量修改引用网址
  • 网站只做程序员wordpress 众筹主题
  • 电商网站开发费用产品开发设计流程图
  • dw 做网站图片之间的链接wordpress增加购物车
  • 网站内链少改怎么做怎样开发app软件
  • 商标被注册了做网站网络销售模式 自建网站
  • 一键制作视频的软件天津百度网站排名优化
  • dy刷粉网站推广马上刷免费logo设计一键生成下载
  • 优化网站打开速度深圳网站建设服务哪家有
  • 常熟有做网站的网络公司吗wordpress导入超时
  • 青岛网站建设和优化wordpress云盘视频播放
  • 凌云县 城市建设 网站学校网站建设公司
  • 做网站主题wordpress怎么清空
  • wap网站制作软件访问 wordpress
  • 南通快速建设网站服务十大网站黄页的免费
  • 国外网站做freelancer最近的国际新闻热点
  • 端午节网站建设dede手机网站更新