当前位置:首页 >休闲 >Spring任务调度&异步任务&Web异步请求三者如何配置线程池? 则会通过如下方式创建一个

Spring任务调度&异步任务&Web异步请求三者如何配置线程池? 则会通过如下方式创建一个

2024-06-30 17:52:35 [百科] 来源:避面尹邢网

Spring任务调度&异步任务&Web异步请求三者如何配置线程池?

作者:Spring全家桶实战案例 开发 前端 首先在容器中通过类型查找TaskScheduler Bean,异步任务如果没有则抛出NoSuchBeanDefinitionException异常。任务在这一步中,调度如果找到多个,步请那么会在通过beanName=taskScheduler在容器中查找。何配

一、置线任务调度

注解类:@Scheduled

Spring任务调度&异步任务&Web异步请求三者如何配置线程池? 则会通过如下方式创建一个

核心处理类:ScheduledAnnotationBeanPostProcessor

Spring任务调度&异步任务&Web异步请求三者如何配置线程池? 则会通过如下方式创建一个

使用的程池线程池:从容器中查询TaskScheduler。

Spring任务调度&异步任务&Web异步请求三者如何配置线程池? 则会通过如下方式创建一个

  • 首先在容器中通过类型查找TaskScheduler Bean,异步任务如果没有则抛出NoSuchBeanDefinitionException异常。任务
  • 在这一步中,调度如果找到多个,步请那么会在通过beanName=taskScheduler在容器中查找
  • 在上一步中抛出异常后会继续查找java.util.concurrent.ScheduledExecutorService 类型的何配Bean。
  • 在这一步中,置线如果找到多个,程池那么会在通过beanName=taskScheduler在容器中查找
  • 在上一步中还是异步任务没有则结束(程序并不会报错)

如果上面流程都没有找到,则会通过如下方式创建一个。

this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);

在Springboot中有个自动配置类会配置一个TaskSchedulingAutoConfiguration。

public class TaskSchedulingAutoConfiguration {    @Bean   @ConditionalOnBean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)   @ConditionalOnMissingBean({  SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class })   public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) {      return builder.build();   }      @Bean   @ConditionalOnMissingBean   public TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties,       ObjectProvider<TaskSchedulerCustomizer> taskSchedulerCustomizers) {      TaskSchedulerBuilder builder = new TaskSchedulerBuilder();     builder = builder.poolSize(properties.getPool().getSize());     Shutdown shutdown = properties.getShutdown();     builder = builder.awaitTermination(shutdown.isAwaitTermination());     builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());     builder = builder.threadNamePrefix(properties.getThreadNamePrefix());     builder = builder.customizers(taskSchedulerCustomizers);     return builder;   } }

二、异步任务

  • 注解类:Async。
  • 核心处理类:AsyncAnnotationBeanPostProcessor。

通过ProxyAsyncConfiguration配置,该类继承AbstractAsyncConfiguration。

在父类中会初始化,下面两个成员变量:

@Configuration(proxyBeanMethods = false) public abstract class AbstractAsyncConfiguration implements ImportAware {    @Nullable   protected Supplier<Executor> executor;   @Nullable   protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;   // 在容器中查找AsyncConfigurer Bean 且只能有一个   @Autowired   void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {      Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {        List<AsyncConfigurer> candidates = configurers.stream().collect(Collectors.toList());       if (CollectionUtils.isEmpty(candidates)) {          return null;       }       if (candidates.size() > 1) {          throw new IllegalStateException("Only one AsyncConfigurer may exist");       }       return candidates.get(0);     });     this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);     this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler);   }    private <T> Supplier<T> adapt(Supplier<AsyncConfigurer> supplier, Function<AsyncConfigurer, T> provider) {      return () -> {        AsyncConfigurer configurer = supplier.get();       return (configurer != null ? provider.apply(configurer) : null);     };   } }

使用的线程池:

