Przeglądaj źródła

issue #69 添加Session的支持,添加测试用例

Daniel Qian 10 lat temu
rodzic
commit
6368ca6816
16 zmienionych plików z 514 dodań i 132 usunięć
  1. 39 0
      weixin-java-common/src/main/java/me/chanjar/weixin/common/session/InternalSessionManager.java
  2. 6 7
      weixin-java-common/src/main/java/me/chanjar/weixin/common/session/SessionImpl.java
  3. 7 2
      weixin-java-common/src/main/java/me/chanjar/weixin/common/session/InternalSessionFacade.java
  4. 22 16
      weixin-java-common/src/main/java/me/chanjar/weixin/common/session/InMemorySessionManager.java
  5. 3 0
      weixin-java-common/src/main/java/me/chanjar/weixin/common/util/WxMessageInMemoryDuplicateChecker.java
  6. 132 0
      weixin-java-common/src/test/java/me/chanjar/weixin/common/session/SessionTest.java
  7. 0 68
      weixin-java-common/src/test/java/me/chanjar/weixin/common/session/TestSession.java
  8. 1 1
      weixin-java-common/src/test/resources/testng.xml
  9. 1 0
      weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpMessageHandler.java
  10. 12 13
      weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpMessageRouter.java
  11. 1 1
      weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpService.java
  12. 2 2
      weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpServiceImpl.java
  13. 134 0
      weixin-java-cp/src/test/java/me/chanjar/weixin/cp/api/WxCpMessageRouterTest.java
  14. 19 20
      weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java
  15. 2 2
      weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpServiceImpl.java
  16. 133 0
      weixin-java-mp/src/test/java/me/chanjar/weixin/mp/api/WxMpMessageRouterTest.java

+ 39 - 0
weixin-java-common/src/main/java/me/chanjar/weixin/common/session/InternalSessionManager.java

@@ -3,6 +3,19 @@ package me.chanjar.weixin.common.session;
 public interface InternalSessionManager {
 
   /**
+   * Return the active Session, associated with this Manager, with the
+   * specified session id (if any); otherwise return <code>null</code>.
+   *
+   * @param id The session id for the session to be returned
+   *
+   * @exception IllegalStateException if a new session cannot be
+   *  instantiated for any reason
+   * @exception java.io.IOException if an input/output error occurs while
+   *  processing this request
+   */
+  InternalSession findSession(String id);
+
+  /**
    * Construct and return a new session object, based on the default
    * settings specified by this Manager's properties.  The session
    * id specified will be used as the session id.
@@ -68,7 +81,33 @@ public interface InternalSessionManager {
    */
   void setMaxInactiveInterval(int interval);
 
+  /**
+   * <pre>
+   * Set the manager checks frequency.
+   * 设置每尝试多少次清理过期session,才真的会执行一次清理动作
+   * 要和{@link #setBackgroundProcessorDelay(int)}联合起来看
+   * 如果把这个数字设置为6(默认),那么就是说manager要等待 6 * backgroundProcessorDay的时间才会清理过期session
+   * </pre>
+   * @param processExpiresFrequency the new manager checks frequency
+   */
   void setProcessExpiresFrequency(int processExpiresFrequency);
 
+  /**
+   * <pre>
+   * Set the manager background processor delay
+   * 设置manager sleep几秒,尝试执行一次background操作(清理过期session)
+   * </pre>
+   * @param backgroundProcessorDelay
+   */
   void setBackgroundProcessorDelay(int backgroundProcessorDelay);
+
+
+  /**
+   * Set the maximum number of active Sessions allowed, or -1 for
+   * no limit.
+   * 设置最大活跃session数,默认无限
+   * @param max The new maximum number of sessions
+   */
+  void setMaxActiveSessions(int max);
+
 }

+ 6 - 7
weixin-java-common/src/main/java/me/chanjar/weixin/common/session/SessionImpl.java

