jedis源码分析(三)-多节点实例

时间:2021-09-05 17:27:45
我们现实场景中经常会遇到多个redis服务节点的情况, jedis提供实现分片存储的实现,jedis通过ShardedJedis支持多个节点地址,简单的shardedJedis代码示例:
3,多实例模式:
import java.util.ArrayList;
import java.util.List;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
public class ShardedJedisUtil {
        private ShardedJedis shardedJedis ;
        private String addresss ;
        public ShardedJedisUtil(String addresss ) {
               this . addresss = addresss ;
              initShardedJedis();
       }
        public void initShardedJedis() {
              List<JedisShardInfo> shards = new ArrayList<>();
              JedisShardInfo info = null ;
               for (String address : addresss .split( "," )) {
                     String[] hostAndPort = address .split( ":" );
                      info = new JedisShardInfo( hostAndPort [0], Integer. valueOf ( hostAndPort [1]));
                      shards .add( info );
              }
               shardedJedis = new ShardedJedis( shards );
       }
        public void set(String key , String value ) {
               try {
                      shardedJedis .set( key , value );
              } catch (JedisConnectionException e ) {
                     System. out .println( e .getMessage());
              } finally {
                      shardedJedis .close();
              }
       }
        public String get(String key ) {
              String result = null ;
               try {
                      result = shardedJedis .get( key );
              } catch (JedisConnectionException e ) {
                     System. out .println( e .getMessage());
              } finally {
                      shardedJedis .close();
              }
               return result ;
       }
        public static void main(String args []) {
               for ( int i = 0; i < 100; i ++) {
                     ShardedJedisUtil ShardedJedisUtil = new ShardedJedisUtil( "127.0.0.1:4100,127.0.0.1:4101" );
                      ShardedJedisUtil .set( "key" + i , "value" + i );
                     System. out .println( ShardedJedisUtil .get( "key" + i ));
              }
       }
}

在示例构造方法中调用 initShardedJedis()方法,首先创建一个 JedisShardInfo对象的 List集合shards,将传入的地址解析之后创建两个JedisShardInfo对象放入shards中, JedisShardInfo是继承了 ShardInfo<Jedis>类的一个包含redis节点信息的包装类对象
public class JedisShardInfo extends ShardInfo<Jedis> {
 
  private int connectionTimeout ;
  private int soTimeout ;
  private String host ;
  private int port ;
  private String password = null ;
  private String name = null ;
   private int db = 0;
  private boolean ssl ;
  private SSLSocketFactory sslSocketFactory ;
  private SSLParameters sslParameters ;
  private HostnameVerifier hostnameVerifier ;
 
  public JedisShardInfo(String host ) {
    super (Sharded. DEFAULT_WEIGHT );
    URI uri = URI. create ( host );
    if ( JedisURIHelper . isValid ( uri )) {
      this . host = uri .getHost();
      this . port = uri .getPort();
      this . password = JedisURIHelper . getPassword ( uri );
      this . db = JedisURIHelper . getDBIndex ( uri );
      this . ssl = uri .getScheme().equals( "rediss" );
    } else {
      this . host = host ;
      this . port = Protocol. DEFAULT_PORT ;
    }
  }
  public JedisShardInfo(String host , SSLSocketFactory sslSocketFactory ,
      SSLParameters sslParameters , HostnameVerifier hostnameVerifier ) {
    super (Sharded. DEFAULT_WEIGHT );
    URI uri = URI. create ( host );
    if ( JedisURIHelper . isValid ( uri )) {
      this . host = uri .getHost();
      this . port = uri .getPort();
      this . password = JedisURIHelper . getPassword ( uri );
      this . db = JedisURIHelper . getDBIndex ( uri );
      this . ssl = uri .getScheme().equals( "rediss" );
      this . sslSocketFactory = sslSocketFactory ;
      this . sslParameters = sslParameters ;
      this . hostnameVerifier = hostnameVerifier ;
    } else {
      this . host = host ;
      this . port = Protocol. DEFAULT_PORT ;
    }
  }
  public JedisShardInfo(String host , String name ) {
    this ( host , Protocol. DEFAULT_PORT , name );
  }
  public JedisShardInfo(String host , int port ) {
    this ( host , port , 2000);
  }
.....
}
之后创建了 ShardedJedis对象 ,在ShardedJedis构造方法中
  public ShardedJedis(List<JedisShardInfo> shards ) {
    super ( shards );
  }
  public ShardedJedis (List<JedisShardInfo> shards , Hashing algo ) {
    super ( shards , algo );
  }
  public ShardedJedis(List<JedisShardInfo> shards , Pattern keyTagPattern ) {
    super ( shards , keyTagPattern );
  }
  public ShardedJedis(List<JedisShardInfo> shards , Hashing algo , Pattern keyTagPattern ) {
    super ( shards , algo , keyTagPattern );
  }
