WEB/Spring Batch

실전! 스프링 배치 어플리케이션 개발

Tony Lim 2023. 4. 11. 19:23

애플리케이션 예제

Job-1

  • 기능
    • 파일로부터 데이터를 읽어서 DB에 적재한다.
  • 세부내용
    • 파일은 매일 새롭게 생성된다.
    • 매일 정해진 시간에 파일을 로드하고 데이터를 DB에 업데이트 한다.
    • 이미 처리한 파일은 다시 읽지 않도록 한다.

Job-2

  • 기능
    • DB 로부터 데이터를 일어서 API 서버와 통신한다
  • 내용
    • Partitioning 기능을 통한 멀티 스레드 구조로 Chunk 기반 프로세스를 구현한다
    • 제품의 유형에 따라서 서로 다른 API 통신을 하도록 구성한다(ClassifierCompositerItemWriter)
    • API 서버는 3개로 구성하여 요청을 처리한다
    • 제품내용과 API 통신 결과를 각 파일로 저장한다. (FlatFileWriter 상속)

Scheduler

  • 기능
    • 시간을 설정하면 프로그램을 가동시킨다.
  • 내용
    • 정해진 시간에 주기적으로 Job-1 과 Job-2 를 실행시킨다
    • Quatz 오픈 소스를 활용한다

job1은 단순히 파일을 일겅서 db에 적재하는 과정이고 

job2는 멀티 스레드로 각각의 db data를 chunk로 읽어들어와서 itemWriter를 각 파일의 유형에 따라서 api 서버들을 호출하고 처리한 후에 File에 적게된다.


어플리케이션 예제 (2)

@Configuration
@RequiredArgsConstructor
public class FileJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    @Bean
    public Job fileJob() {
        return jobBuilderFactory.get("fileJob")
                .start(fileStep1())
                .build();
    }

    @Bean
    public Step fileStep1() {
        return stepBuilderFactory.get("fileStep1")
                .<ProductVO, Product>chunk(10)
                .reader(fileItemReader(null))
                .processor(fileItemProcessor())
                .writer(fileItemWriter())
                .build();
    }

    @Bean
    @StepScope
    public FlatFileItemReader<ProductVO> fileItemReader(@Value("#{jobParameters['requestDate']}") String requestDate) {
        return new FlatFileItemReaderBuilder<ProductVO>()
                .name("flatFile")
                .resource(new ClassPathResource("product_" + requestDate +".csv"))
                .fieldSetMapper(new BeanWrapperFieldSetMapper<>())
                .targetType(ProductVO.class)
                .linesToSkip(1)
                .delimited().delimiter(",")
                .names("id","name","price","type")
                .build();
    }

    @Bean
    public ItemProcessor<ProductVO, Product> fileItemProcessor() {
        return new FileItemProcessor();
    }

    @Bean
    public JpaItemWriter<Product> fileItemWriter() {
        return new JpaItemWriterBuilder<Product>()
                .entityManagerFactory(entityManagerFactory)
                .usePersist(true)
                .build();
    }
}

@StepScope을 이용해서 런타임에 resource의 parameter 이름을 바인딩 하게 함
(program argument 로 requestDate=20210101) 이런식으로 
csv를 읽을꺼니 delimited로 읽게 함

ItemPrcoessor에서는 ProductVo -> Product로 맵핑하는 역할 만 수행함


어플리케이션 예제 (3)

@Configuration
@RequiredArgsConstructor
public class SendJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final ApiStartTasklet apiStartTasklet;
    private final ApiEndTasklet apiEndTasklet;
    private final Step jobStep;

    @Bean
    public Job apiJob() throws Exception {

        return jobBuilderFactory.get("apiJob")
                .incrementer(new RunIdIncrementer())
                .listener(new JobListener())
                .start(apiStep1())
                .next(jobStep)
                .next(apiStep2())
                .build();
    }

    @Bean
    public Step apiStep1() throws Exception {
        return stepBuilderFactory.get("apiStep")
                .tasklet(apiStartTasklet)
                .build();
    }

    @Bean
    public Step apiStep2() throws Exception {
        return stepBuilderFactory.get("apiStep2")
                .tasklet(apiEndTasklet)
                .build();
    }
}

