当前位置: 首页 > news >正文

白菜网站建设国字型网站建设布局

白菜网站建设,国字型网站建设布局,山东省建设厅官方网站怎么样,广州十大游戏公司说明 最大的好处是方便。 其实所有任务的源头,应该都是通过定时的方式,在每个时隙发起轮询。当然在任务的后续传递中,可以通过CallBack或者WebHook的方式,以事件的形态进行。这样可以避免长任务执行的过程中进行等待和轮询。 总结…

说明

最大的好处是方便。

其实所有任务的源头,应该都是通过定时的方式,在每个时隙发起轮询。当然在任务的后续传递中,可以通过CallBack或者WebHook的方式,以事件的形态进行。这样可以避免长任务执行的过程中进行等待和轮询。

总结一下:源头是定时轮询,中间过程是事件传递。

本次使用APS搭建本地定时任务的目的是为了简化实验性质的定时任务,通过在git项目下进行编辑任务脚本和执行任务清单,而运行容器本身会周期性的自动拉取代码,然后按照任务清单执行。

执行过程采用多线程方式,任务的负载通常都不高。整体设计上,复杂和繁重的任务会包在微服务中,定时任务主要是向这些微服务发起触发动作。通常,微服务收到触发元信息后进行自动的任务/数据拉取处理,处理完毕后通过webhook将结果持久化,或进一步发起其他的触发动作。

另外,具有共性的任务将会被提取出来,之后会交给celery以分布&协程方式执行,这些任务包括:

  • 1 数据库IO。例如从队列里取数,存到数据库中。
  • 2 网络数据获取IO。爬取网页、或者通过接口,获取数据。
  • 3 接口化标准操作。按url, json input这样的标准web请求,这种灵活性很强。表面上是一个IO动作,但背后可能触发密集计算,但是又不需要celery集群承担。(可能是ray集群、dask集群、基于显卡计算的集群)

内容

1 读取任务列表

主要为了简单的读入任务(脚本),同时可以方便的进行注释

# 用于将代表任务列表的数据读入
# 去掉换行和空格
# 如果以# 号开头表示注释
def read_all_lines_clean(fpath):with open(fpath, 'r') as f:lines = f.readlines()lines1 = [x.replace('\n','').strip() for x in lines]lines2 = [x for x in lines1 if len(x) and not x.startswith('#')]return lines2

任务文件如下task_list.txt

task_01_probably_git_pull.py
task_02_del_event_null_recs.py
# task_03_sync_xs_backup.py
#task_04_rotate_data.py
# task_05_sync_milvus.py
#task_06_rotate_mysql_time.py

读入后

In [4]: a = read_all_lines_clean('task_list.txt')In [5]: a
Out[5]: ['task_01_probably_git_pull.py', 'task_02_del_event_null_recs.py']

这些就是之后要定时调度的任务

2 并行执行

为了使得每一次定时任务都可以执行,且保证效率,需要用一些简单的调度(容错问题均在脚本内解决)。调度器可以保证每30秒起来一次。

线程的并行执行:

def exe_tasks_threads(task_list_file = base_config.task_list_file, project_folder = base_config.project_folder):tasks = read_all_lines_clean(project_folder + task_list_file)dedup_tasks = remove_duplicates_preserve_order(tasks)pytask_list = [ {'some_path':base_config.project_folder+x} for x in dedup_tasks]thread_concurrent_run(os_system_python, keyword_args_list=pytask_list, max_workers =50)

每一次执行os_system_python

import subprocessdef os_system_python(some_path=None, timeout=30):try:result = subprocess.run(['python3', some_path], timeout=timeout)return resultexcept subprocess.TimeoutExpired:print(f"Task {some_path} timed out after {timeout} seconds.")return None'''
代码说明
subprocess.run:这是 subprocess 模块的高级 API,用于运行命令并等待其完成。它支持 timeout 参数,如果命令在指定时间内未完成,会抛出 TimeoutExpired 异常。timeout 参数:你设置了默认超时时间为 30 秒,这是一个合理的默认值。如果任务在 30 秒内未完成,subprocess.run 会抛出 TimeoutExpired 异常。异常处理:捕获 TimeoutExpired 异常后,打印超时信息并返回 None。这样可以避免程序因超时而崩溃,同时提供清晰的日志信息。
'''

