Spring Batch Listener 기본개념
- Listener는 배치 흐름 중에 Job , Step , Chunk 단계의 실행 전후에 발생하는 이벤트를 받아 용도에 맞게 활용할 수 있도록 제공하는 인터셉터 개념의 클래스.
- 각 단계별로 로그기록을 남기거나 소요된 시간을 계산하거나 실행상태 정보들을 참조 및 조회 할 수 있다.
- 이벤트를 받기 위해서는 Listener 를 등록해야 하며 등록은 API 설정에서 각 단계별로 지정 할 수 있다.
- Job
- JobExecutionListener – Job 실행 전후
- Step
- StepExecutionListener – Step 실행 전후
ChunkListener – Chunk 실행 전후 (Tasklet 실행 전후) , 오류 시점
ItemReadListener – ItemReader 실행 전후, 오류 시점, item 이 null 일 경우 호출 안됨
ItemProcessListener – ItemProcessor 실행 전후, 오류 시점, item 이 null 일 경우 호출 안됨
ItemWriteListener – ItemWriter 실행 전후, 오류 시점, item 이 null 일 경우 호출 안됨
- StepExecutionListener – Step 실행 전후
- SkipListener – 읽기, 쓰기, 처리 Skip 실행 시점, Item 처리가 Skip 될 경우 Skip 된 item을 추적함
RetryListener – Retry 시작, 종료, 에러 시점
Listener를 등록해서 이벤트를 받는 과정은 간단하다.
위에것이 @Before 아래가 @After로 매칭된다. 많은 과정에서 리스너들을 전후로 등록이 가능하다.
JobExecutionListener / StepExecutionListener
JobExecutionListener
- Job의 성공 여부와 상관없이 호출된다.
- 성공 / 실패 여부는 JobExecution 을 통해 알 수 있다.
StepExecutionListener
- Step의 성공 여부와 상관없고 성공 / 실패 여부는 StepExecution을 통해 알 수 있다.
public class AnnotationCustomStepListener {
@BeforeStep
public void beforeStep(StepExecution stepExecution){
System.out.println("@stepExecution.getStepName() : " + stepExecution.getStepName());
}
@AfterStep
public void afterStep(StepExecution stepExecution){
System.out.println("@stepExecution.getStatus() : " + stepExecution.getStatus());
}
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return RepeatStatus.FINISHED;
}
})
.listener(new AnnotationCustomStepListener())
.build();
}
annotation 기반으로 간단히 작성하여 원하는 step안에 넣어서 동작하게 할 수 있다.
ChunkListener / ItemReadListener /ItemProcessListener /ItemWriteListener
ChunkListener 를 사용할떄는 @BeforeChunk ,@AfterChunk , @AfterChunkError 이것을 통해 사용
나머지는 interface구현해서 override해서 사용가능
SkipListener & RetryListener
SkipListener
예외가 발생하면 각각 onSkipRead , onSkipInProcess , onSkipInWrite 리스너의 메소드를 통해 사용자가 지정한 작업들을 수행하게 된다.
RetryListener
open = 재시도 전 매번 호출 , false 를 반환할 경우 retry를 시도하지 않음
close = 재시도 후 매번 호출
onError = 재시도 실패시 마다 호출
Spring Batch Test
@SpringBatchTest
- 자동으로 ApplicationContext에 테스트에 필요한 여러 유틸 Bean을 등록해주는 어노테이션
- JobLauncherTestUtils
- launchJob(), launchStep() 과 같은 스프링 배치 테스트에 필요한 유틸성 메소드 지원
- JobRepositoryTestUtils
- JobRepository를 사용해서 JobExecution을 생성 및 삭제 기능 메소드 지원
- StepScopeTestExecutionListener
- @StepScope 컨텍스트를 생성해 주며 해당 컨텍스트를 통해 JobParameter 등을 단위 테스트에서 DI 받을 수 있다.
- JobScopeTestExecutionListener
- @JobScope 컨텍스트를 생성해 주며 해당 컨텍스트를 통해 JobParameter등을 단위 테스트에서 DI 받을 수 있다.
- JobLauncherTestUtils
@SpringBatchTest
@RunWith(SpringRunner.class)
@SpringBootTest(classes={SimpleJobConfiguration.class, TestBatchConfig.class})
public class SimpleJobTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@Test
public void simple_job_테스트() throws Exception {
// given
JobParameters jobParameters = new JobParametersBuilder()
.addString("requestDate", "20020101")
.addLong("date", new Date().getTime())
.toJobParameters();
// when
// JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
JobExecution jobExecution = jobLauncherTestUtils.launchStep("step1");
// then
Assert.assertEquals(jobExecution.getStatus(), BatchStatus.COMPLETED);
StepExecution stepExecution = (StepExecution)((List) jobExecution.getStepExecutions()).get(0);
Assert.assertEquals(stepExecution.getCommitCount(), 11);
Assert.assertEquals(stepExecution.getWriteCount(), 1000);
Assert.assertEquals(stepExecution.getWriteCount(), 1000);
}
@After
public void clear() throws Exception {
jdbcTemplate.execute("delete from customer2");
}
}
Job , Step 을 개별적으로 테스트를 진행시킬 수 있다.
JobExplorer / JobRegistry / JobOperator
JobExplorer
- JobRepository 의 readonly 버전
- 실행중인 Job의 실행정보인 JobExecution 또는 Step의 실행정보인 StepExecution을 조회할 수 있다.
JobRegistry
- 생성된 Job을 자동으로 등록, 추적 및 관리하며 여러 곳에서 Job을 생성한 경우 ApplicationContext에서 Job을 수집해서 사용할 수 있다.
- 기본구현체로 map 기반의 MapJobRegistry 클래스를 제공한다.
- jobName을 Key로 하고 job을 값으로 하여 매핑한다.
- Job등록
- JobRegistryBeanPostProcessor - BeanPostProcessor 단계에서 bean 초기화 시 자동으로 JobRegistry 에 Job을 등록 시켜준다.
Job Operator
- JobExplorer , JobRepository, JobRegistry , JobLauncher 를 포함하고 있으며 배치의 중단 , 재시작 , job 요약 등의 모니터링이 가능하다.
- 기본 구현체로 SimpleJobOperator 클래스를 제공한다.
@RestController
public class JobController {
@Autowired
private JobRegistry jobRegistry;
@Autowired
private JobOperator jobOperator;
@Autowired
private JobExplorer jobExplorer;
@PostMapping(value = "/batch/start")
public String start(@RequestBody JobInfo jobInfo) throws Exception {
for(Iterator<String> iterator = jobRegistry.getJobNames().iterator(); iterator.hasNext();){
SimpleJob job = (SimpleJob)jobRegistry.getJob(iterator.next());
System.out.println("job name: " + job.getName());
jobOperator.start(job.getName(), "id=" + jobInfo.getId());
}
return "batch is started";
}
@PostMapping(value = "/batch/restart")
public String restart() throws Exception {
for(Iterator<String> iterator = jobRegistry.getJobNames().iterator(); iterator.hasNext();){
SimpleJob job = (SimpleJob)jobRegistry.getJob(iterator.next());
System.out.println("job name: " + job.getName());
JobInstance lastJobInstance = jobExplorer.getLastJobInstance(job.getName());
JobExecution lastJobExecution = jobExplorer.getLastJobExecution(lastJobInstance);
jobOperator.restart(lastJobExecution.getId());
}
return "batch is restarted";
}
@PostMapping(value = "/batch/stop")
public String stop() throws Exception {
for(Iterator<String> iterator = jobRegistry.getJobNames().iterator(); iterator.hasNext();){
SimpleJob job = (SimpleJob)jobRegistry.getJob(iterator.next());
System.out.println("job name: " + job.getName());
Set<JobExecution> runningJobExecutions = jobExplorer.findRunningJobExecutions(job.getName());
JobExecution jobExecution = runningJobExecutions.iterator().next();
jobOperator.stop(jobExecution.getId());
}
return "batch is stopped";
}
}
JObRegistry를 통해 기존에 실행했었던 job을 가져오게 되고 JobExplorer를 통해 해당 Job의 상태값들을 가지고 온후에 실행할지 말지 판단을 한후에
JobOperator 를 통해 실제 원하는 실행(stop ,start) 같은 것을 하게 된다.
한번 성공했던 Job안의 step은 다시 또 실행이 안되니까 restart 하더라도 가장 마지막 JobExecution을 가져와서 (왜냐하면 가장 마지막이 실패했다고 가정함 , 또한 그냥 getJobInstance도 있지만getLastJobInstance가 따로 있는것을 보면 대부분 마지막 놈이 실패하는 경우가 많은 것 같음)
step1까지 성공하고 step2를 실행 못시켰을 경우 (step1 실행도중에 stop api를 통해 멈추게 한 경우) step2부터 다시 실행이 되게 된다.
'WEB > Spring Batch' 카테고리의 다른 글
실전! 스프링 배치 어플리케이션 개발 (0) | 2023.04.11 |
---|---|
스프링 배치 멀티 스레드 프로세싱 (0) | 2023.03.30 |
스프링 배치 반복 및 오류 제어 (1) | 2023.03.25 |
스프링 배치 청크 프로세스 활용 - ItemWriter , ItemProcessor (0) | 2023.03.24 |
스프링 배치 청크 프로세스 활용 - ItemReader (0) | 2023.03.23 |