JUC线程工具包

Author Avatar
kevin
发表:2024-05-11 14:11:00
修改:2024-10-13 21:01:08

JUC就是java.util.concurrent工具包的简称,处理线程的工具包

多线程

程序:源代码,打包封装,应用软件 是静态概念

进程:概念来说,程序运行起来就是一个进程,进程是一个动态的概念。

线程:在一个进程中,需要同时处理多个不同任务,每一个任务由一个线程来执行

串行:多个操作在同一个线程内按顺序执行,是同步的

阻塞:当前操作没有结束时,下一个操作就必须等待。

并行:多个cpu同时执行多个任务,异步的

并发:一个cpu同时执行多个任务,异步的

sleep() 和 wait() 区别

sleep不释放锁

Thread类,只能作用当前线程,静态方法

wait释放锁,

Object类自带方法,且只能在同步器中使用,非静态方法

创建多线程

任何线程都需要通过thread类启动

继承Thread类

重写run方法,调用start方法启动线程

public class CreateThread01Extends {

    public static void main(String[] args) {
        //线程名字
        DemoThread demo = new DemoThread("AAA");
        demo.start();
    }

}

class DemoThread extends Thread {

    public DemoThread(String threadName) {
        super(threadName);
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " thread working ...");
    }
}

实现Runnable接口

重写run方法,使用thread类传进构造器,start启动线程

// 第一步:实现 Runnable 接口
class MyRunnableThread implements Runnable {

    // 第二步:实现 run() 方法
    @Override
    public void run() {

        // 第三步:编写线程中的逻辑代码
        System.out.println(Thread.currentThread().getName() + " is working");
    }
}

使用Callable配合FutureTask

callable并没有直接继承thread

而futureTask实现了RunnableFuture接口

RunnableFuture接口继承了Runnable和Future接口

所以Callable通过FutureTask 类间接运行了线程

 //创建callable接口
        Callable<String> callable01 = ()->{
            System.out.println(Thread.currentThread().getName()+"is work");
            return ":result01";
        };
   //创建futuretask对象
        FutureTask<String> futureTask01 = new FutureTask<>(callable01);
 //运行线程
        new Thread(futureTask01,"futureTask01").start();

线程池

// 1.创建线程池对象
ExecutorService pool = Executors.newFixedThreadPool(5);

// 2.给线程池对象分配任务,每一个任务是一个线程
pool.execute(() -> {
    System.out.println(Thread.currentThread().getName() + " " + new Date());
});

线程生命周期

NEW新建线程对象刚创建
RUNNABLE就绪开启start,等待cpu时间片运行
RUNNING运行得到cpu时间片执行任务
BLOCKED阻塞等待同步锁
WAITING等待等待被唤醒
TIMED_WAITING限时等待在进入等待状态时设定了等待时间
TERMINATED终止线程因为代码执行完成或抛异常而停止执行

多线程-2024-10-13-21-00-47.jpg

虚假唤醒

当多个消费者多个提供者的情况下

