第7章:基于zookeeper的分布式session

时间:2022-03-01 23:11:01

1、使用场景

分布式session功能在于集中式管理session信息,强化session的高可用,例如单机版session存储,当用户请求的服务器发生故障时,此时将用户的访问请求重定向到另外一台服务器就会出现session丢失而用户*退出,分布式session可以集中管理,去掉中心化带来的系统扩展性弱和高可用性差等问题。一般分布式的缓存服务可以充当session存储,由于zookeeper的节点最大存储内容体可以达到1M,并且session的特点基本是读多写少,所以zookeeper也可以作为分布式session实现的一个承载体。本章我们就zookeeper实现的一个分布式session进行一个详细阐述。

2、实现逻辑

第7章:基于zookeeper的分布式session

系统首先需要session将访问zookeeper接口做一层封装,对外提供基于zookeeper的增加,删除,获取session的接口,同时需要设计一个sessionData的类,用来存储session的原数据,其中有一个map就是用来存储session的属性值的。接下来需要自定义一个filter,将所有请求进行过滤,并且将request转化为httpRequestWapper,包装之后应用系统通过request任何操作session的接口都会被代理到我们事先事先的httpRequestWapper里面,进而通过httpRequestWapper将session数据存储到zookeeper,或者从zookeeper读取。

3、代码实现

首先我们定义一个session元数据类,用来操作所有session的属性,如下
/**
*
*/
package com.flykingmz.zookeeper.dSession.model;

import java.io.Serializable;
import java.util.Map;

/**
* @author flyking
*
*/
public class DSessionData implements Serializable {

/**
*
*/
private static final long serialVersionUID = 1L;

/**
* sessionId
*/
private String id;

/**
* session创建时间
*/
private Long createTime;

/**
* 最后一次访问时间
*/
private Long lastAccessTime;


public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public Long getCreateTime() {
return createTime;
}

public Long getLastAccessTime() {
return lastAccessTime;
}

public void setLastAccessTime(Long lastAccessTime) {
this.lastAccessTime = lastAccessTime;
}

public Map<String, String> getSessionContext() {
return sessionContext;
}

public void setSessionContext(Map<String, String> sessionContext) {
this.sessionContext = sessionContext;
}

/**
* session内容体
*/
private Map<String, String> sessionContext;

public DSessionData() {
this.createTime = System.currentTimeMillis();
this.lastAccessTime = this.createTime;
}

}
接下来定义基于zookeeper的dao层封装实现。
/**
*
*/
package com.flykingmz.zookeeper.dSession.dao;

import org.I0Itec.zkclient.ZkClient;
import com.flykingmz.zookeeper.dSession.json.Json;
import com.flykingmz.zookeeper.dSession.model.DSessionData;

/**
* @author flyking
*
*/
public class DSessionDaoImpl implements DSessionDao {

private ZkClient client;

private String zookeeperURL;

public void setZookeeperURL(String zookeeperURL) {
this.zookeeperURL = zookeeperURL;
}

public void init() {
this.client = new ZkClient(zookeeperURL);
}

/*
* (non-Javadoc)
*
* @see
* com.flykingmz.zookeeper.dSession.dao.DSessionDao#addSession(java.lang
* .String, com.flykingmz.zookeeper.dSession.model.DSessionData)
*/
public void addSession(String sessionId, DSessionData data) {
this.client.createPersistent(sessionId, Json.toJson(data));
}

/*
* (non-Javadoc)
*
* @see
* com.flykingmz.zookeeper.dSession.dao.DSessionDao#getSession(java.lang
* .String)
*/
public DSessionData getSession(String sessionId) {
String sessionData = this.client.readData(sessionId);
return Json.toObject(sessionData, DSessionData.class);
}

/*
* (non-Javadoc)
*
* @see
* com.flykingmz.zookeeper.dSession.dao.DSessionDao#delSession(java.lang
* .String)
*/
public void delSession(String sessionId) {
this.client.delete(sessionId);
}

}
dao层实现接下来就需要做一层service实现,主要处理session获取,过期、删除等操作。
/**
*
*/
package com.flykingmz.zookeeper.dSession.service;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.flykingmz.zookeeper.dSession.dao.DSessionDao;
import com.flykingmz.zookeeper.dSession.model.DSessionData;

