Daniel Qian 10 tahun lalu
induk
melakukan
9fe73319d1

+ 18 - 0
weixin-java-common/src/main/java/me/chanjar/weixin/common/util/WxMsgIdDuplicateChecker.java

@@ -0,0 +1,18 @@
+package me.chanjar.weixin.common.util;
+
+/**
+ * <pre>
+ * 消息重复检查器
+ * 微信服务器在五秒内收不到响应会断掉连接,并且重新发起请求,总共重试三次
+ * </pre>
+ */
+public interface WxMsgIdDuplicateChecker {
+
+  /**
+   * 检查消息ID是否重复
+   * @param wxMsgId
+   * @return 如果是重复消息,返回true,否则返回false
+   */
+  public boolean isDuplicate(Long wxMsgId);
+
+}

+ 84 - 0
weixin-java-common/src/main/java/me/chanjar/weixin/common/util/WxMsgIdInMemoryDuplicateChecker.java

@@ -0,0 +1,84 @@
+package me.chanjar.weixin.common.util;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * <pre>
+ * 默认消息重复检查器
+ * 将每个消息id保存在内存里,每隔5秒清理已经过期的消息id,每个消息id的过期时间是15秒
+ * </pre>
+ */
+public class WxMsgIdInMemoryDuplicateChecker implements WxMsgIdDuplicateChecker {
+
+  /**
+   * 一个消息ID在内存的过期时间:15秒
+   */
+  private final Long TIME_TO_LIVE;
+
+  /**
+   * 每隔多少周期检查消息ID是否过期:5秒
+   */
+  private final Long CLEAR_PERIOD;
+
+  private final ConcurrentHashMap<Long, Long> msgId2Timestamp = new ConcurrentHashMap<Long, Long>();
+
+  /**
+   * WxMsgIdInMemoryDuplicateChecker构造函数
+   * <pre>
+   * 一个消息ID在内存的过期时间:15秒
+   * 每隔多少周期检查消息ID是否过期:5秒
+   * </pre>
+   */
+  public WxMsgIdInMemoryDuplicateChecker() {
+    this.TIME_TO_LIVE = 15 * 1000l;
+    this.CLEAR_PERIOD = 5 * 1000l;
+    this.start();
+  }
+
+  /**
+   * WxMsgIdInMemoryDuplicateChecker构造函数
+   * @param timeToLive 一个消息ID在内存的过期时间:毫秒
+   * @param clearPeriod 每隔多少周期检查消息ID是否过期:毫秒
+   */
+  public WxMsgIdInMemoryDuplicateChecker(Long timeToLive, Long clearPeriod) {
+    this.TIME_TO_LIVE = timeToLive;
+    this.CLEAR_PERIOD = clearPeriod;
+    this.start();
+  }
+
+  private void start() {
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          while (true) {
+            Thread.sleep(CLEAR_PERIOD);
+            Long now = System.currentTimeMillis();
+            for (Map.Entry<Long, Long> entry : msgId2Timestamp.entrySet()) {
+              if (now - entry.getValue() > TIME_TO_LIVE) {
+                msgId2Timestamp.entrySet().remove(entry);
+              }
+            }
+          }
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    t.setDaemon(true);
+    t.start();
+  }
+
+  @Override
+  public boolean isDuplicate(Long wxMsgId) {
+    Long timestamp = msgId2Timestamp.putIfAbsent(wxMsgId, System.currentTimeMillis());
+    if (timestamp == null) {
+      // 第一次接收到这个消息
+      return false;
+    }
+    return true;
+  }
+
+
+}

+ 38 - 0
weixin-java-common/src/test/java/me/chanjar/weixin/common/util/WxMsgIdInMemoryDuplicateCheckerTest.java

@@ -0,0 +1,38 @@
+package me.chanjar.weixin.common.util;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Created by qianjia on 15/1/20.
+ */
+@Test
+public class WxMsgIdInMemoryDuplicateCheckerTest {
+
+  public void test() throws InterruptedException {
+    Long[] msgIds = new Long[] { 1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l };
+    WxMsgIdInMemoryDuplicateChecker checker = new WxMsgIdInMemoryDuplicateChecker(2000l, 1000l);
+
+    // 第一次检查
+    for (Long msgId : msgIds) {
+      boolean result = checker.isDuplicate(msgId);
+      Assert.assertFalse(result);
+    }
+
+    // 过1秒再检查
+    Thread.sleep(1000l);
+    for (Long msgId : msgIds) {
+      boolean result = checker.isDuplicate(msgId);
+      Assert.assertTrue(result);
+    }
+
+    // 过1.5秒再检查
+    Thread.sleep(1500l);
+    for (Long msgId : msgIds) {
+      boolean result = checker.isDuplicate(msgId);
+      Assert.assertFalse(result);
+    }
+
+  }
+
+}

