Prechádzať zdrojové kódy

任务提交 分发逻辑完成

张均强 4 rokov pred
rodič
commit
77877d7b30
25 zmenil súbory, kde vykonal 424 pridanie a 73 odobranie
  1. 20 0
      common/fire-common/src/main/java/com/fire/common/constants/RocketTags.java
  2. 4 0
      common/fire-common/src/main/java/com/fire/common/constants/RocketTopic.java
  3. 5 1
      common/fire-dto/src/main/java/com/fire/dto/ChannelInfo.java
  4. 1 1
      common/fire-dto/src/main/java/com/fire/dto/FlowOrderInfo.java
  5. 7 5
      common/fire-dto/src/main/java/com/fire/dto/MobileFlowDispatchRec.java
  6. 3 1
      common/fire-dto/src/main/java/com/fire/dto/enums/OrderStatus.java
  7. 2 2
      common/fire-dto/src/main/java/com/fire/dto/enums/RedisKey.java
  8. 1 1
      common/fire-utils/src/main/java/com/fire/utils/string/FireStringUtils.java
  9. 0 3
      modules/admin/src/main/java/com/fire/admin/service/impl/CustomerServiceImpl.java
  10. 2 0
      modules/distribution/src/main/java/com/fire/dist/DistributionApplication.java
  11. 50 0
      modules/distribution/src/main/java/com/fire/dist/consumer/RedisOrderConsumer.java
  12. 6 16
      modules/distribution/src/main/java/com/fire/dist/consumer/RocketOrderConsumer.java
  13. 1 1
      modules/distribution/src/main/java/com/fire/dist/consumer/RocketUpdateConsumer.java
  14. 2 2
      modules/distribution/src/main/java/com/fire/dist/service/DistOrderService.java
  15. 25 0
      modules/distribution/src/main/java/com/fire/dist/service/OrderUpdateService.java
  16. 1 1
      modules/distribution/src/main/java/com/fire/dist/service/impl/CacheServiceImpl.java
  17. 146 30
      modules/distribution/src/main/java/com/fire/dist/service/impl/DistOrderServiceImpl.java
  18. 84 0
      modules/distribution/src/main/java/com/fire/dist/service/impl/OrderUpdateServiceImpl.java
  19. 2 0
      modules/distribution/src/main/java/com/fire/dist/task/CacheCustomerTask.java
  20. 44 0
      modules/distribution/src/main/java/com/fire/dist/utils/WeightRand.java
  21. 1 0
      modules/distribution/src/main/resources/bootstrap.yml
  22. 3 1
      modules/distribution/src/main/resources/mapper/ChannelInfoMapper.xml
  23. 1 1
      modules/distribution/src/main/resources/mapper/ChannelProductInfoMapper.xml
  24. 1 0
      modules/distribution/src/main/resources/mapper/FlowAppInfoMapper.xml
  25. 12 7
      modules/make-order/src/main/java/com/fire/order/service/impl/MakeOrderServiceImpl.java

+ 20 - 0
common/fire-common/src/main/java/com/fire/common/constants/RocketTags.java

