|
@@ -0,0 +1,37 @@
|
|
|
+package com.fire.savedata.consumer;
|
|
|
+
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.fire.common.constants.RocketTopic;
|
|
|
+import com.fire.dto.TransactionFlow;
|
|
|
+import com.fire.savedata.mapper.TransactionFlowMapper;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.rocketmq.common.message.MessageExt;
|
|
|
+import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
|
|
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
+import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+@RocketMQMessageListener(consumerGroup = "TransactionTidb",topic = RocketTopic.TRANSACTION_TOPIC,consumeMode = ConsumeMode.ORDERLY)
|
|
|
+public class TransactionFlowTidbConsumer implements RocketMQListener<MessageExt> {
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private TransactionFlowMapper transactionFlowMapper;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onMessage(MessageExt msg) {
|
|
|
+ String s = new String(msg.getBody());
|
|
|
+ log.info("transaction message: " + s);
|
|
|
+ ObjectMapper om = new ObjectMapper();
|
|
|
+ try {
|
|
|
+ TransactionFlow transactionFlow = om.readValue(s, TransactionFlow.class);
|
|
|
+ transactionFlowMapper.insert(transactionFlow);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ log.error(e.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|