WEB/Spring Batch

스프링 배치 도메인 이해

Tony Lim 2023. 2. 17. 11:18
728x90

JOB

배치 계층 구조에서 가장 상위 개념으로서 하나의 배치작업 자체를 의미함

api 서버의 접속 로그 데이터를 통계 서버로 옮기는 배치 = JOB

Job Configuration 을 통해 생성되는 객체 단위로서 배치작업을 어떻게 구성하고 실행할 것인지 전체적으로 설명하고 명세해 놓은 객체

배치 Job 을 구성하기 위한 최상위 인터페이스를 스프링이 기본적인 구현체를 제공한다.

여러 Step을 포함하고 있는 컨테이너로서 반드시 한 개이상의 step 으로 구성 된다.

 

기본 구현체

SimpleJob = 순차적으로 step 을 실행시키는 job , 모든 job에서 유용하게 쓸수 있는 표준 기술을 가지고 있다.

FlowJob = 특정한 조건과 흐름에 따라 step을 구성하여 실행시키는 job, flow 객체를 실행시켜서 작업을 진행

 

JobLauncher에서 실제 실행을 담당하고 있다.

@RequiredArgsConstructor
@Configuration
public class JobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job() {
        return this.jobBuilderFactory.get("Job")
                .start(step1())
                .next(step2())
                .build();
    }

현재SimpleJob으로 build할시 구현체가 빈으로 등록이 된다.

@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "spring.batch.job", name = "enabled", havingValue = "true", matchIfMissing = true)
public JobLauncherApplicationRunner jobLauncherApplicationRunner(JobLauncher jobLauncher, JobExplorer jobExplorer,
      JobRepository jobRepository, BatchProperties properties) {
   JobLauncherApplicationRunner runner = new JobLauncherApplicationRunner(jobLauncher, jobExplorer, jobRepository);
   String jobNames = properties.getJob().getNames();
   if (StringUtils.hasText(jobNames)) {
      runner.setJobNames(jobNames);
   }
   return runner;
}

실제 job을 실행시킬 Launcher 가 빈으로 등록된다. 사용자가 직접 등록하지 않으면 boot 자동구성에 의해서 등록된다.

@ConditionalOnMissingBean 이 사용자가 직접 구현한것이랑 겹치는것을 막아준다.

JobLaunchrApplicationRunner#execute method에서 실질적으로 SimpleJob을 실행시킨다.

 


JobInstance

Job이 실행될 생성되는 Job의 논리 실행 단위  객체로서 고유하게 식별가능한 작업 실행을 나타냄

Job 의 설정은 동일하지 Job이 실행되는 시점이 다르기 때문에 Job의 실행을 구분해야함

job과 1:M의 관계이고 

job + jobparameter 를 해싱해 구분해서 기존해 했던것이면 캐시된 instance를 돌려주고 없던거면 새로만들어 낸다.

job을 실행하기전에 생성되는 존재가아니라 job이 실행되면서 상태정보를 db에 저장하기 위해 존재하는 것이다.

BATCH_JOB_INSTANCE 테이블과 맵핑된다.

JobRepository 클래스에서 db에 기존에 있는지 없는지 확인을 한 후에 어떤 instance를 돌려줄것인지 결정한다.

기존 JobInstance를 리턴하고 더 이상 진행하지 않는다. 이미 옛날에 했던것이니까

@Component
public class JobRunner implements ApplicationRunner {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job job;

    @Override
    public void run(ApplicationArguments args) throws Exception {

        JobParameters jobParameters = new JobParametersBuilder()
                .addString("name", "user1")
//                .addDate("reqDate", new Date())
                .toJobParameters();

        jobLauncher.run(job,jobParameters);
    }
}

원래는 SpringBoot에서 기본적으로 job을 다 실행시켜버리니까

