spark 大型项目实战(九):用户访问session分析(九) --开发JDBC辅助组件(连接池)

时间:2022-04-23 00:50:38

**文章地址:http://www.haha174.top/article/details/253097**
项目源码:https://github.com/haha174/spark-session-project.git
在第八篇文章中介绍到了单例模式
首先我们使用单例模式创建一个连接池保存程序初始启动时获取的链接信息如下

  // 为什么要实现代理化呢?因为它的内部要封装一个简单的内部的数据库连接池
    // 为了保证数据库连接池有且仅有一份,所以就通过单例的方式
    // 保证JDBCHelper只有一个实例,实例中只有一份数据库连接池
    private static JDBCHelper instance = null;

    /**
     * 获取单例
     *
     * @return 单例
     */
    public static JDBCHelper getInstance() {
        if (instance == null) {
            synchronized (JDBCHelper.class) {
                if (instance == null) {
                    instance = new JDBCHelper();
                }
            }
        }
        return instance;
    }
     private JDBCHelper() {
        int datasourceSize = GetValueUtils.getIntegerOrElse(ConfigurationManager.getProperty(Constants.JDBC.JDBC_DATASOURCE_SIZE), 1);
        try {
            initDataSourcre(datasourceSize);
        }catch (Exception e){
            throw new RuntimeException();
        }
    }

下面需要一个取出链接和归还链接的方法

    /**
     * 第四步,提供获取数据库连接的方法
     * 有可能,你去获取的时候,这个时候,连接都被用光了,你暂时获取不到数据库连接
     * 所以我们要自己编码实现一个简单的等待机制,去等待获取到数据库连接
     */
    public synchronized Connection getConnection() {
        try {
            int time = 0;
            while (datasource.size() == 0) {
                time++;
                if (time > getConnectionTime && num <= maxNum) {
                    initDataSourcre(1);
                } else {
                    Thread.sleep(100);
                }
            }
            Connection connection = datasource.poll();
            if (connection.isClosed()) {
                reduceNum(1);
                return getConnection();
            }
            connection.setAutoCommit(true);
            return connection;
        } catch (Exception e) {
            reduceNum(1);
            return getConnection();
        }
    }

    /**
     * 归还数据库连接
     * @param connection
     */
    public synchronized void BackConnection(Connection connection) {
        datasource.add(connection);
        connection=null;
    }

下面提供几个原生的增删改查的方法

   /**
     * 执行增删改SQL语句
     * 自动事务执行 执行update
     * @param sql
     * @return 影响的行数
     */
    public int executeUpdate(String sql, Object[] params) {
        int rtn=0;
        Connection connection=getConnection();
        try {
            PreparedStatement  pstmt =getPrepareStatementSql(connection,sql,params);
            rtn = pstmt.executeUpdate();
        }catch (SQLException e){
            e.printStackTrace();
        }finally {
            BackConnection(connection);
        }
        return rtn;
    }

    /**
     * 执行增删改SQL语句
     * 手动事务执行 执行update
     * @param sql
     * @return 影响的行数
     */
    public int executeUpdate(Connection connection,String sql, Object[] params) {
        int rtn=0;
        try {
            PreparedStatement  pstmt =getPrepareStatementSql(connection,sql,params);
            rtn = pstmt.executeUpdate();
        }catch (SQLException e){
            e.printStackTrace();
            throw new SessionFactoryException(e.getMessage());
        }
        return rtn;
    }

    /**
     * 执行查询SQL语句
     * 自动事务检索
     * @param sql
     * @param params
     */
    public ResultSet executeQuery(String sql, Object[] params){
        Connection connection=getConnection();
        ResultSet rs=null;
        try {
            PreparedStatement  pstmt =getPrepareStatementSql(connection,sql,params);
             rs=pstmt.executeQuery();
        }catch (SQLException e){
            e.printStackTrace();
        }finally {
            BackConnection(connection);
        }
        return rs;
    }

    /**
     *
     * @param connection
     * @param sql
     * @param params
     * @return 带事务的编译
     * @throws SQLException
     */
    private PreparedStatement getPrepareStatementSql(Connection connection,String sql, Object[] params)throws SQLException{
        PreparedStatement  pstmt = connection.prepareStatement(sql);
        if(params!=null&&params.length>0){
            for (int i = 0; i < params.length; i++) {
                pstmt.setObject(i + 1, params[i]);
            }
        }
        return pstmt;
    }

为了方便使用在上述的基础上在封装一层,创建一个SessionFactory 类提供两个对象,一个是自动事务的对象,一般用来做查询,和单条sql 语句的增删改,还有一个是手动事务的对象,一般用于同时更新多张表的情况,例如订单表和库存表,添加订单和减少库存,要么同时成功要么一起回滚,但是使用SessionFactory的手动事务对象需要自行commit 和归还链接也就是调用close 方法 具体如下,后续会在实际的项目中再做调整。

public class SessionFactory {
    /**
     * 连接信息
     */
    private Connection connection;
    /**
     * 标注当前是否是手动事务的还是自动事务的
     */
    private boolean transaction=false;
    /**
     * 获得对象
     */
    private static JDBCHelper jdbcHelper = JDBCHelper.getInstance();

