Преглед на файлове

功能提交 交易流水表入ES

秦国才 преди 4 години
родител
ревизия
cb5473cd3c

+ 1 - 1
common/fire-dto/src/main/java/com/fire/es/DispatchDto.java

@@ -9,7 +9,7 @@ import org.springframework.data.elasticsearch.annotations.*;
 import java.util.Date;
 
 @Data
-@Document(indexName = "mobile_flow_dispatch_rec", shards = 3)
+@Document(indexName = "mobile_flow_dispatch_rec", shards = 6)
 public class DispatchDto {
     /**
      * 分发ID号

+ 1 - 1
common/fire-dto/src/main/java/com/fire/es/OrderEsDto.java

@@ -10,7 +10,7 @@ import org.springframework.data.elasticsearch.annotations.*;
 import java.util.Date;
 
 @Data
-@Document(indexName = "flow_order_info", shards = 3)
+@Document(indexName = "flow_order_info", shards = 6)
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class OrderEsDto {
     /**

+ 71 - 0
common/fire-dto/src/main/java/com/fire/es/TransactionESDto.java

@@ -0,0 +1,71 @@
+package com.fire.es;
+
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.elasticsearch.annotations.*;
+
+import java.util.Date;
+
+@ApiModel(value = "交易流水ES实体")
+@Data
+@Document(indexName = "transaction_flow", shards = 6)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TransactionESDto {
+
+    @Id
+    @ApiModelProperty(value = "流水号")
+    private String seqNo;
+
+    @ApiModelProperty(value = "入账时间")
+    @Field(type = FieldType.Date)
+    private Date createTime;
+
+    @ApiModelProperty(value = "服务类型:1、订单扣款 2、订单退款 3、财务加款4、财务撤销")
+    @Field(type = FieldType.Integer)
+    private Integer serviceType;
+
+    @ApiModelProperty(value = "操作前金额")
+    @Field(type = FieldType.Long)
+    private Long beforeAmount;
+
+    @ApiModelProperty(value = "操作金额")
+    @Field(type = FieldType.Long)
+    private Long operatingAmount;
+
+    @ApiModelProperty(value = "操作后金额")
+    @Field(type = FieldType.Long)
+    private Long afterAmount;
+
+    @ApiModelProperty(value = "1、客户流水;2、供应商流水")
+    @Field(type = FieldType.Integer)
+    private Integer distinguish;
+
+    @ApiModelProperty(value = "客户/供应商账户")
+    @Field(type = FieldType.Long)
+    private Long relationId;
+
+    @ApiModelProperty(value = "商户订单号")
+    @Field(type = FieldType.Keyword)
+    private String extorderId;
+
+    @ApiModelProperty(value = "业务描述")
+    @MultiField(
+            mainField = @Field(type=FieldType.Text, analyzer = "ik_max_word"),
+            otherFields = @InnerField(suffix = "keyword", type=FieldType.Keyword))
+    private String note;
+
+    @ApiModelProperty(value = "订单号")
+    @Field(type = FieldType.Long)
+    private Long orderId;
+
+    @ApiModelProperty(value = "操作人id")
+    @Field(type = FieldType.Long)
+    private Long operatorId;
+}

+ 11 - 0
modules/admin/src/main/java/com/fire/admin/mapper/TransactionRepository.java

@@ -0,0 +1,11 @@
+package com.fire.admin.mapper;
+
+import com.fire.es.TransactionESDto;
+import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
+import org.springframework.stereotype.Service;
+
+@Service
+public interface TransactionRepository extends ElasticsearchRepository<TransactionESDto, String> {
+
+
+}

+ 49 - 0
modules/save-data/src/main/java/com/fire/savedata/consumer/TransactionFlowEsConsumer.java

@@ -0,0 +1,49 @@
+package com.fire.savedata.consumer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fire.common.constants.RocketTopic;
+import com.fire.dto.TransactionFlow;
+import com.fire.es.OrderEsDto;
+import com.fire.es.TransactionESDto;
+import com.fire.savedata.mapper.TransactionFlowMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.ConsumeMode;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
+import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
+import org.springframework.data.elasticsearch.core.query.IndexQuery;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+
+@Slf4j
+@Service
+@RocketMQMessageListener(consumerGroup = "TransactionEs",topic = RocketTopic.TRANSACTION_TOPIC,consumeMode = ConsumeMode.ORDERLY)
+public class TransactionFlowEsConsumer implements RocketMQListener<MessageExt> {
+
+
+    @Autowired
+    private ElasticsearchRestTemplate restTemplate;
+
+    @Override
+    public void onMessage(MessageExt msg) {
+        String s = new String(msg.getBody());
+        log.info("transactionES message: " + s);
+        ObjectMapper om = new ObjectMapper();
+        try {
+            TransactionESDto transactionESDto =  om.readValue(s, TransactionESDto.class);
+            IndexQuery indexQuery = new IndexQuery();
+            indexQuery.setId(transactionESDto.getSeqNo());
+            indexQuery.setObject(transactionESDto);
+            IndexCoordinates indexCoordinates = IndexCoordinates.of("transaction_flow");
+            restTemplate.index(indexQuery, indexCoordinates);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            log.error(e.toString());
+        }
+    }
+}

+ 1 - 1
modules/save-data/src/main/java/com/fire/savedata/consumer/TransactionFlowTidbConsumer.java

@@ -24,7 +24,7 @@ public class TransactionFlowTidbConsumer implements RocketMQListener<MessageExt>
     @Override
     public void onMessage(MessageExt msg) {
         String s = new String(msg.getBody());
-        log.info("transaction message: " + s);
+        log.info("transactionTidb message: " + s);
         ObjectMapper om = new ObjectMapper();
         try {
             TransactionFlow transactionFlow = om.readValue(s, TransactionFlow.class);