27 Ağustos 2020 Perşembe

SpringAsync ThreadPoolTaskExecutor Sınıfı

Giriş
TaskExecutor arayüzünü gerçekleştirir.

Bu sınıfın kardeşi SimpleAsyncTaskExecutor sınıfı.

Kapatma
Spring kapanırken bu bean'in shutdown() metodunu çağırır. Açıklaması şöyle.
Per official Spring documentation, when using annotation-based configuration, for destroyMethod field of @Bean, Spring's default behavior is to automatically invoke public, no-arg methods named close or shutdown when the application context is being closed.
shutdown() metodu ise bean'in waitForTasksToCompleteOnShutdown() ayarına göre altta kullanılan java.util.concurrent.ExecutorService nesnesinin shutdown() veya shutdownNow() metodunu çağırır.

Tanımlama
Bu sınıfı tanımlamak için 3 tane yol var.
1. Hiçbir işlem yapmayız. Bu durumda SpringBoot bizim için TaskExecutionAutoConfiguration sınıfını kullanarak bir ThreadPoolTaskExecutor nesnesi yaratır. Bu nesnedeki thread'lerin isimleri "task-1" şeklindedir. 

SpringAsync application.properties Ayarları yazısında gösterildiği gibi ayarlar verilebilir.

2. ThreadPoolTaskExecutor bean'i kendim yaratırım.

3. AsyncConfigurerSupport ile Executor yaratılır. Bu durumda nesnedeki tüm thread'lerin isimleri "ThreadPoolTaskExecutor-1" şeklindedir.

4. AsyncConfigurer arayüzü ile Executor yaratılır.

İkinci yöntem ile birden fazla ThreadPoolTaskExecutor yaratma imkanı var. Böylece farklı işler farklı ThreadPoolTaskExecutor nesnelerine gönderilebilir.

Birinci ve üçüncü yöntemde ise uygulama için default executor yaratılır. Sadece @Async olarak işaretli - yani thread pool ismi verilmeyen - işler bu global Executor'a gönderilir.

İkinci Yöntem - Kendim Yaratırım
Örnek
Şöyle yaparız. Burada Project Loom kullanılıyor.
@Bean
public AsyncTaskExecutor applicationTaskExecutor() {
  ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
  return new TaskExecutorAdapter(executorService::execute);
}
Örnek
Şöyle yaparız. Burada initialize() metodu çağrılıyor
@Bean(name = "myThreadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
  ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
  threadPoolTaskExecutor.setCorePoolSize(...);
  threadPoolTaskExecutor.setMaxPoolSize(...);
  threadPoolTaskExecutor.setQueueCapacity(...);
  threadPoolTaskExecutor.initialize();
  return threadPoolTaskExecutor;
}
Kullanmak için şöyle yaparız.
@Autowired
@Qualifier("myThreadPoolTaskExecutor")
public ThreadPoolTaskExecutor executor;
Örnek
Şöyle yaparız
//two custom asyncExecutors
@Configuration
public class AsyncConfig {

  private static final int MAX_POOL_SIZE = 50;
  public static final int CORE_POOL_SIZE = 20;

  //custom async Executor threadpool.
  @Bean("asyncExecutor")
  public AsyncTaskExecutor asyncTaskExecutor() {
    ThreadPoolTaskExecutor async = new ThreadPoolTaskExecutor();

    async.setMaxPoolSize(MAX_POOL_SIZE);
    async.setCorePoolSize(CORE_POOL_SIZE);
    async.setThreadNamePrefix("async-threads-");
    async.setWaitForTasksToCompleteOnShutdown(false);
    async.setQueueCapacity(100);

    async.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return async;
  }
  @Bean("fixedThreadPool")
  public ExecutorService myFixedThreadPool() {
    int processors = Runtime.getRuntime().availableProcessors();
    return Executors.newFixedThreadPool(processors * 5);
  }
}
@Slf4j
@Service
public class AsyncService {

  @Async
  public void step1(){
    log.info("step 1");
  }

  @Async("asyncExecutor")
  public void step2(){
    log.info("step 2");
  }