+ 1 - 0
weixin-java-common/src/test/resources/testng.xml

@@ -7,6 +7,7 @@
 			<class name="me.chanjar.weixin.common.bean.WxErrorTest" />
             <class name="me.chanjar.weixin.common.bean.WxMenuTest" />
             <class name="me.chanjar.weixin.common.util.crypto.WxCryptUtilTest" />
+            <class name="me.chanjar.weixin.common.util.WxMsgIdInMemoryDuplicateCheckerTest" />
         </classes>
 	</test>
 </suite>

+ 29 - 2
weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpMessageRouter.java

@@ -1,5 +1,7 @@
 package me.chanjar.weixin.cp.api;
 
+import me.chanjar.weixin.common.util.WxMsgIdDuplicateChecker;
+import me.chanjar.weixin.common.util.WxMsgIdInMemoryDuplicateChecker;
 import me.chanjar.weixin.cp.bean.WxCpXmlMessage;
 import me.chanjar.weixin.cp.bean.WxCpXmlOutMessage;
 
@@ -45,18 +47,38 @@ public class WxCpMessageRouter {
 
   private final List<Rule> rules = new ArrayList<Rule>();
 
-  private final ExecutorService executorService;
-
   private final WxCpService wxCpService;
 
+  private ExecutorService executorService;
+
+  private WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker;
+
   public WxCpMessageRouter(WxCpService wxCpService) {
     this.wxCpService = wxCpService;
     this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
+    this.wxMsgIdDuplicateChecker = new WxMsgIdInMemoryDuplicateChecker();
   }
 
   public WxCpMessageRouter(WxCpService wxMpService, int threadPoolSize) {
     this.wxCpService = wxMpService;
     this.executorService = Executors.newFixedThreadPool(threadPoolSize);
+    this.wxMsgIdDuplicateChecker = new WxMsgIdInMemoryDuplicateChecker();
+  }
+
+  /**
+   * 设置自定义的ExecutorService
+   * @param executorService
+   */
+  public void setExecutorService(ExecutorService executorService) {
+    this.executorService = executorService;
+  }
+
+  /**
+   * 设置自定义的WxMsgIdDuplicateChecker
+   * @param wxMsgIdDuplicateChecker
+   */
+  public void setWxMsgIdDuplicateChecker(WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker) {
+    this.wxMsgIdDuplicateChecker = wxMsgIdDuplicateChecker;
   }
 
   /**
@@ -72,6 +94,11 @@ public class WxCpMessageRouter {
    * @param wxMessage
    */
   public WxCpXmlOutMessage route(final WxCpXmlMessage wxMessage) {
+    if (wxMsgIdDuplicateChecker.isDuplicate(wxMessage.getMsgId())) {
+      // 如果是重复消息,那么就不做处理
+      return null;
+    }
+    
     final List<Rule> matchRules = new ArrayList<Rule>();
     // 收集匹配的规则
     for (final Rule rule : rules) {

+ 0 - 88
weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpDuplicateMessageInterceptor.java

@@ -1,88 +0,0 @@
-package me.chanjar.weixin.mp.api;
-
-import me.chanjar.weixin.mp.bean.WxMpXmlMessage;
-import sun.applet.Main;
-
-import java.net.SocketTimeoutException;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * <pre>
- * 消息去重拦截器
- * 微信服务器在五秒内收不到响应会断掉连接,并且重新发起请求,总共重试三次
- * 使用方法:
- * WxMpMessageRouter router = new WxMpMessageRouter();
- * router
- *   .rule()
- *       .interceptor(new WxMpDuplicateMessageInterceptor())
- *   .next()
- *   .rule()
- *       .msgType("MSG_TYPE").event("EVENT").eventKey("EVENT_KEY").content("CONTENT")
- *       .interceptor(interceptor, ...).handler(handler, ...)
- *   .end()
- *   .rule()
- *       // 另外一个匹配规则
- *   .end()
- * ;
- * </pre>
- */
-public class WxMpDuplicateMessageInterceptor implements WxMpMessageInterceptor {
-
-  private static final Long PERIOD = 15 * 1000l;
-
-  private final ConcurrentHashMap<Long, Long> msgId2timestamp;
-
-  public WxMpDuplicateMessageInterceptor() {
-    this.msgId2timestamp = new ConcurrentHashMap<Long, Long>();
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          while (true) {
-            Thread.sleep(PERIOD);
-            Long now = System.currentTimeMillis();
-            for (Map.Entry<Long, Long> entry : msgId2timestamp.entrySet()) {
-              if (now - entry.getValue() > PERIOD) {
-                msgId2timestamp.entrySet().remove(entry);
-              }
-            }
-            msgId2timestamp.clear();
-          }
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        }
-      }
-    });
-    t.setDaemon(true);
-    t.start();
-  }
-
-  @Override
-  public boolean intercept(WxMpXmlMessage wxMessage, Map<String, Object> context, WxMpService wxMpService) {
-    Long now = System.currentTimeMillis();
-    Long timestamp = msgId2timestamp.putIfAbsent(wxMessage.getMsgId(), now);
-    if (timestamp == null) {
-      return true;
-    }
-    if (timestamp.equals(now)) {
-      // 第一次接收到这个消息
-      return true;
-    }
-    return false;
-  }
-
-  public static void main(String[] args) {
-    WxMpDuplicateMessageInterceptor d = new WxMpDuplicateMessageInterceptor();
-    Long endTime = System.currentTimeMillis() + 30 * 1000;
-    Random r = new Random();
-
-    while(System.currentTimeMillis() < endTime) {
-      WxMpXmlMessage m = new WxMpXmlMessage();
-      m.setMsgId(r.nextLong() % 100);
-      d.intercept(m, null, null);
-    }
-
-  }
-}

