WEB/Spring Batch

스프링 배치 이벤트 리스너 & 스프링 배치 테스트 및 운영

Tony Lim 2023. 4. 4. 19:33

Spring Batch Listener 기본개념

  • Listener는 배치 흐름 중에 Job , Step , Chunk 단계의 실행 전후에 발생하는 이벤트를 받아 용도에 맞게 활용할 수 있도록 제공하는 인터셉터 개념의 클래스.
  • 각 단계별로 로그기록을 남기거나 소요된 시간을 계산하거나 실행상태 정보들을 참조 및 조회 할 수 있다.
  • 이벤트를 받기 위해서는 Listener 를 등록해야 하며 등록은 API 설정에서 각 단계별로 지정 할 수 있다.


  • Job
    • JobExecutionListener – Job 실행 전후
  • Step
    • StepExecutionListener – Step 실행 전후
      ChunkListener – Chunk 실행 전후 (Tasklet 실행 전후) , 오류 시점
      ItemReadListener – ItemReader 실행 전후, 오류 시점, item 이 null 일 경우 호출 안됨
      ItemProcessListener – ItemProcessor 실행 전후, 오류 시점, item 이 null 일 경우 호출 안됨
      ItemWriteListener – ItemWriter 실행 전후, 오류 시점, item 이 null 일 경우 호출 안됨
  • SkipListener – 읽기, 쓰기, 처리 Skip 실행 시점, Item 처리가 Skip 될 경우 Skip 된 item을 추적함
    RetryListener – Retry 시작, 종료, 에러 시점

Listener를 등록해서 이벤트를 받는 과정은 간단하다.

위에것이 @Before 아래가 @After로 매칭된다. 많은 과정에서 리스너들을 전후로 등록이 가능하다.


JobExecutionListener / StepExecutionListener

JobExecutionListener

  • Job의 성공 여부와 상관없이 호출된다.
  • 성공 / 실패 여부는 JobExecution 을 통해 알 수 있다.

 

StepExecutionListener

  • Step의 성공 여부와 상관없고 성공 / 실패 여부는 StepExecution을 통해 알 수 있다.
public class AnnotationCustomStepListener {

    @BeforeStep
    public void beforeStep(StepExecution stepExecution){
        System.out.println("@stepExecution.getStepName() : " + stepExecution.getStepName());
    }

    @AfterStep
    public void afterStep(StepExecution stepExecution){
        System.out.println("@stepExecution.getStatus() : " + stepExecution.getStatus());
    }
}
@Bean
public Step step2() {
    return stepBuilderFactory.get("step2")
            .tasklet(new Tasklet() {
                @Override
                public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                    return RepeatStatus.FINISHED;
                }
            })
            .listener(new AnnotationCustomStepListener())
            .build();
}

annotation 기반으로 간단히 작성하여 원하는 step안에 넣어서 동작하게 할 수 있다.


ChunkListener / ItemReadListener /ItemProcessListener /ItemWriteListener

ChunkListener 를 사용할떄는 @BeforeChunk ,@AfterChunk , @AfterChunkError 이것을 통해 사용

나머지는 interface구현해서 override해서 사용가능


SkipListener & RetryListener

SkipListener 

예외가 발생하면 각각 onSkipRead , onSkipInProcess , onSkipInWrite 리스너의 메소드를 통해 사용자가 지정한 작업들을 수행하게 된다.

 

RetryListener

open = 재시도 전 매번 호출 , false 를 반환할 경우 retry를 시도하지 않음

close = 재시도 후 매번 호출

onError = 재시도 실패시 마다 호출

 


Spring Batch Test

@SpringBatchTest

  • 자동으로 ApplicationContext에 테스트에 필요한 여러 유틸 Bean을 등록해주는 어노테이션
    • JobLauncherTestUtils
      • launchJob(), launchStep() 과 같은 스프링 배치 테스트에 필요한 유틸성 메소드 지원
    • JobRepositoryTestUtils
      • JobRepository를 사용해서 JobExecution을 생성 및 삭제 기능 메소드 지원
    • StepScopeTestExecutionListener
      • @StepScope 컨텍스트를 생성해 주며 해당 컨텍스트를 통해 JobParameter 등을 단위 테스트에서 DI 받을 수 있다.
    • JobScopeTestExecutionListener
      • @JobScope 컨텍스트를 생성해 주며 해당 컨텍스트를 통해 JobParameter등을 단위 테스트에서 DI 받을 수 있다.
