Spring线程池配置模板设计(基于Springboot)

目录

线程池配置模板

springboot给我们提供了一个线程池的实现,它的底层是由线程池ThreadPoolTaskExecutor来实现的。相较与JDK提供的线程池进行了一些功能的增强,比如对线程状态的监听,在我们在使用的时候更加的方便。在这里给各位同学一个配置模板,简单的讲解下Spring线程池的底层原理(在最后的源码章节)。

基础的注解解释

@Configuration:这是 Spring 3.0 添加的一个注解,用来代替 applicationContext.xml 配置文件,所有这个配置文件里面能做到的事情都可以通过这个注解所在类来进行注册。

@Bean:用来代替 XML 配置文件里面的 <bean …> 配置。

常用配置参数

  • corePoolSize :线程池的核心池大小,在创建线程池之后,线程池默认没有任何线程。

线程池创建之后,线程池中的线程数为0,当任务过来就会创建一个线程去执行,直到线程数达到corePoolSize 之后,就会被到达的任务放在队列中。换句更精炼的话:corePoolSize 表示允许线程池中允许同时运行的最大线程数。

如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

  • maximumPoolSize :线程池允许的最大线程数,他表示最大能创建多少个线程。maximumPoolSize肯定是大于等于corePoolSize。
  • keepAliveTime:表示线程没有任务时最多保持多久然后停止。默认情况下,只有线程池中线程数大于corePoolSize 时,keepAliveTime 才会起作用。换句话说,当线程池中的线程数大于corePoolSize,并且一个线程空闲时间达到了keepAliveTime,那么就是shutdown。如果配置了 allowCoreThreadTimeOut=true,那么核心线程池也会参与到超时的计时中。
  • Unit:keepAliveTime 的单位。
  • workQueue :一个阻塞队列,用来存储等待执行的任务,当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能
  • threadFactory :线程工厂,用来创建线程。
  • handler :表示当拒绝处理任务时的策略。
    • AbortPolicy:丢弃任务并抛出RejectedExecutionException
    • CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
    • DiscardOldestPolicy:丢弃队列中最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
    • DiscardPolicy:丢弃任务,不做任何处理。
  • allowCoreThreadTimeOut:设置为true则线程池会回收核心线程池的线程,false则只会回收超过核心线程池的线程。默认为false。

配置类设计

这是博主自己写的一个关于Springboot线程池的配置类,参考了一些文章的规范,可以直接使用。

@EnableAsync
@Configuration
public class LogThreadPoolConfig {

  @Bean(name = "logThreadPool")
  public ThreadPoolTaskExecutor LogThreadPoolTask() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    LogThreadPoolProperties properties = this.logThreadPoolProperties();

    executor.setCorePoolSize(properties.getCorePoolSize());
    executor.setMaxPoolSize(properties.getMaxPoolSize());
    executor.setQueueCapacity(properties.getQueueCapacity());
    executor.setKeepAliveSeconds(properties.getKeepAliveSeconds());
    executor.setThreadNamePrefix(properties.getThreadName());
    switch (properties.getRejectedExecutionHandler()) {
      case "abortPolicy":
        executor.setRejectedExecutionHandler(new AbortPolicy());
        break;
      case "callerRunsPolicy":
        executor.setRejectedExecutionHandler(new CallerRunsPolicy());
        break;
      case "discardOldestPolicy":
        executor.setRejectedExecutionHandler(new DiscardOldestPolicy());
        break;
      case "discardPolicy":
        executor.setRejectedExecutionHandler(new DiscardOldestPolicy());
        break;
      default:
        executor.setRejectedExecutionHandler(new CallerRunsPolicy());
        break;
    }
    executor.initialize();
    return executor;
  }


  @Bean
  @ConfigurationProperties(prefix = "threadpool.log")
  public LogThreadPoolProperties logThreadPoolProperties() {
    return new LogThreadPoolProperties();
  }


  //@Getter lombok提供的getset方法生成注解
  //@Setter
  @Configuration
  public static class LogThreadPoolProperties {

    /**
     * 线程前缀名
     */
    private String threadName;
    /**
     * 核心线程池大小
     */
    private int corePoolSize;
    /**
     * 最大线程数
     */
    private int maxPoolSize;
    /**
     * 队列大小
     */
    private int queueCapacity;
    /**
     * 线程池维护空闲线程存在时间
     */
    private int keepAliveSeconds;
    /**
     * 拒绝策略
     */
    private String rejectedExecutionHandler;

  }
}

这样就可以在yml文件中配置参数了:

