Spring/Batch
Chunk Size and Parallel Spring Batch
YoonJong
2024. 4. 22. 14:29
728x90
-- 결론 --
1. Chunk Size 가 클수록 Batch 의 실행시간이 빨라(짧아)지는 것은 아니다.
2. 병렬 처리의 속도가 더 빠르다.
Case) 1억 건의 파일을 DB에 저장해야 한다
- Chunk Size 를 최소화해서 여러번 Commit 한다 → Commit 리소스 비용이 크기 떄문에 비효율적 , 커밋을 되도록 줄인다보통 금융권 기준으로 2~3000 으로 설정하는 것 같다고 함.
- Chunk Size 를 1억으로 설정하면 OOM 이 발생할 수 있다. ( 메모리에 올려두고 작업하기 때문에 )
- ChunkSize 에 따른 속도 비교 → 약 1만건(10,209) 의 데이터를 가지고 테스트
// Chunk Size Test
@RequiredArgsConstructor
@Configuration
public class JpaPageJob {
private final EntityManagerFactory entityManager;
@Bean
public Job jpaPageJob_batchBuild(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("jpaPageJob", jobRepository)
.start(jpaPageJob_batchStep1(0, jobRepository, transactionManager))
.build();
}
@Bean
@JobScope
public Step jpaPageJob_batchStep1(@Value("#{jobParameters[chunkSize]}") int chunkSize, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("jpaPageJob1",jobRepository)
.<Person, PersonTemp>chunk(chunkSize,transactionManager)
.reader(jpaPageJob_dbItemReader())
.processor(jpaPageJob_processor())
.writer(jpaPageJob_dbItemWriter())
.build();
}
@Bean
public JpaPagingItemReader<Person> jpaPageJob_dbItemReader() {
JpaPagingItemReader<Person> reader = new JpaPagingItemReader<>();
reader.setName("JpaPageJob1_Reader");
reader.setEntityManagerFactory(entityManager);
reader.setQueryString("SELECT p FROM Person p");
return reader;
}
@Bean
public ItemProcessor<Person, PersonTemp> jpaPageJob_processor() {
return person -> {
return new PersonTemp(person.getId(), "NEW_" + person.getUsername(), person.getAge());
};
}
@Bean
public JpaItemWriter<PersonTemp> jpaPageJob_dbItemWriter() {
JpaItemWriter<PersonTemp> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManager);
return writer;
}
}
- ChunkSize = 10
--job.name=jpaPageJob v=1 chunkSize=100
// 결과
Step: [jpaPageJob1] executed in 1m14s796ms
- ChunkSize=100
--job.name=jpaPageJob v=1 chunkSize=100
// 결과
Step: [jpaPageJob1] executed in 20s614ms
- ChunkSize = 1000
--job.name=jpaPageJob v=1 chunkSize=1000
// 결과
Step: [jpaPageJob1] executed in 22s729ms
- ChunkSize = 5000
--job.name=jpaPageJob v=1 chunkSize=1000
// 결과
Step: [jpaPageJob1] executed in 34s323ms
- ChunkSize = 10000
--job.name=jpaPageJob v=1 chunkSize=1000
// 결과
Step: [jpaPageJob1] executed in 49s711ms
ChunkSize 가 크다고 해서 꼭 batch 의 실행시간이 짧아지는 것은 아니다.
테스트를 통해 효율적인 Size 를 찾아 적용해야 한다.
병렬처리
자동으로 병렬로 처리해주는 명령어는 없다.
따로따로 Step 을 작성해서 flow 로 합쳐주어야 한다.
아래 코드는 2개의 Step 을 query 를 통해 id = 5000 기준으로 나누어 병렬로 실행한다.
@Configuration
@Slf4j
@RequiredArgsConstructor
public class JpaPageFlowJob {
private final EntityManagerFactory entityManager;
private final int chunkSize = 10;
@Bean
public Job jpaPageLowJob_batchBuild(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
Flow flow1 = new FlowBuilder<Flow>("flow1")
.start(jpaPageJob_batchStep1_parallel(jobRepository, transactionManager))
.build();
Flow flow2 = new FlowBuilder<Flow>("flow2")
.start(jpaPageJob_batchStep2_parallel(jobRepository, transactionManager))
.build();
Flow parallelStepFlow = new FlowBuilder<Flow>("parallelStepFlow")
.split(new SimpleAsyncTaskExecutor())
.add(flow1, flow2)
.build();
return new JobBuilder("jpaPageFlowJob", jobRepository)
.start(parallelStepFlow)
.build().build();
}
@Bean
public Step jpaPageJob_batchStep1_parallel(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("jpaPageJob1",jobRepository)
.<Person, PersonTemp>chunk(chunkSize,transactionManager)
.reader(jpaPageJob_1_dbItemReader_parallel())
.processor(jpaPageJob_1_processor_parallel())
.writer(jpaPageJob_1_dbItemWriter_parallel())
.build();
}
@Bean
public Step jpaPageJob_batchStep2_parallel(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("jpaPageJob1",jobRepository)
.<Person, PersonTemp>chunk(chunkSize,transactionManager)
.reader(jpaPageJob_2_dbItemReader_parallel())
.processor(jpaPageJob_2_processor_parallel())
.writer(jpaPageJob_2_dbItemWriter_parallel())
.build();
}
@Bean
public JpaPagingItemReader<Person> jpaPageJob_1_dbItemReader_parallel() {
JpaPagingItemReader<Person> reader = new JpaPagingItemReader<>();
reader.setName("JpaPageJob1_Reader");
reader.setEntityManagerFactory(entityManager);
reader.setQueryString("SELECT p FROM Person p where p.id <= 5000 order by p.id asc"); // 2개의 병렬처리 중 1번째) id = 5000 이하 인것은 해당 Reader 에서 읽는다.
return reader;
}
@Bean
public ItemProcessor<Person, PersonTemp> jpaPageJob_1_processor_parallel() {
return person -> {
return new PersonTemp(person.getId(), "NEW_" + person.getUsername(), person.getAge());
};
}
@Bean
public JpaItemWriter<PersonTemp> jpaPageJob_1_dbItemWriter_parallel() {
JpaItemWriter<PersonTemp> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManager);
return writer;
}
@Bean
public JpaPagingItemReader<Person> jpaPageJob_2_dbItemReader_parallel() {
JpaPagingItemReader<Person> reader = new JpaPagingItemReader<>();
reader.setName("JpaPageJob1_Reader");
reader.setEntityManagerFactory(entityManager);
reader.setQueryString("SELECT p FROM Person p where p.id > 5000 order by p.id asc"); // 2개의 병렬처리 중 2번째) id = 5000 초과 인것은 해당 Reader 에서 읽는다.
return reader;
}
//PERSON 테이블에서 PERSON_TEMP 테이블에 INSERT 할때 NEW_ 만 붙여서 추가.
@Bean
public ItemProcessor<Person, PersonTemp> jpaPageJob_2_processor_parallel() {
return person -> {
return new PersonTemp(person.getId(), "NEW_" + person.getUsername(), person.getAge());
};
}
@Bean
public JpaItemWriter<PersonTemp> jpaPageJob_2_dbItemWriter_parallel() {
JpaItemWriter<PersonTemp> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManager);
return writer;
}
}
프로그램 인수에서 v 버전만 변경해서 테스트 진행
실행 전, person_temp 의 데이터는 모두 삭제해준다.
--job.name=jpaPageFlowJob v=15
- batchSize = 10
Step: [jpaPageJob1] executed in 9s261ms
- batchSize = 100
Step: [jpaPageJob1] executed in 7s721ms
- batchSize = 1000
Step: [jpaPageJob1] executed in 6s535ms
- batchSize = 5000
Step: [jpaPageJob1] executed in 6s689ms
- batchSize = 10000
Step: [jpaPageJob1] executed in 6s178ms
테스트 결과 size 가 1000 ~ 이상부터는 크게 차이가 나지 않는 것을 확인할 수 있다.
병렬로 실행했기 때문에, 순서대로 DB 에 INSERT 되지 않는다.
728x90