当前位置: 首页 > news >正文

设计师网站登录岳阳建设商城网站公司

设计师网站登录,岳阳建设商城网站公司,游戏开发软件手机版,wordpress 右侧空白目录 基础回顾 线程池执行任务流程 简单使用 构造函数 execute方法 execute中出现的ctl属性 execute中出现的addWorker方法 addWorker中出现的addWorkerFailed方法 addWorker中出现的Worker类 Worker类中run方法出现的runWorker方法 runWorker中出现的getTask runWo…

目录

基础回顾

线程池执行任务流程

简单使用

构造函数

execute方法

execute中出现的ctl属性

execute中出现的addWorker方法

addWorker中出现的addWorkerFailed方法

addWorker中出现的Worker类

Worker类中run方法出现的runWorker方法

runWorker中出现的getTask

runWorker中出现的processWorkerExit

项目中如何配置使用线程池

参考


基础回顾

线程池基础不好的还是要先了解线程池大体知识,不要眼高手低❌

ThreadPoolExecutor线程池有关_明天一定.的博客-CSDN博客

我都忘记了我要写线程池源码相关文章了,填个多年前的坑➿

线程池执行任务流程

线程池新增线程过程

简单使用

看源码,当然要先会简单使用:

    public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1),new ThreadPoolExecutor.CallerRunsPolicy());executor.execute(() -> {System.out.println("线程池执行");});}

我们创建了一个ThreadPoolExecutor对象,然后调用execute方法

构造函数

源码中共有4中构造方法

 最终都是调用最后一个构造方法,其他构造都是给出了默认配置,比如默认线程工厂,默认拒绝策略。所以我们只看参数最长的构造

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

 可以看出来有两步,第一步判断参数是否合理,第二步给属性赋值。重要字段具体含义不再赘述,不懂的看文章开头那篇文章。比较生疏的是acc这个属性,他是一个三元表达式去赋值,判断检查操作是否有权限执行,如果有权限则拿到权限控制的上下文,源码中属性注释也说了,该属性用来执行finalizer。

execute方法

这个方法分三步:

  1. 如果运行线程比核心线程数少
  2. 如果任务可以成功的放入队列
  3. 如果任务放入队列失败,我们就新增线程,如果失败则执行拒绝策略或是因为线程池关闭了
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 获取当前工作线程数和线程池运行状态if (workerCountOf(c) < corePoolSize) { // 判断工作线程是否比核心线程少if (addWorker(command, true)) // 新增workerreturn;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) { // 线程池状态是否是running,是的话往阻塞队列加任务int recheck = ctl.get(); // 双重检查,因为从上次检查到进入此方法,线程池可能改变状态if (! isRunning(recheck) && remove(command)) // 如果当前线程池状态不是RUNNING则从队列删除任务reject(command); // 拒绝策略触发else if (workerCountOf(recheck) == 0) // wc如果是0时,则新增worker执行queue的任务addWorker(null, false);}else if (!addWorker(command, false))// 阻塞队列已满才会走的逻辑reject(command);}

execute中出现的ctl属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;
    private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

 可以看出来ctl的类型是原子整数,初始值是状态或上工作线程数。cti共同记录了运行状态和工作线程数。ctl的组成前三位是状态,后29位是表示线程数。

善用位运算可以节省空间(复合值),而在这里我认为是想可以保证状态和数量的统一变化。

顺便补一下线程池的状态和生命周期的转换:

 

execute中出现的addWorker方法

该方法主要分为两步:

  1. cas自旋新增线程
  2. 创建线程实例并且执行任务
    private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 线程池状态不是running;线程池状态为SHUTDOWN,且要执行的任务不为空;线程池状态为SHUTDOWN,且任务队列为空;都返回失败if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);// 工作线程数>=线程池容量 || 工作线程数>=(核心线程数||最大线程数)if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c)) // 线程新增自旋成功break retry; // 退出外层循环c = ctl.get();  // Re-read ctl// 重新获取状态和之前状态对比,若一样则内层循坏,否则外循环if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false; // 工作线程调用start()方法标志boolean workerAdded = false; // 工作线程被添加标志Worker w = null;try {w = new Worker(firstTask); // 创建工作线程实例final Thread t = w.thread; // 获取工作线程持有的线程实例if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());// 线程池状态为RUNNING或者(线程池状态为SHUTDOWN并且没有新任务时)if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 预检查该线程状态throw new IllegalThreadStateException();workers.add(w); // 线程加入到存放工作线程的HashSet容器,workers全局唯一并被mainLock持有。方便索引出线程int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;// 工作线程被添加标志置为true}} finally {mainLock.unlock();}if (workerAdded) {t.start(); // 调用该线程执行任务 workerStarted = true; // 工作线程调用start()方法标志置为true}}} finally {if (! workerStarted)  // 如果线程启动失败addWorkerFailed(w);}return workerStarted;}

 流程辅助查看

 

addWorker中出现的addWorkerFailed方法

回滚工作线程的创建

private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)workers.remove(w);// 从hash表移除decrementWorkerCount(); // ctl减1tryTerminate(); // 尝试把线程池状态变为Terminate} finally {mainLock.unlock();}}

addWorker中出现的Worker类

它是ThreadPoolExecutor的内部类,继承了AQS实现了Runnable接口,我们主要关注它的run方法

public void run() {runWorker(this);
}

Worker类中run方法出现的runWorker方法

