瀏覽代碼

Merge branch 'feature/dup-message' into develop

Daniel Qian 10 年之前
父節點
當前提交
759795c363

+ 18 - 0
weixin-java-common/src/main/java/me/chanjar/weixin/common/util/WxMsgIdDuplicateChecker.java

@@ -0,0 +1,18 @@
+package me.chanjar.weixin.common.util;
+
+/**
+ * <pre>
+ * 消息重复检查器
+ * 微信服务器在五秒内收不到响应会断掉连接,并且重新发起请求,总共重试三次
+ * </pre>
+ */
+public interface WxMsgIdDuplicateChecker {
+
+  /**
+   * 检查消息ID是否重复
+   * @param wxMsgId
+   * @return 如果是重复消息,返回true,否则返回false
+   */
+  public boolean isDuplicate(Long wxMsgId);
+
+}

+ 84 - 0
weixin-java-common/src/main/java/me/chanjar/weixin/common/util/WxMsgIdInMemoryDuplicateChecker.java

@@ -0,0 +1,84 @@
+package me.chanjar.weixin.common.util;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * <pre>
+ * 默认消息重复检查器
+ * 将每个消息id保存在内存里,每隔5秒清理已经过期的消息id,每个消息id的过期时间是15秒
+ * </pre>
+ */
+public class WxMsgIdInMemoryDuplicateChecker implements WxMsgIdDuplicateChecker {
+
+  /**
+   * 一个消息ID在内存的过期时间:15秒
+   */
+  private final Long TIME_TO_LIVE;
+
+  /**
+   * 每隔多少周期检查消息ID是否过期:5秒
+   */
+  private final Long CLEAR_PERIOD;
+
+  private final ConcurrentHashMap<Long, Long> msgId2Timestamp = new ConcurrentHashMap<Long, Long>();
+
+  /**
+   * WxMsgIdInMemoryDuplicateChecker构造函数
+   * <pre>
+   * 一个消息ID在内存的过期时间:15秒
+   * 每隔多少周期检查消息ID是否过期:5秒
+   * </pre>
+   */
+  public WxMsgIdInMemoryDuplicateChecker() {
+    this.TIME_TO_LIVE = 15 * 1000l;
+    this.CLEAR_PERIOD = 5 * 1000l;
+    this.start();
+  }
+
+  /**
+   * WxMsgIdInMemoryDuplicateChecker构造函数
+   * @param timeToLive 一个消息ID在内存的过期时间:毫秒
+   * @param clearPeriod 每隔多少周期检查消息ID是否过期:毫秒
+   */
+  public WxMsgIdInMemoryDuplicateChecker(Long timeToLive, Long clearPeriod) {
+    this.TIME_TO_LIVE = timeToLive;
+    this.CLEAR_PERIOD = clearPeriod;
+    this.start();
+  }
+
+  private void start() {
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          while (true) {
+            Thread.sleep(CLEAR_PERIOD);
+            Long now = System.currentTimeMillis();
+            for (Map.Entry<Long, Long> entry : msgId2Timestamp.entrySet()) {
+              if (now - entry.getValue() > TIME_TO_LIVE) {
+                msgId2Timestamp.entrySet().remove(entry);
+              }
+            }
+          }
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    t.setDaemon(true);
+    t.start();
+  }
+
+  @Override
+  public boolean isDuplicate(Long wxMsgId) {
+    Long timestamp = msgId2Timestamp.putIfAbsent(wxMsgId, System.currentTimeMillis());
+    if (timestamp == null) {
+      // 第一次接收到这个消息
+      return false;
+    }
+    return true;
+  }
+
+
+}

+ 38 - 0
weixin-java-common/src/test/java/me/chanjar/weixin/common/util/WxMsgIdInMemoryDuplicateCheckerTest.java

@@ -0,0 +1,38 @@
+package me.chanjar.weixin.common.util;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Created by qianjia on 15/1/20.
+ */
+@Test
+public class WxMsgIdInMemoryDuplicateCheckerTest {
+
+  public void test() throws InterruptedException {
+    Long[] msgIds = new Long[] { 1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l };
+    WxMsgIdInMemoryDuplicateChecker checker = new WxMsgIdInMemoryDuplicateChecker(2000l, 1000l);
+
+    // 第一次检查
+    for (Long msgId : msgIds) {
+      boolean result = checker.isDuplicate(msgId);
+      Assert.assertFalse(result);
+    }
+
+    // 过1秒再检查
+    Thread.sleep(1000l);
+    for (Long msgId : msgIds) {
+      boolean result = checker.isDuplicate(msgId);
+      Assert.assertTrue(result);
+    }
+
+    // 过1.5秒再检查
+    Thread.sleep(1500l);
+    for (Long msgId : msgIds) {
+      boolean result = checker.isDuplicate(msgId);
+      Assert.assertFalse(result);
+    }
+
+  }
+
+}

+ 1 - 0
weixin-java-common/src/test/resources/testng.xml