@SpringBatchTest
@RunWith(SpringRunner.class)
@SpringBootTest(classes={SimpleJobConfiguration.class, TestBatchConfig.class})
public class SimpleJobTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Test
    public void simple_job_테스트() throws Exception {

        // given
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("requestDate", "20020101")
                .addLong("date", new Date().getTime())
                .toJobParameters();

        // when
//        JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
        JobExecution jobExecution = jobLauncherTestUtils.launchStep("step1");

        // then
        Assert.assertEquals(jobExecution.getStatus(), BatchStatus.COMPLETED);
        StepExecution stepExecution = (StepExecution)((List) jobExecution.getStepExecutions()).get(0);

        Assert.assertEquals(stepExecution.getCommitCount(), 11);
        Assert.assertEquals(stepExecution.getWriteCount(), 1000);
        Assert.assertEquals(stepExecution.getWriteCount(), 1000);
    }

    @After
    public void clear() throws Exception {
        jdbcTemplate.execute("delete from customer2");
    }
}

Job , Step 을 개별적으로 테스트를 진행시킬 수 있다.


JobExplorer / JobRegistry / JobOperator

JobExplorer

  • JobRepository 의 readonly 버전
  • 실행중인 Job의 실행정보인 JobExecution 또는 Step의 실행정보인 StepExecution을 조회할 수 있다.

JobRegistry

  • 생성된 Job을 자동으로 등록, 추적 및 관리하며 여러 곳에서 Job을 생성한 경우 ApplicationContext에서 Job을 수집해서 사용할 수 있다.
  • 기본구현체로 map 기반의 MapJobRegistry 클래스를 제공한다.
    • jobName을 Key로 하고 job을 값으로 하여 매핑한다.
  • Job등록
    • JobRegistryBeanPostProcessor - BeanPostProcessor 단계에서 bean 초기화 시 자동으로 JobRegistry 에 Job을 등록 시켜준다.

Job Operator

  • JobExplorer , JobRepository, JobRegistry , JobLauncher 를 포함하고 있으며 배치의 중단 , 재시작 , job 요약 등의 모니터링이 가능하다. 
  • 기본 구현체로 SimpleJobOperator 클래스를 제공한다.
@RestController
public class JobController {

   @Autowired
   private JobRegistry jobRegistry;

   @Autowired
   private JobOperator jobOperator;

   @Autowired
   private JobExplorer jobExplorer;

   @PostMapping(value = "/batch/start")
   public String start(@RequestBody JobInfo jobInfo) throws Exception {

      for(Iterator<String> iterator = jobRegistry.getJobNames().iterator(); iterator.hasNext();){

         SimpleJob job = (SimpleJob)jobRegistry.getJob(iterator.next());
         System.out.println("job name: " + job.getName());

         jobOperator.start(job.getName(), "id=" + jobInfo.getId());
      }

      return "batch is started";
   }

   @PostMapping(value = "/batch/restart")
   public String restart() throws Exception {

      for(Iterator<String> iterator = jobRegistry.getJobNames().iterator(); iterator.hasNext();){

         SimpleJob job = (SimpleJob)jobRegistry.getJob(iterator.next());
         System.out.println("job name: " + job.getName());

         JobInstance lastJobInstance = jobExplorer.getLastJobInstance(job.getName());
         JobExecution lastJobExecution = jobExplorer.getLastJobExecution(lastJobInstance);
         jobOperator.restart(lastJobExecution.getId());

      }

      return "batch is restarted";
   }

   @PostMapping(value = "/batch/stop")
   public String stop() throws Exception {

      for(Iterator<String> iterator = jobRegistry.getJobNames().iterator(); iterator.hasNext();){

         SimpleJob job = (SimpleJob)jobRegistry.getJob(iterator.next());
         System.out.println("job name: " + job.getName());

         Set<JobExecution> runningJobExecutions = jobExplorer.findRunningJobExecutions(job.getName());
         JobExecution jobExecution = runningJobExecutions.iterator().next();

         jobOperator.stop(jobExecution.getId());
      }

      return "batch is stopped";
   }
}

JObRegistry를 통해 기존에 실행했었던 job을 가져오게 되고 JobExplorer를 통해 해당 Job의 상태값들을 가지고 온후에 실행할지 말지 판단을 한후에
JobOperator 를 통해 실제 원하는 실행(stop ,start) 같은 것을 하게 된다.

한번 성공했던 Job안의 step은 다시 또 실행이 안되니까 restart 하더라도 가장 마지막 JobExecution을 가져와서 (왜냐하면 가장 마지막이 실패했다고 가정함 , 또한 그냥 getJobInstance도 있지만getLastJobInstance가 따로 있는것을 보면 대부분 마지막 놈이 실패하는 경우가 많은 것 같음)

step1까지 성공하고 step2를 실행 못시켰을 경우 (step1 실행도중에 stop api를 통해 멈추게 한 경우) step2부터 다시 실행이 되게 된다.