15 Aralık 2019 Pazar

SpringBatch Kullanımı

Giriş
Tarihçesi şöyle. Spring Batch 2007 yılından beri var, SpringBoot Batch ise 2015 yılından beri var
The Spring Batch framework, on which Spring Boot Batch is based, was first introduced in 2007 as a subproject of the Spring Framework. Spring Batch was designed to provide a simple and powerful framework for building batch-processing applications, and it quickly gained popularity in the Java ecosystem.

In 2014, the first version of Spring Boot was released, which aimed to simplify the configuration and setup of Spring-based applications. Spring Boot provided an opinionated approach to building Spring applications, which meant that developers could get started quickly without having to make many configuration choices.

In 2015, Spring Batch was integrated into Spring Boot, and Spring Boot Batch was born. Spring Boot Batch provided a streamlined approach to building batch-processing applications using Spring Batch, and it quickly became a popular choice for building batch-processing applications in the Java ecosystem.
Kullanım
@EnableBatchProcessing anotasyonu tanımlı olmalıdır.

Şeklen şöyle
Şeklen şöyle


Kavramlar şöyle.
Job: an entity that encapsulates an entire batch process. It is composed of one or more ordered Steps and it has some properties such as restartability.
Step: a domain object that encapsulates an independent, sequential phase of a batch job.
Item: the individual piece of data that it’s been processed.
Chunk: the processing style used by Spring Batch: read and process the item and then aggregate until reach a number of items, called “chunk” that will be finally written.
JobLauncher: the entry point to launch Spring Batch jobs with a given set of JobParameters.
JobRepository: maintains all metadata related to job executions and provides CRUD operations for JobLauncher, Job, and Step implementations.
1. Job içinde Step'ler bulunur. 
Step nesnesi iki şekilde çalışabilir.
chunk : Çok sayıda satırı, nesneyi, girdiyi bir anda işler. Bu durumda her Step içinde ItemReader, ItemWriter, ItemProcessor bulunabilir.
tasklet : Tek bir iş üzerinde çalışır Bu durumda her Step içinde bir Tasklet bulunur. Step'ler birbirlerine next() çağrısı ile bağlanabilir.

2. Job nesnesi JobLauncher tarafından çalıştırılır.
3. Job hakkındaki bilgiler JobRepository içinde saklanır

Flow
Normalde Step'ler sırayla çalıştırılır. Ancak bazen Conditional Flow yapmak gerekir. Veya Split Flow  ile Flow'lar paralel çalıştırılabilir
 
1. İşi Tanımlama - Job + Step + Tasklet
1.1 Step Nesnesi Yaratma
@Configuration olarak işaretli bir sınıfa StepBuilderFactory enjekte edilir. Bu sınıf kullanılarak Step tipinden bir bean yaratır. Step içinde reader ve writer bulunabilir.
Örnek
Şöyle yaparız.
@Bean
public Step step1() throws Exception {
  return stepBuilderFactory.get("step1").<Employee, Employee>chunk(10).reader(reader())
    .writer(writer()).build();
}
Reader nesneleri ItemReader arayüzünden kalıtır. Reader olarak şunlar kullanılabilir.
FlatFileItemReader. CSV dosyaları okur
- JdbcCursorItemReader
JdbcPagingItemReader 
- MongoItemReader. Bu reader periyodik olarak sorgular ve gelen veriyi işler.

Writer nesneleri ItemWriter arayüzünden kalıtır. Writer olarak şunlar kullanılabilir.
- FlatFileItemWriter
- StaxEventItemWriter

Writer nesneleri işlenen satırları güncellemek, silmek için kullanılırlar. Toplu e-posta göndermek örneği burada. Örnekte bir tablodan e-posta gönderilecek satırlar reader ile okunur, processor ile e-post gönderilir ve writer ile bu satır e-posta gönderildi olarak işaretlenerek kaydedilir.

1.2 Tasklet Nesnesi Yaratma
Tasklet nesnesi örneğin bir kaynağı ilklendirmek veya temizlemek için kullanılır.

1.3 Job Nesnesi Yaratma
@Configuration olarak işaretli bir sınıfa JobBuilderFactory enjekte edilir. Bu sınıf kullanılarak bir JobBuilder elde edilir. JobBuilder ile Job tipinden bir bean yaratır.

Örnek
Şöyle yaparız. Her Job'ın bir ismi vardır. Bu işin adımları eğer birden fazla Step'ten oluşuyorsa flow() ile tanımlanır. Eğer tek bir Step'ten oluşuyorsa run() ile tanımlanır.
@Bean
public Job readEmployee() throws Exception {
  return jobBuilderFactory.get("readEmployee").flow(step1()).end().build();
}
2. İş Başlatmak
- JobLauncher arayüzünü run () metodu çağrılır. JobLauncher bir bean olduğu için kolayca erişilebilir. JobLauncher gerçekleştirimini sağlayan sınıf SimpleJobLauncher sınıfıdır.

Şöyle yaparız
@Autowired
JobLauncher jobLauncher;
- Parametre olarak Job (yani adımlar) ve JobParameters verilir. JobParameters nesnesini oluşturmak için JobParametersBuilder kullanılır.