使用 if判断 的情况下,多个线程可能会逃逸造成程序错误

  //如果食物大于0则停止生产食物
        if(food>0){
            try {
                //大于0线程等待
                //线程可能会到这里,cpu时间片用完,则其他线程抢到锁进来
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //小于0说明需要生产食物
        food++;

​ 使用 if 方式执行线程间通信操作执行条件的判断:当线程从上次进入等待的位置开始继续执行时,没有重新做判断,导致即使条件已经不满足了也不知道,最终造成在不满足条件、不该做操作的时候做了操作。

这种情况,我们称之为:虚假唤醒

解决办法

把 if 关键词换成 while,保证线程被唤醒后能够重新执行条件判断。

这样即使在wait睡着,睡醒之后还是需要重新判断

synchronized

synchronized 底层原理

重点膨胀锁

同步操作(同步操作就是加锁、解锁操作的总称)

对象存储结构

对象在堆内存中保存时的数据结构

images

对象头:

Mark Word

类型指针

数组长度(非数组没有)

对象体:

实例数据。也就是对象中实际包含的属性数据

对齐字节:(不一定存在),刚好8个字节就不存在

对象起始地址必须是 8 字节的整数倍

对于占空间不满足这一要求的对象会进行填充

Mark Word

用于存储对象自身运行时数据,如哈希码、GC分代年龄等

GC分代年龄(每次gc没有被清理则加1)

非固定的数据结构,以便存储更多有效的数据

里面有一个很重要的数据:锁标识位

锁标识偏向含义
010无锁
011偏向锁
00-轻量级锁
10-重量级锁
11-GC

锁膨胀

从 JDK 1.6 开始如果当前申请锁的线程很少、不存在竞争,那么加锁、解锁操作就会非常消耗性能。
那么为了针对这个情况进行优化,诞生了锁膨胀机制。

无锁:锁对象刚创建出来,没有线程来竞争

偏向锁:只有一个线程访问对象,只需要在对象中记录当前偏向的线程的 ID,只要是这个线程来访问对象,则无需获得锁或释放锁,直接可以开始操作

轻量级锁:当存在第二个线程申请同一个锁对象时,偏向锁就会立即升级为轻量级锁

申请锁并不是竞争锁,比如说一前一后地交替执行同步块

重量级锁:当同一时间有多个线程竞争锁时,锁就会被升级成重量级锁,线程之间存在竞争关系

对象监视器

当锁状态膨胀为『重量级锁』时,Mark Word 中有一个指针指向一个特殊的对象——对象监视器

ObjectMonitor 监视器对象

有几个比较重要的数据

 ObjectMonitor() {
      _count        = 0;  // 锁计数器
      _recursions   = 0; // 锁的重入次数
      _owner        = NULL; // 指向持有 ObjectMonitor 对象的线程
      _WaitSet      = NULL; // 处于 wait 状态的线程,会被加入到 _WaitSet(等待队列)
     _EntryList    = NULL ; // 处于等待锁 block 状态的线程,会被加入到该列表(阻塞队列)
 }

加锁和解锁流程

根据锁对象找到对象监视器

判断 _count:

  • 等于0
    • 锁定状态
    • _recursions+1
    • _count+1
    • _owner指向当前线程
  • 不等于0
    • 未锁定
    • 判断 _owner
      • 是当前线程,说明进到了第二个同步代码块
        • _recursions+1, _count+1, _owner指向当前线程
      • 不是当前线程
        • 进入阻塞队列 _EntryList

images

执行同步代码

代码执行完成就要把锁释放掉

_count-1,如果为0则擦除 _owner

_recursions-1

重入性

多个同步代码块嵌套

可重入性就是指一个线程可以直接获得它自己加的锁

而不用等待锁释放才能进入

总结:

images

Lock API

1.需要手动加锁解锁

2.对称执行,有几个加锁就要有几个解锁

3.使用 Lock 对象实现同步锁,要求各个线程使用的是同一个对象。

Lock 接口

ReentrantLock()

方法名功能
lock()加锁
unlock()解锁
boolean tryLock()尝试获得锁
boolean tryLock(long time, TimeUnit unit)等待指定时间之内尝试获得锁
Condition newCondition();用于线程间通信的 Condition 对象
测试尝试获得锁

公平锁是线程轮流执行,非必要,建议采用默认非公平锁

 //默认是非公平锁,构造器参数true表示公平锁
        Lock lock = new ReentrantLock();
        new Thread(()->{
            try {
                //加锁
                lock.lock();
                System.out.println(Thread.currentThread().getName()+"开始工作!");
                //五秒钟后结束工作
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName()+"结束工作");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //解锁
                lock.unlock();
            }

        },"thread-b").start();
       new Thread(()->{
           boolean flag = false;
           try {
               //默认没有获得锁,延迟3秒
               flag = lock.tryLock(3,TimeUnit.SECONDS);
               if (flag) {
                   TimeUnit.SECONDS.sleep(3);
                   System.out.println(Thread.currentThread().getName() + "获得了锁!");
               } else {
                   System.out.println(Thread.currentThread().getName() + "没有获得锁");
               }
           } catch (InterruptedException e) {
               e.printStackTrace();
               if (flag) {
                   lock.unlock();
               }
           }
       },"thread-a").start();

读写锁

Java 的并发包提供了读写锁 ReentrantReadWriteLock,它表示两个锁

读锁多线程共享读

写锁是独占锁

读写锁条件:

  • 不同线程:
    • 读锁可以共享
    • 读写锁互相排斥
    • 写锁是独占的
  • 同个线程
    • 读锁可重入
    • 写锁可重入
    • 先进写锁可以进读锁
    • 先进读锁不能进写锁

测试读写互相排斥

 private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
//读锁    
private ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
//写锁
    private ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();

 // 只要都是加读锁的操作,各个线程间不需要彼此等待,可以同时并发执行
    public void read() {

        try {

            // 加锁
            readLock.lock();

            System.out.println(Thread.currentThread().getName() + " 开始执行读操作");

            TimeUnit.SECONDS.sleep(5);

            System.out.println(Thread.currentThread().getName() + " 结束执行读操作");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            // 释放锁
            readLock.unlock();

        }

    }

    public void write() {

        try {

            writeLock.lock();

            System.out.println(Thread.currentThread().getName() + " 开始执行写操作");

            TimeUnit.SECONDS.sleep(1);

            System.out.println(Thread.currentThread().getName() + " 结束执行写操作");

            System.out.println();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            writeLock.unlock();

        }
    }
}


 public static void main(String[] args){
     // 场景三:创建 Situation03 对象
Situation03 situation03 = new Situation03();

// 创建一个线程使用读锁
new Thread(()->{
    situation03.read();
}, "thread-read").start();

// 创建三个线程使用写锁
new Thread(()->{ situation03.write(); }, "thread-write 01").start();
 }