构造参数有一个JedisShardInfo对象的集合, JedisShardInfo对象中包含host,port,password等属性信息,
ShardedJedis继承了 BinaryShardedJedis类, BinaryShardedJedis 继承了 Sharded<Jedis, JedisShardInfo>,并实现了 BinaryJedisCommands接口, Sharded 的构造方法
public Sharded(List<S> shards ) {
               this ( shards , Hashing. MURMUR_HASH ); // MD5 is really not good as we works
               // with 64-bits not 128
       }
        public Sharded(List<S> shards , Hashing algo ) {
               this . algo = algo ;
              initialize( shards );
       }
        public Sharded(List<S> shards , Pattern tagPattern ) {
               this ( shards , Hashing. MURMUR_HASH , tagPattern ); // MD5 is really not good
               // as we works with
               // 64-bits not 128
       }
        public Sharded (List<S> shards , Hashing algo , Pattern tagPattern ) {
               this . algo = algo ;
               this . tagPattern = tagPattern ;
              initialize( shards );
       }
Sharded 构造方法中有一个参数 Hashing. MURMUR_HASH, Austin Appleby在2008年发明,之后衍生出MURMUR2,MURMUR3等版本, Hashing. MURMUR_HASH是一个高运算低碰撞的哈希算法,
Sharded 构造方法中还调用了initialize( shards )方法:
private void initialize (List<S> shards ) {
               nodes = new TreeMap<Long, S>();
               for ( int i = 0; i != shards .size(); ++ i ) {
                      final S shardInfo = shards .get( i );
                      if ( shardInfo .getName() == null )
                            for ( int n = 0; n < 160 * shardInfo .getWeight(); n ++) {
                                   nodes .put( this . algo .hash( "SHARD-" + i + "-NODE-" + n ), shardInfo );
                           }
                      else
                            for ( int n = 0; n < 160 * shardInfo .getWeight(); n ++) {
                                   nodes .put( this . algo .hash( shardInfo .getName() + "*" + shardInfo .getWeight() + n ), shardInfo );
                           }
                      resources .put( shardInfo , shardInfo .createResource());
              }
       }
initialize方法实现了对redis节点的分片处理,存放在定义TreeMap类型的nodes中,nodes的key通过murmur_hash哈希算法计算的哈希值,value值是之前传入的 JedisShardInfo 对象,默认分片大小是160*weight,weight是权重大小,默认为1。最后将 JedisShardInfo创建的资源放入resuources的 LinkedHashMap中。
初始化shardedJedis对象完成之后,调用set方法,
  public String set(String key , String value ) {
    Jedis j = getShard( key );
    System. out .println( "master:" + j .getClient().getHost()+ ":" + j .getClient().getPort());
    return j .set( key , value );
  }
在set方法中首先调用了一个getShard(key)方法,该方法获取分片后的具体某一个分片信息,也就是分片中放入的jedis对象,
public R getShard (String key ) {
               return resources .get(getShardInfo( key ));
       }
public S getShardInfo(String key ) {
               return getShardInfo(SafeEncoder. encode (getKeyTag( key )));
       }
public S getShardInfo( byte [] key ) {
              SortedMap<Long, S> tail = nodes .tailMap( algo .hash( key ));
              S s = null ;
              Long num = null ;
               if ( tail .isEmpty()) {
                      num = nodes .firstKey();
                      s = nodes .get( num );
                      return s ;
              }
               num = tail .firstKey();
               s = tail .get( num );
               return s ;
       }
最终调用到 getShardInfo( byte [] key )方法,该方法中nodes调用tailMap方法,查询文档可知tailMap 方法用于返回此映射,其键大于或等于fromKey的部分视图。 nodes .tailMap( algo .hash( key )) 将nodes集合中将key键的值大于等于algo.hash(key)哈希值的所有键值对都返回。这里采用的哈希算法与初始化shardedJedis的分片算法一致都采用Murmur_hash算法。
然后通过tail的firstKey方法获取到第一个键值对的键值,也就是一个第一个键值对的key,在通过tail.get(key)获取到初始化时放入的 Jedis 对象。最后调用jedis的set方法将key,value放入其中。