---
spring:
  config:
    activate:
      on-profile: mysql
  datasource:
    hikari:
      jdbc-url: jdbc:mysql://localhost:3306/springbatch?useUnicode=true&characterEncoding=utf8
      username: root
      password: 1234
      driver-class-name: com.mysql.jdbc.Driver
  batch:
    jdbc:
      initialize-schema: always
    job:
      enabled: false

job.enabled = false 를 통해서 ApplicationRunner의 구현체를 통해 실행시킬 수 있도록한다.

 


JobParameter

1. 기본 개념

Job launcher가 Job을 실행할 때 함께 포함되어 사용되는 파라미터를 가진 도메인 객체

하나의 Job에 존재할 수 있는 여러개의 JobInstance를 구분하기 위한 용도 -> 같은 JobParameter는 db에 저장도 되지 않는다.

JobParameters 와 JobInstance 와 1:1 관계

JobParameter는 4가지 타입을 지니고 있고 db에 저장 될때는 각 타입에 맞는 value들을 들고 있게 된다.

 

 

2. 생성 및 바인딩

앱 실행시 주입 = Java -jar LogBatch.jar requestDate=20210101

코드로 생성 = JobParameterBuilder, DefaultJobParametersConverter

SpEL 이용 = @Value("#{jobParameter[requestDate]}"), @JobScope , @StepScope 선언 필수

 

3. BATCH_JOB_EXECUTION_PARAM 테이블가 매핑 , JOB_EXECUTION 과 1:M 관계

 

@Component
public class JobParameterTest implements ApplicationRunner {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    @Override
    public void run(ApplicationArguments args) throws Exception {

        JobParameters jobParameters = new JobParametersBuilder().addString("name", "user1")
                .addLong("seq", 1L)
                .addDate("date", new Date())
                .toJobParameters();

        jobLauncher.run(job, jobParameters);
    }
}
@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .tasklet(new Tasklet() {
                @Override
                public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

                    JobParameters jobParameters = contribution.getStepExecution().getJobParameters();
                    String name = jobParameters.getString("name");
                    long seq = jobParameters.getLong("seq");
                    Date date = jobParameters.getDate("date");

                    System.out.println("===========================");
                    System.out.println("name:" + name);
                    System.out.println("seq: " + seq);
                    System.out.println("date: " + date);
                    System.out.println("===========================");

                    Map<String, Object> jobParameters2 = chunkContext.getStepContext().getJobParameters();
                    String name2 = (String)jobParameters2.get("name");
                    long seq2 = (long)jobParameters2.get("seq");

                    System.out.println("step1 has executed");
                    return RepeatStatus.FINISHED;
                }
            })
            .build();
}

jobLauncher를 통해 job을 실행시켜 하나의 job instance가 생성되기 위해 넣은 parameter를 step 에서 꺼내 쓸 수 있다.

StepContirbution , ChunkContext에서 꺼내 쓸 수 있다.  JobParamters , Map<String ,Object>를 반환하는 차이가 있다.

 


JobExecution

1. 기본개념

JobInstance에 대한 한번의 시도를 의미하는 객체로서 Job 실행중에 발생한 정보들을 저장하고 있는 객체
시작,종료,상태,종료상태의 속성을 가진다.

JobInstance와의 관계

JobExecution 은 FAILED ,COMPLETED 등의 Job의 실행 결과 상태를 가지고 있다.

JobExecution 은 COMPLETED 면 JobInstance 실행이 완료된것으로 간주해서 재 실행이 불가능함

JobExecution 이 FAILED 이면 재실행이 가능하다. JobParameter가 동일할 지라도 계속 실행할 수 있다.

COMPLETED 가 될때까지 하나의 JobInstance에 여러번의 시도가 생길 수 있음

 

2. BATCH_JOB_EXECUTION과 매핑 = JobInstance 와 JobExecution 은 1:M의 관계로서 JobInstance 에 대한 성공,실패의 내역을가지고 있음