Örnek
Şöyle yaparız
Job job =  ...;
JobParameters jobParameters = ...;
try {
  JobExecution jobExecution = jobLauncher.run(job, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException
   | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
  ...
}
Örnek
Şöyle yaparız
@Autowired
JobBuilderFactory jobBuilderFactory;

@Bean(name = "ExcelFileProcessingJob")
public Job job () throws Exception {
return this.jobBuilderFactory.get("ExcelFileProcessingJob")
  .start(fileProcessingStep())
  .build();
}

--------
@Autowired
@Qualifier("ExcelFileProcessingJob")
Job excelFileProcessingJob;

JobParameters jobParameters = new JobParametersBuilder()
  .addString("filename", filename)
  .toJobParameters();
JobExecution jobExecution = jobLauncher.run(excelFileProcessingJob, jobParameters);
3. Veri tabanı Tabloları

Tablolar şöyle
batch_job_execution
batch_job_execution_context
batch_job_execution_params
batch_job_execution_seq
batch_job_instance
batch_job_seq
batch_step_execution
batch_step_execution_context
batch_step_execution_seq
Tablolar şöyle
create  table  if not exists batch_job_instance
(
    job_instance_id bigint       not null
        primary key,
    version         bigint,
    job_name        varchar(100) not null,
    job_key         varchar(32)  not null,
    constraint job_inst_un
        unique (job_name, job_key)
);

alter table batch_job_instance
    owner to migration_user;

create table if not exists batch_job_execution
(
    job_execution_id           bigint    not null
        primary key,
    version                    bigint,
    job_instance_id            bigint    not null
        constraint job_inst_exec_fk
            references batch_job_instance,
    create_time                timestamp not null,
    start_time                 timestamp,
    end_time                   timestamp,
    status                     varchar(10),
    exit_code                  varchar(2500),
    exit_message               varchar(2500),
    last_updated               timestamp,
    job_configuration_location varchar(2500)
);

alter table batch_job_execution
    owner to migration_user;



create table batch_job_execution_params
(
    job_execution_id bigint       not null
        constraint job_exec_params_fk
            references batch_job_execution,
    type_cd          varchar(6)   not null,
    key_name         varchar(100) not null,
    string_val       varchar(250),
    date_val         timestamp,
    long_val         bigint,
    double_val       double precision,
    identifying      char         not null
);

alter table batch_job_execution_params
    owner to migration_user;

create table batch_step_execution
(
    step_execution_id  bigint       not null
        primary key,
    version            bigint       not null,
    step_name          varchar(100) not null,
    job_execution_id   bigint       not null
        constraint job_exec_step_fk
            references batch_job_execution,
    start_time         timestamp    not null,
    end_time           timestamp,
    status             varchar(10),
    commit_count       bigint,
    read_count         bigint,
    filter_count       bigint,
    write_count        bigint,
    read_skip_count    bigint,
    write_skip_count   bigint,
    process_skip_count bigint,
    rollback_count     bigint,
    exit_code          varchar(2500),
    exit_message       varchar(2500),
    last_updated       timestamp
);

alter table batch_step_execution
    owner to migration_user;

create table batch_step_execution_context
(
    step_execution_id  bigint        not null
        primary key
        constraint step_exec_ctx_fk
            references batch_step_execution,
    short_context      varchar(2500) not null,
    serialized_context text
);

alter table batch_step_execution_context
    owner to migration_user;

create table batch_job_execution_context
(
    job_execution_id   bigint        not null
        primary key
        constraint job_exec_ctx_fk
            references batch_job_execution,
    short_context      varchar(2500) not null,
    serialized_context text
);

alter table batch_job_execution_context
    owner to migration_user;


/*Sequence generate*/
create sequence if not exists batch_step_execution_seq maxvalue 9223372036854775807 no cycle;
alter sequence batch_step_execution_seq owner to migration_user;

create sequence if not exists batch_job_execution_seq maxvalue 9223372036854775807 no cycle;
alter sequence batch_job_execution_seq owner to migration_user;

create sequence if not exists batch_job_seq maxvalue 9223372036854775807 no cycle;
alter sequence batch_job_seq owner to migration_user;
application.properties dosyasında şöyle yaparız
spring.batch.initialize-schema=ALWAYS
Açıklaması şöyle
If we don't use this property and start the application, the application will complain Table batch_job_instance doesn't exist.

To avoid this error, we are basically telling to create batch job-related metadata during startup. This property will create additional database tables in your database like batch_job_execution, batch_job_execution_context, batch_job_execution_params, batch_job_instance etc.
Job'ları listelemek için şöyle yaparız
@SuppressWarnings("unchecked")
public List<?> jobs() {
  var sql = "select * from batch_step_execution order by step_execution_id";

  var resultList = (List<Tuple>) entityManager.createNativeQuery(sql, Tuple.class)
    .getResultList();
  return resultList.stream().map(tuple -> {
    var map = new HashMap<>();
    map.put("step_execution_id", tuple.get("step_execution_id"));
    map.put("step_name", tuple.get("step_name"));
    map.put("read_count", tuple.get("read_count"));
    map.put("write_count", tuple.get("write_count"));
    map.put("start_time", tuple.get("start_time"));
    map.put("end_time", tuple.get("end_time"));
    map.put("status", tuple.get("status"));
    map.put("commit_count", tuple.get("commit_count"));
    map.put("exit_message", tuple.get("exit_message"));
    map.put("exit_code", tuple.get("exit_code"));
    return map;
  }).collect(Collectors.toList());
}


Hiç yorum yok:

Yorum Gönder