线程间通信

  • ReentrantLock 同步锁:将执行操作的代码块设置为同步操作,提供原子性保证

  • Condition 对象

    :对指定线程进行等待、唤醒操作

    • await() 方法:让线程等待
    • signal() 方法:将线程唤醒

普通通信

   // 创建同步锁对象
    private Lock lock = new ReentrantLock();
    // 通过同步锁对象创建控制线程间通信的条件对象
    private Condition condition =  lock.newCondition();
    private int number = 0;
    public void add(){
        try {
            lock.lock();
            while (number==1){
                // 满足条件时,不该当前线程干活,所以进入等待状态
                condition.await();
            }
            System.out.println(Thread.currentThread().getName() + " 执行 - 1 操作,data = " + ++number);

            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {

            // 释放锁
            lock.unlock();

        }
    }
    public void sub(){
        try {
            lock.lock();
            while (number==0){
                // 满足条件时,不该当前线程干活,所以进入等待状态
                condition.await();
            }
            System.out.println(Thread.currentThread().getName() + " 执行 - 1 操作,data = " + --number);

            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {

            // 释放锁
            lock.unlock();

        }
    }

    public static void main(String[] args) {
        SiutFive demo = new SiutFive();
        new Thread(()->{
            for (int i = 0; i < 20; i++) {
                demo.add();
            }
        },"thread-add A").start();
        new Thread(()->{
            for (int i = 0; i < 20; i++) {
                demo.sub();
            }
        },"thread-add B").start();
        new Thread(()->{
            for (int i = 0; i < 20; i++) {
                demo.add();
            }
        },"thread-add c").start();
        new Thread(()->{
            for (int i = 0; i < 20; i++) {
                demo.sub();
            }
        },"thread-add d").start();
    }

定制化通信

  • 线程1:打印连续数字
  • 线程2:打印连续字母
  • 线程3:打印 * 符
  • 线程4:打印 $ 符
  //设置步数
    private int number = 1;
    //同步锁对象
    private Lock lock = new ReentrantLock();
    private Condition math = lock.newCondition();
    private int i = 1;
    private int j = 65;
    private Condition letter = lock.newCondition();
    private Condition star = lock.newCondition();
    private Condition dollar = lock.newCondition();

    //打印数字
    public void printMath(){
        try {
            lock.lock();
            if (number %4 != 1){
                math.await();
            }
            System.out.print(i++);
            letter.signal();
            number++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    //打印字母
    public void printLetter(){
        try {
            lock.lock();
            if (number %4 != 2){
                letter.await();
            }

            System.out.print((char)j);
            if ((char)j == 'Z'){
                j = 64;
            }
            j++;
            star.signal();
            number++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    //打印星号
    public void printStar(){
        try {
            lock.lock();
            if (number %4 != 3){
                star.await();
            }
            System.out.print("*");
            dollar.signal();
            number++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    //打印刀儿
    public void printDollar(){
        try {
            lock.lock();
            if (number %4 != 0){
                dollar.await();
            }
            System.out.println("$");
            math.signal();
            number++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) {
        SiutSex demo = new SiutSex();
        new Thread(()->{
            for (int i = 0; i < 30; i++) {
                demo.printMath();
            }
        }).start();
        new Thread(()->{
            for (int i = 0; i < 30; i++) {
                demo.printLetter();
            }
        }).start();
        new Thread(()->{
            for (int i = 0; i < 30; i++) {
                demo.printStar();
            }
        }).start();
        new Thread(()->{
            for (int i = 0; i < 30; i++) {
                demo.printDollar();
            }
        }).start();
    }

同步代码和lock对比

相同点

  • 都支持独占锁
  • 都支持可重入

Lock 能够覆盖 synchronized 的功能,而且功能更强大

images

synchronized 够用,那就使用 synchronized;如果需要额外附加功能则使用 Lock

其他api

CountDownLatch

指定一个操作步骤数量,每完成一次就减一,可以抑制最后一步,然后一起执行

/**
 * ClassName: SiutSeven
 * Description:
 * date: 2022/1/20 15:47
 *CountdownLatch 帮助韦小宝收集八部四十二章经
 * @author Yee
 * @since JDK 1.8
 */
public class SiutSeven {
    public static void main(String[] args) throws InterruptedException {
        //目标次数
        int number = 8;
        CountDownLatch countDownLatch = new CountDownLatch(number);
        for (int i = 0; i < number; i++) {
            int count = i+1;
            new Thread(()->{
                //收集
                System.out.println("收集了第"+count+"部四十二章经");
                countDownLatch.countDown();
            }).start();

        }
       //抑制最后一步,只有一个收集完成
        countDownLatch.await();
        //如果不抑制,所有线程代码都会执行到这里,多个收集完成
        System.out.println("收集完成");
    }
   
    
}

CyclicBarrier

循环栅栏,可循环利用的屏障,让所有线程都等待完成后才会继续下一步行动

/**
 * ClassName: SiutEight
 * Description:
 * date: 2022/1/20 15:54
 *		测试 CyclicBarrier
 *	支持多线程在执行各自任务的时候,到达某个状态点就等待,等所有线程都到达这个状态点再继续执行后步骤。
 * @author Yee
 * @since JDK 1.8
 */
public class SiutEight {
    private static List<List<String>> lists = new ArrayList<>();

    //初始化
    static {
        lists.add(Arrays.asList("学习java","学习php","标记完成","学习完成"));
        lists.add(Arrays.asList("学习煮菜","学习厨艺","学习杀鱼","标记完成","学习完成"));
        lists.add(Arrays.asList("学习绘画","学习跳舞","学习滑板","学习书法","标记完成","学习完成"));
    }

    public static void main(String[] args) {
        //构造方法,参与线程的个数
        int conut = lists.size();
        CyclicBarrier barrier = new CyclicBarrier(conut);
        // 2.创建3个线程分别执行各自的任务
        //最后统一学习结束
        for (int i = 0; i < conut; i++) {
            int number = i;
            new Thread(()->{
                try {
                    List<String> list = lists.get(number);
                    for (int j = 0;j<list.size();j++) {
                        // 找出每个小集合字段
                        String data = list.get(j);
                       System.out.println(Thread.currentThread().getName()+data);
                        //判断小集合字段
                        if ("标记完成".equals(data)){
                            // 遇到特殊任务标记,就让当前线程等一下
                            //是await不是wait
                            barrier.await();
                        }

                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
            },"thread-"+i).start();
        }
    }
}

Semaphore

信号量, 可以用来控制同时访问特定资源的线程数量,资源抢占模式

通常用于那些资源有明确访问数量限制的场景,常用于限流

/**
 * ClassName: SiutNight
 * Description:
 * date: 2022/1/20 16:27
 *使用 Semaphore 抢车位
 * @author Yee
 * @since JDK 1.8
 */
public class SiutNight {
    public static void main(String[] args) {
        //多个线程抢夺三个资源
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try {
                    //申请资源
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"拿到资源");
                    TimeUnit.SECONDS.sleep(3);
                  System.out.println(Thread.currentThread().getName()+"释放资源");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            },"thread-"+i).start();
        }
    }
}

Fork Join

将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果

  • Fork:拆分:把大任务拆分成小任务。
  • Join:合并:把小任务执行的结果合并到一起。

测试案例,1-100的累加

/**
 * ClassName: Mytassk
 * Description:
 * date: 2022/1/20 17:01
 *  泛型是要返回的结果数据类型
 *  100以内的累加
 * @author Yee
 * @since JDK 1.8
 */
public class Mytassk extends RecursiveTask<Integer> {
    //开始区间
    private int begin;
    //结束区间
    private int end;

    // 保存当前任务的结果
    private int result = 0;

    public Mytassk(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        //任务拆分10以内
        if (end - begin <=10){
            for (int i = begin; i <= end; i++) {
                result+=i;
            }
        }else{
            int leftBegin = begin;
            int leftEnd = (end + begin ) / 2;
            int rightBegin = leftEnd+1;
            int rightEnd = end;
            Mytassk leftTask = new Mytassk(leftBegin, leftEnd);
            Mytassk rightTask = new Mytassk(rightBegin, rightEnd);
            //任务拆分
            leftTask.fork();
            rightTask.fork();
            //任务合并
            Integer leftResult = leftTask.join();
            Integer rightResult = rightTask.join();
            result = leftResult+rightResult;
        }
        return result;
    }
}

  public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        Mytassk mytassk = new Mytassk(1, 100);
        ForkJoinTask<Integer> result = pool.submit(mytassk);
        System.out.println("结果:"+result.get());
    }

CompletableFuture

Callable + FutureTask 组合的『超级强化版

静态方法:

无返回值的异步任务,传入一个Runnable接口

/**
 * ClassName: RunAsyncTest
 * Description:
 * date: 2022/1/21 22:44
 *无返回值的异步任务
 * @author Yee
 * @since JDK 1.8
 */
public class RunAsyncTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
            System.out.println(Thread.currentThread().getName() + " working");
        });
        Void taskResult = future.get();
        System.out.println("taskResult = " + taskResult);

    }
}

有返回值的异步任务,需要传入一个Supplier接口

/**
 * ClassName: SupplyAsyncTest
 * Description:
 * date: 2022/1/21 22:47
 *有返回值的异步任务
 * @author Yee
 * @since JDK 1.8
 */
public class SupplyAsyncTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String hello = "hello!";
        String world = ",World";
        
        CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
            return hello+world;
        });
        String result = future.get();
        System.out.println(result);

    }
}