@@ -10,6 +10,26 @@ public class RocketTags {
      * 下单的订单tag
      */
     public static final String MAKE_TAG = "make";
+    /**
+     * 待发送的tag
+     */
+    public static final String WAIT_TAG = "wait";
+    /**
+     * 待发送的tag
+     */
+    public static final String EXCEPTION_TAG = "exception";
+    /**
+     * 充值中
+     */
+    public static final String SENDING_TAG = "sending";
+    /**
+     * 失败订单
+     */
+    public static final String FAIL_TAG = "fail";
+    /**
+     * 成功订单
+     */
+    public static final String SUCCESS_TAG = "success";
     /**
      * 客户+产品tag
      */

+ 4 - 0
common/fire-common/src/main/java/com/fire/common/constants/RocketTopic.java

@@ -10,6 +10,10 @@ public class RocketTopic {
      * 订单topic
      */
     public static final String ORDER_TOPIC = "order";
+    /**
+     * 子订单topic
+     */
+    public static final String CHILD_ORDER_TOPIC = "child";
     /**
      * 缓存更新topic
      */

+ 5 - 1
common/fire-dto/src/main/java/com/fire/dto/ChannelInfo.java

@@ -77,7 +77,7 @@ public class ChannelInfo {
 
     @TableField(exist = false)
     @ApiModelProperty(value = "分发组id", hidden = true)
-    private Integer distributeGroupId;
+    private Long distributeGroupId;
 
     @TableField(exist = false)
     @ApiModelProperty(value = "通道产品 根据通道id分第一层 根据面额分第二次 根据区域分第三层", hidden = true)
@@ -87,4 +87,8 @@ public class ChannelInfo {
     @ApiModelProperty("供应商名称")
     @TableField(exist = false)
     private String supplierName;
+
+    @TableField(exist = false)
+    @ApiModelProperty(value = "通道权重", hidden = true)
+    private Integer weight;
 }

+ 1 - 1
common/fire-dto/src/main/java/com/fire/dto/FlowOrderInfo.java

@@ -95,7 +95,7 @@ public class FlowOrderInfo {
     private Long customerProductId;
 
     @ApiModelProperty(value = "通道ID")
-    private Integer channelId;
+    private Long channelId;
 
     @ApiModelProperty(value = "回调时间")
     private Date callbackTime;

+ 7 - 5
common/fire-dto/src/main/java/com/fire/dto/MobileFlowDispatchRec.java

@@ -4,17 +4,19 @@ import java.util.Date;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import lombok.Builder;
 import lombok.Data;
 
 @ApiModel(value = "分发记录实体")
 @Data
+@Builder
 public class MobileFlowDispatchRec {
 
     @ApiModelProperty(value = "分发ID号")
     private Long recId;
 
     @ApiModelProperty(value = "分发订单号")
-    private String orderId;
+    private Long orderId;
 
     @ApiModelProperty(value = "手机号码")
     private String phoneNo;
@@ -23,7 +25,7 @@ public class MobileFlowDispatchRec {
     private String packageId;
 
     @ApiModelProperty(value = "面额")
-    private Integer flowAmount;
+    private Long flowAmount;
 
     @ApiModelProperty(value = "分发次数")
     private Integer sendCount;
@@ -59,19 +61,19 @@ public class MobileFlowDispatchRec {
     private String lastCallbackMsg;
 
     @ApiModelProperty(value = "运营商结算价格")
-    private Integer operatorBalancePrice;
+    private Long operatorBalancePrice;
 
     @ApiModelProperty(value = "通道组次数")
     private Integer batchCount;
 
     @ApiModelProperty(value = "客户产品id")
-    private String customerProductsId;
+    private Long customerProductsId;
 
     @ApiModelProperty(value = "客户名称")
     private String customerName;
 
     @ApiModelProperty(value = "通道产品id")
-    private String channelProductsId;
+    private Long channelProductsId;
 
     @ApiModelProperty(value = "运营商订单号")
     private String operatorId;

+ 3 - 1
common/fire-dto/src/main/java/com/fire/dto/enums/OrderStatus.java

@@ -7,7 +7,9 @@ public enum OrderStatus {
     ORDER_SENT(2, "已发"),
     ORDER_FAIL(4, "失败"),
     ORDER_RISK(5, "风控"),
-    ORDER_SUCCESS(6, "成功");
+    ORDER_SUCCESS(6, "成功"),
+    ORDER_IN_QUEUE(7, "队列中"),
+    ORDER_EXCEPTION(8, "异常订单");
 
     private final Integer status;
 

+ 2 - 2
common/fire-dto/src/main/java/com/fire/dto/enums/RedisKey.java

@@ -5,8 +5,8 @@ public enum RedisKey {
     //系统
     ORDER_ID_INCR("order.id.incr", "自增订单id"),
     GLOBAL_ID_INCR("global.id.incr", "全局自增id"),
-    ORDER_INFO("order.info", "订单详情"),
-    CHILD_ORDER_INFO("child.order.info", "子订单详情"),
+    ORDER_INFO("order.info.", "订单详情"),
+    CHILD_ORDER_INFO("child.order.info.", "子订单详情"),
     CUSTOMER_AMOUNT("{customer.amount}", "客户金额"),
     SUPPLIER_AMOUNT("{supplier.amount}", "供应商金额"),
     ORDER_DOWN("{down.order}", "客户下单号 非订单号"),

+ 1 - 1
common/fire-utils/src/main/java/com/fire/utils/string/FireStringUtils.java

@@ -21,7 +21,7 @@ public class FireStringUtils {
         if (null == str || "".equals(str)) {
             return b;
         }
-        Pattern p = Pattern.compile("^[1][3,4,5,7,8][0-9]{9}$"); // 验证手机号
+        Pattern p = Pattern.compile("^[1][3,4,5,6,7,8,9][0-9]{9}$"); // 验证手机号
         Matcher m = p.matcher(str);
         b = m.matches();
         return b;

+ 0 - 3
modules/admin/src/main/java/com/fire/admin/service/impl/CustomerServiceImpl.java

@@ -33,12 +33,9 @@ import java.time.LocalDateTime;
 import java.util.List;
 
 import static com.fire.common.constants.RocketTags.CONSUMER_PRODUCT_TAG;
-import static com.fire.common.constants.RocketTags.MAKE_TAG;
-import static com.fire.common.constants.RocketTopic.ORDER_TOPIC;
 import static com.fire.common.constants.RocketTopic.UPDATE_TOPIC;
 import static com.fire.dto.enums.RedisKey.CUSTOMER_AMOUNT;
 import static com.fire.dto.enums.RedisKey.GLOBAL_ID_INCR;
-import static com.fire.dto.enums.RocketQueue.ORDER_QUEUE;
 
 /**
  * @author: liuliu

+ 2 - 0
modules/distribution/src/main/java/com/fire/dist/DistributionApplication.java

@@ -5,6 +5,7 @@ import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
+import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
 /**
@@ -18,6 +19,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
 @EnableSwagger2Doc
 @MapperScan({"com.fire.dist.mapper"})
 @EnableScheduling
+@EnableAsync
 public class DistributionApplication {
     public static void main(String[] args) {
         SpringApplication.run(DistributionApplication.class, args);

+ 50 - 0
modules/distribution/src/main/java/com/fire/dist/consumer/RedisOrderConsumer.java

@@ -0,0 +1,50 @@
+package com.fire.dist.consumer;
+
+import com.fire.common.redis.RedisPriorityQueueScript;
+import com.fire.dist.service.DistOrderService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.cloud.context.config.annotation.RefreshScope;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+
+/**
+ * redis优先级队列消费
+ *
+ * @author ZJQ 2021年6月15日14:59:42
+ */
+
+@Slf4j
+@Component
+@RefreshScope
+public class RedisOrderConsumer {
+
+    @Value("${sleep.consume}")
+    private Boolean sleepConsume;
+
+    @Resource
+    private RedisPriorityQueueScript redisPriorityQueueScript;
+
+    @Resource
+    private DistOrderService distOrderService;
+
+    /**
+     * 监听redis优先级队列
+     */
+    @PostConstruct
+    public void onMessage() {
+        //如果不暂停那么立刻取消息出来
+        while (!sleepConsume) {
+            //消费队列
+            String orderStr = redisPriorityQueueScript.getContent();
+            if (orderStr != null) {
+                distOrderService.distOrder(orderStr);
+            }
+        }
+    }
+
+
+}

+ 6 - 16
modules/distribution/src/main/java/com/fire/dist/consumer/RocketOrderConsumer.java

@@ -2,14 +2,12 @@ package com.fire.dist.consumer;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fire.common.exception.BaseException;
-import com.fire.dist.priority.RedisPriorityQueue;
+import com.fire.common.redis.RedisPriorityQueueScript;
 import com.fire.dto.FlowOrderInfo;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.cloud.context.config.annotation.RefreshScope;
 import org.springframework.stereotype.Component;
@@ -32,17 +30,14 @@ import static org.apache.rocketmq.spring.annotation.ConsumeMode.ORDERLY;
 @Slf4j
 @Component
 @RefreshScope
-@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = ORDER_TOPIC, consumeMode = ORDERLY, consumeThreadMax = 1)
-public class RocketOrderConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
+@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = ORDER_TOPIC, consumeThreadMax = 100)
+public class RocketOrderConsumer implements RocketMQListener<MessageExt> {
 
     @Value("${sleep.consume}")
     private Boolean sleepConsume;
 
-    @Value("${rocketmq.consumer.group}")
-    private String instanceName;
-
     @Resource
-    private RedisPriorityQueue redisPriorityQueue;
+    private RedisPriorityQueueScript redisPriorityQueueScript;
 
     @Override
     public void onMessage(MessageExt msg) {
@@ -62,19 +57,14 @@ public class RocketOrderConsumer implements RocketMQListener<MessageExt>, Rocket
 
             try {
                 FlowOrderInfo orderInfo = om.readValue(orderStr, FlowOrderInfo.class);
-                redisPriorityQueue.push(orderInfo.getOrderId(), orderStr);
+                redisPriorityQueueScript.addContent(orderInfo.getOrderId(), orderStr);
             } catch (Exception e) {
                 log.error("订单消息解析失败,消息体为:" + orderStr, e);
-                throw new BaseException("订单消息解析失败");
+                throw new BaseException("订单消息解析失败",e);
             }
         }
 
 
     }
 
-    @Override
-    public void prepareStart(DefaultMQPushConsumer consumer) {
-        consumer.setInstanceName(instanceName);
-    }
-
 }

+ 1 - 1
modules/distribution/src/main/java/com/fire/dist/consumer/RocketUpdateConsumer.java

@@ -24,7 +24,7 @@ import static com.fire.common.constants.RocketTopic.UPDATE_TOPIC;
 
 @Slf4j
 @Component
-@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = UPDATE_TOPIC)
+@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.update-group}", topic = UPDATE_TOPIC)
 public class RocketUpdateConsumer implements RocketMQListener<MessageExt> {
 
     @Resource

+ 2 - 2
modules/distribution/src/main/java/com/fire/dist/service/DistOrderService.java

@@ -13,8 +13,8 @@ public interface DistOrderService {
     /**
      * 分发方法
      *
-     * @param orderInfo 订单
+     * @param orderStr 订单
      */
-    void distOrder(FlowOrderInfo orderInfo);
+    void distOrder(String orderStr);
 
 }

+ 25 - 0
modules/distribution/src/main/java/com/fire/dist/service/OrderUpdateService.java

@@ -0,0 +1,25 @@
+package com.fire.dist.service;
+
+
+import com.fire.dto.FlowOrderInfo;
+import com.fire.dto.MobileFlowDispatchRec;
+
+/**
+ * 客户服务层
+ *
+ * @author ZJQ 2021年6月17日15:51:14
+ */
+public interface OrderUpdateService {
+
+    /**
+     * 订单更新redis 和发送更新消息
+     */
+    void updateOrder(FlowOrderInfo orderInfo, String tag);
+
+    /**
+     * redis 和发送更新消息
+     */
+    void updateChildOrder(MobileFlowDispatchRec childOrder, String tag);
+
+
+}

+ 1 - 1
modules/distribution/src/main/java/com/fire/dist/service/impl/CacheServiceImpl.java

@@ -95,7 +95,7 @@ public class CacheServiceImpl implements CacheService {
             //将产品挂载到通道上面
             channelInfos.forEach(a -> a.setProductListMap(productListMap.get(a.getChannelId())));
             //通道按分发组id进行分组
-            Map<Integer, List<ChannelInfo>> channelListMap = channelInfos.stream().filter(a -> a.getDistributeGroupId() != null).collect(Collectors.groupingBy(ChannelInfo::getDistributeGroupId));
+            Map<Long, List<ChannelInfo>> channelListMap = channelInfos.stream().filter(a -> a.getDistributeGroupId() != null).collect(Collectors.groupingBy(ChannelInfo::getDistributeGroupId));
             //将通道挂载到分发组上面
             distributeGroups.forEach(a -> a.setChannelInfos(channelListMap.get(a.getDistributeGroupId())));
             //分发组按 通道组id第一层 运营商第二层 进行分组

+ 146 - 30
modules/distribution/src/main/java/com/fire/dist/service/impl/DistOrderServiceImpl.java

@@ -8,15 +8,21 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fire.dist.data.DataPool;
 import com.fire.dist.service.DistOrderService;
+import com.fire.dist.service.OrderUpdateService;
+import com.fire.dist.utils.WeightRand;
 import com.fire.dto.*;
 import com.fire.dto.enums.DistributePolicy;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import redis.clients.jedis.JedisCluster;
 
 import javax.annotation.Resource;
 import java.util.*;
 
+import static com.fire.common.constants.RocketTags.*;
+import static com.fire.dto.enums.DistributePolicy.DEFAULT_POLICY;
+import static com.fire.dto.enums.OrderStatus.*;
 import static com.fire.dto.enums.Province.QG_ALL;
 import static com.fire.dto.enums.RedisKey.CHILD_ORDER_INFO;
 import static com.fire.dto.enums.ValidStatus.SUSPEND;
@@ -32,45 +38,74 @@ public class DistOrderServiceImpl implements DistOrderService {
 
     @Resource
     private JedisCluster jedisCluster;
+    @Resource
+    private OrderUpdateService orderUpdateService;
 
     @Override
-    public void distOrder(FlowOrderInfo orderInfo) {
+    @Async
+    public void distOrder(String orderStr) {
+
+        FlowOrderInfo orderInfo;
+        ObjectMapper om = new ObjectMapper();
+        try {
+            orderInfo = om.readValue(orderStr, FlowOrderInfo.class);
+        } catch (Exception e) {
+            //TODO 订单解析异常告警
+            log.error("订单消息解析失败,消息体为:" + orderStr, e);
+            return;
+        }
         //取客户信息,进行分发校验
         FlowAppInfo flowAppInfo = DataPool.flowAppInfoMap.get(orderInfo.getAppId());
         Long orderId = orderInfo.getOrderId();
         if (flowAppInfo == null) {
+            //未找到客户信息异常,更新为异常
             log.error("客户信息为空,请检查缓存机制,本次订单号为:" + orderId);
-            //todo  入待发送
+            orderInfo.setStatus(ORDER_EXCEPTION.status());
+            orderInfo.setNote("未获取到该订单对应的客户信息,检查客户信息缓存");
+            orderUpdateService.updateOrder(orderInfo, EXCEPTION_TAG);
             return;
         }
         CustomerInfo customerInfo = flowAppInfo.getCustomerInfo();
         if (customerInfo == null) {
+            //未找到客户信息异常,更新为异常
             log.error("客户信息为空,请检查缓存机制,本次订单号为:" + orderId);
-            //todo  入待发送
+            orderInfo.setStatus(ORDER_WAIT_SEND.status());
+            orderInfo.setNote("未获取到该订单对应的客户信息,检查客户信息缓存");
+            orderUpdateService.updateOrder(orderInfo, EXCEPTION_TAG);
             return;
         }
         //客户是否暂停
         if (SUSPEND.status().equals(customerInfo.getStatus())) {
-            //todo 入待发送
+            //客户暂停,入待发送
+            orderInfo.setStatus(ORDER_WAIT_SEND.status());
+            orderInfo.setNote("未获取到该订单对应的客户信息,检查客户信息缓存");
+            orderUpdateService.updateOrder(orderInfo, WAIT_TAG);
             return;
         }
         //下单时间
         Date applyDate = orderInfo.getApplyDate();
         if (applyDate == null) {
-            log.error("订单下单时间为空,请检查系统bug,订单号为:" + orderId);
+            //下单时间为空,入异常
+            orderInfo.setStatus(ORDER_WAIT_SEND.status());
+            orderInfo.setNote("下单时间为空,异常请检查代码");
+            orderUpdateService.updateOrder(orderInfo, EXCEPTION_TAG);
             return;
         }
-        int timeOut = flowAppInfo.getTime() * 1000;
+        Integer timeOut = flowAppInfo.getTime();
+
         long now = System.currentTimeMillis();
-        if (now - applyDate.getTime() > timeOut) {
-            //todo 入回调队列
+        if (timeOut != null && now - applyDate.getTime() > timeOut * 1000) {
+            //订单超时失败
+            orderInfo.setStatus(ORDER_FAIL.status());
+            orderInfo.setNote("订单超时,失败");
+            orderUpdateService.updateOrder(orderInfo, FAIL_TAG);
             return;
         }
         //取分发记录
         Integer sendCount = orderInfo.getSendCount();
         Integer nowSendCount = sendCount == null ? 1 : sendCount + 1;
         MobileFlowDispatchRec childOrder = null;
-        ObjectMapper om = new ObjectMapper();
+
         if (sendCount != null) {
             Long childId = orderId + sendCount;
             String obj = jedisCluster.hget(CHILD_ORDER_INFO.key(), String.valueOf(childId));
@@ -85,19 +120,28 @@ public class DistOrderServiceImpl implements DistOrderService {
         //找到通道组
         ChannelGroup channelGroup = DataPool.channelGroupMap.get(flowAppInfo.getChannelId());
         if (channelGroup == null) {
-            //todo 入待发送
+            //客户未找到通道组,入待发送
+            orderInfo.setStatus(ORDER_WAIT_SEND.status());
+            orderInfo.setNote("未找到通道组,入待发送");
+            orderUpdateService.updateOrder(orderInfo, WAIT_TAG);
             return;
         }
         //提取通道下面的分发组
         Map<Integer, List<DistributeGroup>> distributeGroupsMap = channelGroup.getDistributeGroupsMap();
         if (distributeGroupsMap == null) {
-            //todo 入待发送
+            //没有合适的分发组
+            orderInfo.setStatus(ORDER_WAIT_SEND.status());
+            orderInfo.setNote("没有合适的分发组,入待发送");
+            orderUpdateService.updateOrder(orderInfo, WAIT_TAG);
             return;
         }
         //找到对应运营商的分发组列表
         List<DistributeGroup> distributeGroups = distributeGroupsMap.get(orderInfo.getPhoneOperator());
         if (CollectionUtils.isEmpty(distributeGroups)) {
-            //todo 入待发送
+            //没有合适的分发组
+            orderInfo.setStatus(ORDER_WAIT_SEND.status());
+            orderInfo.setNote("没有合适的分发组,入待发送");
+            orderUpdateService.updateOrder(orderInfo, WAIT_TAG);
             return;
         }
         //分发记录中存的分发日志
@@ -116,6 +160,7 @@ public class DistOrderServiceImpl implements DistOrderService {
         //选择符合条件的通道产品,存入list中
         List<ChannelProductInfo> changeProducts = new ArrayList<>();
         //当取不到分发组通道 或者 通道产品时  分发组发送次数+1
+        DistributePolicy policy = DEFAULT_POLICY;
         while (batchCount <= flowAppInfo.getTotalCount()) {
             //对分发次数取模确定选择分发组
             int distributeGroupSize = distributeGroups.size();
@@ -125,11 +170,11 @@ public class DistOrderServiceImpl implements DistOrderService {
             DistributeGroup distributeGroup = distributeGroups.get(distributeIndex);
             List<ChannelInfo> channelInfos = distributeGroup.getChannelInfos();
             if (CollectionUtils.isEmpty(channelInfos)) {
+                //
                 batchCount += 1;
                 continue;
             }
-            //根据分发组策略进行不同的分发操作
-            DistributePolicy policy = DistributePolicy.getByCode(distributeGroup.getPolicy());
+
             //当前分发组发送次数的发送记录
             Map<Long, Integer> thisLog = recordLog.get(batchCount);
 
@@ -148,11 +193,13 @@ public class DistOrderServiceImpl implements DistOrderService {
                         ChannelProductInfo productInfo = strListMap.get(orderInfo.getAreaCode());
                         //分省资源
                         if (productInfo != null) {
+                            productInfo.setChannelInfo(channelInfo);
                             changeProducts.add(productInfo);
                         }
                         ChannelProductInfo productInfoAll = strListMap.get(QG_ALL.getCode());
                         if (productInfoAll != null) {
-                            changeProducts.add(productInfo);
+                            productInfoAll.setChannelInfo(channelInfo);
+                            changeProducts.add(productInfoAll);
                         }
 
                     }
@@ -162,24 +209,93 @@ public class DistOrderServiceImpl implements DistOrderService {
                 batchCount += 1;
             } else {
                 //如果 取到了产品 那么退出
+                //取到分发组策略
+                policy = DistributePolicy.getByCode(distributeGroup.getPolicy());
                 break;
             }
         }
-
-
-        //策略
-//        switch (policy) {
-//            case COST_PRIOR -> {
-//
-//            }
-//            case SPEED_PRIOR -> {
-//                for (ChannelInfo channelInfo : channelInfos) {
-//                    //根据产品价格提取通道
-//
-//                }
-//            }
-//
-//        }
+        //最终都没取到有两种情况 1是分发过找不到 2是没分发过找不到
+        if (CollectionUtils.isEmpty(changeProducts)) {
+            orderInfo.setSendCount(nowSendCount);
+            orderInfo.setBatchCount(batchCount);
+            //分发过那么直接入失败队列
+            if (childOrder != null) {
+                orderInfo.setStatus(ORDER_FAIL.status());
+                orderInfo.setNote("达到最大重试次数,失败");
+                orderUpdateService.updateOrder(orderInfo, FAIL_TAG);
+            } else {
+                //没有合适的通道
+                orderInfo.setStatus(ORDER_WAIT_SEND.status());
+                orderInfo.setNote("没有合适的通道,入待发送");
+                orderUpdateService.updateOrder(orderInfo, WAIT_TAG);
+            }
+            return;
+        }
+        //找到了就根据策略来分发
+        //策略 最终选择到的产品
+        ChannelProductInfo changeChannelProduct = null;
+        switch (policy) {
+            case COST_PRIOR -> {
+                //成本优先 按成本排序 升序排列
+                changeProducts.sort(Comparator.comparingLong(ChannelProductInfo::getPrice));
+                //提取最低成本
+                List<ChannelProductInfo> minList = new ArrayList<>(3);
+                for (ChannelProductInfo productInfo : changeProducts) {
+                    if (productInfo.getPrice().equals(changeProducts.get(0).getPrice())) {
+                        minList.add(productInfo);
+                    } else {
+                        break;
+                    }
+                }
+                //取出最低成本的通道 按吞吐量优先
+                changeChannelProduct = WeightRand.nextItem(minList);
+            }
+            case THROUGHPUT_PRIOR -> changeChannelProduct = WeightRand.nextItem(changeProducts);
+        }
+        if (changeChannelProduct != null) {
+            orderInfo.setSendCount(nowSendCount);
+            orderInfo.setBatchCount(batchCount);
+            orderInfo.setStatus(ORDER_SENT.status());
+            orderInfo.setNote("没有合适的通道,入待发送");
+            orderInfo.setChannelId(changeChannelProduct.getChannelId());
+            orderUpdateService.updateOrder(orderInfo, SENDING_TAG);
+            //更新历史发送记录
+            Map<Long, Integer> thisLog = recordLog.get(batchCount);
+            if (thisLog == null) {
+                thisLog = new HashMap<>();
+            }
+            thisLog.put(changeChannelProduct.getChannelId(), 1);
+            recordLog.put(batchCount, thisLog);
+            String logStr = null;
+            try {
+                logStr = om.writeValueAsString(thisLog);
+            } catch (JsonProcessingException e) {
+                log.error("存储分发记录中,分发日志转换时异常", e);
+            }
+            MobileFlowDispatchRec newChildOrder = MobileFlowDispatchRec.builder()
+                    .orderId(orderId)
+                    .recId(orderId + nowSendCount)
+                    .batchCount(batchCount)
+                    .phoneNo(orderInfo.getPhoneNo())
+                    .packageId(changeChannelProduct.getPackageId())
+                    .flowAmount(changeChannelProduct.getFacePrice())
+                    .createDate(new Date())
+                    .sendStatus(ORDER_SENT.status())
+                    .phoneHome(orderInfo.getPhoneHome())
+                    .appId(orderInfo.getAppId())
+                    .operatorBalancePrice(changeChannelProduct.getPrice())
+                    .batchCount(batchCount)
+                    .sendCount(nowSendCount)
+                    .customerProductsId(orderInfo.getCustomerProductId())
+                    .customerName(orderInfo.getCustomerName())
+                    .channelProductsId(changeChannelProduct.getChannelProductId())
+                    .recordLog(logStr)
+                    .channelName(changeChannelProduct.getChannelInfo().getChannelName())
+                    .phoneOperator(orderInfo.getPhoneOperator())
+                    .build();
+            //分发记录入redis,并且发送消息,后面入db和es
+            orderUpdateService.updateChildOrder(newChildOrder, MAKE_TAG);
+        }
     }
 
 

+ 84 - 0
modules/distribution/src/main/java/com/fire/dist/service/impl/OrderUpdateServiceImpl.java

@@ -0,0 +1,84 @@
+package com.fire.dist.service.impl;
+
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fire.dist.service.OrderUpdateService;
+import com.fire.dto.FlowOrderInfo;
+import com.fire.dto.MobileFlowDispatchRec;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Service;
+import redis.clients.jedis.JedisCluster;
+
+import javax.annotation.Resource;
+
+import static com.fire.common.constants.RocketTopic.CHILD_ORDER_TOPIC;
+import static com.fire.common.constants.RocketTopic.ORDER_TOPIC;
+import static com.fire.dto.enums.RedisKey.CHILD_ORDER_INFO;
+import static com.fire.dto.enums.RedisKey.ORDER_INFO;
+
+/**
+ * 订单更新实现层
+ *
+ * @author ZJQ 2021年6月16日17:13:54
+ */
+@Service
+@Slf4j
+public class OrderUpdateServiceImpl implements OrderUpdateService {
+
+    @Resource
+    private JedisCluster jedisCluster;
+    @Resource
+    private RocketMQTemplate rocketMQTemplate;
+
+    /**
+     * 订单更新
+     *
+     * @param orderInfo 订单详情
+     * @param tag       mq的tag
+     */
+    @Override
+    public void updateOrder(FlowOrderInfo orderInfo, String tag) {
+        Long orderId = orderInfo.getOrderId();
+        String orderStr;
+        ObjectMapper om = new ObjectMapper();
+        try {
+            orderStr = om.writeValueAsString(orderInfo);
+            //订单转换为json更新入redis
+            long hKey = orderId / 10000000;
+            jedisCluster.hset(ORDER_INFO.key() + hKey, String.valueOf(orderId), orderStr);
+            //订单入队列
+            rocketMQTemplate.syncSendOrderly(ORDER_TOPIC + ":" + tag, MessageBuilder.withPayload(orderStr).build(), String.valueOf(orderId));
+
+        } catch (JsonProcessingException e) {
+            //订单转换的其他异常 如果执行到这里说明系统有bug
+            log.error("订单在分发的时候转换成json失败:", e);
+        }
+    }
+
+    /**
+     * 子订单更新
+     *
+     * @param childOrder 子订单详情
+     * @param tag        mq的tag
+     */
+    @Override
+    public void updateChildOrder(MobileFlowDispatchRec childOrder, String tag) {
+        Long recId = childOrder.getRecId();
+        String childOrderStr;
+        ObjectMapper om = new ObjectMapper();
+        try {
+            childOrderStr = om.writeValueAsString(childOrder);
+            //订单转换为json更新入redis
+            long hKey = recId / 10000000;
+            jedisCluster.hset(CHILD_ORDER_INFO.key() + hKey, String.valueOf(recId), childOrderStr);
+            //订单入队列
+            rocketMQTemplate.syncSendOrderly(CHILD_ORDER_TOPIC + ":" + tag, MessageBuilder.withPayload(childOrderStr).build(), String.valueOf(recId));
+        } catch (JsonProcessingException e) {
+            //订单转换的其他异常 如果执行到这里说明系统有bug
+            log.error("子订单在分发的时候转换成json失败:", e);
+        }
+    }
+}

+ 2 - 0
modules/distribution/src/main/java/com/fire/dist/task/CacheCustomerTask.java

@@ -2,6 +2,7 @@ package com.fire.dist.task;
 
 import com.fire.dist.service.CacheService;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
@@ -17,6 +18,7 @@ import javax.annotation.Resource;
 @Component
 @Slf4j
 public class CacheCustomerTask {
+
     @Resource
     private CacheService cacheService;
 

+ 44 - 0
modules/distribution/src/main/java/com/fire/dist/utils/WeightRand.java

@@ -0,0 +1,44 @@
+package com.fire.dist.utils;
+
+import com.fire.dto.ChannelProductInfo;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * 带权重的选择通道
+ */
+public class WeightRand {
+
+    /**
+     * nextItem()方法根据权重随机选择一个,具体就是,首先生成一个0~1的数,
+     * 然后使用二分查找,如果没找到,返回结果是-(插入点)-1,所以-index-1就是插入点,插入点的位置就对应选项的索引。
+     *
+     * @return ChannelInfo
+     */
+    public static ChannelProductInfo nextItem(List<ChannelProductInfo> productInfos) {
+        //随机取0-1之间的数
+        Random rnd = new Random();
+        double randomValue = rnd.nextDouble();
+        //weights为所有权重累计
+        int weights = 0;
+        for (ChannelProductInfo productInfo : productInfos) {
+            weights += productInfo.getChannelInfo().getWeight();
+        }
+        //新建一个数组 存当前权重累计/所有权重累计
+        double[] probabilities = new double[productInfos.size()];
+        int sum = 0;
+        for (int i = 0; i < productInfos.size(); i++) {
+            sum += productInfos.get(i).getChannelInfo().getWeight();
+            probabilities[i] = sum / (double) weights;
+        }
+        //通过二分查找到插入点 就可以取到数组的位置
+        int index = Arrays.binarySearch(probabilities, randomValue);
+        if (index < 0) {
+            index = -index - 1;
+        }
+        return productInfos.get(index);
+    }
+
+}

+ 1 - 0
modules/distribution/src/main/resources/bootstrap.yml

@@ -31,6 +31,7 @@ spring:
         server-addr: 192.168.2.114:8848
         file-extension: yaml
         namespace: fire
+        prefix: distribution
       discovery:
         server-addr: 192.168.2.114:8848
         namespace: fire

+ 3 - 1
modules/distribution/src/main/resources/mapper/ChannelInfoMapper.xml

@@ -18,6 +18,7 @@
         <result property="channelType" column="channel_type" jdbcType="INTEGER"/>
         <result property="isDelete" column="is_delete" jdbcType="INTEGER"/>
         <result property="distributeGroupId" column="distribute_group_id" jdbcType="INTEGER"/>
+        <result property="weight" column="weight" jdbcType="INTEGER"/>
     </resultMap>
 
     <!--查询全部-->
@@ -32,7 +33,8 @@
                a.discount,
                a.channel_type,
                a.is_delete,
-               b.distribute_group_id
+               b.distribute_group_id,
+               b.weight
         from access_channel_info a
                  left join distribute_group_channel b on a.channel_id = b.channel_id
     </select>

+ 1 - 1
modules/distribution/src/main/resources/mapper/ChannelProductInfoMapper.xml

@@ -30,7 +30,7 @@
                a.discount,
                b.area_num
         from channel_product_info a
-                 left join flow_product_info b on a.product_id = b.product_id
+                 left join flow_product_info b on a.package_id = b.package_id
     </select>
 
 

+ 1 - 0
modules/distribution/src/main/resources/mapper/FlowAppInfoMapper.xml

@@ -37,6 +37,7 @@
                ip_address,
                dispatch_channel,
                total_count,
+               time,
                channel_id
         from flow_app_info
     </sql>

+ 12 - 7
modules/make-order/src/main/java/com/fire/order/service/impl/MakeOrderServiceImpl.java

@@ -37,11 +37,10 @@ import static com.fire.common.constants.RocketTags.MAKE_TAG;
 import static com.fire.common.constants.RocketTopic.ORDER_TOPIC;
 import static com.fire.dto.enums.AmountOper.ADD;
 import static com.fire.dto.enums.AmountOper.SUB;
-import static com.fire.dto.enums.OrderStatus.ORDER_SENT;
+import static com.fire.dto.enums.OrderStatus.ORDER_IN_QUEUE;
 import static com.fire.dto.enums.PriceCheck.CHECK;
 import static com.fire.dto.enums.Province.QG_ALL;
 import static com.fire.dto.enums.RedisKey.*;
-import static com.fire.dto.enums.RocketQueue.ORDER_QUEUE;
 import static com.fire.dto.enums.Status.*;
 import static com.fire.dto.enums.ValidStatus.INVALID;
 
@@ -299,7 +298,8 @@ public class MakeOrderServiceImpl implements MakeOrderService {
 
 
         //自增id 取自redis,长度17位,后面加两位00 为主订单号,后面分发订单号后两位从 01开始
-        Long orderId = jedisCluster.incr(ORDER_ID_INCR.key()) * 100;
+        Long orderIncr = jedisCluster.incr(ORDER_ID_INCR.key());
+        Long orderId = orderIncr * 100;
 
         //客户订单id校验 这里订单id拼接上客户id,保证唯一
         String customerOrder = content.getExtOrder() + "." + flowAppInfo.getCustomerId();
@@ -323,7 +323,7 @@ public class MakeOrderServiceImpl implements MakeOrderService {
         order.setPhoneNo(phoneNo);
         order.setExtorderId(content.getExtOrder());
         order.setApplyDate(new Date());
-        order.setStatus(ORDER_SENT.status());
+        order.setStatus(ORDER_IN_QUEUE.status());
         order.setFlowAmount(product.getFacePrice());
         order.setAreaCode(product.getAreaNum());
         order.setPrice(product.getPrice());
@@ -356,11 +356,11 @@ public class MakeOrderServiceImpl implements MakeOrderService {
 
         try {
             //订单入redis
-
-            jedisCluster.hset(ORDER_INFO.key(), String.valueOf(orderId), orderStr);
+            long hKey = orderId / 10000000;
+            jedisCluster.hset(ORDER_INFO.key() + hKey, String.valueOf(orderId), orderStr);
 
             //订单入队列
-            rocketMQTemplate.syncSendOrderly(ORDER_TOPIC + ":" + MAKE_TAG, MessageBuilder.withPayload(orderStr).build(), ORDER_QUEUE.queue());
+            rocketMQTemplate.syncSendOrderly(ORDER_TOPIC + ":" + MAKE_TAG, MessageBuilder.withPayload(orderStr).build(), String.valueOf(orderId));
         } catch (Exception e) {
             //其他异常 扣减补偿
             log.error("订单存储或者发送到队列失败", e);
@@ -372,4 +372,9 @@ public class MakeOrderServiceImpl implements MakeOrderService {
         return responseDto;
     }
 
+    public static void main(String[] args) {
+
+        System.out.println("key:" + (1000000000019999900L / 10000000));
+    }
+
 }