Spring/Batch

Chunk Size and Parallel Spring Batch

YoonJong 2024. 4. 22. 14:29
728x90

-- 결론 -- 

1. Chunk Size 가 클수록 Batch 의 실행시간이 빨라(짧아)지는 것은 아니다.

2. 병렬 처리의 속도가 더 빠르다.

 


 

Case) 1억 건의 파일을 DB에 저장해야 한다

  • Chunk Size 를 최소화해서 여러번 Commit 한다 → Commit 리소스 비용이 크기 떄문에 비효율적 , 커밋을 되도록 줄인다보통 금융권 기준으로 2~3000 으로 설정하는 것 같다고 함.
  • Chunk Size 를 1억으로 설정하면 OOM 이 발생할 수 있다. ( 메모리에 올려두고 작업하기 때문에 )

  • ChunkSize 에 따른 속도 비교 → 약 1만건(10,209) 의 데이터를 가지고 테스트
// Chunk Size Test
@RequiredArgsConstructor
@Configuration
public class JpaPageJob {

    private final EntityManagerFactory entityManager;

    @Bean
    public Job jpaPageJob_batchBuild(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new JobBuilder("jpaPageJob", jobRepository)
                .start(jpaPageJob_batchStep1(0, jobRepository, transactionManager))
                .build();
    }

    @Bean
    @JobScope
    public Step jpaPageJob_batchStep1(@Value("#{jobParameters[chunkSize]}") int chunkSize, JobRepository jobRepository, PlatformTransactionManager transactionManager) {

        return new StepBuilder("jpaPageJob1",jobRepository)
                .<Person, PersonTemp>chunk(chunkSize,transactionManager)
                .reader(jpaPageJob_dbItemReader())
                .processor(jpaPageJob_processor())
                .writer(jpaPageJob_dbItemWriter())
                .build();
    }

    @Bean
    public JpaPagingItemReader<Person> jpaPageJob_dbItemReader() {

        JpaPagingItemReader<Person> reader = new JpaPagingItemReader<>();
        reader.setName("JpaPageJob1_Reader");
        reader.setEntityManagerFactory(entityManager);
        reader.setQueryString("SELECT p FROM Person p");

        return reader;
    }

    @Bean
    public ItemProcessor<Person, PersonTemp> jpaPageJob_processor() {
        return person -> {
            return new PersonTemp(person.getId(), "NEW_" + person.getUsername(), person.getAge());
        };
    }

    @Bean
    public JpaItemWriter<PersonTemp> jpaPageJob_dbItemWriter() {
        JpaItemWriter<PersonTemp> writer = new JpaItemWriter<>();
        writer.setEntityManagerFactory(entityManager);
        return writer;

    }
}

  1. ChunkSize = 10
--job.name=jpaPageJob v=1 chunkSize=100

// 결과
Step: [jpaPageJob1] executed in 1m14s796ms
  1. ChunkSize=100
--job.name=jpaPageJob v=1 chunkSize=100

// 결과
Step: [jpaPageJob1] executed in 20s614ms
  1. ChunkSize = 1000
--job.name=jpaPageJob v=1 chunkSize=1000

// 결과
Step: [jpaPageJob1] executed in 22s729ms
  1. ChunkSize = 5000
--job.name=jpaPageJob v=1 chunkSize=1000

// 결과
Step: [jpaPageJob1] executed in 34s323ms
  1. ChunkSize = 10000
--job.name=jpaPageJob v=1 chunkSize=1000

// 결과
Step: [jpaPageJob1] executed in 49s711ms

ChunkSize 가 크다고 해서 꼭 batch 의 실행시간이 짧아지는 것은 아니다.

테스트를 통해 효율적인 Size 를 찾아 적용해야 한다.


병렬처리

자동으로 병렬로 처리해주는 명령어는 없다.

따로따로 Step 을 작성해서 flow 로 합쳐주어야 한다.

 

아래 코드는 2개의 Step 을 query 를 통해 id = 5000 기준으로 나누어 병렬로 실행한다.

@Configuration
@Slf4j
@RequiredArgsConstructor
public class JpaPageFlowJob {

    private final EntityManagerFactory entityManager;
    private final int chunkSize = 10;


