Skip to content

Instantly share code, notes, and snippets.

@y0ngb1n
Last active June 26, 2023 10:00
Show Gist options
  • Save y0ngb1n/09880f5397c19fe10d4ecdb2f4425ddb to your computer and use it in GitHub Desktop.
Save y0ngb1n/09880f5397c19fe10d4ecdb2f4425ddb to your computer and use it in GitHub Desktop.
Java 提供的线程池相关的工具类中,最核心的是 ThreadPoolExecutor,通过名字你也能看出来,它强调的是 Executor,而不是一般意义上的池化资源。

线程池的使用和监控

使用自定义线程池组件,主要思路如下:

  • 使用有界队列的固定数量线程池;
  • 拒绝策略是将任务丢弃,但需要记录错误日志;
  • 使用一个调度线程池对业务线程池进行监控。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.stereotype.Component;

/**
 * 自定义线程池
 *
 * @author yangbin
 */
@Slf4j
@Component
public class CustomThreadPoolExecutor implements AutoCloseable {

  private static final int DEFAULT_QUEUE_SIZE = 1024;
  private static final int DEFAULT_POOL_SIZE = 9 + 1;

  @Getter
  private int queueSize = DEFAULT_QUEUE_SIZE;

  @Getter
  private int poolSize = DEFAULT_POOL_SIZE;

  /**
   * 用于周期性监控业务线程池的运行状态
   */
  private final ScheduledExecutorService scheduledExecutorService = Executors
    .newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("biz-thread-executor-monitor").build());

  /**
   * 自定义异步线程池,把线程池类比为一个项目组,而线程就是项目组的成员
   * 1. 任务队列使用有界队列
   * 2. 自定义拒绝策略
   */
  private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
    // 线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
    poolSize,
    // 线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。
    // 当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
    poolSize,
    // 如果一个线程空闲了 keepAliveTime & unit 这么久,而且线程池的线程数大于 corePoolSize,那么这个空闲的线程就要被回收了。
    0, TimeUnit.MILLISECONDS,
    // LinkedBlockingQueue 为无界队列,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。
    // 任务队列强烈建议使用有界队列
    new ArrayBlockingQueue<>(queueSize),
    // 自定义如何创建线程,例如可以给线程指定一个有意义的名字。
    new BasicThreadFactory.Builder().namingPattern("biz-thread-%d").build(),
    // 自定义拒绝策略
    ((r, executor) -> log.error("The async executor pool is full!"))
  );

  private final ExecutorService executorService = threadPoolExecutor;

  @PostConstruct
  public void initialize() {
    scheduledExecutorService.scheduleAtFixedRate(() -> {
      // 线程池需要执行的任务数
      final long taskCount = threadPoolExecutor.getTaskCount();
      // 线程池在运行过程中已完成的任务数
      final long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
      // 曾经创建过的最大线程数
      final int largestPoolSize = threadPoolExecutor.getLargestPoolSize();
      // 线程池里的线程数量
      final int poolSize = threadPoolExecutor.getPoolSize();
      // 线程池里活跃的线程数量
      final int activeCount = threadPoolExecutor.getActiveCount();

      log.info("biz-thread-executor-monitor: taskCount={}, completedTaskCount={}, largestPoolSize={}, poolSize={}, activeCount={}",
        taskCount, completedTaskCount, largestPoolSize, poolSize, activeCount);
    }, 0, 10, TimeUnit.MINUTES);
  }

  public void execute(Runnable task) {
    executorService.execute(task);
  }

  @Override
  public void close() {
    executorService.shutdown();
    scheduledExecutorService.shutdown();
  }
}

定时监控线程池

  • 利用 Spring 注册线程池实例
  • 统一注入并定时监控线程池的运行情况
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;

/**
 * 线程池配置
 *
 * @author yangbin
 */
@Slf4j
public class ThreadPoolConfig {

  /** 核心线程数 */
  public static final Integer CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();

  /** 最大线程数 */
  public static final Integer MAX_POOL_SIZE = (int) (CORE_POOL_SIZE / (1 - 0.9));

  /** 定时任务线程池,定时打印各线程池的运行情况 */
  @Bean(destroyMethod = "shutdown")
  public ExecutorService scheduledLogStatsThreadPool(
      Map<String, ThreadPoolExecutor> threadPoolExecutorMap) {
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
    scheduledThreadPool.scheduleAtFixedRate(
        () -> {
          threadPoolExecutorMap.forEach(
              ((name, executor) ->
                  log.info(
                      "[定时监控线程池] {}, 核心线程数:{}, 最大线程数:{}, 当前线程数:{}, 活跃线程数:{}, 同时存在最大线程数:{}, 线程池任务总量:{}, 队列类型:{}, 队列容量:{}, 队列元素个数:{}, 队列剩余个数:{}",
                      name,
                      executor.getCorePoolSize(),
                      executor.getMaximumPoolSize(),
                      executor.getPoolSize(),
                      executor.getActiveCount(),
                      executor.getLargestPoolSize(),
                      executor.getCompletedTaskCount(),
                      executor.getQueue().getClass().getSimpleName(),
                      executor.getQueue().size() + executor.getQueue().remainingCapacity(),
                      executor.getQueue().size(),
                      executor.getQueue().remainingCapacity())));
        },
        1,
        10,
        TimeUnit.MINUTES);
    return scheduledThreadPool;
  }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment