Giriş
Tarihçesi şöyle. Spring Batch 2007 yılından beri var, SpringBoot Batch ise 2015 yılından beri var
Tarihçesi şöyle. Spring Batch 2007 yılından beri var, SpringBoot Batch ise 2015 yılından beri var
The Spring Batch framework, on which Spring Boot Batch is based, was first introduced in 2007 as a subproject of the Spring Framework. Spring Batch was designed to provide a simple and powerful framework for building batch-processing applications, and it quickly gained popularity in the Java ecosystem.In 2014, the first version of Spring Boot was released, which aimed to simplify the configuration and setup of Spring-based applications. Spring Boot provided an opinionated approach to building Spring applications, which meant that developers could get started quickly without having to make many configuration choices.In 2015, Spring Batch was integrated into Spring Boot, and Spring Boot Batch was born. Spring Boot Batch provided a streamlined approach to building batch-processing applications using Spring Batch, and it quickly became a popular choice for building batch-processing applications in the Java ecosystem.
Kullanım
@EnableBatchProcessing anotasyonu tanımlı olmalıdır.
Şeklen şöyle
Şeklen şöyleKavramlar şöyle.
Job: an entity that encapsulates an entire batch process. It is composed of one or more ordered Steps and it has some properties such as restartability.1. Job içinde Step'ler bulunur.
Step: a domain object that encapsulates an independent, sequential phase of a batch job.
Item: the individual piece of data that it’s been processed.
Chunk: the processing style used by Spring Batch: read and process the item and then aggregate until reach a number of items, called “chunk” that will be finally written.
JobLauncher: the entry point to launch Spring Batch jobs with a given set of JobParameters.
JobRepository: maintains all metadata related to job executions and provides CRUD operations for JobLauncher, Job, and Step implementations.
Step nesnesi iki şekilde çalışabilir.
chunk : Çok sayıda satırı, nesneyi, girdiyi bir anda işler. Bu durumda her Step içinde ItemReader, ItemWriter, ItemProcessor bulunabilir.
tasklet : Tek bir iş üzerinde çalışır Bu durumda her Step içinde bir Tasklet bulunur. Step'ler birbirlerine next() çağrısı ile bağlanabilir.
2. Job nesnesi JobLauncher tarafından çalıştırılır.
3. Job hakkındaki bilgiler JobRepository içinde saklanırFlow
Normalde Step'ler sırayla çalıştırılır. Ancak bazen Conditional Flow yapmak gerekir. Veya Split Flow ile Flow'lar paralel çalıştırılabilir
1. İşi Tanımlama - Job + Step + Tasklet
Örnek
Şöyle yaparız. Her Job'ın bir ismi vardır. Bu işin adımları eğer birden fazla Step'ten oluşuyorsa flow() ile tanımlanır. Eğer tek bir Step'ten oluşuyorsa run() ile tanımlanır.
- JobLauncher arayüzünü run () metodu çağrılır. JobLauncher bir bean olduğu için kolayca erişilebilir. JobLauncher gerçekleştirimini sağlayan sınıf SimpleJobLauncher sınıfıdır.
Şöyle yaparız
Örnek
Şöyle yaparız
3. Veri tabanı Tabloları
Tablolar şöyle
application.properties dosyasında şöyle yaparız
1. İşi Tanımlama - Job + Step + Tasklet
1.1 Step Nesnesi Yaratma
@Configuration olarak işaretli bir sınıfa StepBuilderFactory enjekte edilir. Bu sınıf kullanılarak Step tipinden bir bean yaratır. Step içinde reader ve writer bulunabilir.
Örnek
Şöyle yaparız.
- FlatFileItemReader. CSV dosyaları okur
- JdbcCursorItemReader
- JdbcPagingItemReader
- MongoItemReader. Bu reader periyodik olarak sorgular ve gelen veriyi işler.
Writer nesneleri ItemWriter arayüzünden kalıtır. Writer olarak şunlar kullanılabilir.
- FlatFileItemWriter
- StaxEventItemWriter
Örnek
Şöyle yaparız.
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1").<Employee, Employee>chunk(10).reader(reader())
.writer(writer()).build();
}
Reader nesneleri ItemReader arayüzünden kalıtır. Reader olarak şunlar kullanılabilir.- FlatFileItemReader. CSV dosyaları okur
- JdbcCursorItemReader
- JdbcPagingItemReader
- MongoItemReader. Bu reader periyodik olarak sorgular ve gelen veriyi işler.
Writer nesneleri ItemWriter arayüzünden kalıtır. Writer olarak şunlar kullanılabilir.
- FlatFileItemWriter
- StaxEventItemWriter
Writer nesneleri işlenen satırları güncellemek, silmek için kullanılırlar. Toplu e-posta göndermek örneği burada. Örnekte bir tablodan e-posta gönderilecek satırlar reader ile okunur, processor ile e-post gönderilir ve writer ile bu satır e-posta gönderildi olarak işaretlenerek kaydedilir.
1.2 Tasklet Nesnesi Yaratma
Tasklet nesnesi örneğin bir kaynağı ilklendirmek veya temizlemek için kullanılır.
1.2 Tasklet Nesnesi Yaratma
Tasklet nesnesi örneğin bir kaynağı ilklendirmek veya temizlemek için kullanılır.
1.3 Job Nesnesi Yaratma
@Configuration olarak işaretli bir sınıfa JobBuilderFactory enjekte edilir. Bu sınıf kullanılarak bir JobBuilder elde edilir. JobBuilder ile Job tipinden bir bean yaratır.
@Configuration olarak işaretli bir sınıfa JobBuilderFactory enjekte edilir. Bu sınıf kullanılarak bir JobBuilder elde edilir. JobBuilder ile Job tipinden bir bean yaratır.
Örnek
Şöyle yaparız. Her Job'ın bir ismi vardır. Bu işin adımları eğer birden fazla Step'ten oluşuyorsa flow() ile tanımlanır. Eğer tek bir Step'ten oluşuyorsa run() ile tanımlanır.
@Bean
public Job readEmployee() throws Exception {
return jobBuilderFactory.get("readEmployee").flow(step1()).end().build();
}
2. İş Başlatmak
Şöyle yaparız
@Autowired
JobLauncher jobLauncher;
- Parametre olarak Job (yani adımlar) ve JobParameters verilir. JobParameters nesnesini oluşturmak için JobParametersBuilder kullanılır.
Örnek
Şöyle yaparız
Job job = ...;
JobParameters jobParameters = ...;
try {
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
...
}
Örnek
Şöyle yaparız
@Autowired
JobBuilderFactory jobBuilderFactory;
@Bean(name = "ExcelFileProcessingJob")
public Job job () throws Exception {
return this.jobBuilderFactory.get("ExcelFileProcessingJob")
.start(fileProcessingStep())
.build();
}
--------
@Autowired
@Qualifier("ExcelFileProcessingJob")
Job excelFileProcessingJob;
JobParameters jobParameters = new JobParametersBuilder()
.addString("filename", filename)
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(excelFileProcessingJob, jobParameters);
Tablolar şöyle
batch_job_execution
batch_job_execution_context
batch_job_execution_params
batch_job_execution_seq
batch_job_instance
batch_job_seq
batch_step_execution
batch_step_execution_context
batch_step_execution_seq
Tablolar şöyle
create table if not exists batch_job_instance
(
job_instance_id bigint not null
primary key,
version bigint,
job_name varchar(100) not null,
job_key varchar(32) not null,
constraint job_inst_un
unique (job_name, job_key)
);
alter table batch_job_instance
owner to migration_user;
create table if not exists batch_job_execution
(
job_execution_id bigint not null
primary key,
version bigint,
job_instance_id bigint not null
constraint job_inst_exec_fk
references batch_job_instance,
create_time timestamp not null,
start_time timestamp,
end_time timestamp,
status varchar(10),
exit_code varchar(2500),
exit_message varchar(2500),
last_updated timestamp,
job_configuration_location varchar(2500)
);
alter table batch_job_execution
owner to migration_user;
create table batch_job_execution_params
(
job_execution_id bigint not null
constraint job_exec_params_fk
references batch_job_execution,
type_cd varchar(6) not null,
key_name varchar(100) not null,
string_val varchar(250),
date_val timestamp,
long_val bigint,
double_val double precision,
identifying char not null
);
alter table batch_job_execution_params
owner to migration_user;
create table batch_step_execution
(
step_execution_id bigint not null
primary key,
version bigint not null,
step_name varchar(100) not null,
job_execution_id bigint not null
constraint job_exec_step_fk
references batch_job_execution,
start_time timestamp not null,
end_time timestamp,
status varchar(10),
commit_count bigint,
read_count bigint,
filter_count bigint,
write_count bigint,
read_skip_count bigint,
write_skip_count bigint,
process_skip_count bigint,
rollback_count bigint,
exit_code varchar(2500),
exit_message varchar(2500),
last_updated timestamp
);
alter table batch_step_execution
owner to migration_user;
create table batch_step_execution_context
(
step_execution_id bigint not null
primary key
constraint step_exec_ctx_fk
references batch_step_execution,
short_context varchar(2500) not null,
serialized_context text
);
alter table batch_step_execution_context
owner to migration_user;
create table batch_job_execution_context
(
job_execution_id bigint not null
primary key
constraint job_exec_ctx_fk
references batch_job_execution,
short_context varchar(2500) not null,
serialized_context text
);
alter table batch_job_execution_context
owner to migration_user;
/*Sequence generate*/
create sequence if not exists batch_step_execution_seq maxvalue 9223372036854775807 no cycle;
alter sequence batch_step_execution_seq owner to migration_user;
create sequence if not exists batch_job_execution_seq maxvalue 9223372036854775807 no cycle;
alter sequence batch_job_execution_seq owner to migration_user;
create sequence if not exists batch_job_seq maxvalue 9223372036854775807 no cycle;
alter sequence batch_job_seq owner to migration_user;
spring.batch.initialize-schema=ALWAYSAçıklaması şöyle
If we don't use this property and start the application, the application will complain Table batch_job_instance doesn't exist.To avoid this error, we are basically telling to create batch job-related metadata during startup. This property will create additional database tables in your database like batch_job_execution, batch_job_execution_context, batch_job_execution_params, batch_job_instance etc.
Job'ları listelemek için şöyle yaparız
@SuppressWarnings("unchecked") public List<?> jobs() { var sql = "select * from batch_step_execution order by step_execution_id"; var resultList = (List<Tuple>) entityManager.createNativeQuery(sql, Tuple.class) .getResultList(); return resultList.stream().map(tuple -> { var map = new HashMap<>(); map.put("step_execution_id", tuple.get("step_execution_id")); map.put("step_name", tuple.get("step_name")); map.put("read_count", tuple.get("read_count")); map.put("write_count", tuple.get("write_count")); map.put("start_time", tuple.get("start_time")); map.put("end_time", tuple.get("end_time")); map.put("status", tuple.get("status")); map.put("commit_count", tuple.get("commit_count")); map.put("exit_message", tuple.get("exit_message")); map.put("exit_code", tuple.get("exit_code")); return map; }).collect(Collectors.toList()); }
Hiç yorum yok:
Yorum Gönder