threadpool:
  log:
    threadName: ThreadPool-log- # 线程池前缀名
    corePoolSize: 8             # 核心线程池数:IO型推荐设置为cpu核心数*2;cpu型推荐设置为cpu数+1
    maxPoolSize: 16             # 最大线程池数
    queueCapacity: 1000         # 线程池阻塞队列容量
    keepAliveSeconds: 60        # 允许线程空闲时间
    # 拒绝策略 abortPolicy callerRunsPolicy discardOldestPolicy discardPolicy
    rejectedExecutionHandler: callerRunsPolicy

线程池使用

Spring提供了注解方式来方便我们使用线程池,只需要在要异步处理的方法上加 @Async(“你配置的线程池名字”)就可以了,注意这个类需要被spring扫描并纳入管理,所以要加@Service、@Component等注解。

@Service
public class ServiceImpl implements Service {

  @Override
  @Async("logThreadPool")
  public void addOperationLog(BaseLog baseLog) {
    //你要异步执行的逻辑
  }
}

具体的异步效果可以自测一下

ThreadPoolTaskExecutor源码

springboot给我们提供了一个线程池的实现,它的底层是由我们传统线程池ThreadPoolExecutor来实现的。

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
  private final Object poolSizeMonitor = new Object();
  private int corePoolSize = 1;
  private int maxPoolSize = 2147483647;
  private int keepAliveSeconds = 60;
  private int queueCapacity = 2147483647;
  private boolean allowCoreThreadTimeOut = false;
  private TaskDecorator taskDecorator;
  /**
   * 在这可以看到,其底层封装了我们熟悉的threadPoolExecutor,这是JDK提供给我们的线程池实现
   */
  private ThreadPoolExecutor threadPoolExecutor;

  public ThreadPoolTaskExecutor() {
  }

  /**
   * 这些都是些get/set
   */
  public void setCorePoolSize(int corePoolSize) {
    Object var2 = this.poolSizeMonitor;
    synchronized(this.poolSizeMonitor) {
      this.corePoolSize = corePoolSize;
      if (this.threadPoolExecutor != null) {
        this.threadPoolExecutor.setCorePoolSize(corePoolSize);
      }

    }
  }
  public int getCorePoolSize() {
    Object var1 = this.poolSizeMonitor;
    synchronized(this.poolSizeMonitor) {
      return this.corePoolSize;
    }
  }
  public void setMaxPoolSize(int maxPoolSize) {
    Object var2 = this.poolSizeMonitor;
    synchronized(this.poolSizeMonitor) {
      this.maxPoolSize = maxPoolSize;
      if (this.threadPoolExecutor != null) {
        this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
      }
    }
  }
  public int getMaxPoolSize() {
    Object var1 = this.poolSizeMonitor;
    synchronized(this.poolSizeMonitor) {
      return this.maxPoolSize;
    }
  }
  public void setKeepAliveSeconds(int keepAliveSeconds) {
    Object var2 = this.poolSizeMonitor;
    synchronized(this.poolSizeMonitor) {
      this.keepAliveSeconds = keepAliveSeconds;
      if (this.threadPoolExecutor != null) {
        this.threadPoolExecutor.setKeepAliveTime((long)keepAliveSeconds, TimeUnit.SECONDS);
      }
    }
  }
  public int getKeepAliveSeconds() {
    Object var1 = this.poolSizeMonitor;
    synchronized(this.poolSizeMonitor) {
      return this.keepAliveSeconds;
    }
  }
  public void setQueueCapacity(int queueCapacity) {
    this.queueCapacity = queueCapacity;
  }

  public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
    this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
  }

  public void setTaskDecorator(TaskDecorator taskDecorator) {
    this.taskDecorator = taskDecorator;
  }

  /**
   * 这是初始化方法,可以在这里把JDK提供的ThreadPoolExecutor初始化了
   */
  protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
    BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity);
    ThreadPoolExecutor executor;
    if (this.taskDecorator != null) {
      executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
        public void execute(Runnable command) {
          super.execute(ThreadPoolTaskExecutor.this.taskDecorator.decorate(command));
        }
      };
    } else {
      executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
    }

    if (this.allowCoreThreadTimeOut) {
      executor.allowCoreThreadTimeOut(true);
    }

    this.threadPoolExecutor = executor;
    return executor;
  }

  protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
    return (BlockingQueue)(queueCapacity > 0 ? new LinkedBlockingQueue(queueCapacity) : new SynchronousQueue());
  }

  public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
    Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
    return this.threadPoolExecutor;
  }

  public int getPoolSize() {
    return this.threadPoolExecutor == null ? this.corePoolSize : this.threadPoolExecutor.getPoolSize();
  }

  public int getActiveCount() {
    return this.threadPoolExecutor == null ? 0 : this.threadPoolExecutor.getActiveCount();
  }

  public void execute(Runnable task) {
    ThreadPoolExecutor executor = this.getThreadPoolExecutor();

    try {
      executor.execute(task);
    } catch (RejectedExecutionException var4) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
    }
  }

  public void execute(Runnable task, long startTimeout) {
    this.execute(task);
  }

  public Future<?> submit(Runnable task) {
    ThreadPoolExecutor executor = this.getThreadPoolExecutor();

    try {
      return executor.submit(task);
    } catch (RejectedExecutionException var4) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
    }
  }

  public <T> Future<T> submit(Callable<T> task) {
    ThreadPoolExecutor executor = this.getThreadPoolExecutor();

    try {
      return executor.submit(task);
    } catch (RejectedExecutionException var4) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
    }
  }

  //这些都是些Spring对线程池的功能增强,一般用不到
  public ListenableFuture<?> submitListenable(Runnable task) {
    ThreadPoolExecutor executor = this.getThreadPoolExecutor();

    try {
      ListenableFutureTask<Object> future = new ListenableFutureTask(task, (Object)null);
      executor.execute(future);
      return future;
    } catch (RejectedExecutionException var4) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
    }
  }

  public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
    ThreadPoolExecutor executor = this.getThreadPoolExecutor();

    try {
      ListenableFutureTask<T> future = new ListenableFutureTask(task);
      executor.execute(future);
      return future;
    } catch (RejectedExecutionException var4) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
    }
  }

  public boolean prefersShortLivedTasks() {
    return true;
  }
}