真正执行任务的方法,通过getTask从队列拿任务

    final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask; // 获取工作线程中用来执行任务的线程实例w.firstTask = null;w.unlock(); // 允许中断boolean completedAbruptly = true; // 线程意外终止标志try {// 如果当前任务不为空,则直接执行;否则调用getTask()从任务队列中取出一个任务执行while (task != null || (task = getTask()) != null) {w.lock();// 如果状态值大于等于STOP且当前线程还没有被中断,则主动中断线程if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task); // 任务执行前的回调,空实现,可以在子类中自定义Throwable thrown = null;try {task.run(); // 执行线程任务} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown); // 任务执行后的回调,空实现,可以在子类中自定义}} finally {task = null; // 将循环变量task设置为null,表示已处理完成w.completedTasks++; // 当前已完成的任务数+1w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly); // 工作线程退出}}

runWorker流程图如下

runWorker中出现的getTask

根据配置,对任务进行阻塞或超时等待。

    private Runnable getTask() {boolean timedOut = false; // // 通过timeOut变量表示拿出的线程是否超时了for (;;) {int c = ctl.get();int rs = runStateOf(c);// 检查线程池状态以及阻塞队列的大小if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// 当前线程是否允许超时销毁的标志// 允许超时销毁:当线程池允许核心线程超时 或 工作线程数>核心线程数boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 如果(当前线程数大于最大线程数 或 (允许超时销毁 且 当前发生了空闲时间超时))// 且(当前线程数大于1 或 阻塞队列为空)// 则减少worker计数并返回nullif ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 根据线程是否允许超时判断用poll还是take(会阻塞)方法从任务队列头部取出一个任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

runWorker中出现的processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // 如果中断,调整减少worker的数量decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w); // 从工作线程集合中移除该工作线程} finally {mainLock.unlock();}tryTerminate(); // 尝试把线程状态变为terminateint c = ctl.get();// 如果是RUNNING 或 SHUTDOWN状态if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {// 如果允许核心线程超时则最小线程数是0,否则最小线程数等于核心线程数int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果阻塞队列非空,则至少要有一个线程继续执行剩下的任务if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false); // 重新创建一个worker来代替被销毁的线程}}

项目中如何配置使用线程池

用过线程池的都知道,那些核心参数是真的不好确定,需要做大量修改、发布、验证这套工作。市面上常说的是分io密集型和cpu密集型,但是具体参数不是那么好确定的,比如线程池设置为 2*CPU 核心数,有点像是把任务都当做 IO 密集型去处理了。而且一个项目里面一般来说不止一个自定义线程池吧?比如有专门处理数据上送的线程池,有专门处理查询请求的线程池,这样去做一个简单的线程隔离。但是如果都用这样的参数配置的话,显然是不合理的。

所以我们需要一个可动态配置的线程池,可以自己写一个模块,也可以用已经开源的dynamic-tp或者hippo4j。具体实现还是利用ThreadPoolExecutor中的一些set方法

关于队列的动态调整:美团他们有一个名字为 ResizableCapacityLinkedBlockIngQueue 的队列:很明显,这是一个自定义队列了。我们也可以按照这个思路自定义一个队列,让其可以对 Capacity 参数进行修改即可。把 LinkedBlockingQueue 粘贴一份出来,修改个名字,然后把 Capacity 参数的 final 修饰符去掉,并提供其对应的 get/set 方法。然后在程序里面把原来的队列换掉即可。

不过比较好的是开源项目自带监控告警和配置文件配置会比较全面一点。具体配置还是得做大量修改、发布、验证。不过也可以从网上常说的理论值入手去尝试。

参考

【超详细】Java线程池源码解析 - 掘金 (juejin.cn) 

Java线程池实现原理及其在美团业务中的实践 - 美团技术团队 (meituan.com)

http://www.yayakq.cn/news/283250/

相关文章:

  • 把自己做的网站传到网上新公司网站建设要求
  • 深圳积分商城网站设计最美情侣免费观看视频在线
  • 永久打开本网站的wordpress notfound
  • 长沙网站制作案例建筑工程招投标网站
  • 室内设计网站资源网站备案初审时间
  • 外贸网站设计航达建设网站
  • 免费网站制作平台推荐卖手表的网站
  • wordpress建的大型网站吗自己开设计工作室
  • 网站建设的描述网站开发涉及内容
  • 建设部网站统计西安网站托管专业公司
  • 做婚恋网站的思路运营一个网站的成本
  • 营销型网站图片国家世界新闻
  • 大连做外贸建站的专业公司ios微信上的pdf乱码
  • 高性能网站建设指南 京东新开传奇网站刚开一秒第一区
  • 东莞市建设工程检测中心网站重庆公司注册网站
  • 建设网站要多少费用网站规划的原则有哪些
  • 建站技术布局方式重庆城乡规划和建设局网站
  • 电脑哪里做模板下载网站个人小公司怎么注册
  • 东莞腾宇科技网站建设网站如何在百度
  • 马鞍山网站制作公司刷移动关键词优化
  • 工信部网站备案查询步骤详解驻马店做网站建设的公司
  • 机关网站建设引导语做网站前端的软件
  • 专门做mod的网站西部域名网
  • 大连建站系统模板大蒜做营销型网站
  • 此网站不支持下载视频怎么办沈阳制作网站
  • 淘宝客自己做网站深州市住房保障和城乡建设局网站
  • 网站制作语言学建设网站
  • 网站底部版权信息模板电子商务公司
  • 多软件网站下载安装电子商务网站建设完整详细流程图
  • joomla功能型网站建设做网站类型的营业执照证明如何填写