  • 首先在容器中通过类型查找AsyncConfigurer Bean。
  • 如果没有则设置默认的AsyncConfigurer::getAsyncExecutor 该方法是接口中默认方法,返回的是null。
  • 在上一步中如果容器中没有AsyncConfigurer,那么设置到AsyncAnnotationBeanPostProcessor中也将就是null。
  • 初始化AsyncAnnotationBeanPostProcessor。
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {    @Nullable   private Supplier<Executor> executor;   @Nullable   private Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;   @Override   public void setBeanFactory(BeanFactory beanFactory) {      super.setBeanFactory(beanFactory);     // 构建切面Advisor     AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);     if (this.asyncAnnotationType != null) {        advisor.setAsyncAnnotationType(this.asyncAnnotationType);     }     advisor.setBeanFactory(beanFactory);     this.advisor = advisor;   } } public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {    public AsyncAnnotationAdvisor(       @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {       Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);     asyncAnnotationTypes.add(Async.class);     try {        asyncAnnotationTypes.add((Class<? extends Annotation>)           ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));     }     // 构建通知类     this.advice = buildAdvice(executor, exceptionHandler);     this.pointcut = buildPointcut(asyncAnnotationTypes);   }   protected Advice buildAdvice(       @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {       AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);     // 调用父类方法     interceptor.configure(executor, exceptionHandler);     return interceptor;   } } public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {     public void configure(@Nullable Supplier<Executor> defaultExecutor,       @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {       // 如果defaultExecutor则调用getDefaultExecutor方法,该方法在子类重写了     this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));     this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);   }   protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {      if (beanFactory != null) {        try {          // 容器中查找TaskExecutor类型的Bean         return beanFactory.getBean(TaskExecutor.class);       } catch (NoUniqueBeanDefinitionException ex) {          try {            return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);         } catch (NoSuchBeanDefinitionException ex2) {          }       } catch (NoSuchBeanDefinitionException ex) {          try {            return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);         } catch (NoSuchBeanDefinitionException ex2) {          }         // Giving up -> either using local default executor or none at all...       }     }     return null;   } } public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {    protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {      // 先通过父类获取     Executor defaultExecutor = super.getDefaultExecutor(beanFactory);     // 如果父类获取不到,则创建默认的SimpleAsyncTaskExecutor     return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());   } }

总结:

  • 从容器中查找TaskExecutor Bean。
  • 上一步没有找到,则查找beanName=taskExecutor,类型为java.util.concurrent.Executor的Bean。
  • 如果上一步还是没有找到,那么最终创建默认的SimpleAsyncTaskExecutor 这是个没有上限的线程池,来一个任务创建新线程。

如果执行的异步任务很多且线程池,线程有限则多的任务会等待。

三、Web异步接口

RequestMappingHandlerAdapter。

public class RequestMappingHandlerAdapter {    // 默认是一个没有上限的线程池   private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("MvcAsync");   protected ModelAndView invokeHandlerMethod(     HttpServletRequest request,     HttpServletResponse response,      HandlerMethod handlerMethod) throws Exception {      // ...     WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);     asyncManager.setTaskExecutor(this.taskExecutor);     // ...   } }

创建RequestMappingHandlerAdapter Bean对象。

继承关系。

public static class EnableWebMvcConfiguration extends DelegatingWebMvcConfiguration{ } public class DelegatingWebMvcConfiguration extends WebMvcConfigurationSupport {    private final WebMvcConfigurerComposite configurers = new WebMvcConfigurerComposite();   // WebMvcConfigurer默认实现   @Autowired(required = false)   public void setConfigurers(List<WebMvcConfigurer> configurers) {      if (!CollectionUtils.isEmpty(configurers)) {        this.configurers.addWebMvcConfigurers(configurers);     }   }   @Override   protected void configureAsyncSupport(AsyncSupportConfigurer configurer) {      this.configurers.configureAsyncSupport(configurer);   } } public class WebMvcConfigurationSupport implements ApplicationContextAware, ServletContextAware {    private AsyncSupportConfigurer asyncSupportConfigurer;   @Bean   public RequestMappingHandlerAdapter requestMappingHandlerAdapter(...) {      // ...     AsyncSupportConfigurer configurer = getAsyncSupportConfigurer();     if (configurer.getTaskExecutor() != null) {        adapter.setTaskExecutor(configurer.getTaskExecutor());     }     // ...   }   protected AsyncSupportConfigurer getAsyncSupportConfigurer() {      if (this.asyncSupportConfigurer == null) {        this.asyncSupportConfigurer = new AsyncSupportConfigurer();       configureAsyncSupport(this.asyncSupportConfigurer);     }     return this.asyncSupportConfigurer;   } } public class WebMvcAutoConfiguration {    public static class WebMvcAutoConfigurationAdapter implements WebMvcConfigurer, ServletContextAware {      public void configureAsyncSupport(AsyncSupportConfigurer configurer) {        // 判断容器中是否有beanName = applicationTaskExecutor 的Bean       if (this.beanFactory.containsBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME)) {          // 通过beanName = applicationTaskExecutor获取bean对象         Object taskExecutor = this.beanFactory.getBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME);         // 判断是否AsyncTaskExecutor对象         if (taskExecutor instanceof AsyncTaskExecutor) {            configurer.setTaskExecutor(((AsyncTaskExecutor) taskExecutor));         }       }       Duration timeout = this.mvcProperties.getAsync().getRequestTimeout();       if (timeout != null) {          configurer.setDefaultTimeout(timeout.toMillis());       }     }   } }

总结:

  • 默认使用SimpleAsyncTaskExecutor。
  • 如果容器中存在以beanName = applicationTaskExecutor 且 类型是 AsyncTaskExecutor, 则使用该bean。

到这你应该知道了这三者在线程池方面该如何正确配置及使用了。

责任编辑:姜华 来源: 今日头条 Spring任务调度

(责任编辑:百科)

    推荐文章
    热点阅读