  @Async("fixedThreadPool")
  public void step3(){
    log.info("step 3");
  }
}
Örnek
Şöyle yaparız
@Bean
public TaskExecutor threadPoolTaskExecutor() {
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  executor.setCorePoolSize(100);
  executor.setMaxPoolSize(1000);
  executor.setThreadNamePrefix("default_task_executor_thread");
  executor.setThreadFactory(new CustomThreadFactory());
  executor.initialize();
  return executor;
}
Örnek
SpringBoot kullanıyorsak application.properties dosyasında şöyle yaparız ve bu değerleri @Value ile okuyarak @Bean anotasyonu ile ThreadPoolTaskExecutor yaratırız.
threadpool.corepoolsize=5
threadpool.maxpoolsize=10

Sınıfın Metodlar
constructor
Şöyle yaparız.
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
execute metodu
Örnek ver

getPoolSize metodu - Thread Sayısı
Şöyle yaparız.
LOGGER.debug("QueueSize = {}", executor.getThreadPoolExecutor().getQueue().size());
LOGGER.debug("PoolSize = {}", executor.getPoolSize());     
getThreadPoolExecutor metodu - Kuyruk Bilgisi
Kuyruğun büyüklüğünü öğrenmek için şöyle yaparız.
LOGGER.debug("QueueSize = {}", executor.getThreadPoolExecutor().getQueue().size());
LOGGER.debug("PoolSize = {}", executor.getPoolSize());     
Kuyruğa daha kaç nesne eklenebileceğini öğrenmek için şöyle yaparız.
LOGGER.debug("Remaining Capacity = {}", executor.getThreadPoolExecutor().getQueue()
  .remaningCapacity());
setAwaitTerminationSeconds metodu
Şöyle yaparız.
executor.setAwaitTerminationSeconds(30);
setCorePoolSize metodu
Şöyle yaparız.
executor.setCorePoolSize(5);
setMaxPoolSize metodu
Şöyle yaparız.
executor.setMaxPoolSize(10);
setQueueCapacity metodu
Şöyle yaparız.
taskExecutor.setQueueCapacity(40);
setRejectedExecutionHandler metodu
Şöyle yaparız.
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
setTaskDecorator metodu
Bir örnek burada

setThreadFactory metodu
Şöyle yaparız. Yeni bir thread yaratılınca kendi arzumuza göre kod çalıştırmak için kullanılabilir.
// ThreadPoolTaskExecutor configuration
@Bean
public TaskExecutor threadPoolTaskExecutor() {
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  ...
  executor.setThreadFactory(new CustomThreadFactory());
  executor.initialize();
  return executor;
}

// Custom thread factory
public class CustomThreadFactory implements ThreadFactory{

  @Override
  public Thread newThread(Runnable r) {
    return new Thread(new CustomRunnable(r));
  }

  public static class CustomRunnable implements Runnable {

    private Runnable runnable;

    protected CustomRunnable(Runnable runnable) {
      super();
      this.runnable = runnable;
    }

    @Override
    public void run() {
      ...
    }

  }
}
setThreadNamePrefix metodu
Havuzdaki thread isimlerine ön ek vermeyi sağlar.
Örnek
Şöyle yaparız.
@Bean
public TaskExecutor requestQueue() {
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  executor.setCorePoolSize(5);
  executor.setMaxPoolSize(5);
  executor.setThreadNamePrefix("request_queue");
  executor.initialize();
  return executor;
}
Örnek
Şöyle yaparız
@Bean
public Executor taskExecutor() {
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  executor.setCorePoolSize(2);
  executor.setMaxPoolSize(2);
  executor.setQueueCapacity(500);
  executor.setThreadNamePrefix("KafkaMsgExecutor-");
  executor.initialize();
  return executor;
}
setWaitForTasksToCompleteOnShutdown metodu
Eğer true verirse executor.Shutdown() metodu çağrılır. Açıklaması şöyle.
Spring will call executor.shutdown instead of executor.shutdownNow
Örnek
Şöyle yaparız.
executor.setWaitForTasksToCompleteOnShutdown(true);     
submit metodu
Future nesnesi döner.
Örnek
Elimizde şöyle bir bean olsun
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
Şöyle yaparız
Future<Foo> future = threadPoolTaskExecutor.submit(...);
Foo foo = future.get();


Hiç yorum yok:

Yorum Gönder