애플리케이션 예제
Job-1
- 기능
- 파일로부터 데이터를 읽어서 DB에 적재한다.
- 세부내용
- 파일은 매일 새롭게 생성된다.
- 매일 정해진 시간에 파일을 로드하고 데이터를 DB에 업데이트 한다.
- 이미 처리한 파일은 다시 읽지 않도록 한다.
Job-2
- 기능
- DB 로부터 데이터를 일어서 API 서버와 통신한다
- 내용
- Partitioning 기능을 통한 멀티 스레드 구조로 Chunk 기반 프로세스를 구현한다
- 제품의 유형에 따라서 서로 다른 API 통신을 하도록 구성한다(ClassifierCompositerItemWriter)
- API 서버는 3개로 구성하여 요청을 처리한다
- 제품내용과 API 통신 결과를 각 파일로 저장한다. (FlatFileWriter 상속)
Scheduler
- 기능
- 시간을 설정하면 프로그램을 가동시킨다.
- 내용
- 정해진 시간에 주기적으로 Job-1 과 Job-2 를 실행시킨다
- Quatz 오픈 소스를 활용한다
job1은 단순히 파일을 일겅서 db에 적재하는 과정이고
job2는 멀티 스레드로 각각의 db data를 chunk로 읽어들어와서 itemWriter를 각 파일의 유형에 따라서 api 서버들을 호출하고 처리한 후에 File에 적게된다.
어플리케이션 예제 (2)
@Configuration
@RequiredArgsConstructor
public class FileJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job fileJob() {
return jobBuilderFactory.get("fileJob")
.start(fileStep1())
.build();
}
@Bean
public Step fileStep1() {
return stepBuilderFactory.get("fileStep1")
.<ProductVO, Product>chunk(10)
.reader(fileItemReader(null))
.processor(fileItemProcessor())
.writer(fileItemWriter())
.build();
}
@Bean
@StepScope
public FlatFileItemReader<ProductVO> fileItemReader(@Value("#{jobParameters['requestDate']}") String requestDate) {
return new FlatFileItemReaderBuilder<ProductVO>()
.name("flatFile")
.resource(new ClassPathResource("product_" + requestDate +".csv"))
.fieldSetMapper(new BeanWrapperFieldSetMapper<>())
.targetType(ProductVO.class)
.linesToSkip(1)
.delimited().delimiter(",")
.names("id","name","price","type")
.build();
}
@Bean
public ItemProcessor<ProductVO, Product> fileItemProcessor() {
return new FileItemProcessor();
}
@Bean
public JpaItemWriter<Product> fileItemWriter() {
return new JpaItemWriterBuilder<Product>()
.entityManagerFactory(entityManagerFactory)
.usePersist(true)
.build();
}
}
@StepScope을 이용해서 런타임에 resource의 parameter 이름을 바인딩 하게 함
(program argument 로 requestDate=20210101) 이런식으로
csv를 읽을꺼니 delimited로 읽게 함
ItemPrcoessor에서는 ProductVo -> Product로 맵핑하는 역할 만 수행함
어플리케이션 예제 (3)
@Configuration
@RequiredArgsConstructor
public class SendJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final ApiStartTasklet apiStartTasklet;
private final ApiEndTasklet apiEndTasklet;
private final Step jobStep;
@Bean
public Job apiJob() throws Exception {
return jobBuilderFactory.get("apiJob")
.incrementer(new RunIdIncrementer())
.listener(new JobListener())
.start(apiStep1())
.next(jobStep)
.next(apiStep2())
.build();
}
@Bean
public Step apiStep1() throws Exception {
return stepBuilderFactory.get("apiStep")
.tasklet(apiStartTasklet)
.build();
}
@Bean
public Step apiStep2() throws Exception {
return stepBuilderFactory.get("apiStep2")
.tasklet(apiEndTasklet)
.build();
}
}
위그림에서 job2에 해당하는 job중에 부모에 해당하는 job이다. API서버와의 통신의 시작과 끝을 로깅하는 용도
next에 주입받은 jobStep을 실행시키게된다.
@Configuration
@RequiredArgsConstructor
public class SendChildJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final Step apiMasterStep;
private final JobLauncher jobLauncher;
@Bean
public Step jobStep() throws Exception {
return stepBuilderFactory.get("jobStep")
.job(childJob())
.launcher(jobLauncher)
.build();
}
@Bean
public Job childJob() throws Exception {
return jobBuilderFactory.get("childJob")
.start(apiMasterStep)
.build();
}
}
해당 jobStep은 step안에 job이 있는 형태 (job -> step ->job)
partition을 사용하기위해 apiMasterStep을 호출하여 apichildStep을 각 스레드에게 분배하게 된다.
어플리케이션 (4)
@Configuration
@RequiredArgsConstructor
public class ApiStepConfiguration {
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
private int chunkSize = 10;
@Bean
public Step apiMasterStep() throws Exception {
ProductVO[] productList = QueryGenerator.getProductList(dataSource);
return stepBuilderFactory.get("apiMasterStep")
.partitioner(apiSlaveStep().getName(), partitioner())
.step(apiSlaveStep())
.gridSize(productList.length)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public TaskExecutor taskExecutor(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(3);
taskExecutor.setMaxPoolSize(6);
taskExecutor.setThreadNamePrefix("api-thread-");
return taskExecutor;
}
@Bean
public Step apiSlaveStep() throws Exception {
return stepBuilderFactory.get("apiSlaveStep")
.<ProductVO, ApiRequestVO>chunk(chunkSize)
.reader(itemReader(null))
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public ProductPartitioner partitioner() {
ProductPartitioner productPartitioner = new ProductPartitioner();
productPartitioner.setDataSource(dataSource);
return productPartitioner;
}
@Bean
@StepScope
public ItemReader<ProductVO> itemReader(@Value("#{stepExecutionContext['product']}") ProductVO productVO) throws Exception {
JdbcPagingItemReader<ProductVO> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setPageSize(chunkSize);
reader.setRowMapper(new BeanPropertyRowMapper<>(ProductVO.class));
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, name, price, type");
queryProvider.setFromClause("from product");
queryProvider.setWhereClause("where type = :type");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.DESCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setParameterValues(QueryGenerator.getParameterForQuery("type", productVO.getType()));
reader.setQueryProvider(queryProvider);
reader.afterPropertiesSet();
return reader;
}
@Bean
public ItemProcessor<ProductVO, ApiRequestVO> itemProcessor() {
ClassifierCompositeItemProcessor<ProductVO, ApiRequestVO> processor = new ClassifierCompositeItemProcessor<>();
ProcessorClassifier<ProductVO, ItemProcessor<?, ? extends ApiRequestVO>> classifier = new ProcessorClassifier();
Map<String, ItemProcessor<ProductVO, ApiRequestVO>> processorMap = new HashMap<>();
processorMap.put("1", new ApiItemProcessor1());
processorMap.put("2", new ApiItemProcessor2());
processorMap.put("3", new ApiItemProcessor3());
classifier.setProcessorMap(processorMap);
processor.setClassifier(classifier);
return processor;
}
@Bean
public ItemWriter<ApiRequestVO> itemWriter() {
ClassifierCompositeItemWriter<ApiRequestVO> writer = new ClassifierCompositeItemWriter<>();
WriterClassifier<ApiRequestVO, ItemWriter<? super ApiRequestVO>> classifier = new WriterClassifier();
Map<String, ItemWriter<ApiRequestVO>> writerMap = new HashMap<>();
writerMap.put("1", new ApiItemWriter1(new ApiService1()));
writerMap.put("2", new ApiItemWriter2(new ApiService2()));
writerMap.put("3", new ApiItemWriter3(new ApiService3()));
classifier.setWriterMap(writerMap);
writer.setClassifier(classifier);
return writer;
}
}
itemReader에서 부터 taskExecutor가 제공해주는다른 스레드에서 동작하게 된다.
Partition에서 알맞은 크기로 쪼개서 읽어오게된다
ProductVO type 필드에 따라 다른 ApiItemPrcoessor 가 적용이 된다.
writer도 동일하게 ApiRequestVO type필드의 값에 따라 ApiItemWriter가 달라진다.
어플리케이션 예제 (6)
스케줄러 Quartz
API호출 예제
@Component
public class ApiJobRunner extends JobRunner {
@Autowired
private Scheduler scheduler;
@Override
protected void doRun(ApplicationArguments args) {
JobDetail jobDetail = buildJobDetail(ApiSchJob.class, "apiJob", "batch", new HashMap());
Trigger trigger = buildJobTrigger("0/30 * * * * ?");
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
}
}
}
@Component
@Slf4j
public class ApiSchJob extends QuartzJobBean{
@Autowired
private Job apiJob;
@Autowired
private JobLauncher jobLauncher;
@SneakyThrows
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("id", new Date().getTime())
.toJobParameters();
jobLauncher.run(apiJob, jobParameters);
}
}
어플리케이션 예제 (7)
File에 다시 write하는 예제 + Quartz
@Component
public class FileJobRunner extends JobRunner {
@Autowired
private Scheduler scheduler;
@Override
protected void doRun(ApplicationArguments args) {
String[] sourceArgs = args.getSourceArgs();
JobDetail jobDetail = buildJobDetail(FileSchJob.class, "fileJob", "batch", new HashMap());
Trigger trigger = buildJobTrigger("0/50 * * * * ?");
jobDetail.getJobDataMap().put("requestDate", sourceArgs[0]);
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
}
}
}
@Component
@Slf4j
public class FileSchJob extends QuartzJobBean{
@Autowired
private Job fileJob;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobExplorer jobExplorer;
@SneakyThrows
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
String requestDate = (String)context.getJobDetail().getJobDataMap().get("requestDate");
JobParameters jobParameters = new JobParametersBuilder()
.addLong("id", new Date().getTime())
.addString("requestDate", requestDate)
.toJobParameters();
int jobInstanceCount = jobExplorer.getJobInstanceCount(fileJob.getName());
List<JobInstance> jobInstances = jobExplorer.getJobInstances(fileJob.getName(), 0, jobInstanceCount);
if(jobInstances.size() > 0) {
for(JobInstance jobInstance : jobInstances){
List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
List<JobExecution> jobExecutionList = jobExecutions.stream().filter(jobExecution ->
jobExecution.getJobParameters().getString("requestDate").equals(requestDate))
.collect(Collectors.toList());
if (jobExecutionList.size() > 0) {
throw new JobExecutionException(requestDate + " already exists");
}
}
}
jobLauncher.run(fileJob, jobParameters);
}
}
jobinstance에서 이미 실행되었던 jobexecution을 requestDate를 통해서 구분을 하게되고 만약 존재한다면 launcher를 실행시키지 않는다.
'WEB > Spring Batch' 카테고리의 다른 글
스프링 배치 이벤트 리스너 & 스프링 배치 테스트 및 운영 (0) | 2023.04.04 |
---|---|
스프링 배치 멀티 스레드 프로세싱 (0) | 2023.03.30 |
스프링 배치 반복 및 오류 제어 (1) | 2023.03.25 |
스프링 배치 청크 프로세스 활용 - ItemWriter , ItemProcessor (0) | 2023.03.24 |
스프링 배치 청크 프로세스 활용 - ItemReader (0) | 2023.03.23 |