WEB/Spring Batch

스프링 배치 청크 프로세스 이해

Tony Lim 2023. 3. 22. 20:35
728x90

Chunk

기본개념

  • Chunk란 여러 개의 아이템을 묶은 하나의 덩어리 , 블록을 의미함
  • 한번에 하나씩 아이템을 입력 받아 Chunk 단위의 덩어리로 만든 후 Chunk 단위로 트랜잭션을 처리함 , 즉 Chunk 단위의 Commit 과 Rollback 이 이루어짐
  • 일반적으로 대용량 데이터를  한번에 처리하는 것이 아닌 청크 단위로 쪼개어서 더 이상 처리할 데이터가 없을 때까지 반복해서 입출력하는데 사용됨

Chunk<I> = ItemReader 로 읽은 하나의 아이템을 Chunk에서 정한 개수만큼 반복해서 저장하는 타입
Chunk<O> = ItemReader로부터 전달받은 Chunk<I> 를 참조해서 ItemProcessor에서 가공 , 필터링 한후에 ItemWriter에게 전달하는 타입

하나의 chunk가 읽고 ,가공,쓰기가 한트랜잭션에서 일어나게 된다.

 


ChunkOrientedTasklet - 개념 및 API 소개

기본개념

  • ChunkOrientedTasklet 은 스프링 배치에서 제공하는 Tasklet의 구현체로서 Chunk 지향 프로세싱을 담당하는 도메인 객체
  • ItemReader, ItemWriter , ItemProcessor 를 사용해 Chunk 기반의 데이터 입출력 처리를 담당한다.
  • TaskletStep 에 의해서 반복적으로 실행되며 ChunkOrientedTasklet 이 실행될 때마다 매번 새로운 트랜잭션이 생성되어 처리가 이루어 진다.
  • exception이 발생할 경우 , 해당 Chunk는 롤백 되며 이전에 커밋한 Chunk는 완료된 상태가 유지된다.
  • 내부적으로 ItemReader 를 핸들링 하는 ChunkProvider 와 ItemProcessor , ItemWriter 를 핸들링하는 ChunkProcessor 타입의 구현체를 가진다.

public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

   @SuppressWarnings("unchecked")
   Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
   if (inputs == null) {
      inputs = chunkProvider.provide(contribution);
      if (buffering) {
         chunkContext.setAttribute(INPUTS_KEY, inputs);
      }
   }

   chunkProcessor.process(contribution, inputs);
   chunkProvider.postProcess(contribution, inputs);

   // Allow a message coming back from the processor to say that we
   // are not done yet
   if (inputs.isBusy()) {
      logger.debug("Inputs still busy");
      return RepeatStatus.CONTINUABLE;
   }

   chunkContext.removeAttribute(INPUTS_KEY);
   chunkContext.setComplete();

   if (logger.isDebugEnabled()) {
      logger.debug("Inputs not busy, ended: " + inputs.isEnd());
   }
   return RepeatStatus.continueIf(!inputs.isEnd());
}
  1. Chunk 처리중 예외가 발생하여 재시도 할 경우 다시 데이터를 읽지 않고 버퍼 (ChunkContext)
    에 담겨 놓았던 데이터를 가지고 옮
  2. chunkProvider.provide를 통해서 Item을 Chunk size만큼 반복해서 읽은 다음 Chunk<I>에 저장하고 반환
  3. Chunk를 캐싱하기 위해 ChunkContext 버퍼에 담음
  4. ChunkProvider로 부터 받은 Chunk<I>의 아이템 개수 만큼 데이터를 가공하고 저장
  5. Chunk 단위 입출력이 완료되면 버퍼에 저장한 Chunk 데이터 삭제
  6. 읽을 Item 이 더 존재하는지 체크해서 존재하면 Chunk 프로세스 반복하고 null 일 경우 RepeatStauts.FINISHED 반환하고 프로세스 종료함

chunk(10) 이면 10개씩 저장한다 -> commit interval 10개씩 commit한다.

readerTransactionalQueue = 원래는 외부 message queue까지 포함해서 트랜잭션 처리를 하지 않는데 (default = false) 이것을 가능하게 해준다?

총 3가지 를 앞 단에서 학습한것이고 이제 chunk를 파고들 차례이다.

 


ChunkOrientedTasklet - ChunkProvider / ChunkProcessor

 

ChunkProvider 

기본개념

  • ItemReader를 사용해서 소스로부터 아이템을 Chunk size만큼 읽어서 Chunk 단위로 만들어 제공하는 도메인 객체
  • Chunk<I> 를 만들고 내부적으로 반복문을 사용해서 ItemReader.read()를 계속 호출하면서 item을 Chunk에 쌓는다.
  • 외부로부터 ChunkProvider가 호출될 때마다 항상 새로운 Chunk가 생성된다.

