FlatFileItemWriter - 개념 및 API 소개
- 2차원 데이터(표)로 표현된 유형의 파일을 처리하는 ItemWriter
- 고정 위치로 정의된 데이터 필드나 특수 문자에 의해 구별된 데이터의 행을 기록한다
- Resource 와 LineAggregator 두 가지가 요소가 필요하다
LineAggregator
- Item 을 받아서 String 으로 변환하여 리턴한다
- FieldExtractor를 사용해서 처리할 수 있다
- 구현체
- PassThroughLineAggregator, DelimitedLineAggregator, FormatterLineAggregator
Reader에서 봤던 Tokenizer(comma기준으로 쪼개는것처럼) comma 기준으로 다시 합(aggregate) 해준다.
- PassThroughLineAggregator, DelimitedLineAggregator, FormatterLineAggregator
FieldExtractor
- 전달 받은 Item 객체의 필드를 배열로 만들고 배열을 합쳐서 문자열을 만들도록 구현하도록 제공하는 인터페이스
- 구현체
- BeanWrapperFieldExtractor, PassThroughFieldExtractor
| 를 delimiter로 사용하여 Object 배열의 원소값을 하나의 String으로 aggregate한 것이다.
FlatFileItemWriter - delimeted (DelimitedLineAggregator , FormatterLineAggregator)
DelimitedLineAggregator
객체의 필드사이에 구분자를 삽입해서 한 문자열로 변환함
java pojo로 존재하는 것을 object array로 바꾼후 aggregator에서 지정한 delimiter 기준으로 하나의 String으로 합체함
@Bean
public FlatFileItemWriter<Customer> customItemWriter() throws Exception {
return new FlatFileItemWriterBuilder<Customer>()
.name("customerWriter")
.resource(new FileSystemResource("C:\\jsw\\inflearn\\spring-batch-lecture\\src\\main\\resources\\customer.csv"))
.append(true)
.delimited()
.delimiter(",")
.names(new String[] {"id", "name", "age"})
.build();
}
delimited() = DelimitedLineAggregator
formatted() = FormatterLineAggregator 으로 builder가 바뀌게 된다.
append(true) = result.txt에 이미 있는 데이터 이후에 write하게 된다.
shouldDeleteIfEmpty(true) = Write할 data 없는 경우에 result.txt에 data가 있든 없는 삭제해 버려라 ㄷㄷ;
FormatterLineAggregator
객체의 필드를 사용자가 설정한 Formatter 구문을 통해 문자열로 변환한다.
@Bean
public FlatFileItemWriter<Customer> customItemWriter() throws Exception {
return new FlatFileItemWriterBuilder<Customer>()
.name("customerWriter")
.resource(new ClassPathResource("customer.csv"))
.formatted()
.format("%-2d%-15s%-2d")
.names(new String[] {"id", "name", "age"})
.build();
}
직접 사용자가 정의한 포맷대로 찍는것이다. string , int ,자릿수를 잘 구별해서 작성해줘야한다.
XML - StaxEventItemWriter
- XML 쓰는 과정은 읽기 과정에 대칭적이다.
- StaxEventItemWriter는 Resource, marshaller, rootTagName가 필요하다
@Bean
public StaxEventItemWriter customItemWriter() {
return new StaxEventItemWriterBuilder<Customer>()
.name("customersWriter")
.marshaller(itemMarshaller())
.resource(new FileSystemResource("customer.xml"))
.rootTagName("customer")
.overwriteOutput(true)
.build();
}
@Bean
public XStreamMarshaller itemMarshaller() {
Map<String, Class<?>> aliases = new HashMap<>();
aliases.put("customer", Customer.class);
aliases.put("id", Long.class);
aliases.put("firstName", String.class);
aliases.put("lastName", String.class);
aliases.put("birthdate", Date.class);
XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
xStreamMarshaller.setAliases(aliases);
return xStreamMarshaller;
}
JsonFileItemWriter
Java Pojo -> Json으로 변환후에 쓴다.
@Bean
public JsonFileItemWriter<Customer> customItemWriter() {
return new JsonFileItemWriterBuilder<Customer>()
.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
.resource(new ClassPathResource("customer.json"))
.name("customerJsonFileItemWriter")
.build();
}
DB - JdbcBatchItemWriter
- JdbcCursorItemReader 설정과 마찬가지로 datasource 를 지정하고, sql 속성에 실행할 쿼리를 설정
- JDBC의 Batch 기능을 사용하여 bulk insert/update/delete 방식으로 처리
- 단건 처리가 아닌 일괄처리이기 때문에 성능에 이점을 가진다
name api는 존재하지않음
여기서 bulk ~ insert 라는것은 매 chunk 마다 io 작업을 하는 것이 아니라 buffer에 write할 chunk들을 저장해 놓았다가 commit 시점에 한꺼번에 write io 하는 것을 의미한다.
beanMapped , columnMapped 둘 중 하나를 선택해서 매핑을 어떻게 할 것인지 결정하게 된다.
JdbcBatchItemWriter#write 메소드안 에서 동적쿼리를 작성하는 방식이 : 를 쓰는 경우와 ?를 쓰는 경우를 구분하고
:를 쓸때 Map , Pojo 둘중 하나의 방식으로 정하게 된다.
DB - JpaItemWriter
- JPA Entity 기반으로 데이터를 처리하며 EntityManagerFactory 를 주입받아 사용한다
- Entity를 하나씩 chunk 크기 만큼 insert 혹은 merge 한 다음 flush 한다
- ItemReader 나 ItemProcessor 로 부터 아이템을 전발 받을 때는 Entity 클래스 타입으로 받아야 한다
JPA를 쓰는것이니 당연히 Entity로 변환해서 전달해줘야한다.
ItemWriterAdapter
ItemReader와 비슷하게 Write할 때 다른 서비스의 메소드를 reflection을 통해서 대신 사용하게 도와준다.
@Bean
public ItemWriterAdapter customItemWriter() {
ItemWriterAdapter<String> writer = new ItemWriterAdapter<>();
writer.setTargetObject(customService());
writer.setTargetMethod("joinCustomer");
return writer;
}
public class ItemWriterAdapter<T> extends AbstractMethodInvokingDelegator<T> implements ItemWriter<T> {
@Override
public void write(List<? extends T> items) throws Exception {
for (T item : items) {
invokeDelegateMethodWithArgument(item);
}
}
}
write할 item들을 기존 서비스에게 던져주고 대신 써달라한다.
CompositeItemProcessor
- ItemProcessor 들을 연결(Chaining)해서 위임하면 각 ItemProcessor 를 실행시킨다
- 이전 ItemProcessor 반환 값은 다음 ItemProcessor 값 으로 연결된다
여러 Processor들을 하나로 통합해서 chain으로 묶어서 처리하게 한다.
public CompositeItemProcessor<String,String> customItemProcessor() {
CompositeItemProcessor<String,String> compositeProcessor = new CompositeItemProcessor<>();
List<ItemProcessor<String,String>> itemProcessors = new ArrayList();
itemProcessors.add(new CustomItemProcessor1());
itemProcessors.add(new CustomItemProcessor2());
return new CompositeItemProcessorBuilder<String,String>()
.delegates(itemProcessors)
.build();
}
ClassifierCompositeItemProcessor
Classifier 로 라우팅 패턴을 구현해서 ItemProcessor 구현체 중에서 하나를 호출하는 역할을 한다
public class ProcessorClassifier<C,T> implements Classifier<C, T> {
private Map<Integer, ItemProcessor<ProcessorInfo, ProcessorInfo>> processorMap = new HashMap<>();
@Override
public T classify(C classifiable) {
return (T)processorMap.get(((ProcessorInfo)classifiable).getId());
}
public void setProcessorMap(Map<Integer, ItemProcessor<ProcessorInfo, ProcessorInfo>> processorMap) {
this.processorMap = processorMap;
}
}
@Bean
public ItemProcessor<ProcessorInfo,ProcessorInfo> customItemProcessor() {
ClassifierCompositeItemProcessor<ProcessorInfo, ProcessorInfo> processor
= new ClassifierCompositeItemProcessor<>();
ProcessorClassifier<ProcessorInfo, ItemProcessor<?, ? extends ProcessorInfo>> classifier
= new ProcessorClassifier();
Map<Integer, ItemProcessor<ProcessorInfo, ProcessorInfo>> processorMap = new HashMap<>();
processorMap.put(1, new CustomItemProcessor1());
processorMap.put(2, new CustomItemProcessor2());
processorMap.put(3, new CustomItemProcessor3());
classifier.setProcessorMap(processorMap);
processor.setClassifier(classifier);
return processor;
}
예제는 1,2,3 차레대로 반환된다. classifier에서 PrceossInfo 의 id필드를 기준으로 map에서 꺼내온다.
'WEB > Spring Batch' 카테고리의 다른 글
스프링 배치 멀티 스레드 프로세싱 (0) | 2023.03.30 |
---|---|
스프링 배치 반복 및 오류 제어 (1) | 2023.03.25 |
스프링 배치 청크 프로세스 활용 - ItemReader (0) | 2023.03.23 |
스프링 배치 청크 프로세스 이해 (2) | 2023.03.22 |
스프링 배치 실행 - Flow (0) | 2023.03.03 |