CompletableFuture 异步

线程池

顾名思义就是,管理一些线程的资源池,提供限制和管理资源的方式,维护一些基本信息。

Executor框架

Java5后面引进的一种框架,有队列,拒绝策略等东西

由三部分组成:

任务Task:定义做什么Runnable不返回结果、Callable返回结果可抛异常

任务的执行Executor(ThreadPoolExecutor关键类继承于此):谁来做怎么做

异步计算的结果Future:如何获取结果

运行流程:

  1. 先决定用Runnable、Callable哪个接口
  2. 通过调用 ExecutorService 的 submit() 方法来提交任务,具体的执行则由其实现类(如 ThreadPoolExecutor)来完成。
  3. 给一个结果Future,可以主进程干别的
  4. 获取结果Future.get,计算完成直接取走,计算未完成线程堵塞

ThreadPoolExecutor

核心参数

image-20251010161133982
image-20251010161133982

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize:核心线程数:线程池中保持活动状态的线程数量,即使是空闲的(常驻店员)
  • maximumPoolSize:最大线程数:线程池能够容纳同时执行的最大线程数(常驻店员+临时工最大值)只有workQueue和corePoolSize都满了时候才招临时
  • workQueue:用于保存等待执行任务的阻塞队列(等候区)

ThreadPoolExecutor其他常见参数 :

  • unit : keepAliveTime 参数的时间单位。
  • keepAliveTime:非核心线程在没有作业的时候的等待时间
  • threadFactory :用于创建新线程,可以用它来为线程池中的线程命名。
  • handler :拒绝策略:当任务队列已满且线程数达到maximumPoolSize时,用于处理新提交任务的策略。

线程池创建

一、通过ThreadPoolExecutor 构造函数创建

没啥说的,上面参数一个一个写就行了构造函数。

二、通过 Executors 工具类创建(不推荐)

Executors提供了几个常用的静态方法创建线程池:

  • Executors.newFixedThreadPool(int nThreads): 创建一个固定大小的线程池。
  • Executors.newSingleThreadExecutor(): 创建一个只有一个线程的线程池。
  • Executors.newCachedThreadPool(): 创建一个可缓存的线程池,线程数根据任务量动态调整。
  • Executors.newScheduledThreadPool(int corePoolSize): 创建一个可以执行定时任务的线程池。[11]

为什么不推荐用Executors 创建

  • 前两个:使用了无界的LinkedBlockingQueue作为任务队列。如果任务的生产速度远快于消费速度,队列中的任务会持续堆积,最终可能导致服务器内存溢出(OOM)。
  • 后两个:允许创建的线程数量几乎是无限的。当有大量并发请求时,它会不断创建新线程,这可能耗尽CPU和内存资源,导致系统崩溃。

image-20251010164219604
image-20251010164219604

工作过程

image-20251010165637571
image-20251010165637571

线程池拒绝策略

CallerRunsPolicy, 使用线程池的调用者所在的线程去执行被拒绝的任务,除非线程池被停止或者线程池的任务队列已有空缺。

AbortPolicy, 直接抛出干不了的异常

DiscardPolicy, 不做任何处理,静默拒绝提交的任务。

DiscardOldestPolicy, 抛弃最老的任务,然后执行该任务。

还有自定义拒绝方式

CompletableFuture

为什么不用Future本身了呢?虽然他可以异步计算,但是获取结果的时候必须使用Future.get()的方式阻塞调用线程,或者轮询询问是否完成。

我的项目创建CompletableFuture使用的是通过自定义线程池,根据runnable构建执行任务。也就是下面这种

ompletableFuture.runAsync(() -> {
    redisUtil.zset("user_video_history:" + uid, vid);   // 添加到/更新观看历史记录
    videoStatsService.updateStats(vid, "play", true, 1);
}, taskExecutor);

而不是使用了默认内置线程池ForkJoinPool.commonPool(),因为这个线程池的处理线程数量是电脑的CPU核数-1,不如自定义

1.有并行加载数据的,比如同时加载用户信息和视频信息,加快加载速度

2.有异步通知消息的,就是如果有人点赞了异步去通知,作用就是用户不需要等结果,还有就是网络IO发消息这种操作也是比较耗时间的,异步不会堵塞主线程。

3.有异步缓存更新的,符合读写分离这种设计理念

实际使用上,项目在:

  1. 并行加载:聊天系统同时加载聊天记录和用户资料
  2. 异步通知:异步通知消息的,就是如果有人点赞了异步去通知
  3. 异步更新:除了缓存写的时候,还有就是在于更新视频到OSS上