|
@@ -0,0 +1,47 @@
|
|
|
|
+package com.fire.savedata.consumer;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
+import com.fire.common.constants.RocketTopic;
|
|
|
|
+import com.fire.es.DispatchDto;
|
|
|
|
+import com.fire.es.OrderEsDto;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.apache.rocketmq.common.message.MessageExt;
|
|
|
|
+import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
|
|
|
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
|
+import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
+import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
|
|
|
|
+import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
|
|
|
|
+import org.springframework.data.elasticsearch.core.query.IndexQuery;
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
+
|
|
|
|
+@Slf4j
|
|
|
|
+@Service
|
|
|
|
+@RocketMQMessageListener(consumerGroup = "DispatchEs",topic = RocketTopic.CHILD_ORDER_TOPIC,consumeMode = ConsumeMode.ORDERLY)
|
|
|
|
+public class DispatchEsConsumer implements RocketMQListener<MessageExt> {
|
|
|
|
+
|
|
|
|
+ @Autowired
|
|
|
|
+ private ElasticsearchRestTemplate restTemplate;
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onMessage(MessageExt msg) {
|
|
|
|
+ String s = new String(msg.getBody());
|
|
|
|
+ log.info("dispathES message: " + s);
|
|
|
|
+ ObjectMapper om = new ObjectMapper();
|
|
|
|
+ try {
|
|
|
|
+ DispatchDto dispatchDto = om.readValue(s, DispatchDto.class);
|
|
|
|
+ IndexQuery indexQuery = new IndexQuery();
|
|
|
|
+ indexQuery.setId(dispatchDto.getRecId());
|
|
|
|
+ indexQuery.setObject(dispatchDto);
|
|
|
|
+ IndexCoordinates indexCoordinates = IndexCoordinates.of("mobile_flow_dispatch_rec");
|
|
|
|
+ restTemplate.index(indexQuery, indexCoordinates);
|
|
|
|
+ } catch (JsonProcessingException e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ log.error(e.toString());
|
|
|
|
+ //throw new BaseException("ES消费出错");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+}
|