+ 29 - 2
weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java

@@ -1,5 +1,7 @@
 package me.chanjar.weixin.mp.api;
 
+import me.chanjar.weixin.common.util.WxMsgIdDuplicateChecker;
+import me.chanjar.weixin.common.util.WxMsgIdInMemoryDuplicateChecker;
 import me.chanjar.weixin.mp.bean.WxMpXmlMessage;
 import me.chanjar.weixin.mp.bean.WxMpXmlOutMessage;
 
@@ -45,18 +47,38 @@ public class WxMpMessageRouter {
 
   private final List<Rule> rules = new ArrayList<Rule>();
 
-  private final ExecutorService executorService;
-
   private final WxMpService wxMpService;
 
+  private ExecutorService executorService;
+
+  private WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker;
+
   public WxMpMessageRouter(WxMpService wxMpService) {
     this.wxMpService = wxMpService;
     this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
+    this.wxMsgIdDuplicateChecker = new WxMsgIdInMemoryDuplicateChecker();
   }
 
   public WxMpMessageRouter(WxMpService wxMpService, int threadPoolSize) {
     this.wxMpService = wxMpService;
     this.executorService = Executors.newFixedThreadPool(threadPoolSize);
+    this.wxMsgIdDuplicateChecker = new WxMsgIdInMemoryDuplicateChecker();
+  }
+
+  /**
+   * 设置自定义的ExecutorService
+   * @param executorService
+   */
+  public void setExecutorService(ExecutorService executorService) {
+    this.executorService = executorService;
+  }
+
+  /**
+   * 设置自定义的WxMsgIdDuplicateChecker
+   * @param wxMsgIdDuplicateChecker
+   */
+  public void setWxMsgIdDuplicateChecker(WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker) {
+    this.wxMsgIdDuplicateChecker = wxMsgIdDuplicateChecker;
   }
 
   /**
@@ -72,6 +94,11 @@ public class WxMpMessageRouter {
    * @param wxMessage
    */
   public WxMpXmlOutMessage route(final WxMpXmlMessage wxMessage) {
+    if (wxMsgIdDuplicateChecker.isDuplicate(wxMessage.getMsgId())) {
+      // 如果是重复消息,那么就不做处理
+      return null;
+    }
+
     final List<Rule> matchRules = new ArrayList<Rule>();
     // 收集匹配的规则
     for (final Rule rule : rules) {