3 自动更新

更新git项目,作为一个任务脚本被周期执行。由于代码更新并不是高频事件,所以一般概率上保证5分钟会更新一次代码。

(base) root@76a14afa199b:/workspace/local_aps_v2/base# python3 task_01_probably_git_pull.py
2000-01-01 08:00:00
2000-01-01 08:00:00
2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
task_01_probably_git_pull running
2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
Git pull executed successfully for branch 'master':
Already up to date.2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
(base) root@76a14afa199b:/workspace/local_aps_v2/base#

4 定时调度

调度器在每分钟的0/30秒执行,我把30秒定为一拍(pace),一分钟定位一时隙(slot)。绝大部分任务都应该在30秒内完成。

# 执行本地脚本
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerfrom base_config import base_config
from Basefuncs import * 
def exe_tasks_threads(task_list_file = base_config.task_list_file, project_folder = base_config.project_folder):tasks = read_all_lines_clean(project_folder + task_list_file)dedup_tasks = remove_duplicates_preserve_order(tasks)pytask_list = [ {'some_path':base_config.project_folder+x} for x in dedup_tasks]thread_concurrent_run(os_system_python, keyword_args_list=pytask_list, max_workers =50)# 后台启动命令 nohup python3 /root/prj27_timetask/cron_task/test_001.py >/dev/null 2>&1 &if __name__ == '__main__':# 创建调度器sche1 = BlockingScheduler()# 添加任务,使用 cron 表达式每分钟的第 0 秒和第 30 秒执行sche1.add_job(exe_tasks_threads,'cron',second='0,30',  # 每分钟的第 0 秒和第 30 秒kwargs={},coalesce=True,max_instances=1)print('[S] Starting scheduler with cron (0s and 30s of every minute)...')try:sche1.start()  # 启动调度器except (KeyboardInterrupt, SystemExit):print('[S] Scheduler stopped.')

5 Docker运行

为了保证执行的稳定性,使用docker执行

docker run -d --name=local_aps_v2 \--restart=always \-v /etc/localtime:/etc/localtime -v /etc/timezone:/etc/timezone -v /etc/hostname:/etc/hostname -e "LANG=C.UTF-8" \-w /workspace/local_aps_v2/base \YOURIMAGE  \sh -c "git pull && python3 aps.py"

只有环境改变时才需要修改镜像重发布,大部分时候只要调试和修改代码,然后推送就可以了。

http://www.yayakq.cn/news/334923/

相关文章:

  • 好的建站平台内衣网站建立
  • 长春网站制作平台wordpress微擎
  • 江苏城乡建设厅网站门窗网站模板
  • 刚出来的新产品怎么推荆州seo技术厂家
  • 共享网站的详细规划企业建网站一般要多少钱
  • 英文建设网站wordpress 会员管理系统
  • 微信公众号 网站开发外包网站开发多少钱
  • 网站开发从哪里学起长沙网站优化效果
  • 演示网站网站建设中通知
  • 多域名指向同一网站东莞市网上注册公司流程
  • 羽毛球赛事2023赛程seo综合
  • 城乡建设部网站甘红刚快速搭建网站2020
  • 做网站前台步骤wordpress插件汉化不全
  • 东营区住房和城乡建设局网站久久建筑网官网平台
  • 网站分站开发计划书wordpress集成dplayer
  • 高端科技产品网站建设服务app开发的公司
  • 有没有给人做简历的网站wordpress 纯代码seo
  • 江苏省品牌专业建设网站wordpress菜单定制
  • 网站英文地图怎么做网站开发快速盈利
  • 网站的设计特点有哪些洛客设计平台
  • 淘宝联盟怎么自己做网站推广建设一个网站需要学习什么
  • 加强校园网站建设网站建设案例步骤
  • 做网站需要哪些流程天美传媒传媒官网免费下载
  • 网站建设技术培训有口皆碑的域名备案加急
  • 网站首页适合vue做吗工业产品设计结构图
  • 百度一下图片识别网站优化 ppt
  • 企业门户网站建设渠道做的图怎么上传到网站
  • 怎么用手机做网站教程兰州网站搜索排名
  • 厦门物流网站建设郑州网络建站公司
  • 山东建设银行招聘网站m3u8 wordpress插件