WEB/Spring Batch

스프링 배치 청크 프로세스 활용 - ItemWriter , ItemProcessor

Tony Lim 2023. 3. 24. 19:03
728x90

FlatFileItemWriter - 개념 및 API 소개

  • 2차원 데이터(표)로 표현된 유형의 파일을 처리하는 ItemWriter
  • 고정 위치로 정의된 데이터 필드나 특수 문자에 의해 구별된 데이터의 행을 기록한다
  • Resource 와 LineAggregator 두 가지가 요소가 필요하다

LineAggregator

  • Item 을 받아서 String 으로 변환하여 리턴한다
  • FieldExtractor를 사용해서 처리할 수 있다
  • 구현체
    • PassThroughLineAggregator, DelimitedLineAggregator, FormatterLineAggregator
      Reader에서 봤던 Tokenizer(comma기준으로 쪼개는것처럼) comma 기준으로 다시 합(aggregate) 해준다.

FieldExtractor

  • 전달 받은 Item 객체의 필드를 배열로 만들고 배열을 합쳐서 문자열을 만들도록 구현하도록 제공하는 인터페이스

  • 구현체 
    • BeanWrapperFieldExtractor, PassThroughFieldExtractor

| 를 delimiter로 사용하여 Object 배열의 원소값을 하나의 String으로 aggregate한 것이다.


FlatFileItemWriter - delimeted (DelimitedLineAggregator , FormatterLineAggregator)

DelimitedLineAggregator 

객체의 필드사이에 구분자를 삽입해서 한 문자열로 변환함

java pojo로 존재하는 것을 object array로 바꾼후 aggregator에서 지정한 delimiter 기준으로 하나의 String으로 합체함

@Bean
public FlatFileItemWriter<Customer> customItemWriter() throws Exception {
    return new FlatFileItemWriterBuilder<Customer>()
            .name("customerWriter")
            .resource(new FileSystemResource("C:\\jsw\\inflearn\\spring-batch-lecture\\src\\main\\resources\\customer.csv"))
            .append(true)
            .delimited()
            .delimiter(",")
            .names(new String[] {"id", "name", "age"})
            .build();
}

delimited() = DelimitedLineAggregator  
formatted() = FormatterLineAggregator 으로    builder가 바뀌게 된다.

append(true) = result.txt에 이미 있는 데이터 이후에 write하게 된다. 
shouldDeleteIfEmpty(true) = Write할 data 없는 경우에 result.txt에 data가 있든 없는 삭제해 버려라 ㄷㄷ;

 

FormatterLineAggregator

객체의 필드를 사용자가 설정한 Formatter 구문을 통해 문자열로 변환한다.

@Bean
public FlatFileItemWriter<Customer> customItemWriter() throws Exception {
    return new FlatFileItemWriterBuilder<Customer>()
            .name("customerWriter")
            .resource(new ClassPathResource("customer.csv"))
            .formatted()
            .format("%-2d%-15s%-2d")
            .names(new String[] {"id", "name", "age"})
            .build();
}

직접 사용자가 정의한 포맷대로 찍는것이다. string , int ,자릿수를 잘 구별해서 작성해줘야한다.

 


XML - StaxEventItemWriter

  • XML 쓰는 과정은 읽기 과정에 대칭적이다. 
  • StaxEventItemWriter는 Resource, marshaller, rootTagName가 필요하다

@Bean
public StaxEventItemWriter customItemWriter() {
    return new StaxEventItemWriterBuilder<Customer>()
            .name("customersWriter")
            .marshaller(itemMarshaller())
            .resource(new FileSystemResource("customer.xml"))
            .rootTagName("customer")
            .overwriteOutput(true)
            .build();

}

@Bean
public XStreamMarshaller itemMarshaller() {
    Map<String, Class<?>> aliases = new HashMap<>();
    aliases.put("customer", Customer.class);
    aliases.put("id", Long.class);
    aliases.put("firstName", String.class);
    aliases.put("lastName", String.class);
    aliases.put("birthdate", Date.class);
    XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
    xStreamMarshaller.setAliases(aliases);
    return xStreamMarshaller;
}

 


JsonFileItemWriter

Java Pojo -> Json으로 변환후에 쓴다.

@Bean
public JsonFileItemWriter<Customer> customItemWriter() {
    return new JsonFileItemWriterBuilder<Customer>()
            .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
            .resource(new ClassPathResource("customer.json"))
            .name("customerJsonFileItemWriter")
            .build();
}

