Bladeren bron

bug修改 在缓存通道-产品信息中,唯一确定了通道产品,所以缓存数据结构需要修改

张均强 4 jaren geleden
bovenliggende
commit
f67187fc51

+ 1 - 1
common/fire-dto/src/main/java/com/fire/dto/ChannelInfo.java

@@ -82,7 +82,7 @@ public class ChannelInfo {
     @TableField(exist = false)
     @ApiModelProperty(value = "通道产品 根据通道id分第一层 根据区域分第二次 根据面额分第三层", hidden = true)
     @JsonIgnoreProperties
-    Map<String, Map<Long, List<ChannelProductInfo>>> productListMap;
+    Map<String, Map<Long, ChannelProductInfo>> productListMap;
 
     @ApiModelProperty("供应商名称")
     @TableField(exist = false)

+ 1 - 1
common/fire-dto/src/main/java/com/fire/dto/CustomerProduct.java

@@ -45,7 +45,7 @@ public class CustomerProduct {
     private Integer price;
 
     @ApiModelProperty(value = "产品面额")
-    private Integer facePrice;
+    private Long facePrice;
 
     @ApiModelProperty(value = "是否有效 (1:有效 0:无效)")
     private Integer isEffective;

+ 1 - 5
common/fire-dto/src/main/java/com/fire/dto/DistributeGroup.java

@@ -34,7 +34,7 @@ public class DistributeGroup {
     private Long channelGroupId;
 
     @ApiModelProperty("组策略")
-    private String policy;
+    private Integer policy;
 
     @ApiModelProperty("运营商 1-移动 2-电信 3-联通")
     private Integer operator;
@@ -61,10 +61,6 @@ public class DistributeGroup {
     @TableLogic
     private Integer isDelete;
 
-    @TableField(exist = false)
-    @ApiModelProperty("通道组列表")
-    private List<ChannelGroup> channelGroups;
-
     @TableField(exist = false)
     @ApiModelProperty("通道组列表")
     private List<ChannelInfo> channelInfos;

+ 1 - 1
common/fire-dto/src/main/java/com/fire/dto/FlowOrderInfo.java

@@ -65,7 +65,7 @@ public class FlowOrderInfo {
     private String gwErrorCode;
 
     @ApiModelProperty(value = "面额")
-    private Integer flowAmount;
+    private Long flowAmount;
 
     @ApiModelProperty(value = "流量下发适配器")
     private String adapterName;

+ 40 - 0
common/fire-dto/src/main/java/com/fire/dto/enums/DistributePolicy.java

@@ -0,0 +1,40 @@
+package com.fire.dto.enums;
+
+/**
+ * 分发策略
+ */
+public enum DistributePolicy {
+
+    COST_PRIOR(1, "成本优先"),
+    THROUGHPUT_PRIOR(2, "吞吐量优先"),
+    SPEED_PRIOR(3, "速度优先"),
+    SUCCESS_RATE_QUEUE(4, "成功率优先"),
+    DEFAULT_POLICY(0, "默认");
+
+    public final Integer code;
+
+    public final String desc;
+
+    DistributePolicy(Integer code, String desc) {
+        this.code = code;
+        this.desc = desc;
+    }
+
+    public Integer code() {
+        return code;
+    }
+
+    public String desc() {
+        return desc;
+    }
+
+    public static DistributePolicy getByCode(Integer code) {
+        for (DistributePolicy enums : DistributePolicy.values()) {
+            if (enums.code().equals(code)) {
+                return enums;
+            }
+        }
+        return DEFAULT_POLICY;
+    }
+
+}

+ 11 - 1
modules/distribution/src/main/java/com/fire/dist/consumer/RocketOrderConsumer.java

@@ -5,9 +5,11 @@ import com.fire.common.exception.BaseException;
 import com.fire.dist.priority.RedisPriorityQueue;
 import com.fire.dto.FlowOrderInfo;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.cloud.context.config.annotation.RefreshScope;
 import org.springframework.stereotype.Component;
@@ -31,11 +33,14 @@ import static org.apache.rocketmq.spring.annotation.ConsumeMode.ORDERLY;
 @Component
 @RefreshScope
 @RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = ORDER_TOPIC, consumeMode = ORDERLY, consumeThreadMax = 1)
-public class RocketOrderConsumer implements RocketMQListener<MessageExt> {
+public class RocketOrderConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
 
     @Value("${sleep.consume}")
     private Boolean sleepConsume;
 
+    @Value("${rocketmq.consumer.group}")
+    private String instanceName;
+
     @Resource
     private RedisPriorityQueue redisPriorityQueue;
 
@@ -67,4 +72,9 @@ public class RocketOrderConsumer implements RocketMQListener<MessageExt> {
 
     }
 
+    @Override
+    public void prepareStart(DefaultMQPushConsumer consumer) {
+        consumer.setInstanceName(instanceName);
+    }
+
 }

+ 37 - 0
modules/distribution/src/main/java/com/fire/dist/rest/TestRest.java

@@ -0,0 +1,37 @@
+package com.fire.dist.rest;
+
+import com.fire.dist.data.DataPool;
+import com.fire.dist.service.CacheService;
+import com.fire.dto.ChannelGroup;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+/**
+ * 测试接口rest类
+ *
+ * @author ZJQ 2021年6月3日11:26:39
+ */
+@Api(tags = "测试接口")
+@RestController
+public class TestRest {
+
+    @Resource
+    private CacheService cacheService;
+
+    @GetMapping("/dist/getChannel")
+    @ApiOperation(value = "查看通道缓存")
+    public Map<Long, ChannelGroup> getChannel() {
+        return DataPool.channelGroupMap;
+    }
+
+    @GetMapping("/dist/cacheChannel")
+    @ApiOperation(value = "缓存通道信息")
+    public void cacheChannel() {
+        cacheService.cacheChannel();
+    }
+}

+ 4 - 3
modules/distribution/src/main/java/com/fire/dist/service/impl/CacheServiceImpl.java

@@ -2,6 +2,7 @@ package com.fire.dist.service.impl;
 
 
 import com.alibaba.nacos.common.utils.CollectionUtils;
+import com.alibaba.nacos.common.utils.StringUtils;
 import com.fire.dist.data.DataPool;
 import com.fire.dist.mapper.*;
 import com.fire.dist.service.CacheService;
@@ -90,15 +91,15 @@ public class CacheServiceImpl implements CacheService {
 
         if (CollectionUtils.isNotEmpty(productInfos) && CollectionUtils.isNotEmpty(channelInfos) && CollectionUtils.isNotEmpty(distributeGroups) && CollectionUtils.isNotEmpty(channelGroups)) {
             //通道产品 根据 通道id第一层 区域第二层 面额分第三层 进行分组
-            Map<Long, Map<String, Map<Long, List<ChannelProductInfo>>>> productListMap = productInfos.stream().collect(Collectors.groupingBy(ChannelProductInfo::getChannelId, Collectors.groupingBy(ChannelProductInfo::getAreaNum, Collectors.groupingBy(ChannelProductInfo::getStandardPrice))));
+            Map<Long, Map<String, Map<Long, ChannelProductInfo>>> productListMap = productInfos.stream().filter(a -> a.getChannelId() != null && !StringUtils.isEmpty(a.getAreaNum()) && a.getStandardPrice() != null).collect(Collectors.groupingBy(ChannelProductInfo::getChannelId, Collectors.groupingBy(ChannelProductInfo::getAreaNum, Collectors.toMap(ChannelProductInfo::getStandardPrice, a -> a, (k1, k2) -> k2))));
             //将产品挂载到通道上面
             channelInfos.forEach(a -> a.setProductListMap(productListMap.get(a.getChannelId())));
             //通道按分发组id进行分组
-            Map<Integer, List<ChannelInfo>> channelListMap = channelInfos.stream().collect(Collectors.groupingBy(ChannelInfo::getDistributeGroupId));
+            Map<Integer, List<ChannelInfo>> channelListMap = channelInfos.stream().filter(a -> a.getDistributeGroupId() != null).collect(Collectors.groupingBy(ChannelInfo::getDistributeGroupId));
             //将通道挂载到分发组上面
             distributeGroups.forEach(a -> a.setChannelInfos(channelListMap.get(a.getDistributeGroupId())));
             //分发组按 通道组id第一层 运营商第二层 进行分组
-            Map<Long, Map<Integer, List<DistributeGroup>>> distributeGroupListMap = distributeGroups.stream().collect(Collectors.groupingBy(DistributeGroup::getChannelGroupId, Collectors.groupingBy(DistributeGroup::getOperator)));
+            Map<Long, Map<Integer, List<DistributeGroup>>> distributeGroupListMap = distributeGroups.stream().filter(a -> a.getChannelGroupId() != null && a.getOperator() != null).collect(Collectors.groupingBy(DistributeGroup::getChannelGroupId, Collectors.groupingBy(DistributeGroup::getOperator)));
             //将分发组进行排序
             distributeGroupListMap.forEach((k1, v1) -> v1.forEach((k2, v2) -> v2.sort(Comparator.comparingInt(DistributeGroup::getWeight).reversed())));
             //将分发组挂载到通道组上

+ 55 - 18
modules/distribution/src/main/java/com/fire/dist/service/impl/DistOrderServiceImpl.java

@@ -2,17 +2,21 @@ package com.fire.dist.service.impl;
 
 
 import com.alibaba.nacos.common.utils.CollectionUtils;
+import com.alibaba.nacos.common.utils.StringUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fire.dist.data.DataPool;
 import com.fire.dist.service.DistOrderService;
 import com.fire.dto.*;
+import com.fire.dto.enums.DistributePolicy;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 import redis.clients.jedis.JedisCluster;
 
 import javax.annotation.Resource;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -38,18 +42,18 @@ public class DistOrderServiceImpl implements DistOrderService {
         Long orderId = orderInfo.getOrderId();
         if (flowAppInfo == null) {
             log.error("客户信息为空,请检查缓存机制,本次订单号为:" + orderId);
-            //todo  入回调队列
+            //todo  入待发送
             return;
         }
         CustomerInfo customerInfo = flowAppInfo.getCustomerInfo();
         if (customerInfo == null) {
             log.error("客户信息为空,请检查缓存机制,本次订单号为:" + orderId);
-            //todo  入回调队列
+            //todo  入待发送
             return;
         }
         //客户是否暂停
         if (SUSPEND.status().equals(customerInfo.getStatus())) {
-            //todo 入待发送db
+            //todo 入待发送
             return;
         }
         //下单时间
@@ -61,7 +65,7 @@ public class DistOrderServiceImpl implements DistOrderService {
         int timeOut = flowAppInfo.getTime() * 1000;
         long now = System.currentTimeMillis();
         if (now - applyDate.getTime() > timeOut) {
-            //todo 超时回调  入回调队列
+            //todo 入回调队列
             return;
         }
         //取分发记录
@@ -83,34 +87,67 @@ public class DistOrderServiceImpl implements DistOrderService {
         //找到通道组
         ChannelGroup channelGroup = DataPool.channelGroupMap.get(flowAppInfo.getChannelId());
         if (channelGroup == null) {
-            //todo 回调队列 直接失败
+            //todo 入待发送
             return;
         }
         //提取通道下面的分发组
         Map<Integer, List<DistributeGroup>> distributeGroupsMap = channelGroup.getDistributeGroupsMap();
         if (distributeGroupsMap == null) {
-            //todo 回调队列 直接失败
+            //todo 入待发送
             return;
         }
         //找到对应运营商的分发组列表
         List<DistributeGroup> distributeGroups = distributeGroupsMap.get(orderInfo.getPhoneOperator());
         if (CollectionUtils.isEmpty(distributeGroups)) {
-            //todo 回调队列 直接失败
+            //todo 入待发送
             return;
         }
-        //分发组已经按权重排序
-        distributeGroups.forEach(a -> {
-
-        });
-
-        //当取不到分发记录时
-        if (childOrder == null) {
-            childOrder = new MobileFlowDispatchRec();
-            childOrder.setRecId(orderId + nowSendCount);
-
+        //分发组发送次数 这里当分发组找不到合适的产品时 分发组发送次数才加1
+        int batchCount = orderInfo.getBatchCount() == null ? 1 : orderInfo.getBatchCount();
+        //对分发次数取模确定选择分发组
+        int distributeGroupSize = distributeGroups.size();
+        //分发组下标 由分发组发送次数-1 取模
+        int distributeIndex = (batchCount - 1) % distributeGroupSize;
+        //提取到分发组
+        DistributeGroup distributeGroup = distributeGroups.get(distributeIndex);
+        List<ChannelInfo> channelInfos = distributeGroup.getChannelInfos();
+        if (CollectionUtils.isEmpty(channelInfos)) {
+            //todo 入待发送
+            return;
         }
+        //根据分发组策略进行不同的分发操作
+        DistributePolicy policy = DistributePolicy.getByCode(distributeGroup.getPolicy());
+        //分发记录中存的分发日志
+        Map<Integer, Map<Long, Integer>> recordLog = new HashMap<>();
+        if (childOrder != null && StringUtils.isEmpty(childOrder.getRecordLog())) {
+            try {
+                recordLog = om.readValue(childOrder.getRecordLog(), new TypeReference<>() {
+                });
+            } catch (JsonProcessingException e) {
+                log.error("分发记录日志转换异常", e);
+            }
+        }
+        ChannelInfo change = null;
+        //策略
+        switch (policy) {
+            case COST_PRIOR -> channelInfos.forEach(a -> {
+                //根据产品价格提取通道
+                Map<String, Map<Long, ChannelProductInfo>> productListMap = a.getProductListMap();
+                if (productListMap != null) {
+                    //根据区域提取产品
+                    Map<Long, ChannelProductInfo> longListMap = productListMap.get(orderInfo.getAreaCode());
+                    if (longListMap != null) {
+                        ChannelProductInfo productInfo = longListMap.get(orderInfo.getFlowAmount());
+                    }
 
+                }
+            });
+            case SPEED_PRIOR -> channelInfos.forEach(a -> {
+                a.getChannelId();
+            });
 
+        }
     }
 
-}
+
+}

+ 19 - 0
modules/distribution/src/main/java/com/fire/dist/task/CacheCustomerTask.java

@@ -39,5 +39,24 @@ public class CacheCustomerTask {
         log.info("spring启动时执行缓存初始化客户信息");
     }
 
+    /**
+     * 每天凌晨3时0分0秒定时缓存通道信息
+     */
+    @Async
+    @Scheduled(cron = "0 0 3 * * ?")
+    public void cacheChannel() {
+        cacheService.cacheChannel();
+    }
+
+    /**
+     * spring启动时执行缓存初始化通道信息
+     */
+    @Async
+    @PostConstruct
+    public void initChannel() {
+        cacheService.cacheChannel();
+        log.info("spring启动时执行缓存初始化通道信息");
+    }
+
 
 }

+ 0 - 1
modules/distribution/src/main/resources/mapper/ChannelGroupMapper.xml

@@ -15,7 +15,6 @@
     <!--查询全部-->
     <select id="queryAll" resultMap="AccessChannelGroupMap">
         <include refid="baseSql"/>
-        where channel_group_id = #{channelGroupId}
     </select>
 
     <sql id="baseSql">

+ 3 - 4
modules/distribution/src/main/resources/mapper/CustomerProductMapper.xml

@@ -5,12 +5,11 @@
     <resultMap type="com.fire.dto.CustomerProduct" id="CustomerProductMap">
         <result property="customerProductId" column="customer_product_id" jdbcType="INTEGER"/>
         <result property="customerId" column="customer_id" jdbcType="INTEGER"/>
-        <result property="productId" column="product_id" jdbcType="INTEGER"/>
         <result property="packageId" column="package_id" jdbcType="VARCHAR"/>
         <result property="type" column="type" jdbcType="INTEGER"/>
         <result property="price" column="price" jdbcType="NUMERIC"/>
         <result property="facePrice" column="face_price" jdbcType="NUMERIC"/>
-        <result property="isValid" column="is_valid" jdbcType="INTEGER"/>
+        <result property="isEffective" column="is_effective" jdbcType="INTEGER"/>
         <result property="areaName" column="area_name" jdbcType="VARCHAR"/>
         <result property="operator" column="operator" jdbcType="INTEGER"/>
         <result property="areaNum" column="area_num" jdbcType="VARCHAR"/>
@@ -24,7 +23,7 @@
                a.type,
                a.price,
                a.face_price,
-               a.is_valid,
+               a.is_effective,
                b.area_name,
                b.operator,
                b.area_num,
@@ -40,7 +39,7 @@
                a.type,
                a.price,
                a.face_price,
-               a.is_valid
+               a.is_effective
         from customer_product a
     </sql>
 

+ 1 - 1
modules/make-order/src/main/java/com/fire/order/service/impl/MakeOrderServiceImpl.java

@@ -327,7 +327,7 @@ public class MakeOrderServiceImpl implements MakeOrderService {
         order.setFlowAmount(product.getFacePrice());
         order.setAreaCode(product.getAreaNum());
         order.setPrice(product.getPrice());
-        order.setProductId(product.getProductId());
+//        order.setProductId(product.getProductId());
         order.setCustomerProductId(product.getCustomerProductId());
         order.setPhoneOperator(product.getOperator());
         order.setPhoneCity(phoneZone.getAreaCode());