    private SessionFactory(Connection connection,boolean transaction) {
        this.connection = connection;
        this.transaction = transaction;
    }
    private SessionFactory(){}

    public static SessionFactory getSessionFactory() {
        return new SessionFactory(null,false);
    }

    /**
     * 注意 当取得手动事务的session  需要手动去提交事务  和close   sessionFactory  不然会导致 增删改失败和丢失连接数
     * @return
     */
    public static SessionFactory getTransactionSessionFactory() {
        SessionFactory sessionFactory= new SessionFactory(jdbcHelper.getConnection(),true);
        sessionFactory.setAutoCommit(false);
        return sessionFactory;
    }
    /**
     * 提交事务
     *
     * @return
     */
    public boolean commit() {
        try {
            connection.commit();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 回滚事务
     *
     * @return
     */
    public boolean rollback() {
        try {
            connection.rollback();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 设置自动提交  默认的是true
     *
     * @return
     */
    public boolean setAutoCommit(boolean flag) {
        try {
            connection.setAutoCommit(flag);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 增删改方法
     * @param sql
     * @param params
     * @return
     */

    public int executeUpdate(String sql, Object[] params) {
        if(transaction==transaction){
           return jdbcHelper.executeUpdate(connection,sql,params);
        }else{
            return jdbcHelper.executeUpdate(sql,params);
        }
    }
    /**
     * @param sql
     * @param params
     * @return
     */
    public Map<String, Object> queryForMap(String sql, Object[] params) {
        try {
            ResultSet set = jdbcHelper.executeQuery( sql,  params);
            List<Map<String, Object>> list = ResultSetToMap(set);
            if (list != null && list.size() > 0) {
                return list.get(0);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * @param sql
     * @return
     */
    public Map<String, Object> queryForMap(String sql) {
        return queryForMap(sql, null);
    }

    /**
     * 返回一个map  形式的查找结果
     * @param rs
     * @return
     */
    private List<Map<String, Object>> ResultSetToMap(ResultSet rs) {
        try {
            ResultSetMetaData rsmd = rs.getMetaData();
            int count = rsmd.getColumnCount();
            String[] name = new String[count];
            for (int i = 0; i < count; i++) {
                name[i] = rsmd.getColumnName(i + 1);
            }
            List<Map<String, Object>> result = new ArrayList<Map<String, Object>>();
            Map<String, Object> map = new HashMap<>();
            while (rs.next()) {
                for (int i = 0; i < count; i++) {
                    map.put(name[i], rs.getObject(name[i]));
                }
                result.add(map);
            }
            return result;
        } catch (SQLException e) {
            e.printStackTrace();
            throw new SessionFactoryException(e.getMessage());
        }
    }


    public <T> T queryForObject(String sql, Class<T> clazz) {
        return queryForObject(sql, null, clazz);
    }

    /**
     * 返回一个对象形式的查找结果
     * @param sql
     * @param params
     * @param clazz
     * @param <T>
     * @return
     */
    public <T> T queryForObject(String sql, Object[] params, Class<T> clazz) {
        Map<String, Object> map = queryForMap(sql, params);
        if (null != map) {
            try {
                return BeanUtil.mapToBean(clazz,map);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return null;
    }


    public <T> List<T> queryForList(String sql, Class<T> clazz) {
        return queryForList(sql, null, clazz);
    }

    public <T> List<T> queryForList(String sql, Object[] params, Class<T> clazz) {
        try {
            ResultSet set = jdbcHelper.executeQuery( sql,  params);
            List<Map<String, Object>> list = ResultSetToMap(set);
            return mapsToObjects(list, clazz);
        } catch (Exception e) {
            e.printStackTrace();
            throw new SessionFactoryException(e.getMessage());
        }
    }

    /**
     * 根据sql  获取String
     *
     * @param sql
     * @return
     */
    public String queryForString(String sql) {
        return queryForString(sql, null);
    }

    /**
     * @param sql
     * @param params
     * @return
     */
    public String queryForString(String sql, Object[] params) {
        try {
            ResultSet set = jdbcHelper.executeQuery( sql,  params);
            List<Map<String, Object>> list = ResultSetToMap(set);
            if (list != null && list.size() > 0) {
                return com.alibaba.fastjson.JSON.toJSONString(list.get(0));
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new SessionFactoryException(e.getMessage());
        }
        return "";
    }


    private <T> List<T> mapsToObjects(List<Map<String, Object>> maps, Class<T> clazz) throws Exception {
        List<T> list = Lists.newArrayList();
        if (maps != null && maps.size() > 0) {
            T bean = null;
            for (Map<String, Object> map : maps) {
                bean = BeanUtil.mapToBean(clazz,map);
                list.add(bean);
            }
        }
        return list;
    }

    /**
     * 关闭连接
     */
    public void close() {
        JDBCHelper.getInstance().BackConnection(connection);
        this.connection=null;
    }

}

欢迎关注,更多福利

spark 大型项目实战(九):用户访问session分析(九) --开发JDBC辅助组件(连接池)