기존에 실행했던 경우에는 Status 가 Failed인 경우에만 다시 같은 JobParameter+Job = JobInstance로 재실행 시킬 수 있다.

계속 Failed 하면 계속 JobExecution이 생성되어 db에 쌓이게 된다.

 


Step

1. 기본 개념

Batch job을 구성하는 독립적인 하나의 단계로서 실제 배치 처리를 정의하고 컨트롤하는데 필요한 모든 정보를 가지고 있는 도메인 객체

단순한 단일 태스크 뿐 아니라 입력과 처리 그리고 출력과 관련된 복잡한 비지니스 로직을 포함하는 모든 설정들을 담고 있다.

배치작업을 어떻게 구성하고 실행할 것인지 Job의 세부작업을 Task 기반으로 설정하고 명세해 놓은 객체

모든 Job은 하나 이상의 step으로 구성됨

 

2. 기본 구현체

TaskletStep = 가장 기본이 되는 클래스로 서 Tasklet 타입의 구현체를 제어한다.

PartitionStep = 멀티스레드 방식으로 Step을 여러개로 분리해서 실행한다.

JobStep = Step 내에서 Job을 실행하도록 한다. chain처럼 job -> step -> job -> step 이런식으로 구성이 된다.

FlowStep = Step 내에서 Flow를 실행하도록 한다.

 

Job은 여러 Step으로 구성되고 각 Step은 Tasklet으로 실제 해야할일들이 정의되어 있다.

 


StepExecution

1. 기본 개념

Step 에 대한 한번의 시도를 의미하는 객체로서 Step 실행 중에 발생한 정보들을 저장하고 있는 객체

- 시작 , 종료 ,상태 ,commit count, rollback count등의 속성을 가짐

Step이 매번 시도될 때마다 생성되며 각 Step 별로 생성된다.

Job이 재시작 하더라도 이미 성공적으로 완료된 Step은 재 실행되지 않고 실패한 Step 만 실행된다.

이전단계 Step이 실패해서 현재 Step을 실행하지 않았다면 StepExecution을 생성하지 않는다. Step이 실제로 시작됐을 때만 StepExecution을 생성한다.

JobExecution과의 관계

- Step 의 StepExecution이 모두 정상적으로 완료 되어야 JobExecution이 정상적으로 완료된다.

- Step의 StepExecution중 하나라도 실패하면 JobExecution은 실패한다.

 

2. BATCH_STEP_EXECUTION 테이블과 맵핑

JobExecution와 StepExecution 은 1:M 의 관계

하나의 Job에 여러 개의 Step으로 구성했을 경우 각 StepExecution은 하나의 JobExecution을 부모로 가진다.

 

JobExecution의 step 중 하나가 실패하면 JobExecution도 Failed가 된다.

Job + JobParameter = JobInstance 생성

JobInstance 실행 -> JobExecution 생성 -> StepExecution들이 생성된다. 

StepExecution에서 FAILED가 나면 부모인 JobExecution도 FAILED 된다.

FAILED이니 성공할때까지 JobInstance를 실행함 -> 실행할떄마다 JobExecution이 생성이 되는거다. 자식인 StepExecution도 생성이 됨

StepExecution id 3 이 성공했는데도 불구하고 그림상엔 5로 다시 생성되었는데 이것은 틀린것이다. 3으로 그대로 남아있어야함

 


StepContribution

 

1. 기본개념

청크 프로세스의 변경 사항을 버퍼링 한 후 StepExecution 상태를 업데이트하는 도메인 객체

- 여기서 청크 프로세스는 tasklet이라고 생각해도 무방하다. 

청크 커밋 직전에 StepExecution 의 apply 메서드를 호출하여 상태를 업데이트 함

ExitStatus 의 기본 종료코드의 외 사용자 정의 종료코드를 생성해서 적용 할 수 있음

 

stepExecution의 여러 값들을 저장해서 가지고 있다가 그값을 적용시키는 역할을 하고 있다.

 


