만들고자 하는 구조이다. 각각ms 가 서로 다른 db를 쓰고 있으니 kafka connect를 통해서 data sync를 맞추려 한다.
order serviec , catalog service 각각 서로다른 h2 db를 가지고 있다.
하고자하는것은 order service에서 주문을 한 수량 만큼 catalog service의 db에서 해당 수량만큼 줄어들게 하고 싶다.
Order Service Kafka Producer 등록
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
공식문서에는 스프링 부트를 사용시에는 해당 설정을 해주지 않아도 된다고 한다.
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String,String> kafkaTemplate;
public OrderDto send(String topic, OrderDto orderDto) {
ObjectMapper mapper = new ObjectMapper();
String jsonString = "";
try{
jsonString = mapper.writeValueAsString(orderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic,jsonString);
log.info("Kafka Producer sent data form the Order microservice: " + orderDto);
return orderDto;
}
}
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder orderDetails) {
//jpa
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
OrderDto createdOrder = orderService.createOrder(orderDto);
ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
//send this order to the kafka
kafkaProducer.send("example-catalog-topic",orderDto);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
2022-11-03 11:37:52.515 INFO 17221 --- [o-auto-1-exec-4] c.e.o.messagequeue.KafkaProducer : Kafka Producer sent data form the Order microservice: OrderDto(productId=CATALOG-003, qty=15, unitPrice=2000, totalPrice=30000, orderId=d1fa2691-cca2-4a42-acb3-ff6a852478be, userId=cefd489c-3e94-425b-8f81-77c45e1fa90f)
rest api에서 order 주문이 들어오면 order serivce의 db에 생성된 order를 저장을 하고 이 저장된 것을 kafka topic("example-catalog-topic")에 저장을 시도하게 된다.
Catalog Service Kafka Consumer등록
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory () {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
마찬가지로 부트를 사용하면 설정할 필요가 없다.
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
private final CatalogRepository catalogRepository;
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage) {
log.info("Kafka Message: -> " + kafkaMessage);
Map<Object,Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
String productId = (String) map.get("productId");
CatalogEntity entity = catalogRepository.findByProductId(productId);
if (entity != null) {
entity.setStock(entity.getStock() - (Integer)map.get("qty"));
catalogRepository.save(entity);
}
}
}
2022-11-03 11:37:52.596 INFO 23840 --- [ntainer#0-0-C-1] c.e.c.messagequeue.KafkaConsumer : Kafka Message: -> {"productId":"CATALOG-003","qty":15,"unitPrice":2000,"totalPrice":30000,"orderId":"d1fa2691-cca2-4a42-acb3-ff6a852478be","userId":"cefd489c-3e94-425b-8f81-77c45e1fa90f"}
@KafkaListener로 어떤 topic을 consume할것인지 설정해주고 해당 토픽이 흘러 왔을시에 어떤 비지니스 로직을 수 행할 것인지를 적어주면 된다.
전 포스트에서 왔던 것처럼 JSON 형식으로 consume하게 된다.
kafka connect를 사용하여 2개의 instance에서 data sync
현재 2개의 order service가 떠있는 상태이고 postman에서 order를 5번 생성 요청을 보냈다.
서로 다른 db instance를 보유하고 있고 api gateway 가 round robin 방식으로 돌리면서 호출 했기에 서로 db가 sync 가 맞지 않는 것을 확인 할 수 있다.
order service에서 kafka broker로 보낼때 위와 같은 json 형식으로 보내야 sink connect가 인식하고 topic에 저장할 수 있다.
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {
private final KafkaTemplate<String,String> kafkaTemplate;
List<Field> fields = Arrays.asList(new Field("string",true,"order_id"),
new Field("string", true, "user_id"),
new Field("string",true,"product_id"),
new Field("int32",true,"qty"),
new Field("int32",true,"unit_price"),
new Field("int32",true,"total_price"));
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
public OrderDto send(String topic, OrderDto orderDto) {
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema,payload);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic,jsonInString);
log.info("Order Producer sent data from the Order microservice: " + kafkaOrderDto);
return orderDto;
}
}
위 사진과 같은 template을 만들기위한 코드이다. json으로 만든뒤 KafkaTemplate으로 통해서 orders 토픽에 저장하게 된다.
그러면 등록 한 sink connector에서 해당 topic을 mysql로 쏴주게 된다.
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder orderDetails) {
/* jpa -> h2 or mysql */
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
// OrderDto createdOrder = orderService.createOrder(orderDto);
// ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(orderDetails.getQty() * orderDetails.getUnitPrice());
/* send this order to the kafka */
kafkaProducer.send("example-catalog-topic",orderDto);
orderProducer.send("orders",orderDto);
ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
더 이상 주문생성 요청이 왔을 때 db에 직접 저장하지 않고 orderProducer를 통해 카프카에게 전달하는 것을 확인 할 수 있다.
이때 kafkaProducer는 order service의 주문 생성 내역을 catalog service에게 전달하는 역할이다. 이 둘은 ms이고 서로 다른 db를 지니고 있기 때문이다.
2개의 instance를 켜더라도 하나의 db이니 직접 각 앱에서 jpa 를 통해 저장을 할 수 있겠지만 kafka를 통하여 토픽을 저장해두게 되면 여러 방면으로 쓰도록 할 수 있다.
한번 저장되면 여러 다른 종류의 Consumer가 해당 topic안의 message를 소비할 수 있게 되니까
'Cloud > SpringCloud로 개발하는 MSA' 카테고리의 다른 글
MS 모니터링 (wsl2 grafana) (0) | 2022.11.05 |
---|---|
장애 처리와 MS 분산 추적 (0) | 2022.11.04 |
MS간의 data 동기화 (1. kafka 기본 이론) (0) | 2022.11.02 |
MS간 통신 (0) | 2022.10.29 |
gateway 인증 + Cloud Config + Cloud Bus(rabbitMQ) + 암호화 (0) | 2022.10.28 |