    @Bean
    public Job jpaPageLowJob_batchBuild(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        Flow flow1 = new FlowBuilder<Flow>("flow1")
                .start(jpaPageJob_batchStep1_parallel(jobRepository, transactionManager))
                .build();

        Flow flow2 = new FlowBuilder<Flow>("flow2")
                .start(jpaPageJob_batchStep2_parallel(jobRepository, transactionManager))
                .build();

        Flow parallelStepFlow = new FlowBuilder<Flow>("parallelStepFlow")
                .split(new SimpleAsyncTaskExecutor())
                .add(flow1, flow2)
                .build();

        return new JobBuilder("jpaPageFlowJob", jobRepository)
                .start(parallelStepFlow)
                .build().build();
    }

    @Bean
    public Step jpaPageJob_batchStep1_parallel(JobRepository jobRepository, PlatformTransactionManager transactionManager) {

        return new StepBuilder("jpaPageJob1",jobRepository)
                .<Person, PersonTemp>chunk(chunkSize,transactionManager)
                .reader(jpaPageJob_1_dbItemReader_parallel())
                .processor(jpaPageJob_1_processor_parallel())
                .writer(jpaPageJob_1_dbItemWriter_parallel())
                .build();
    }

    @Bean
    public Step jpaPageJob_batchStep2_parallel(JobRepository jobRepository, PlatformTransactionManager transactionManager) {

        return new StepBuilder("jpaPageJob1",jobRepository)
                .<Person, PersonTemp>chunk(chunkSize,transactionManager)
                .reader(jpaPageJob_2_dbItemReader_parallel())
                .processor(jpaPageJob_2_processor_parallel())
                .writer(jpaPageJob_2_dbItemWriter_parallel())
                .build();
    }


    @Bean
    public JpaPagingItemReader<Person> jpaPageJob_1_dbItemReader_parallel() {

        JpaPagingItemReader<Person> reader = new JpaPagingItemReader<>();
        reader.setName("JpaPageJob1_Reader");
        reader.setEntityManagerFactory(entityManager);
        reader.setQueryString("SELECT p FROM Person p where p.id <= 5000 order by p.id asc"); // 2개의 병렬처리 중 1번째) id = 5000 이하 인것은 해당 Reader 에서 읽는다.

        return reader;
    }

    @Bean
    public ItemProcessor<Person, PersonTemp> jpaPageJob_1_processor_parallel() {
        return person -> {
            return new PersonTemp(person.getId(), "NEW_" + person.getUsername(), person.getAge());
        };
    }

    @Bean
    public JpaItemWriter<PersonTemp> jpaPageJob_1_dbItemWriter_parallel() {
        JpaItemWriter<PersonTemp> writer = new JpaItemWriter<>();
        writer.setEntityManagerFactory(entityManager);
        return writer;
    }

    @Bean
    public JpaPagingItemReader<Person> jpaPageJob_2_dbItemReader_parallel() {

        JpaPagingItemReader<Person> reader = new JpaPagingItemReader<>();
        reader.setName("JpaPageJob1_Reader");
        reader.setEntityManagerFactory(entityManager);
        reader.setQueryString("SELECT p FROM Person p where p.id > 5000 order by p.id asc"); // 2개의 병렬처리 중 2번째) id = 5000 초과 인것은 해당 Reader 에서 읽는다.

        return reader;
    }

		//PERSON 테이블에서 PERSON_TEMP 테이블에 INSERT 할때 NEW_ 만 붙여서 추가.
    @Bean
    public ItemProcessor<Person, PersonTemp> jpaPageJob_2_processor_parallel() {
        return person -> {
            return new PersonTemp(person.getId(), "NEW_" + person.getUsername(), person.getAge());
        };
    }

    @Bean
    public JpaItemWriter<PersonTemp> jpaPageJob_2_dbItemWriter_parallel() {
        JpaItemWriter<PersonTemp> writer = new JpaItemWriter<>();
        writer.setEntityManagerFactory(entityManager);
        return writer;
    }
}

프로그램 인수에서 v 버전만 변경해서 테스트 진행

실행 전, person_temp 의 데이터는 모두 삭제해준다.

--job.name=jpaPageFlowJob v=15
  1. batchSize = 10
Step: [jpaPageJob1] executed in 9s261ms
  1. batchSize = 100
Step: [jpaPageJob1] executed in 7s721ms
  1. batchSize = 1000
Step: [jpaPageJob1] executed in 6s535ms
  1. batchSize = 5000
Step: [jpaPageJob1] executed in 6s689ms
  1. batchSize = 10000
Step: [jpaPageJob1] executed in 6s178ms

테스트 결과 size 가 1000 ~ 이상부터는 크게 차이가 나지 않는 것을 확인할 수 있다.

병렬로 실행했기 때문에, 순서대로 DB 에 INSERT 되지 않는다.

 

728x90