DB - JdbcBatchItemWriter

  • JdbcCursorItemReader 설정과 마찬가지로 datasource 를 지정하고, sql 속성에 실행할 쿼리를 설정 
  • JDBC의 Batch 기능을 사용하여 bulk insert/update/delete 방식으로 처리
  • 단건 처리가 아닌 일괄처리이기 때문에 성능에 이점을 가진다

name api는 존재하지않음 

여기서 bulk ~ insert 라는것은 매 chunk 마다 io 작업을 하는 것이 아니라 buffer에 write할 chunk들을 저장해 놓았다가 commit 시점에 한꺼번에 write io 하는 것을 의미한다.

beanMapped , columnMapped 둘 중 하나를 선택해서 매핑을 어떻게 할 것인지 결정하게 된다.

JdbcBatchItemWriter#write 메소드안 에서 동적쿼리를 작성하는 방식이 : 를 쓰는 경우와 ?를 쓰는 경우를 구분하고

:를 쓸때 Map , Pojo 둘중 하나의 방식으로 정하게 된다.


DB - JpaItemWriter

  • JPA Entity 기반으로 데이터를 처리하며 EntityManagerFactory 를 주입받아 사용한다
  • Entity를 하나씩 chunk 크기 만큼 insert  혹은 merge 한 다음 flush 한다
  • ItemReader 나 ItemProcessor 로 부터 아이템을 전발 받을 때는 Entity 클래스 타입으로 받아야 한다
    JPA를 쓰는것이니 당연히 Entity로 변환해서 전달해줘야한다.


ItemWriterAdapter

ItemReader와 비슷하게 Write할 때 다른 서비스의 메소드를 reflection을 통해서 대신 사용하게 도와준다.

@Bean
public ItemWriterAdapter customItemWriter() {

    ItemWriterAdapter<String>  writer = new ItemWriterAdapter<>();
     writer.setTargetObject(customService());
     writer.setTargetMethod("joinCustomer");
    return  writer;
}
public class ItemWriterAdapter<T> extends AbstractMethodInvokingDelegator<T> implements ItemWriter<T> {

   @Override
   public void write(List<? extends T> items) throws Exception {
      for (T item : items) {
         invokeDelegateMethodWithArgument(item);
      }
   }

}

write할 item들을 기존 서비스에게 던져주고 대신 써달라한다.

 


CompositeItemProcessor

  • ItemProcessor 들을 연결(Chaining)해서 위임하면 각 ItemProcessor 를 실행시킨다
  • 이전 ItemProcessor 반환 값은 다음 ItemProcessor 값 으로 연결된다

여러 Processor들을 하나로 통합해서 chain으로 묶어서 처리하게 한다.

public CompositeItemProcessor<String,String> customItemProcessor() {

    CompositeItemProcessor<String,String> compositeProcessor = new CompositeItemProcessor<>();
    List<ItemProcessor<String,String>> itemProcessors = new ArrayList();
    itemProcessors.add(new CustomItemProcessor1());
    itemProcessors.add(new CustomItemProcessor2());

    return new CompositeItemProcessorBuilder<String,String>()
            .delegates(itemProcessors)
         .build();
}

 


ClassifierCompositeItemProcessor

Classifier 로 라우팅 패턴을 구현해서 ItemProcessor 구현체 중에서 하나를 호출하는 역할을 한다

public class ProcessorClassifier<C,T> implements Classifier<C, T> {

    private Map<Integer, ItemProcessor<ProcessorInfo, ProcessorInfo>> processorMap = new HashMap<>();

    @Override
    public T classify(C classifiable) {
        return (T)processorMap.get(((ProcessorInfo)classifiable).getId());
    }

    public void setProcessorMap(Map<Integer, ItemProcessor<ProcessorInfo, ProcessorInfo>> processorMap) {
        this.processorMap = processorMap;
    }
}
@Bean
public ItemProcessor<ProcessorInfo,ProcessorInfo> customItemProcessor() {

    ClassifierCompositeItemProcessor<ProcessorInfo, ProcessorInfo> processor 
    = new ClassifierCompositeItemProcessor<>();

    ProcessorClassifier<ProcessorInfo, ItemProcessor<?, ? extends ProcessorInfo>> classifier 
    = new ProcessorClassifier();
    
    Map<Integer, ItemProcessor<ProcessorInfo, ProcessorInfo>> processorMap = new HashMap<>();
    processorMap.put(1, new CustomItemProcessor1());
    processorMap.put(2, new CustomItemProcessor2());
    processorMap.put(3, new CustomItemProcessor3());
    classifier.setProcessorMap(processorMap);
    processor.setClassifier(classifier);

    return processor;
}

예제는 1,2,3 차레대로 반환된다. classifier에서 PrceossInfo 의 id필드를 기준으로 map에서 꺼내온다.

 

 

728x90