@@ -6,7 +6,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class SessionImpl implements WxSession, InternalSession {
+public class StandardSession implements WxSession, InternalSession {
 
   /**
    * The string manager for this package.
@@ -129,7 +129,7 @@ public class SessionImpl implements WxSession, InternalSession {
    * The facade associated with this session.  NOTE:  This value is not
    * included in the serialized version of this object.
    */
-  protected transient InternalSessionFacade facade = null;
+  protected transient StandardSessionFacade facade = null;
 
   /**
    * The access count for this session.
@@ -137,7 +137,7 @@ public class SessionImpl implements WxSession, InternalSession {
   protected transient AtomicInteger accessCount = null;
 
 
-  public SessionImpl(InternalSessionManager manager) {
+  public StandardSession(InternalSessionManager manager) {
     this.manager = manager;
     this.accessCount = new AtomicInteger();
   }
@@ -147,7 +147,7 @@ public class SessionImpl implements WxSession, InternalSession {
   public WxSession getSession() {
 
     if (facade == null){
-      facade = new InternalSessionFacade(this);
+      facade = new StandardSessionFacade(this);
     }
     return (facade);
 
@@ -281,7 +281,6 @@ public class SessionImpl implements WxSession, InternalSession {
 
   @Override
   public void setMaxInactiveInterval(int interval) {
-    int oldMaxInactiveInterval = this.maxInactiveInterval;
     this.maxInactiveInterval = interval;
   }
 
@@ -311,9 +310,9 @@ public class SessionImpl implements WxSession, InternalSession {
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
-    if (!(o instanceof SessionImpl)) return false;
+    if (!(o instanceof StandardSession)) return false;
 
-    SessionImpl session = (SessionImpl) o;
+    StandardSession session = (StandardSession) o;
 
     if (creationTime != session.creationTime) return false;
     if (expiring != session.expiring) return false;

+ 7 - 2
weixin-java-common/src/main/java/me/chanjar/weixin/common/session/InternalSessionFacade.java

@@ -2,17 +2,21 @@ package me.chanjar.weixin.common.session;
 
 import java.util.Enumeration;
 
-public class InternalSessionFacade implements WxSession {
+public class StandardSessionFacade implements WxSession {
 
   /**
    * Wrapped session object.
    */
   private WxSession session = null;
 
-  public InternalSessionFacade(WxSession session) {
+  public StandardSessionFacade(StandardSession session) {
     this.session = session;
   }
 
+  public InternalSession getInternalSession() {
+    return (InternalSession) session;
+  }
+
   @Override
   public Object getAttribute(String name) {
     return session.getAttribute(name);
@@ -37,4 +41,5 @@ public class InternalSessionFacade implements WxSession {
   public void invalidate() {
     session.invalidate();
   }
+
 }

+ 22 - 16
weixin-java-common/src/main/java/me/chanjar/weixin/common/session/InMemorySessionManager.java

@@ -8,9 +8,12 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class InMemorySessionManager implements WxSessionManager, InternalSessionManager {
+/**
+ * 基于内存的session manager
+ */
+public class StandardSessionManager implements WxSessionManager, InternalSessionManager {
 
-  protected final Logger log = LoggerFactory.getLogger(InMemorySessionManager.class);
+  protected final Logger log = LoggerFactory.getLogger(StandardSessionManager.class);
 
   protected static final StringManager sm =
       StringManager.getManager(Constants.Package);
@@ -128,18 +131,9 @@ public class InMemorySessionManager implements WxSessionManager, InternalSession
   }
 
 
-  /**
-   * Return the active Session, associated with this Manager, with the
-   * specified session id (if any); otherwise return <code>null</code>.
-   *
-   * @param id The session id for the session to be returned
-   *
-   * @exception IllegalStateException if a new session cannot be
-   *  instantiated for any reason
-   * @exception java.io.IOException if an input/output error occurs while
-   *  processing this request
-   */
-  protected InternalSession findSession(String id) {
+
+  @Override
+  public InternalSession findSession(String id) {
 
     if (id == null)
       return (null);
@@ -189,12 +183,11 @@ public class InMemorySessionManager implements WxSessionManager, InternalSession
     return (getNewSession());
   }
 
-
   /**
    * Get new session class to be used in the doLoad() method.
    */
   protected InternalSession getNewSession() {
-    return new SessionImpl(this);
+    return new StandardSession(this);
   }
 
 
@@ -312,4 +305,17 @@ public class InMemorySessionManager implements WxSessionManager, InternalSession
 
   }
 
+  /**
+   * Set the maximum number of active Sessions allowed, or -1 for
+   * no limit.
+   *
+   * @param max The new maximum number of sessions
+   */
+  @Override
+  public void setMaxActiveSessions(int max) {
+
+    this.maxActiveSessions = max;
+
+  }
+
 }

+ 3 - 0
weixin-java-common/src/main/java/me/chanjar/weixin/common/util/WxMessageInMemoryDuplicateChecker.java

@@ -82,6 +82,9 @@ public class WxMessageInMemoryDuplicateChecker implements WxMessageDuplicateChec
 
   @Override
   public boolean isDuplicate(Long wxMsgId) {
+    if (wxMsgId == null) {
+      return false;
+    }
     checkBackgroundProcessStarted();
     Long timestamp = msgId2Timestamp.putIfAbsent(wxMsgId, System.currentTimeMillis());
     if (timestamp == null) {

+ 132 - 0
weixin-java-common/src/test/java/me/chanjar/weixin/common/session/SessionTest.java

@@ -0,0 +1,132 @@
+package me.chanjar.weixin.common.session;
+
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test
+public class SessionTest {
+
+  @DataProvider
+  public Object[][] getSessionManager() {
+
+    return new Object[][] {
+        new Object[] { new StandardSessionManager() }
+    };
+
+  }
+
+
+  @Test(dataProvider = "getSessionManager", expectedExceptions = IllegalStateException.class)
+  public void testInvalidate(WxSessionManager sessionManager) {
+
+    WxSession session = sessionManager.getSession("abc");
+    session.invalidate();
+    session.getAttributeNames();
+
+  }
+
+  @Test(dataProvider = "getSessionManager")
+  public void testInvalidate2(InternalSessionManager sessionManager) {
+
+    Assert.assertEquals(sessionManager.getActiveSessions(), 0);
+    WxSession session = ((WxSessionManager) sessionManager).getSession("abc");
+    Assert.assertEquals(sessionManager.getActiveSessions(), 1);
+    session.invalidate();
+    Assert.assertEquals(sessionManager.getActiveSessions(), 0);
+
+  }
+
+  @Test(dataProvider = "getSessionManager")
+  public void testGetSession(WxSessionManager sessionManager) {
+
+    WxSession session1 = sessionManager.getSession("abc");
+    WxSession session2 = sessionManager.getSession("abc");
+    Assert.assertEquals(session1, session2);
+    Assert.assertTrue(session1 == session2);
+
+    WxSession abc1 = sessionManager.getSession("abc1");
+    Assert.assertNotEquals(session1, abc1);
+
+    WxSession abc1b = sessionManager.getSession("abc1", false);
+    Assert.assertEquals(abc1, abc1b);
+
+    WxSession def = sessionManager.getSession("def", false);
+    Assert.assertNull(def);
+
+  }
+
+  @Test(dataProvider = "getSessionManager")
+  public void testInvalidateAngGet(WxSessionManager sessionManager) {
+
+    WxSession session1 = sessionManager.getSession("abc");
+    session1.invalidate();
+    WxSession session2 = sessionManager.getSession("abc");
+    Assert.assertNotEquals(session1, session2);
+    InternalSessionManager ism = (InternalSessionManager) sessionManager;
+    Assert.assertEquals(ism.getActiveSessions(), 1);
+
+  }
+
+  @Test(dataProvider = "getSessionManager")
+  public void testBackgroundProcess(WxSessionManager sessionManager) throws InterruptedException {
+
+    InternalSessionManager ism = (InternalSessionManager) sessionManager;
+    ism.setMaxInactiveInterval(1);
+    ism.setProcessExpiresFrequency(1);
+    ism.setBackgroundProcessorDelay(1);
+
+    Assert.assertEquals(ism.getActiveSessions(), 0);
+
+    InternalSession abc = ism.createSession("abc");
+    abc.endAccess();
+
+    Thread.sleep(2000l);
+    Assert.assertEquals(ism.getActiveSessions(), 0);
+
+  }
+
+  @Test(dataProvider = "getSessionManager")
+  public void testBackgroundProcess2(WxSessionManager sessionManager) throws InterruptedException {
+
+    InternalSessionManager ism = (InternalSessionManager) sessionManager;
+    ism.setMaxInactiveInterval(100);
+    ism.setProcessExpiresFrequency(1);
+    ism.setBackgroundProcessorDelay(1);
+
+    Assert.assertEquals(ism.getActiveSessions(), 0);
+
+    InternalSession abc = ism.createSession("abc");
+    abc.setMaxInactiveInterval(1);
+    abc.endAccess();
+
+    Thread.sleep(2000l);
+    Assert.assertEquals(ism.getActiveSessions(), 0);
+
+  }
+
+  @Test(dataProvider = "getSessionManager")
+  public void testMaxActive(WxSessionManager sessionManager) throws InterruptedException {
+
+    InternalSessionManager ism = (InternalSessionManager) sessionManager;
+    ism.setMaxActiveSessions(2);
+
+    ism.createSession("abc");
+    ism.createSession("abc");
+    ism.createSession("def");
+
+  }
+
+  @Test(dataProvider = "getSessionManager", expectedExceptions = TooManyActiveSessionsException.class)
+  public void testMaxActive2(WxSessionManager sessionManager) throws InterruptedException {
+
+    InternalSessionManager ism = (InternalSessionManager) sessionManager;
+    ism.setMaxActiveSessions(2);
+
+    ism.createSession("abc");
+    ism.createSession("abc");
+    ism.createSession("def");
+    ism.createSession("xyz");
+
+  }
+}

+ 0 - 68
weixin-java-common/src/test/java/me/chanjar/weixin/common/session/TestSession.java

@@ -1,68 +0,0 @@
-package me.chanjar.weixin.common.session;
-
-import org.testng.Assert;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-@Test
-public class TestSession {
-
-  @DataProvider
-  public Object[][] getSessionManager() {
-    return new Object[][] {
-        new Object[] { new InMemorySessionManager() }
-    };
-  }
-
-
-  @Test(dataProvider = "getSessionManager", expectedExceptions = IllegalStateException.class)
-  public void testInvalidate(WxSessionManager sessionManager) {
-    WxSession session = sessionManager.getSession("abc");
-    session.invalidate();
-    session.getAttributeNames();
-  }
-
-  @Test(dataProvider = "getSessionManager")
-  public void testInvalidate2(InternalSessionManager sessionManager) {
-    Assert.assertEquals(sessionManager.getActiveSessions(), 0);
-    WxSession session = ((WxSessionManager) sessionManager).getSession("abc");
-    Assert.assertEquals(sessionManager.getActiveSessions(), 1);
-    session.invalidate();
-    Assert.assertEquals(sessionManager.getActiveSessions(), 0);
-  }
-
-  @Test(dataProvider = "getSessionManager")
-  public void testGetSession(WxSessionManager sessionManager) {
-    WxSession session1 = sessionManager.getSession("abc");
-    WxSession session2 = sessionManager.getSession("abc");
-    Assert.assertTrue(session1.equals(session2));
-
-    WxSession abc1 = sessionManager.getSession("abc1");
-    Assert.assertFalse(session1.equals(abc1));
-
-    WxSession abc1b = sessionManager.getSession("abc1", false);
-    Assert.assertTrue(abc1.equals(abc1b));
-
-    WxSession def = sessionManager.getSession("def", false);
-    Assert.assertNull(def);
-  }
-
-  @Test(dataProvider = "getSessionManager")
-  public void testBackgroundProcess(WxSessionManager sessionManager) throws InterruptedException {
-
-    InternalSessionManager ism = (InternalSessionManager) sessionManager;
-    ism.setMaxInactiveInterval(1);
-    ism.setProcessExpiresFrequency(1);
-    ism.setBackgroundProcessorDelay(1);
-
-    Assert.assertEquals(ism.getActiveSessions(), 0);
-
-    InternalSession abc = ism.createSession("abc");
-    abc.endAccess();
-
-    Thread.sleep(2000l);
-    Assert.assertEquals(ism.getActiveSessions(), 0);
-
-  }
-
-}

+ 1 - 1
weixin-java-common/src/test/resources/testng.xml

@@ -8,7 +8,7 @@
       <class name="me.chanjar.weixin.common.bean.WxMenuTest"/>
       <class name="me.chanjar.weixin.common.util.crypto.WxCryptUtilTest"/>
       <class name="me.chanjar.weixin.common.util.WxMessageInMemoryDuplicateCheckerTest"/>
-      <class name="me.chanjar.weixin.common.session.TestSession" />
+      <class name="me.chanjar.weixin.common.session.SessionTest" />
     </classes>
   </test>
 </suite>

+ 1 - 0
weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpMessageHandler.java

@@ -1,5 +1,6 @@
 package me.chanjar.weixin.cp.api;
 
+import me.chanjar.weixin.common.session.WxSession;
 import me.chanjar.weixin.common.session.WxSessionManager;
 import me.chanjar.weixin.cp.bean.WxCpXmlMessage;
 import me.chanjar.weixin.cp.bean.WxCpXmlOutMessage;

+ 12 - 13
weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpMessageRouter.java

@@ -1,9 +1,6 @@
 package me.chanjar.weixin.cp.api;
 
-import me.chanjar.weixin.common.session.InternalSession;
-import me.chanjar.weixin.common.session.WxSession;
-import me.chanjar.weixin.common.session.InMemorySessionManager;
-import me.chanjar.weixin.common.session.WxSessionManager;
+import me.chanjar.weixin.common.session.*;
 import me.chanjar.weixin.common.util.WxMessageDuplicateChecker;
 import me.chanjar.weixin.common.util.WxMessageInMemoryDuplicateChecker;
 import me.chanjar.weixin.cp.bean.WxCpXmlMessage;
@@ -69,7 +66,7 @@ public class WxCpMessageRouter {
     this.wxCpService = wxCpService;
     this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
-    this.sessionManager = new InMemorySessionManager();
+    this.sessionManager = new StandardSessionManager();
   }
 
   /**
@@ -152,10 +149,12 @@ public class WxCpMessageRouter {
         );
       } else {
         res = rule.service(wxMessage);
+        // 在同步操作结束,session访问结束
+        log.trace("End session access after sync operation finish {}", wxMessage.getFromUserName());
+        sessionEndAccess(wxMessage);
       }
     }
 
-    // 告诉session,它已经用不着了
     if (futures.size() > 0) {
       executorService.submit(new Runnable() {
         @Override
@@ -163,19 +162,17 @@ public class WxCpMessageRouter {
           for (Future future : futures) {
             try {
               future.get();
+              log.trace("End session access after async operation finish {}", wxMessage.getFromUserName());
+              // 异步操作结束,session访问结束
+              sessionEndAccess(wxMessage);
             } catch (InterruptedException e) {
               log.error("Error happened when wait task finish", e);
             } catch (ExecutionException e) {
               log.error("Error happened when wait task finish", e);
             }
           }
-          // 在这里session再也不会被使用了
-          sessionEndAccess(wxMessage);
         }
       });
-    } else {
-      // 在这里session再也不会被使用了
-      sessionEndAccess(wxMessage);
     }
     return res;
   }
@@ -185,10 +182,12 @@ public class WxCpMessageRouter {
    * @param wxMessage
    */
   protected void sessionEndAccess(WxCpXmlMessage wxMessage) {
-    WxSession session = sessionManager.getSession(wxMessage.getFromUserName(), false);
+
+    InternalSession session = ((InternalSessionManager)sessionManager).findSession(wxMessage.getFromUserName());
     if (session != null) {
-      ((InternalSession) session).endAccess();
+      session.endAccess();
     }
+
   }
 
   public static class Rule {

+ 1 - 1
weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpService.java

@@ -392,7 +392,7 @@ public interface WxCpService {
   /**
    * <pre>
    * 设置WxSessionManager,只有当需要使用个性化的WxSessionManager的时候才需要调用此方法,
-   * WxCpService默认使用的是{@link me.chanjar.weixin.common.session.InMemorySessionManager}
+   * WxCpService默认使用的是{@link me.chanjar.weixin.common.session.StandardSessionManager}
    * </pre>
    * @param sessionManager
    */

+ 2 - 2
weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpServiceImpl.java

@@ -12,7 +12,7 @@ import me.chanjar.weixin.common.bean.WxMenu;
 import me.chanjar.weixin.common.bean.result.WxError;
 import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
 import me.chanjar.weixin.common.exception.WxErrorException;
-import me.chanjar.weixin.common.session.InMemorySessionManager;
+import me.chanjar.weixin.common.session.StandardSessionManager;
 import me.chanjar.weixin.common.session.WxSession;
 import me.chanjar.weixin.common.session.WxSessionManager;
 import me.chanjar.weixin.common.util.StringUtils;
@@ -66,7 +66,7 @@ public class WxCpServiceImpl implements WxCpService {
 
   private int maxRetryTimes = 5;
 
-  protected WxSessionManager sessionManager = new InMemorySessionManager();
+  protected WxSessionManager sessionManager = new StandardSessionManager();
 
   public boolean checkSignature(String msgSignature, String timestamp, String nonce, String data) {
     try {

+ 134 - 0
weixin-java-cp/src/test/java/me/chanjar/weixin/cp/api/WxCpMessageRouterTest.java

@@ -1,6 +1,7 @@
 package me.chanjar.weixin.cp.api;
 
 import me.chanjar.weixin.common.api.WxConsts;
+import me.chanjar.weixin.common.session.StandardSessionManager;
 import me.chanjar.weixin.common.session.WxSessionManager;
 import me.chanjar.weixin.cp.bean.WxCpXmlMessage;
 import me.chanjar.weixin.cp.bean.WxCpXmlOutMessage;
@@ -159,4 +160,137 @@ public class WxCpMessageRouterTest {
 
   }
 
+  @DataProvider
+  public Object[][] standardSessionManager() {
+
+    // 故意把session存活时间变短,清理更频繁
+    StandardSessionManager ism = new StandardSessionManager();
+    ism.setMaxInactiveInterval(1);
+    ism.setProcessExpiresFrequency(1);
+    ism.setBackgroundProcessorDelay(1);
+
+    return new Object[][] {
+        new Object[] { ism }
+    };
+
+  }
+
+  @Test(dataProvider = "standardSessionManager")
+  public void testSessionClean1(StandardSessionManager ism) throws InterruptedException {
+
+    // 两个同步请求,看是否处理完毕后会被清理掉
+    final WxCpMessageRouter router = new WxCpMessageRouter(null);
+    router.setSessionManager(ism);
+    router
+        .rule().async(false).handler(new WxSessionMessageHandler()).next()
+        .rule().async(false).handler(new WxSessionMessageHandler()).end();
+
+    WxCpXmlMessage msg = new WxCpXmlMessage();
+    msg.setFromUserName("abc");
+    router.route(msg);
+
+    Thread.sleep(2000l);
+    Assert.assertEquals(ism.getActiveSessions(), 0);
+
+  }
+
+  @Test(dataProvider = "standardSessionManager")
+  public void testSessionClean2(StandardSessionManager ism) throws InterruptedException {
+
+    // 1个同步,1个异步请求,看是否处理完毕后会被清理掉
+    {
+      final WxCpMessageRouter router = new WxCpMessageRouter(null);
+      router.setSessionManager(ism);
+      router
+          .rule().async(false).handler(new WxSessionMessageHandler()).next()
+          .rule().async(true).handler(new WxSessionMessageHandler()).end();
+
+      WxCpXmlMessage msg = new WxCpXmlMessage();
+      msg.setFromUserName("abc");
+      router.route(msg);
+
+      Thread.sleep(2000l);
+      Assert.assertEquals(ism.getActiveSessions(), 0);
+    }
+    {
+      final WxCpMessageRouter router = new WxCpMessageRouter(null);
+      router.setSessionManager(ism);
+      router
+          .rule().async(true).handler(new WxSessionMessageHandler()).next()
+          .rule().async(false).handler(new WxSessionMessageHandler()).end();
+
+      WxCpXmlMessage msg = new WxCpXmlMessage();
+      msg.setFromUserName("abc");
+      router.route(msg);
+
+      Thread.sleep(2000l);
+      Assert.assertEquals(ism.getActiveSessions(), 0);
+    }
+
+  }
+
+  @Test(dataProvider = "standardSessionManager")
+  public void testSessionClean3(StandardSessionManager ism) throws InterruptedException {
+
+    // 2个异步请求,看是否处理完毕后会被清理掉
+    final WxCpMessageRouter router = new WxCpMessageRouter(null);
+    router.setSessionManager(ism);
+    router
+        .rule().async(true).handler(new WxSessionMessageHandler()).next()
+        .rule().async(true).handler(new WxSessionMessageHandler()).end();
+
+    WxCpXmlMessage msg = new WxCpXmlMessage();
+    msg.setFromUserName("abc");
+    router.route(msg);
+
+    Thread.sleep(2000l);
+    Assert.assertEquals(ism.getActiveSessions(), 0);
+
+  }
+
+  @Test(dataProvider = "standardSessionManager")
+  public void testSessionClean4(StandardSessionManager ism) throws InterruptedException {
+
+    // 一个同步请求,看是否处理完毕后会被清理掉
+    {
+      final WxCpMessageRouter router = new WxCpMessageRouter(null);
+      router.setSessionManager(ism);
+      router
+          .rule().async(false).handler(new WxSessionMessageHandler()).end();
+
+      WxCpXmlMessage msg = new WxCpXmlMessage();
+      msg.setFromUserName("abc");
+      router.route(msg);
+
+      Thread.sleep(2000l);
+      Assert.assertEquals(ism.getActiveSessions(), 0);
+    }
+
+    {
+      final WxCpMessageRouter router = new WxCpMessageRouter(null);
+      router.setSessionManager(ism);
+      router
+          .rule().async(true).handler(new WxSessionMessageHandler()).end();
+
+      WxCpXmlMessage msg = new WxCpXmlMessage();
+      msg.setFromUserName("abc");
+      router.route(msg);
+
+      Thread.sleep(2000l);
+      Assert.assertEquals(ism.getActiveSessions(), 0);
+    }
+  }
+
+  public static class WxSessionMessageHandler implements  WxCpMessageHandler {
+
+    @Override
+    public WxCpXmlOutMessage handle(WxCpXmlMessage wxMessage, Map<String, Object> context, WxCpService wxCpService,
+        WxSessionManager sessionManager) {
+      sessionManager.getSession(wxMessage.getFromUserName());
+      return null;
+    }
+
+
+  }
+
 }

+ 19 - 20
weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java

@@ -1,9 +1,6 @@
 package me.chanjar.weixin.mp.api;
 
-import me.chanjar.weixin.common.session.InternalSession;
-import me.chanjar.weixin.common.session.InMemorySessionManager;
-import me.chanjar.weixin.common.session.WxSession;
-import me.chanjar.weixin.common.session.WxSessionManager;
+import me.chanjar.weixin.common.session.*;
 import me.chanjar.weixin.common.util.WxMessageDuplicateChecker;
 import me.chanjar.weixin.common.util.WxMessageInMemoryDuplicateChecker;
 import me.chanjar.weixin.mp.bean.WxMpXmlMessage;
@@ -67,7 +64,7 @@ public class WxMpMessageRouter {
     this.wxMpService = wxMpService;
     this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
-    this.sessionManager = new InMemorySessionManager();
+    this.sessionManager = new StandardSessionManager();
   }
 
   /**
@@ -131,29 +128,31 @@ public class WxMpMessageRouter {
         }
       }
     }
-    
+
     if (matchRules.size() == 0) {
       return null;
     }
-    
+
     WxMpXmlOutMessage res = null;
     final List<Future> futures = new ArrayList<Future>();
     for (final Rule rule : matchRules) {
       // 返回最后一个非异步的rule的执行结果
       if(rule.async) {
         futures.add(
-          executorService.submit(new Runnable() {
-            public void run() {
-              rule.service(wxMessage);
-            }
-          })
+            executorService.submit(new Runnable() {
+              public void run() {
+                rule.service(wxMessage);
+              }
+            })
         );
       } else {
         res = rule.service(wxMessage);
+        // 在同步操作结束,session访问结束
+        log.trace("End session access after sync operation finish {}", wxMessage.getFromUserName());
+        sessionEndAccess(wxMessage);
       }
     }
 
-    // 告诉session,它已经用不着了
     if (futures.size() > 0) {
       executorService.submit(new Runnable() {
         @Override
@@ -161,19 +160,17 @@ public class WxMpMessageRouter {
           for (Future future : futures) {
             try {
               future.get();
+              log.trace("End session access after async operation finish {}", wxMessage.getFromUserName());
+              // 异步操作结束,session访问结束
+              sessionEndAccess(wxMessage);
             } catch (InterruptedException e) {
               log.error("Error happened when wait task finish", e);
             } catch (ExecutionException e) {
               log.error("Error happened when wait task finish", e);
             }
           }
-          // 在这里session再也不会被使用了
-          sessionEndAccess(wxMessage);
         }
       });
-    } else {
-      // 在这里session再也不会被使用了
-      sessionEndAccess(wxMessage);
     }
     return res;
   }
@@ -183,10 +180,12 @@ public class WxMpMessageRouter {
    * @param wxMessage
    */
   protected void sessionEndAccess(WxMpXmlMessage wxMessage) {
-    WxSession session = sessionManager.getSession(wxMessage.getFromUserName(), false);
+
+    InternalSession session = ((InternalSessionManager)sessionManager).findSession(wxMessage.getFromUserName());
     if (session != null) {
-      ((InternalSession) session).endAccess();
+      session.endAccess();
     }
+
   }
 
   public static class Rule {

+ 2 - 2
weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpServiceImpl.java

@@ -10,7 +10,7 @@ import me.chanjar.weixin.common.bean.WxMenu;
 import me.chanjar.weixin.common.bean.result.WxError;
 import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
 import me.chanjar.weixin.common.exception.WxErrorException;
-import me.chanjar.weixin.common.session.InMemorySessionManager;
+import me.chanjar.weixin.common.session.StandardSessionManager;
 import me.chanjar.weixin.common.session.WxSessionManager;
 import me.chanjar.weixin.common.util.StringUtils;
 import me.chanjar.weixin.common.util.crypto.SHA1;
@@ -68,7 +68,7 @@ public class WxMpServiceImpl implements WxMpService {
 
   private int maxRetryTimes = 5;
 
-  protected WxSessionManager sessionManager = new InMemorySessionManager();
+  protected WxSessionManager sessionManager = new StandardSessionManager();
 
   public boolean checkSignature(String timestamp, String nonce, String signature) {
     try {

+ 133 - 0
weixin-java-mp/src/test/java/me/chanjar/weixin/mp/api/WxMpMessageRouterTest.java

@@ -1,6 +1,7 @@
 package me.chanjar.weixin.mp.api;
 
 import me.chanjar.weixin.common.api.WxConsts;
+import me.chanjar.weixin.common.session.StandardSessionManager;
 import me.chanjar.weixin.common.session.WxSessionManager;
 import me.chanjar.weixin.mp.bean.WxMpXmlMessage;
 import me.chanjar.weixin.mp.bean.WxMpXmlOutMessage;
@@ -159,4 +160,136 @@ public class WxMpMessageRouterTest {
 
   }
 
+  @DataProvider
+  public Object[][] standardSessionManager() {
+
+    // 故意把session存活时间变短,清理更频繁
+    StandardSessionManager ism = new StandardSessionManager();
+    ism.setMaxInactiveInterval(1);
+    ism.setProcessExpiresFrequency(1);
+    ism.setBackgroundProcessorDelay(1);
+
+    return new Object[][] {
+        new Object[] { ism }
+    };
+
+  }
+
+  @Test(dataProvider = "standardSessionManager")
+  public void testSessionClean1(StandardSessionManager ism) throws InterruptedException {
+
+    // 两个同步请求,看是否处理完毕后会被清理掉
+    final WxMpMessageRouter router = new WxMpMessageRouter(null);
+    router.setSessionManager(ism);
+    router
+        .rule().async(false).handler(new WxSessionMessageHandler()).next()
+        .rule().async(false).handler(new WxSessionMessageHandler()).end();
+
+    WxMpXmlMessage msg = new WxMpXmlMessage();
+    msg.setFromUserName("abc");
+    router.route(msg);
+
+    Thread.sleep(2000l);
+    Assert.assertEquals(ism.getActiveSessions(), 0);
+
+  }
+
+  @Test(dataProvider = "standardSessionManager")
+  public void testSessionClean2(StandardSessionManager ism) throws InterruptedException {
+
+    // 1个同步,1个异步请求,看是否处理完毕后会被清理掉
+    {
+      final WxMpMessageRouter router = new WxMpMessageRouter(null);
+      router.setSessionManager(ism);
+      router
+          .rule().async(false).handler(new WxSessionMessageHandler()).next()
+          .rule().async(true).handler(new WxSessionMessageHandler()).end();
+
+      WxMpXmlMessage msg = new WxMpXmlMessage();
+      msg.setFromUserName("abc");
+      router.route(msg);
+
+      Thread.sleep(2000l);
+      Assert.assertEquals(ism.getActiveSessions(), 0);
+    }
+    {
+      final WxMpMessageRouter router = new WxMpMessageRouter(null);
+      router.setSessionManager(ism);
+      router
+          .rule().async(true).handler(new WxSessionMessageHandler()).next()
+          .rule().async(false).handler(new WxSessionMessageHandler()).end();
+
+      WxMpXmlMessage msg = new WxMpXmlMessage();
+      msg.setFromUserName("abc");
+      router.route(msg);
+
+      Thread.sleep(2000l);
+      Assert.assertEquals(ism.getActiveSessions(), 0);
+    }
+
+  }
+
+  @Test(dataProvider = "standardSessionManager")
+  public void testSessionClean3(StandardSessionManager ism) throws InterruptedException {
+
+    // 2个异步请求,看是否处理完毕后会被清理掉
+    final WxMpMessageRouter router = new WxMpMessageRouter(null);
+    router.setSessionManager(ism);
+    router
+        .rule().async(true).handler(new WxSessionMessageHandler()).next()
+        .rule().async(true).handler(new WxSessionMessageHandler()).end();
+
+    WxMpXmlMessage msg = new WxMpXmlMessage();
+    msg.setFromUserName("abc");
+    router.route(msg);
+
+    Thread.sleep(2000l);
+    Assert.assertEquals(ism.getActiveSessions(), 0);
+
+  }
+
+  @Test(dataProvider = "standardSessionManager")
+  public void testSessionClean4(StandardSessionManager ism) throws InterruptedException {
+
+    // 一个同步请求,看是否处理完毕后会被清理掉
+    {
+      final WxMpMessageRouter router = new WxMpMessageRouter(null);
+      router.setSessionManager(ism);
+      router
+          .rule().async(false).handler(new WxSessionMessageHandler()).end();
+
+      WxMpXmlMessage msg = new WxMpXmlMessage();
+      msg.setFromUserName("abc");
+      router.route(msg);
+
+      Thread.sleep(2000l);
+      Assert.assertEquals(ism.getActiveSessions(), 0);
+    }
+
+    {
+      final WxMpMessageRouter router = new WxMpMessageRouter(null);
+      router.setSessionManager(ism);
+      router
+          .rule().async(true).handler(new WxSessionMessageHandler()).end();
+
+      WxMpXmlMessage msg = new WxMpXmlMessage();
+      msg.setFromUserName("abc");
+      router.route(msg);
+
+      Thread.sleep(2000l);
+      Assert.assertEquals(ism.getActiveSessions(), 0);
+    }
+  }
+
+  public static class WxSessionMessageHandler implements  WxMpMessageHandler {
+
+    @Override
+    public WxMpXmlOutMessage handle(WxMpXmlMessage wxMessage, Map<String, Object> context, WxMpService wxMpService,
+        WxSessionManager sessionManager) {
+      sessionManager.getSession(wxMessage.getFromUserName());
+      return null;
+    }
+
+  }
+
 }