ExecutionContext

 

1. 기본 개념

프레임워크에서 유지 및 관리하는 키/값으로 된 컬렉션으로 StepExecution 또는 JobExecution 객체의 상태를 저장하는 공유 객체 

DB에 직렬화한 값으로 저장됨 - {"key" :"value"}

공유범위

- Step 범위 = 각 Step 의 StepExecution 에 저장되며 Step 간 서로 공유 안됨

- Job 범위 = 각 Job의 JobExecution에 저장되며 Job 간 서로 공유 안되며 해당 Job의 Step 간 서로 공유됨

Job 재 시작시 이미 처리한 Row 데이터는 건너뛰고 이후로 수행하도록 할 때 상태 정보를 활용한다.

 

@Component
public class ExecutionContextTasklet1 implements Tasklet {

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

        ExecutionContext jobExecutionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
        ExecutionContext stepExecutionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();

        String jobName = chunkContext.getStepContext().getStepExecution().getJobExecution().getJobInstance().getJobName();
        String stepName = chunkContext.getStepContext().getStepExecution().getStepName();

        if(jobExecutionContext.get("jobName") == null){
            jobExecutionContext.put("jobName", jobName);
        }
        if(stepExecutionContext.get("stepName") == null) {
            stepExecutionContext.put("stepName", stepName);
        }

        System.out.println("jobName: " + jobExecutionContext.get("jobName"));
        System.out.println("stepName: " + stepExecutionContext.get("stepName"));

        return RepeatStatus.FINISHED;
    }
}
@Component
public class ExecutionContextTasklet2 implements Tasklet {

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

        ExecutionContext jobExecutionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
        ExecutionContext stepExecutionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();

        System.out.println("jobName: " + jobExecutionContext.get("jobName"));
        System.out.println("stepName: " + stepExecutionContext.get("stepName"));

        String stepName = chunkContext.getStepContext().getStepExecution().getStepName();
        if(stepExecutionContext.get("stepName") == null) {
            stepExecutionContext.put("stepName", stepName);
        }
        return RepeatStatus.FINISHED;
    }
}

JobExecution에서는 Step까지 공유가 되지만 StepExecution은 step끼리 공유가 되지않는다.

Tasklet1 에서 저장했던 jobname은 꺼내올수가 있지만 stepName은 꺼낼 수가 없다.

 

@Component
public class ExecutionContextTasklet3 implements Tasklet {

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

        Object name = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("name");

        if(name == null){
            chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().put("name", "user1");
            throw new RuntimeException("step has failed");
        }

        return RepeatStatus.FINISHED;
    }
}

첫시도때는 tasklet3는 실패하게 된다. name이 없을 테니까 하지만 첫번째 시도에서 JobExecutionContext에 저장을 하기 때문에 db에도 저장이 되게 된다.

두번째 시도에는 name이 null이 아니게 된다.

실패했던 step3부터 재시작 된것을 확인할 수 있다. 같은 JobInstance이지만 2번째 실행이니 다른 JobExecution을 가지고 있다.

 


JobRepository

 

1.기본개념

배치작업 중의 정보를 저장하는 저장소 역할

Job이 언제 수행되었고 , 언제 끝났으며 , 몇 번이 실행되었고 실행에 대한 결과등의 배치작업의 수행과 관련된 모든 meta data를 저장함

JobLauncher , Job ,Step 구현체 내부에서 CRUD기능을 처리함

 

JobRepository 설정

@EnableBatchProcessing 어노테이션만 선언하면 JobRepository 가 자동으로 빈으로 생성됨

BatchConfigurer 인터페이스를 구현하거나 BasicBatchConfigurer를 상속해 JobRepository 설정을 커스터마이징 할 수 있다.

 

JDBC 방식으로 설정 - JobRepositoryFactoryBean

