WEB/Spring Batch

스프링 배치 멀티 스레드 프로세싱

Tony Lim 2023. 3. 30. 19:46
728x90

기본 개념

RestTempate가 실제로 실행할 로직이 들어있는 RepeatCallback 각 스레드마다 실행시키게 된다.

RepeatCallback안에서 일어나는 Chunk process도 각 스레드마다 일어나게 된다.

 

스프링 배치 스레드 모델

  • 기본적으로는 단일 스레드 방식으로 작업
  • 성능 향상과 대규모 데이터 작업을 위한 비동기 처리 및 Scale out 기능을 제공한다.
  • Local 과 Remote 처리를 지원한다.

AsyncItem Processor / Writer  = 별도의 스레드가 할당되어 작업을 처리하는 방식

Multi-threaded step = Step 내 Chunk 구조인 ItemReader, Processor, Writer 마다 여러 스레드가 할당되어 실행하는 방법

Remote Chunking = 분산환경 처럼 Step 처리가 여러 프로세스로 분할되어 외부의 다른 서버로 전송되어 처리하는 방식

Parallel Steps = Step 마다 스레드가 할당되어 여러개의 Step을 병렬로 실행하는 방법

Partitinoing = Master / Slave 방식으로서 Master 가 데이터를 파티셔닝 한 다음 각 파티션에게 스레드를 할당하여 Slave 가 독립적으로 작동하는 방식


AsyncItemProcessor / AsyncItemWriter

기본개념

Step안의 ItemProcessor가 비동기적으로 동작하는 구조

AsyncItemProcessor 로부터 AsyncItemWriter 가 받는 최종 결과값은 List<Future<T>> 타입이며 비동기 실행이 완료될 떄까지 대기한다.

Future#get은 블록킹 메소드이기 때문에 ItemProcessor의 모든 비동기 작업이 끝나야 write를 할수 있게 된다.

    @Bean
    public AsyncItemProcessor asyncItemProcessor() throws Exception {
        AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor();

        asyncItemProcessor.setDelegate(customItemProcessor());
        asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
//        asyncItemProcessor.setTaskExecutor(taskExecutor());
        asyncItemProcessor.afterPropertiesSet();

        return asyncItemProcessor;
    }
@Bean
public AsyncItemWriter asyncItemWriter() throws Exception {
    AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();

    asyncItemWriter.setDelegate(customItemWriter());
    asyncItemWriter.afterPropertiesSet();

    return asyncItemWriter;
}

일반적인 ItemProcessor 와 ItemWriter를 Bean으로 등록한 뒤에 setDelegate를 통해 전달하면 해당 로직들이 비동기적으로 동작하게 된다.


Multi-threaded Step

기본개념

  • Step 내에서 멀티 스레드로 Chunk 기반 처리가 이루어지는 구조
  • TaskExecutorRepeatTemplate 이 반복자로 사용되며 설정한 개수 (throttleLimit) 만큼의 스레드를 생성하여 수행함

각각의 스레드가 ItemReader -> Processor -> Write를 순차적으로 독립적으로 수행하게 된다.

ItemReader가 같은것을 읽어오거나 읽을 때 충돌이 나지 않기 위해 thread safe한 Reader를 구현하거나 사용해야한다.
JDBCPagingItemReader , JPAPagingItemReader가 thread safe한 상태로 제공이 된다.

thread safe한 ItemReader를 사용하지 않으면 중복된것을 읽어오거나 동시성 오류가 나게 된다.

 


Parallel Steps

기본개념

  • SplitState 를 사용해서 여러 개의 Flow들을 병렬적으로 실행하는 구조
  • 실행이 다 완료된 후 FlowExecutionStatus 결과들을 취합해서 다음 단계 결정을 한다.

SplitState에서 여러개의 Flow들을 들고 있고 TaskExecutor에서 각 스레드에서 각 Flow들을 실행 시키게 된다.

@Bean
public Job job() {
    return jobBuilderFactory.get("batchJob")
            .incrementer(new RunIdIncrementer())
            .start(flow1())
            .split(taskExecutor()).add(flow2(),flow3())
            .next(flow4())
            .end()
            .listener(new StopWatchJobListener())
            .build();
}

split에서 병렬적으로 실행될 flow2 ,flow3들을 추가하고 이것들이 다 끝나고 나면
main thread에서 next에 있는 flow4를 실행시키게 된다.


Partitioning 

기본개념

  • MasterStep 이 SlaveStep 을 실행시키는 구조
  • SlaveStep은 각 스레드에 의해 독립적으로 실행이 됨 , 독립적인 StepExecution 파라미터 환경을 구성함
  • SlaveStep은 ItemReader/ ItemProcessor / ItemWriter 등을 가지고 동작하며 작업을 독립적으로 병렬 처리한다.
  • MasterStep은 PartitionStep이며 SlaveStep은 TaskletStep , FlowStep등이 올 수 있다.

MasterStep은 논리적인의미이지 실제로는 그냥 Step이다.

 

PartitionStep (MasterStep 역할을 함) 

  • 파티셔닝 기능을 수행하는 Step 구현체
  • 파티셔닝을 수행후 StepExecutionAggregator를 사용해서 StepExecution의 정보를 최종 집계한다.

 

PartitionHandler

  • ParitionStep에 의해 호출되며 스레드를 생성해서 WorkStep 을 병렬로 실행한다.
  • WorkStep에서 사용할 StepExecution생성은 StepExecutionSplitter 와 Partitioner 에게 위임한다.
  • WorkStep을 병렬로 실행 후 최종결과를 담은 StepExecution을 PartitionStep에 반환한다.

 

StepExecutionSplitter

  • WorkStep에서 사용할 StepExecution을 gridSize만큼 생성한다.
  • Partitioner를 통해 ExecutionContext를 얻어서 StepExecution에 매핑한다.

 

Partitioner

  • StepExecution에 매핑할 ExecutionContext를 gridSize만큼 생성한다.
  • 각 ExecutionContext에 저장된 정보는 WorkStep을 실행하는 스레드마다 독립적으로 참조 및 활용이 가능하다.

결국 Step을 실행시키기위해서 필요한 StepExecution , ExecutionContext를 만드는 과정이다.

 

각 스레드마다 StepExecution , ExecutionContext를 지니고 있다.

@JobScope 한것처럼 Proxy객체가 생성되고 런타임에 실제 요청을 처리할시에 하위의 ItemReader,processor, writer 빈들이 생성이 된다.

이들은 각 스레드마다 생성이 되기에 공유가 되지않아서 thread safe하다.


SynchronizedItemStreamReader

기본개념

  • Thread-safe 하지 않은 ItemReader 를 Thread-safe하게 처리하도록 하는 역할
  • Spring Batch 4.0 부터 지원한다.

synchronized로 동기화 처리하여 기다리게 해서 동시성 이슈를 해결한다.

실제 데이터처리는 JdbcCursorItemReader에게 위임한다.

@Bean
@StepScope
public SynchronizedItemStreamReader<Customer> customItemReader() {
    JdbcCursorItemReader<Customer> notSafetyReader = new JdbcCursorItemReaderBuilder<Customer>()
            .fetchSize(60)
            .dataSource(dataSource)
            .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
            .sql("select id, firstName, lastName, birthdate from customer")
            .name("SafetyReader")
            .build();

    return new SynchronizedItemStreamReaderBuilder<Customer>()
            .delegate(notSafetyReader)
            .build();
}

랩핑해줘서 동기화를 시키고 실제 item 을 읽어오는 itemReader는 JdbcCursorItemReader를 통해서 읽어오는 것이다.

 

 

 

 

 

 

728x90