@@ -7,6 +7,7 @@
 			<class name="me.chanjar.weixin.common.bean.WxErrorTest" />
             <class name="me.chanjar.weixin.common.bean.WxMenuTest" />
             <class name="me.chanjar.weixin.common.util.crypto.WxCryptUtilTest" />
+            <class name="me.chanjar.weixin.common.util.WxMsgIdInMemoryDuplicateCheckerTest" />
         </classes>
 	</test>
 </suite>

+ 29 - 2
weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpMessageRouter.java

@@ -1,5 +1,7 @@
 package me.chanjar.weixin.cp.api;
 
+import me.chanjar.weixin.common.util.WxMsgIdDuplicateChecker;
+import me.chanjar.weixin.common.util.WxMsgIdInMemoryDuplicateChecker;
 import me.chanjar.weixin.cp.bean.WxCpXmlMessage;
 import me.chanjar.weixin.cp.bean.WxCpXmlOutMessage;
 
@@ -45,18 +47,38 @@ public class WxCpMessageRouter {
 
   private final List<Rule> rules = new ArrayList<Rule>();
 
-  private final ExecutorService executorService;
-
   private final WxCpService wxCpService;
 
+  private ExecutorService executorService;
+
+  private WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker;
+
   public WxCpMessageRouter(WxCpService wxCpService) {
     this.wxCpService = wxCpService;
     this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
+    this.wxMsgIdDuplicateChecker = new WxMsgIdInMemoryDuplicateChecker();
   }
 
   public WxCpMessageRouter(WxCpService wxMpService, int threadPoolSize) {
     this.wxCpService = wxMpService;
     this.executorService = Executors.newFixedThreadPool(threadPoolSize);
+    this.wxMsgIdDuplicateChecker = new WxMsgIdInMemoryDuplicateChecker();
+  }
+
+  /**
+   * 设置自定义的ExecutorService
+   * @param executorService
+   */
+  public void setExecutorService(ExecutorService executorService) {
+    this.executorService = executorService;
+  }
+
+  /**
+   * 设置自定义的WxMsgIdDuplicateChecker
+   * @param wxMsgIdDuplicateChecker
+   */
+  public void setWxMsgIdDuplicateChecker(WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker) {
+    this.wxMsgIdDuplicateChecker = wxMsgIdDuplicateChecker;
   }
 
   /**
@@ -72,6 +94,11 @@ public class WxCpMessageRouter {
    * @param wxMessage
    */
   public WxCpXmlOutMessage route(final WxCpXmlMessage wxMessage) {
+    if (wxMsgIdDuplicateChecker.isDuplicate(wxMessage.getMsgId())) {
+      // 如果是重复消息,那么就不做处理
+      return null;
+    }
+    
     final List<Rule> matchRules = new ArrayList<Rule>();
     // 收集匹配的规则
     for (final Rule rule : rules) {

+ 29 - 2
weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java

@@ -1,5 +1,7 @@
 package me.chanjar.weixin.mp.api;
 
+import me.chanjar.weixin.common.util.WxMsgIdDuplicateChecker;
+import me.chanjar.weixin.common.util.WxMsgIdInMemoryDuplicateChecker;
 import me.chanjar.weixin.mp.bean.WxMpXmlMessage;
 import me.chanjar.weixin.mp.bean.WxMpXmlOutMessage;
 
@@ -45,18 +47,38 @@ public class WxMpMessageRouter {
 
   private final List<Rule> rules = new ArrayList<Rule>();
 
-  private final ExecutorService executorService;
-
   private final WxMpService wxMpService;
 
+  private ExecutorService executorService;
+
+  private WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker;
+
   public WxMpMessageRouter(WxMpService wxMpService) {
     this.wxMpService = wxMpService;
     this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
+    this.wxMsgIdDuplicateChecker = new WxMsgIdInMemoryDuplicateChecker();
   }
 
   public WxMpMessageRouter(WxMpService wxMpService, int threadPoolSize) {
     this.wxMpService = wxMpService;
     this.executorService = Executors.newFixedThreadPool(threadPoolSize);
+    this.wxMsgIdDuplicateChecker = new WxMsgIdInMemoryDuplicateChecker();
+  }
+
+  /**
+   * 设置自定义的ExecutorService
+   * @param executorService
+   */
+  public void setExecutorService(ExecutorService executorService) {
+    this.executorService = executorService;
+  }
+
+  /**
+   * 设置自定义的WxMsgIdDuplicateChecker
+   * @param wxMsgIdDuplicateChecker
+   */
+  public void setWxMsgIdDuplicateChecker(WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker) {
+    this.wxMsgIdDuplicateChecker = wxMsgIdDuplicateChecker;
   }
 
   /**
@@ -72,6 +94,11 @@ public class WxMpMessageRouter {
    * @param wxMessage
    */
   public WxMpXmlOutMessage route(final WxMpXmlMessage wxMessage) {
+    if (wxMsgIdDuplicateChecker.isDuplicate(wxMessage.getMsgId())) {
+      // 如果是重复消息,那么就不做处理
+      return null;
+    }
+
     final List<Rule> matchRules = new ArrayList<Rule>();
     // 收集匹配的规则
     for (final Rule rule : rules) {