반목문 종료 시점

  • chunk size만큼 item을 읽으면 반목문이 종료가 되고 ChunkProcessor로 넘어감
  • ItemReader가 읽은 item 이 null 일 경우 반목문 종료 및 해당 Step 반복문까지 종료

기본구현체로서 SimpleChunkProvider 와 FaultTolerantChunkProvider가 있다.

public Chunk<I> provide(final StepContribution contribution) throws Exception {

   final Chunk<I> inputs = new Chunk<>();
   repeatOperations.iterate(new RepeatCallback() {

      @Override
      public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
         I item = null;
         Timer.Sample sample = Timer.start(Metrics.globalRegistry);
         String status = BatchMetrics.STATUS_SUCCESS;
         try {
            item = read(contribution, inputs);
         }
         catch (SkipOverflowException e) {
            // read() tells us about an excess of skips by throwing an
            // exception
            status = BatchMetrics.STATUS_FAILURE;
            return RepeatStatus.FINISHED;
         }
         finally {
            stopTimer(sample, contribution.getStepExecution(), status);
         }
         if (item == null) {
            inputs.setEnd();
            return RepeatStatus.FINISHED;
         }
         inputs.add(item);
         contribution.incrementReadCount();
         return RepeatStatus.CONTINUABLE;
      }

   });

   return inputs;
}
  1. Item 을 담을 Chunk 생성(inputs) , provide() 호출마다 새롭게 생성함
  2. RepeatCallback 을 통해서 Chunk size만큼 반복문 실행하면서 read() 호출
  3. read 메소드에서 ItemReader가 Item 한개씩 읽어서 리턴 
  4. 반복하다가 if 문에서 item== null 이면 Chunk 프로세스 종료
  5. ItemReader로 부터 받은 Item 을 Chunksize 만큼 Chunk(inputs) 에 저장

 

ChunkProcessor

기본 개념

  • ItemProcessor 를 사용해서 Item 을 변형 , 가공 , 필터링 하고 ItemWriter 를 사용해서 Chunk 데이터를 저장, 출력한다.
  • Chunk<O> 를 만들고 앞에서 넘어온 Chunk<I>의 item 을 한건씩 처리한 후 Chunk<O> 에 저장한다.
  • 외부로 부터 ChunkProcessor 가 호출될 때마다 항상 새로운 Chunk 가 생성된다.
  • ItemProcessor 는 설정 시 선택사항으로서 만약 객체가 존재하지 않을 경우 ItemReader에서 읽은 item 그대로가 Chunk<O> 에 저장된다.
  • ItemProcessor 처리가 완료되면 Chunk<O> 에 있는 List<Item> 을 ItemWriter에게 전달한다.
  • ItemWriter 처리가 완료되면 Chunk트랜잭션은 종료하게 되고 Step 반복문에서 ChunkOrientedTasklet가 새롭게 실행된다.
  • ItemWriter는 Chunk size만큼 데이터를 Commit 처리하기 때문에 Chunk size는 곧 Commit interval 이 된다.
  • 기본 구현체로서 SimpleChunkProcessor 와 FalutTolerantChunkProcessor가 있다.
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {

   // Allow temporary state to be stored in the user data field
   initializeUserData(inputs);

   // If there is no input we don't have to do anything more
   if (isComplete(inputs)) {
      return;
   }

   // Make the transformation, calling remove() on the inputs iterator if
   // any items are filtered. Might throw exception and cause rollback.
   Chunk<O> outputs = transform(contribution, inputs);

   // Adjust the filter count based on available data
   contribution.incrementFilterCount(getFilterCount(inputs, outputs));

   // Adjust the outputs if necessary for housekeeping purposes, and then
   // write them out...
   write(contribution, inputs, getAdjustedOutputs(inputs, outputs));

}
  • transform 은 ItemProcessor에서 가공처리 된 아이템을 담은 Chunk<O> outputs 에 반환한다.
  • 인자로 들어가는 inputs 는 chunkProvider.provide() 에서 받은 Chunk<I>
  • contribution.incrementFilterCount = chunk 에 10개중 9개만 ItemProcessor가 처리했으면 이 숫자를 저장하게 된다.
  • write 메소드는 가공처리된 Chunk<O> 를 Itemwriter에게 전달한다.

ItemReader / ItemWriter / ItemProcessor 이해

ItemReader 

기본개념

  • 다양한 입력으로부터 데이터를 읽어서 제공하는 인터페이스
  • 플랫파일 - csv, txt (고정 위치로 정의된 데이터 필드나 특수문자로 구별된 데이터의 행)
  • XML, Json ,Database , JMS ,Message queue
  • Custom Reader = thread safe 하게 구현할 필요가 있음
  • ChunkOrientedTasklet 실행시 필수적 요소임