위그림에서 job2에 해당하는 job중에 부모에 해당하는 job이다. API서버와의 통신의 시작과 끝을 로깅하는 용도

next에 주입받은 jobStep을 실행시키게된다.

@Configuration
@RequiredArgsConstructor
public class SendChildJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final Step apiMasterStep;
    private final JobLauncher jobLauncher;

    @Bean
    public Step jobStep() throws Exception {
        return stepBuilderFactory.get("jobStep")
                .job(childJob())
                .launcher(jobLauncher)
                .build();
    }

    @Bean
    public Job childJob() throws Exception {
        return jobBuilderFactory.get("childJob")
                .start(apiMasterStep)
                .build();
    }
}

해당 jobStep은 step안에 job이 있는 형태 (job -> step ->job)

partition을 사용하기위해 apiMasterStep을 호출하여 apichildStep을 각 스레드에게 분배하게 된다.

 


어플리케이션 (4)

@Configuration
@RequiredArgsConstructor
public class ApiStepConfiguration {

    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    private int chunkSize = 10;

    @Bean
    public Step apiMasterStep() throws Exception {

        ProductVO[] productList = QueryGenerator.getProductList(dataSource);

        return stepBuilderFactory.get("apiMasterStep")
                .partitioner(apiSlaveStep().getName(), partitioner())
                .step(apiSlaveStep())
                .gridSize(productList.length)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor(){
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(3);
        taskExecutor.setMaxPoolSize(6);
        taskExecutor.setThreadNamePrefix("api-thread-");

        return taskExecutor;
    }

    @Bean
    public Step apiSlaveStep() throws Exception {

        return stepBuilderFactory.get("apiSlaveStep")
                .<ProductVO, ApiRequestVO>chunk(chunkSize)
                .reader(itemReader(null))
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public ProductPartitioner partitioner() {
        ProductPartitioner productPartitioner = new ProductPartitioner();
        productPartitioner.setDataSource(dataSource);
        return productPartitioner;
    }

    @Bean
    @StepScope
    public ItemReader<ProductVO> itemReader(@Value("#{stepExecutionContext['product']}") ProductVO productVO) throws Exception {

        JdbcPagingItemReader<ProductVO> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(dataSource);
        reader.setPageSize(chunkSize);
        reader.setRowMapper(new BeanPropertyRowMapper<>(ProductVO.class));

        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, name, price, type");
        queryProvider.setFromClause("from product");
        queryProvider.setWhereClause("where type = :type");

        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.DESCENDING);
        queryProvider.setSortKeys(sortKeys);

        reader.setParameterValues(QueryGenerator.getParameterForQuery("type", productVO.getType()));
        reader.setQueryProvider(queryProvider);
        reader.afterPropertiesSet();

        return reader;
    }

    @Bean
    public ItemProcessor<ProductVO, ApiRequestVO> itemProcessor() {

        ClassifierCompositeItemProcessor<ProductVO, ApiRequestVO> processor = new ClassifierCompositeItemProcessor<>();

        ProcessorClassifier<ProductVO, ItemProcessor<?, ? extends ApiRequestVO>> classifier = new ProcessorClassifier();

        Map<String, ItemProcessor<ProductVO, ApiRequestVO>> processorMap = new HashMap<>();
        processorMap.put("1", new ApiItemProcessor1());
        processorMap.put("2", new ApiItemProcessor2());
        processorMap.put("3", new ApiItemProcessor3());

        classifier.setProcessorMap(processorMap);

        processor.setClassifier(classifier);

        return processor;
    }

    @Bean
    public ItemWriter<ApiRequestVO> itemWriter() {

        ClassifierCompositeItemWriter<ApiRequestVO> writer = new ClassifierCompositeItemWriter<>();

        WriterClassifier<ApiRequestVO, ItemWriter<? super ApiRequestVO>> classifier = new WriterClassifier();

        Map<String, ItemWriter<ApiRequestVO>> writerMap = new HashMap<>();
        writerMap.put("1", new ApiItemWriter1(new ApiService1()));
        writerMap.put("2", new ApiItemWriter2(new ApiService2()));
        writerMap.put("3", new ApiItemWriter3(new ApiService3()));

        classifier.setWriterMap(writerMap);

        writer.setClassifier(classifier);

        return writer;
    }
}

itemReader에서 부터 taskExecutor가 제공해주는다른 스레드에서 동작하게 된다.
Partition에서 알맞은 크기로 쪼개서 읽어오게된다

ProductVO type 필드에 따라 다른 ApiItemPrcoessor 가 적용이 된다.

writer도 동일하게 ApiRequestVO type필드의 값에 따라 ApiItemWriter가 달라진다.


어플리케이션 예제 (6)

스케줄러 Quartz 

API호출 예제

@Component
public class ApiJobRunner extends JobRunner {