ThreadPoolTaskExecutor 继承了 ExecutorConfigurationSupport,其实它主要是完成线程池的初始化的:

public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory implements BeanNameAware, InitializingBean, DisposableBean {
  protected final Log logger = LogFactory.getLog(this.getClass());
  private ThreadFactory threadFactory = this;
  private boolean threadNamePrefixSet = false;
  private RejectedExecutionHandler rejectedExecutionHandler = new AbortPolicy();
  private boolean waitForTasksToCompleteOnShutdown = false;
  private int awaitTerminationSeconds = 0;
  private String beanName;
  private ExecutorService executor;

  public ExecutorConfigurationSupport() {
  }

  public void setThreadFactory(ThreadFactory threadFactory) {
    this.threadFactory = (ThreadFactory)(threadFactory != null ? threadFactory : this);
  }

  public void setThreadNamePrefix(String threadNamePrefix) {
    super.setThreadNamePrefix(threadNamePrefix);
    this.threadNamePrefixSet = true;
  }

  public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
    this.rejectedExecutionHandler = (RejectedExecutionHandler)(rejectedExecutionHandler != null ? rejectedExecutionHandler : new AbortPolicy());
  }

  public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
    this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
  }

  public void setAwaitTerminationSeconds(int awaitTerminationSeconds) {
    this.awaitTerminationSeconds = awaitTerminationSeconds;
  }

  public void setBeanName(String name) {
    this.beanName = name;
  }

  /**
  * 这里就是在bean初始化完后调用线程池的初始化方法生成线程池实例
  * 并被Spring容器管理
  */
  public void afterPropertiesSet() {
    this.initialize();
  }

  public void initialize() {
    if (this.logger.isInfoEnabled()) {
      this.logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : ""));
    }

    if (!this.threadNamePrefixSet && this.beanName != null) {
      this.setThreadNamePrefix(this.beanName + "-");
    }

    this.executor = this.initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
  }

  protected abstract ExecutorService initializeExecutor(ThreadFactory var1, RejectedExecutionHandler var2);

  public void destroy() {
    this.shutdown();
  }

  public void shutdown() {
    if (this.logger.isInfoEnabled()) {
      this.logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
    }

    if (this.waitForTasksToCompleteOnShutdown) {
      this.executor.shutdown();
    } else {
      this.executor.shutdownNow();
    }

    this.awaitTerminationIfNecessary();
  }

  private void awaitTerminationIfNecessary() {
    if (this.awaitTerminationSeconds > 0) {
      try {
        if (!this.executor.awaitTermination((long)this.awaitTerminationSeconds, TimeUnit.SECONDS) && this.logger.isWarnEnabled()) {
          this.logger.warn("Timed out while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
        }
      } catch (InterruptedException var2) {
        if (this.logger.isWarnEnabled()) {
          this.logger.warn("Interrupted while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
        }

        Thread.currentThread().interrupt();
      }
    }
  }
}

上述好多的参数其实都是JDK线程池需要的,具体他们的功能可以看线程池源码来了解它的作用。线程池源码解析