常用非静态方法:

采用链式调用

thenApply()方法,传入一个Function接口

新任务特点1:能够接收上一个任务的结果。
新任务特点2:它自己也是一个有返回值的任务。

thenAccept方法,传入一个Consumer接口

新任务特点1:能够接收上一个任务的结果。
新任务特点2:它自己没有返回值。

综合使用,采用方法引用的方式

/**
 * ClassName: ThenApplyTest
 * Description:
 * date: 2022/1/21 23:10
 *线程依赖
 * @author Yee
 * @since JDK 1.8
 */
public class ThenApplyTest {
    public static void main(String[] args) throws Exception {
        //字符串转数字
        String str = "13579";
         CompletableFuture.supplyAsync(str::toString).thenApply(Integer::parseInt).thenAccept(System.out::println);
    }
}

Lock 底层原理

JMM

Java Memory Model

Java 内存模型

JVM 的内存结构中堆内存也是线程共享的

栈内存是线程私有的

Java 内存模型(JMM)设计出来就是为了解决缓存一致性问题的

  • 原子性
    • 一个操作是不可分割的,那么我们就可以说这个操作是原子操作
    • 使用同步技术(sychronized)或者锁(Lock)来让它变成一个原子操作
  • 可见性
    • 每个线程操作自己的本地内存,对其他线程是不可见的
  • 有序性
    • CPU 执行程序指令和 JVM 编译源程序之后,都会对指令做一定的重排序
    • 重新排序后代码执行的结果和不重排执行的结果必须一样

