Spring/Batch

Spring Batch 5.x DB 사용하기

YoonJong 2024. 4. 20. 10:42
728x90

 

기존 Tasklet 방식에 이어 Chunk 방식을 이용한 간단한 예제를 작성해 보려고 한다.

 

스프링 배치 아키텍처이다.

Tasklet 에서는Item 과 관련된(노란색) 부분은 사용하지 않았다.

Chunk 방식에서는 Item 을 사용하려고 한다.

 

DB 에서 데이터를 가져올 case 

1. ItemReader -> DB 에서 데이터 조회

2. ItemProcessor -> 데이터 가공

3. ItemWriter -> DB 에 다시 INSERT

 

이렇게 순서로 진행할 예정이다.

 

.

 

MySQL DB 에 BATCH 테이블이 정상적으로 생성되어있다고 가정.

JPA 를 이용해서 Entity 2개를 만들어주었다.

Person 테이블에서 가공 후 PersonTemp 테이블에 넣어줄 예정.

 

@Entity
@NoArgsConstructor
@Getter
public class Person {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String username;

    private int age;

    public Person(String username, int age) {
        this.username = username;
        this.age = age;
    }
}

 

@Entity
@NoArgsConstructor
@AllArgsConstructor
public class PersonTemp {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String username;

    private int age;

    public PersonTemp(String username, int age) {
        this.username = username;
        this.age = age;
    }
}

 

Repository 도 따로 만들어준다.

public interface PersonRepository extends JpaRepository<Person, Long> {
}
public interface PersonTempRepository extends JpaRepository<PersonTemp, Long> {
}

 

Person 테이블에 데이터를 생성해준다

12 개를 생성해주었다 ( chunkSize 를 5개로 설정해서 테스트하면서 로그를 볼 것 )

insert into person (age, username) VALUES (1, 'USER1');
insert into person (age, username) VALUES (2, 'USER2');
insert into person (age, username) VALUES (3, 'USER3');
insert into person (age, username) VALUES (4, 'USER4');
insert into person (age, username) VALUES (5, 'USER5');
insert into person (age, username) VALUES (6, 'USER6');
insert into person (age, username) VALUES (7, 'USER7');
insert into person (age, username) VALUES (8, 'USER8');
insert into person (age, username) VALUES (9, 'USER9');
insert into person (age, username) VALUES (10, 'USER10');
insert into person (age, username) VALUES (11, 'USER11');
insert into person (age, username) VALUES (12, 'USER12');

 

이제 Job 클래스를 생성한다.

@RequiredArgsConstructor
@Configuration
public class MyJobChunkConfig {

    private final EntityManagerFactory entityManager;
    private final int chunkSize = 5;

    @Bean
    public Job myChunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new JobBuilder("myChunkJob2", jobRepository) // myChunkJob2 이름 설정
                .start(myChunkStep(jobRepository, transactionManager)) // 시작할 Step 메서드
                .build();
    }

    @Bean
    public Step myChunkStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {

        return new StepBuilder("myChunkStep2",jobRepository)
                .<Person, PersonTemp>chunk(chunkSize,transactionManager) // chunkSize 설정 (5)
                .reader(personJpaPagingItemReader())
                .processor(personJpaPagingProcessor())
                .writer(personJpaPagingWriter())
                .build();
    }

    @Bean
    public JpaPagingItemReader<Person> personJpaPagingItemReader() {

        JpaPagingItemReader<Person> reader = new JpaPagingItemReader<>();
        reader.setName("personJpaPagingItemReader");
        reader.setPageSize(chunkSize);
        reader.setEntityManagerFactory(entityManager);
        reader.setQueryString("SELECT p FROM Person p"); // Person 테이블에서 데이터 조회 

        return reader;
    }

    @Bean
    public ItemProcessor<Person, PersonTemp> personJpaPagingProcessor() {
        return person -> {
            return new PersonTemp(person.getId(), "NEW_" + person.getUsername(), person.getAge());
        }; // PersonTemp 테이블에 NEW_ 가 붙는 과정을 진행하고나서 저장
    }

    @Bean
    public JpaItemWriter<PersonTemp> personJpaPagingWriter() {
        JpaItemWriter<PersonTemp> writer = new JpaItemWriter<>();
        writer.setEntityManagerFactory(entityManager);
        return writer;

    }
}

 

 

 

프로젝트 실행

프로그램 인수에 아래와 같이 넣어준다.

--job.name=myChunkJob2

 

콘솔을 확인해보면 아래와 같이 5개씩 묶어서 조회하는 것을 볼 수 있다.

12개 이므로 총 3번을 select 해서 가져오는 것을 확인할 수 있다.

만약 chunkSize 가 12 이상이면 1번에 조회해서 가져올 것.

2024-04-20T10:26:47.465+09:00  INFO 2036 --- [           main] c.e.s.SpringBatchTutorialApplication     : Started SpringBatchTutorialApplication in 2.703 seconds (process running for 3.21)
2024-04-20T10:26:47.468+09:00  INFO 2036 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2024-04-20T10:26:47.537+09:00  INFO 2036 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=myChunkJob2]] launched with the following parameters: [{}]
2024-04-20T10:26:47.580+09:00  INFO 2036 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [myChunkStep2]
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person p1_0 limit ?,?
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person_temp p1_0 where p1_0.id=?
Hibernate: insert into person_temp (age,username) values (?,?)
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person_temp p1_0 where p1_0.id=?
Hibernate: insert into person_temp (age,username) values (?,?)
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person_temp p1_0 where p1_0.id=?
Hibernate: insert into person_temp (age,username) values (?,?)
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person_temp p1_0 where p1_0.id=?
Hibernate: insert into person_temp (age,username) values (?,?)
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person_temp p1_0 where p1_0.id=?
Hibernate: insert into person_temp (age,username) values (?,?)
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person p1_0 limit ?,?
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person_temp p1_0 where p1_0.id=?
Hibernate: insert into person_temp (age,username) values (?,?)
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person_temp p1_0 where p1_0.id=?
Hibernate: insert into person_temp (age,username) values (?,?)
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person_temp p1_0 where p1_0.id=?
Hibernate: insert into person_temp (age,username) values (?,?)
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person_temp p1_0 where p1_0.id=?
Hibernate: insert into person_temp (age,username) values (?,?)
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person_temp p1_0 where p1_0.id=?
Hibernate: insert into person_temp (age,username) values (?,?)
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person p1_0 limit ?,?
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person_temp p1_0 where p1_0.id=?
Hibernate: insert into person_temp (age,username) values (?,?)
Hibernate: select p1_0.id,p1_0.age,p1_0.username from person_temp p1_0 where p1_0.id=?
Hibernate: insert into person_temp (age,username) values (?,?)
2024-04-20T10:26:47.870+09:00  INFO 2036 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [myChunkStep2] executed in 289ms
2024-04-20T10:26:47.888+09:00  INFO 2036 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=myChunkJob2]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 328ms

 

 

 

728x90