다수의 구현체들이 ItemReader와 ItemStream 인터페이스를 동시에 구현하고 있음

  • 파일의 스트림을 열거나 종료 , DB 커넥션을 열거나 종료, 입력장치 초기화등의 작업
  • Execution Context는 read와 관련된 여러가지 상태정보를 저장해서 재시작시 참조 하도록 지원

 

ItemWriter 

기본개념

  • ItemReader와 1대1 매칭이되는 data종류들을 전달하게 된다.
  • 아이템 하나가 아닌 아이템 리스트를 전달받는다.
  • ItemReader와 마찬가지로 필수 요소이다.

 

ItemProcessor 

기본개념

  • 데이터 출력하기 전에 데이터를 가공, 변형 , 필터링 하는 역할
  • ItemReader 및 ItemWriter 와 분리되어 비즈니스 로직을 구현할 수 있다.
  • ItemProcessor에서 process() 실행결과 null을 반환하면 Chunk<O>에 저장이 안됨으로 결국 ItemWriter로 전달이 되지않는다.
  • 선택적요소이다.

 


ItemStream

기본 개념

  • ItemReader 와 ItemWriter처리 과정중 상태를 저장하고 오류가 발생하면 해당 상태를 참조하여 실패한 곳에서 재 시작하도록 지원함
  • 리소스를 열고 닫아야 하며 입출력 장치 초기화 등의 작업을 해야하는 경우 사용함
  • ExecutionContext 를 매개변수로 받아서 상태 정보를 업데이트 한다.
  • ItemReader 및 ItemWriter 는 ItemStream을 구현하고 있다.

 

public class CustomItemStreamReader<String> implements ItemStreamReader<String> {

    private final List<String> items;
    private int index = -1;
    private boolean restart = false;

    public CustomItemStreamReader(List<String> items) {
        this.items = items;
        this.index = 0;
    }

    @Override
    public String read() throws Exception {
        String item = null;

        if(this.index < this.items.size()) {
            item = this.items.get(index);
            index++;
        }

        if(this.index == 6 && !restart) {
            throw new RuntimeException("Restart is required.");
        }

        return item;
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if(executionContext.containsKey("index")) {
            index = executionContext.getInt("index");
            this.restart = true;
        }
        else {
            index = 0;
            executionContext.put("index", index);
        }
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.put("index", index);
    }

    @Override
    public void close() throws ItemStreamException {
        System.out.println("close");
    }
}

ItemStreamReader 와 ItemStreamWriter 의 open 메소드를 초기화 하는것 처럼 제일 먼저 호출해준다.

현재 ItemStreamReader에서 open 하면서 ExecutionContext에 index가 있으면 꺼내오게 된다. 즉 중간에 실행이 멈췄다고 볼수 있다. -> 일부러 1~5까지 실행하고 index = 6 되면 해당 index부터 실행이 가능하다는 예시를 보여주기 위함이다.
처음 시작하는것이면 idnex를 0 으로 저장하게 된다.

update는 chunk가 실행 될 때마다(여기서는 5 , 총 10개의 whole data를 5개로 2개의 chunk로 쪼갰음)
호출되어 현재 ExecutionContext를 db에 저장하게 된다. StepExecutionContext 테이블에 저장이된다.
Reader, Writer 차례대로 update를 호출하게 된다.

close에서는 reosurce release를 해줘야하는 곳이다.

 


Chunk Process 아키텍처

taskletStep에서 doExecute 에서 Transactiontemplate.execute를 통해서 transcation을 시작해서 Chunk를 실행한다.

10개를 총 5개씩 2개의 Chunk로 쪼개면 Transcation 2번 실행 되는것이다.

https://www.inflearn.com/questions/823291/%ED%8A%B8%EB%9E%9C%EC%9E%AD%EC%85%98-%EA%B2%BD%EA%B3%84-%EC%99%80-%ED%8A%B8%EB%9E%9C%EC%9E%AD%EC%85%98-begin%EC%97%90-%EB%8C%80%ED%95%9C-%EA%B5%AC%EB%B6%84

 

트랜잭션 경계 와 트랜잭션 begin에 대한 구분 - 인프런 | 질문 & 답변

안녕하세요 정수원 선생님 질문이 2개 있습니다.1) 'Chunk Process 아키텍처' 첫번째 사진 설명부분 4:26쯤에 트랜잭션 경계와 실제 트랜잭션 begin을 구분하셨는데 이것이 무슨 차이인가요?아니면 어

www.inflearn.com

 

728x90