volatile 关键字

原子性:volatile 不能提供原子性保证。
可见性:volatile 修饰一个成员变量后,能够保证这个成员变量能够在各个线程间可见。
有序性:volatile 会影响底层指令的执行,通过添加内存屏障,禁止一定范围内的指令重排。

结论:volatile 最重要的价值就是为我们提供可见性保证。

CAS 机制

Compare And Swap 比较并交换

比较:使用修改者提供的 expect 和被修改数据的实际值相比较。
交换:如果 expect 和实际值相等,则可以使用新值替换当前实际值(旧值)。
自旋:如果 expect 和实际值不等,则读取当前实际值,再次尝试执行 CAS。

CAS 和乐观锁有什么区别

乐观锁机制中根据版本号来判断是否可以修改,CAS 机制中没有版本号。

AQS

AbstractQueuedSynchronizer 抽象的队列式同步器

多线程访问共享资源的同步器框架

AQS 的底层原理:volatile + CAS + 线程对象的双向链表结构

volatile 修饰的 state:表示当前锁释放被占用,类似 _count
CAS:修改 state(加锁+1,解锁 -1) 的方式是 CAS

双向链表:申请锁但是没有成功获取到的线程对象

线程池

阻塞队列

  • 当队列了的时候进行入队列操作
  • 当队列了的时候进行出队列操作

