浏览代码

任务提交 更新缓存消息消费,更新缓存

张均强 4 年之前
父节点
当前提交
acca1b20a2
共有 1 个文件被更改,包括 18 次插入9 次删除
  1. 18 9
      modules/make-order/src/main/java/com/fire/order/consumer/RocketUpdateConsumer.java

+ 18 - 9
modules/make-order/src/main/java/com/fire/order/consumer/RocketUpdateConsumer.java

@@ -1,5 +1,6 @@
 package com.fire.order.consumer;
 
+import com.alibaba.nacos.api.utils.StringUtils;
 import com.fire.order.service.CacheService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -21,23 +22,31 @@ import static com.fire.common.constants.RocketTopic.UPDATE_TOPIC;
  * @author ZJQ 2021年5月26日14:59:42
  */
 
-
 @Slf4j
 @Component
-@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = UPDATE_TOPIC)
+@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = UPDATE_TOPIC,consumeTimeout  = 3000)
 public class RocketUpdateConsumer implements RocketMQListener<MessageExt> {
 
     @Resource
     private CacheService cacheService;
 
     @Override
-    public void onMessage(MessageExt messageExt) {
-        switch (messageExt.getTags()) {
-            case CONSUMER_PRODUCT_TAG -> cacheService.cacheCustomer();
-            case BLACKLIST_TAG -> cacheService.cacheBlacklist();
-            case MAINTENANCE_TAG -> cacheService.cacheMaintenance();
-            case PHONE_ZONE_TAG -> cacheService.cachePhoneZone();
-            case VIRTUAL_TAG -> cacheService.cacheVirtualNum();
+    public void onMessage(MessageExt msg) {
+        //只消费最近十分钟的消息,因为再之前的消息为历史的更新没有意义
+        Long now = System.currentTimeMillis();
+        Long bron = msg.getBornTimestamp();
+        if(now - bron < 600000){
+            String tag = msg.getTags();
+            if (!StringUtils.isEmpty(tag)) {
+                log.info("消费更新缓存消息:" + tag);
+                switch (tag) {
+                    case CONSUMER_PRODUCT_TAG -> cacheService.cacheCustomer();
+                    case BLACKLIST_TAG -> cacheService.cacheBlacklist();
+                    case MAINTENANCE_TAG -> cacheService.cacheMaintenance();
+                    case PHONE_ZONE_TAG -> cacheService.cachePhoneZone();
+                    case VIRTUAL_TAG -> cacheService.cacheVirtualNum();
+                }
+            }
         }
     }
 }