/**
* @author flyking
*
*/
@Service
public class DSessionServiceImpl implements DSessionService {
@Autowired
private DSessionDao dSessionDaoImpl;

private Long session_expire_time = 30*60*1000L;

/*
* (non-Javadoc)
*
* @see
* com.flykingmz.zookeeper.dSession.service.DSessionService#getSession(java
* .lang.String)
*/
public Map<String, String> getSession(String sessionId) {
DSessionData sessionData = dSessionDaoImpl.getSession(sessionId);
if((System.currentTimeMillis() - sessionData.getCreateTime())>=session_expire_time){
dSessionDaoImpl.delSession(sessionId);
return null;
}
sessionData.setLastAccessTime(System.currentTimeMillis());
dSessionDaoImpl.addSession(sessionId, sessionData);
return sessionData.getSessionContext();
}

/*
* (non-Javadoc)
*
* @see
* com.flykingmz.zookeeper.dSession.service.DSessionService#saveSession(
* java.lang.String, java.util.Map)
*/
public void saveSession(String sessionId, Map<String, String> session) {
DSessionData sessionData = new DSessionData();
sessionData.setSessionContext(session);
sessionData.setId(sessionId);
dSessionDaoImpl.addSession(sessionId, sessionData);
}

/*
* (non-Javadoc)
*
* @see
* com.flykingmz.zookeeper.dSession.service.DSessionService#removeSession
* (java.lang.String)
*/
public void removeSession(String sessionId) {
dSessionDaoImpl.delSession(sessionId);
}

/*
* (non-Javadoc)
*
* @see
* com.flykingmz.zookeeper.dSession.service.DSessionService#setSessionAttribute
* (java.lang.String, java.lang.String, java.lang.String)
*/
public void setSessionAttribute(String sessionId, String key, String value) {
DSessionData sessionData = dSessionDaoImpl.getSession(sessionId);
sessionData.getSessionContext().put(key, value);
sessionData.setLastAccessTime(System.currentTimeMillis());
dSessionDaoImpl.addSession(sessionId, sessionData);
}

/*
* (non-Javadoc)
*
* @see com.flykingmz.zookeeper.dSession.service.DSessionService#
* removeSessionAttribute(java.lang.String, java.lang.String)
*/
public void removeSessionAttribute(String sessionId, String key) {
DSessionData sessionData = dSessionDaoImpl.getSession(sessionId);
sessionData.getSessionContext().remove(key);
sessionData.setLastAccessTime(System.currentTimeMillis());
dSessionDaoImpl.addSession(sessionId, sessionData);
}

/*
* (non-Javadoc)
*
* @see
* com.flykingmz.zookeeper.dSession.service.DSessionService#getSessionAttribute
* (java.lang.String, java.lang.String)
*/
public String getSessionAttribute(String sessionId, String key) {
DSessionData sessionData = dSessionDaoImpl.getSession(sessionId);
Map<String,String> sessionContext = sessionData.getSessionContext();
if(sessionContext == null){
return null;
}
sessionData.setLastAccessTime(System.currentTimeMillis());
dSessionDaoImpl.addSession(sessionId, sessionData);
return sessionContext.get(key);
}

}
以上属于业务层面的实现,接下来就需要封装httprequest了,首先需要定义requestWraaper以及HttpSessionSessionIdWrapper
package com.flykingmz.zookeeper.dSession.filter;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;