BlockingQueue接口

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列。(常用)
  • LinkedBlockingQueue:由链表结构组成的有界阻塞队列
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
  • LinkedTransferQueue:由链表组成的无界阻塞队列。
  • LinkedBlockingDeque:由链表组成的双向阻塞队列。

创建线程池

ThreadPoolExecutor 类

7个重要参数(重要)

  • corePoolSize:线程池中的常驻核心线程数
  • maximumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须大于等于1
  • keepAliveTime:多余的空闲线程的存活时间。当前池中线程数量超过 corePoolSize 时,当空闲时间达到 keepAliveTime 的线程会被销毁,直到剩余线程数量等于 corePoolSize
  • unit:keepAliveTime 的时间单位
  • workQueue:任务队列,被提交但尚未被执行的任务
  • threadFactory:表示生成线程池中工作线程的工厂, 用于创建线程,一般默认的即可
  • handler:拒绝策略处理器。当任务队列已满,工作线程也达到了 maximumPoolSize,新增的工作任务将按照某个既定的拒绝策略被拒绝执行。
/**
 * ClassName: ThreadPoolDemo
 * Description:
 * date: 2022/1/22 18:20
 *自定义线程池创建对象、分配任务、测试四种拒绝策略。
 * @author Yee
 * @since JDK 1.8
 */
public class ThreadPoolDemo {
    public static void main(String[] args) throws InterruptedException {
        //核心线程数
        int core = 3;
        //空闲线程
        int maximum = 5;
        //空闲存活时间
        Long liveTime = 3L;
        //时间单位
        TimeUnit unit =TimeUnit.SECONDS;
        //阻塞队列
        BlockingQueue workQueue = new ArrayBlockingQueue(3);
        //线程工厂
        ThreadFactory factory = Executors.defaultThreadFactory();
        //拒绝策略
//        RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
//        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
//        RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
        RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(core, maximum, liveTime, unit, workQueue, factory, handler);
        //布置任务

        while (true){
            executor.execute(()->{
                for (int j = 0; j < 5; j++) {
                    System.out.println(Thread.currentThread().getName()+
                    "执行线程任务");
                }
            });
        }
    }
}

工作机制

重要

images

刚创建线程池时,线程池中的线程数为零

执行任务的时候才会创建线程

当核心线程运行满时,新的任务放到阻塞队列而不是开启新的线程运行任务

当阻塞队列满了之后,开启新的线程运行任务,

运行的任务是最新放进来的任务,且运行线程不会超过设置的最大值线程数

阻塞队列和最大线程数量已满,则会启动饱和拒绝策略来执行。

线程空闲,核心线程会保留,非核心线程会销毁

拒绝策略

  • AbortPolicy(默认):直接抛出RejectedExecutionException异常,阻止接收新的任务。
  • CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。(貌似这个常用)
  • DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。
评论