    @Autowired
    private Scheduler scheduler;

    @Override
    protected void doRun(ApplicationArguments args) {

        JobDetail jobDetail = buildJobDetail(ApiSchJob.class, "apiJob", "batch", new HashMap());
        Trigger trigger = buildJobTrigger("0/30 * * * * ?");

        try {
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }
}
@Component
@Slf4j
public class ApiSchJob extends QuartzJobBean{

   @Autowired
   private Job apiJob;

   @Autowired
   private JobLauncher jobLauncher;

   @SneakyThrows
   @Override
   protected void executeInternal(JobExecutionContext context) throws JobExecutionException {

      JobParameters jobParameters = new JobParametersBuilder()
                           .addLong("id", new Date().getTime())
                           .toJobParameters();
      jobLauncher.run(apiJob, jobParameters);
   }
}

어플리케이션 예제 (7)

File에 다시 write하는 예제 + Quartz

@Component
public class FileJobRunner extends JobRunner {

    @Autowired
    private Scheduler scheduler;

    @Override
    protected void doRun(ApplicationArguments args) {

        String[] sourceArgs = args.getSourceArgs();
        JobDetail jobDetail = buildJobDetail(FileSchJob.class, "fileJob", "batch", new HashMap());
        Trigger trigger = buildJobTrigger("0/50 * * * * ?");
        jobDetail.getJobDataMap().put("requestDate", sourceArgs[0]);

        try {
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }
}
@Component
@Slf4j
public class FileSchJob extends QuartzJobBean{

   @Autowired
   private Job fileJob;

   @Autowired
   private JobLauncher jobLauncher;

   @Autowired
   private JobExplorer jobExplorer;

   @SneakyThrows
   @Override
   protected void executeInternal(JobExecutionContext context) throws JobExecutionException {

      String requestDate = (String)context.getJobDetail().getJobDataMap().get("requestDate");

      JobParameters jobParameters = new JobParametersBuilder()
                           .addLong("id", new Date().getTime())
                           .addString("requestDate", requestDate)
                           .toJobParameters();

      int jobInstanceCount = jobExplorer.getJobInstanceCount(fileJob.getName());
      List<JobInstance> jobInstances = jobExplorer.getJobInstances(fileJob.getName(), 0, jobInstanceCount);

      if(jobInstances.size() > 0) {
         for(JobInstance jobInstance : jobInstances){
            List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
            List<JobExecution> jobExecutionList = jobExecutions.stream().filter(jobExecution ->
                        jobExecution.getJobParameters().getString("requestDate").equals(requestDate))
                  .collect(Collectors.toList());
            if (jobExecutionList.size() > 0) {
               throw new JobExecutionException(requestDate + " already exists");
            }
         }
      }
      jobLauncher.run(fileJob, jobParameters);
   }
}

jobinstance에서 이미 실행되었던 jobexecution을 requestDate를 통해서 구분을 하게되고 만약 존재한다면 launcher를 실행시키지 않는다.