/**
* @author flyking
*/
public class HttpServletRequestWrapper extends javax.servlet.http.HttpServletRequestWrapper {
private String sessionId = "";

public HttpServletRequestWrapper(String sessionId , HttpServletRequest request) {
super(request);
this.sessionId = sessionId;
}

public HttpSession getSession(boolean create) {
return new HttpSessionSessionIdWrapper(this.sessionId, super.getSession(create));
}

public HttpSession getSession() {
return new HttpSessionSessionIdWrapper(this.sessionId, super.getSession());
}

}
package com.flykingmz.zookeeper.dSession.filter;import java.util.Enumeration;import java.util.Map;import javax.servlet.http.HttpSession;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.flykingmz.zookeeper.dSession.DistributedContextContainer;import com.flykingmz.zookeeper.dSession.Enumerator;/** * @author flyking */public class HttpSessionSessionIdWrapper extends HttpSessionWrapper {private final static Logger logger = LoggerFactory.getLogger(HttpSessionSessionIdWrapper.class);private String sessionId;public HttpSessionSessionIdWrapper(String sessionId, HttpSession session) {super(session);this.sessionId = sessionId;}public Object getAttribute(String key) {return DistributedContextContainer.getSessionService().getSessionAttribute(sessionId, key);}public Enumeration getAttributeNames() {Map<String, String> session = DistributedContextContainer.getSessionService().getSession(sessionId);return (new Enumerator(session.keySet(), true));}public void invalidate() {DistributedContextContainer.getSessionService().removeSession(sessionId);}public void removeAttribute(String key) {DistributedContextContainer.getSessionService().removeSessionAttribute(sessionId, key);}@SuppressWarnings("unchecked")public void setAttribute(String key, Object value) {if (value instanceof String) {DistributedContextContainer.getSessionService().setSessionAttribute(sessionId, key, (String) value);} else {logger.warn("session unsupport not serializable string." + "[key="+ key + "]" + "[value=" + value + "]");}}@Overridepublic String getId() {return sessionId;}}
这样request可以被包装处理实现,而发起这个包装的触发点就在filter里面,我们看下filter的实现。
package com.flykingmz.zookeeper.dSession.filter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;


public class DistributedSessionFilter implements Filter {
private final static Logger logger = LoggerFactory
.getLogger(DistributedSessionFilter.class);
/**
*
*/
private static final long serialVersionUID = -1L;

private String sessionIdName = "D_SESSION_ID";

private String cookieDomain = "";

private String cookiePath = "/";

private List<String> excludeUrl = new ArrayList<String>();

public void doFilter(ServletRequest servletRequest,
ServletResponse servletResponse, FilterChain filterChain)
throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) servletRequest;
HttpServletResponse response = (HttpServletResponse) servletResponse;
String uri = ((HttpServletRequest) request).getRequestURI();

if (this.excludeUrl != null && this.isMatchExcludeUrl(uri)) {
filterChain.doFilter(request, response);
return;
}

//设置cookieDomain
initCookieDomain(request);


String sessionId = getSessionId(request, response);
HttpServletRequestWrapper httpServletRequestWrapper = new HttpServletRequestWrapper(sessionId, request);
logger.debug("sessionId:"+sessionId);


filterChain.doFilter(httpServletRequestWrapper, response);
}



private void initCookieDomain(HttpServletRequest request) {
String serverName = request.getServerName();
cookieDomain = serverName;
}


private String getSessionId(HttpServletRequest request,
HttpServletResponse response) {
Cookie cookies[] = request.getCookies();
Cookie sCookie = null;
String sessionId = "";
if (cookies != null && cookies.length > 0) {
for (int i = 0; i < cookies.length; i++) {
sCookie = cookies[i];
if (sCookie.getName().equals(sessionIdName)) {
sessionId = sCookie.getValue();
}
}
}

if (sessionId == null || sessionId.length() == 0) {
sessionId = java.util.UUID.randomUUID().toString();
response.addHeader("Set-Cookie", sessionIdName + "=" + sessionId
+ ";domain=" + this.cookieDomain + ";Path="
+ this.cookiePath + ";HTTPOnly");
}
return sessionId;
}



public void init(FilterConfig filterConfig) throws ServletException {
this.cookieDomain = filterConfig.getInitParameter("cookieDomain");
if (this.cookieDomain == null) {
this.cookieDomain = "";
}
this.cookiePath = filterConfig.getInitParameter("cookiePath");
if (this.cookiePath == null || this.cookiePath.length() == 0) {
this.cookiePath = "/";
}
String excludeUrlsString = filterConfig.getInitParameter("excludeUrls");
if (!StringUtils.isEmpty(excludeUrlsString)) {
String[] urls = excludeUrlsString.split(",");
this.excludeUrl = Arrays.asList(urls);
}
}

private boolean isMatchExcludeUrl(String uri) {
if (StringUtils.isEmpty(uri)) {
return false;
}
// 修复类型匹配规则
for (String regexUrl : this.excludeUrl) {
if (uri.endsWith(regexUrl)) {
return true;
}
}
return false;
}

public void destroy() {
this.excludeUrl = null;
this.cookieDomain = null;
this.cookiePath = null;
}

}
以上就是一个基于zookeeper实现的分布式session主要逻辑代码,具体源码实现可以参考

https://github.com/flykingmz/zookeeper-step.git 项目名称为:distributedSession