- 내부적으로 AOP기술을 통해 트랜잭션 처리를 해주고 있음
- 트랜잭션 isolation의 기본값은 serializable 로 최고 수준 , 다른 레벨로 지정할 수 있음
- 메타테이블의 Table Prefix 를 변경할 수 있음, 기본 값은 "BATCH_" 임

In Memory 방식으로 설정 - MapJobRepositoryFactoryBean

- 성능 등의 이유로 도메인 오브젝트를 굳이 데이터베이스에 저장하고 싶지 않을 경우
- 보통 Test나 프로토타입의 빠른 개발이 필요할 때 사용

public class SimpleJobRepository implements JobRepository {

   private static final Log logger = LogFactory.getLog(SimpleJobRepository.class);

   private JobInstanceDao jobInstanceDao;

   private JobExecutionDao jobExecutionDao;

   private StepExecutionDao stepExecutionDao;

여러 Dao를 사용해서 실제 db 에 정보들을 저장하고 조작하게 된다.

 

@Configuration
public class CustomBatchConfigurer extends BasicBatchConfigurer {

    private final DataSource dataSource;

    protected CustomBatchConfigurer(BatchProperties properties, DataSource dataSource, TransactionManagerCustomizers transactionManagerCustomizers) {
        super(properties, dataSource, transactionManagerCustomizers);
        this.dataSource = dataSource;
    }

    @Override
    protected JobRepository createJobRepository() throws Exception {

        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource);
        factory.setTransactionManager(getTransactionManager());
        factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE"); // isolation 수준, 기본값은 “ISOLATION_SERIALIZABLE”
        factory.setTablePrefix("BATCH_");  // 테이블 Prefix, 기본값은 “BATCH_”,
                                            // BATCH_JOB_EXECUTION 가 SYSTEM_JOB_EXECUTION 으로 변경됨
                                            // 실제 테이블명이 변경되는 것은 아니다
        factory.setMaxVarCharLength(1000); // varchar 최대 길이(기본값 2500)

        return factory.getObject(); // Proxy 객체가 생성됨 (트랜잭션 Advice 적용 등을 위해 AOP 기술 적용)

    }
}

spring batch에서 기본적으로 쓰는  Repository layer를 custom하게 수정할 수 있다.

 


JobLauncher

 

1.기본개념

배치 Job을 실행시키는 역할을 한다.

Job과 Job Parameters 를 인자로 받으며 요청된 배치 작업을 수행한 후 최종 client에게 JobExecution을 반환함

스프링부트 배치가 구동이 되면 JobLauncher 빈이 자동생성이 된다.

 

Job 실행

- JobLauncher.run(Job, JobParameters)

- 스프링 부트 배치에서는 JobLauncherApplicationRunner 가 자동적으로 JobLauncher를 실행시킨다.

- 동기적 실행

ㅡ taskExecutor를 SyncTaskExecutor로 설정할 경우 (기본값은 SyncTaskExecutor)

ㅡ JobExecution을 획득하고 배치 처리를 완료한 이후 Client에게 JobExecution을 반환

ㅡ 스케줄러에 의해 배치처리에 적합 함 = 배치처리시간이 길어도 상관없는 경우

- 비 동기적 실행

ㅡ taskExecutor 가 SimpleAsyncTaskExecutor 로 설정할 경우

ㅡ JobExecution 을 획득한 후 Client 에게 바로 JobExecution 을 반환하고 배치처리를 완료한다.

ㅡ Http 요청에 의한 배치처리에 적합함 = 배치처리 시간이 길 경우 응답이 늦어지지 않도록 함

 

 

 

728x90

'WEB > Spring Batch' 카테고리의 다른 글

스프링 배치 실행 - Flow  (0) 2023.03.03
스프링 배치 실행 - Step  (0) 2023.02.28
스프링 배치 실행 - Job  (0) 2023.02.23
스프링 배치 소개 + 시작  (0) 2023.02.16
java brains) Spring Batch  (0) 2021.05.29