把一个长key缩短
package com.org.util; import java.security.MessageDigest; import java.util.Scanner; import org.apache.commons.lang.StringUtils; public class StringUtil { /** * 1个key下面挂480个field,80亿数据 len=4,64*64*64*64=1677万key * 具体怎么控制见方法体 */ private static final int LEN = 4; private static final char CHARS[] = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '$', '_'}; /** * 根据field获取key 80亿 len=4 * @param field * @param bit 2的byte次方=key数量 * @return */ public static String getBucketId(String field){ byte[] md; try { MessageDigest mdInst = MessageDigest.getInstance("MD5"); md = mdInst.digest(field.getBytes()); } catch (Exception e) { if(StringUtils.isBlank(field)||field.length()<LEN){ return "@@@@"; }else{ md = field.getBytes(); } } StringBuffer sb = new StringBuffer(); //前4位取所有值 byte b; for(int i=0; i<LEN; i++){ b = md[i]; if(b<0){ b&=127; } sb.append(CHARS[b>>1]); } return sb.toString(); } /** * 通过field获取key * @param args */ public static void main(String[] args) { Scanner scan = new Scanner(System.in); //通过field获取key System.out.println("通过field获取key"); System.out.println("请输入field"); String field = scan.next(); scan.close(); System.out.println(getBucketId(field)); } }
导入文件类
package com.cpic.ffsw; import java.io.BufferedReader; import java.io.Closeable; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Scanner; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import com.org.util.DateUtil; import com.org.util.StringUtil; import com.org.util.Util; import redis.clients.jedis.BinaryJedisCluster; import redis.clients.jedis.Client; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisClusterConnectionHandler; import redis.clients.jedis.JedisClusterInfoCache; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisSlotBasedConnectionHandler; import redis.clients.jedis.PipelineBase; import redis.clients.jedis.exceptions.JedisMovedDataException; import redis.clients.jedis.exceptions.JedisRedirectionException; import redis.clients.util.JedisClusterCRC16; import redis.clients.util.SafeEncoder; /** * 指定文件导入 * @author 1 * */ public class JedisClusterPipeline extends PipelineBase implements Closeable { //常量定义 private static final int CONNECTION_TIMEOUT = 10000;//连接超时时间 private static final int SO_TIMEOUT = 10000;//返回值的超时时间 private static final int MAX_ATTEMPTS = 5;//出现异常最大重试次数 // 部分字段没有对应的获取方法,只能采用反射来做 // 你也可以去继承JedisCluster和JedisSlotBasedConnectionHandler来提供访问接口 private static final Field FIELD_CONNECTION_HANDLER; private static final Field FIELD_CACHE; static { FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler"); FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache"); } private JedisSlotBasedConnectionHandler connectionHandler; private JedisClusterInfoCache clusterInfoCache; private Queue<Client> clients = new LinkedList<Client>(); // 根据顺序存储每个命令对应的Client private Map<JedisPool, Map<Long,Jedis>> jedisMap = new HashMap<>(); // 用于缓存连接 private boolean hasDataInBuf = false; // 是否有数据在缓存区 /** * 根据jedisCluster实例生成对应的JedisClusterPipeline * * @param * @return */ public JedisClusterPipeline(JedisCluster jedisCluster){ setJedisCluster(jedisCluster); } private void setJedisCluster(JedisCluster jedis) { connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER); clusterInfoCache = getValue(connectionHandler, FIELD_CACHE); } /** * 刷新集群信息,当集群信息发生变更时调用 * * @param * @return */ private void refreshCluster() { connectionHandler.renewSlotCache(); } /** * 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化 */ private void sync() { innerSync(null); } /** * 同步读取所有数据 并按命令顺序返回一个列表 * * @return 按照命令的顺序返回所有的数据 */ private List<Object> syncAndReturnAll() { List<Object> responseList = new ArrayList<Object>(); innerSync(responseList); return responseList; } private void innerSync(List<Object> formatted) { HashSet<Client> clientSet = new HashSet<Client>(); try { for (Client client : clients) { // 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,发生了JedisMovedDataException这样的错误应用是不知道的,因此需要调用get()来触发错误。 // 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,然而它并没有提供对应方法,要获取data属性就得用反射,不想再反射了,所以就这样了 Object data = generateResponse(client.getOne()).get(); if (null != formatted) { formatted.add(data); } // size相同说明所有的client都已经添加,就不用再调用add方法了 if (clientSet.size() != jedisMap.size()) { clientSet.add(client); } } } catch (JedisRedirectionException jre) { if (jre instanceof JedisMovedDataException) { // if MOVED redirection occurred, rebuilds cluster's slot cache, // recommended by Redis cluster specification refreshCluster(); } jre.printStackTrace(); throw jre; } finally { if (clientSet.size() != jedisMap.size()) { // 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染 for(Map.Entry<JedisPool, Map<Long,Jedis>> poolEntry:jedisMap.entrySet()){ for (Map.Entry<Long, Jedis> jedisEntry : poolEntry.getValue().entrySet()) { if (clientSet.contains(jedisEntry.getValue().getClient())) { continue; } flushCachedData(jedisEntry.getValue()); } } } hasDataInBuf = false; close(); } } @Override public void close() { clean(); clients.clear(); for (Map.Entry<JedisPool, Map<Long, Jedis>> poolEntry:jedisMap.entrySet()) { for(Map.Entry<Long, Jedis> jedisEntry:poolEntry.getValue().entrySet()){ if (hasDataInBuf) { flushCachedData(jedisEntry.getValue()); } jedisEntry.getValue().close(); } } jedisMap.clear(); hasDataInBuf = false; } private void flushCachedData(Jedis jedis) { try { jedis.getClient().getAll(); } catch (RuntimeException ex) { ex.printStackTrace(); } } @Override protected Client getClient(String key) { byte[] bKey = SafeEncoder.encode(key); return getClient(bKey); } @Override protected Client getClient(byte[] key) { Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key)); Client client = jedis.getClient(); clients.add(client); return client; } private Jedis getJedis(int slot) { //根据线程id从缓存中获取Jedis Jedis jedis = null; Map<Long,Jedis> tmpMap = null; //获取线程id long id = Thread.currentThread().getId(); //获取jedispool JedisPool pool = clusterInfoCache.getSlotPool(slot); if (jedisMap.containsKey(pool)) { tmpMap = jedisMap.get(pool); if(tmpMap.containsKey(id)){ jedis = tmpMap.get(id); }else{ jedis = pool.getResource(); tmpMap.put(id, jedis); } }else{ tmpMap = new HashMap<>(); jedis = pool.getResource(); tmpMap.put(id, jedis); jedisMap.put(pool, tmpMap); } hasDataInBuf = true; return jedis; } private static Field getField(Class<?> cls, String fieldName) { try { Field field = cls.getDeclaredField(fieldName); field.setAccessible(true); return field; } catch (NoSuchFieldException | SecurityException e) { e.printStackTrace(); throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e); } } @SuppressWarnings({ "unchecked" }) private static <T> T getValue(Object obj, Field field) { try { return (T) field.get(obj); } catch (IllegalArgumentException | IllegalAccessException e) { e.printStackTrace(); throw new RuntimeException(e); } } public static long write(String host,int port,String pwd,String fileName){ long s = System.currentTimeMillis(); File file = new File(fileName); String theadName = file.getName(); System.out.println(theadName+"-->当前时间:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())); BufferedReader br = null; JedisCluster jc = getJedisCluster(host, port, pwd); JedisClusterPipeline jcp = new JedisClusterPipeline(jc); jcp.refreshCluster(); List<Object> batchResult = null; int totalCount = 0; try { // batch write br = new BufferedReader(new FileReader(file)); int count = 0; String current; while (null!=(current=br.readLine())) { count++; totalCount++; String[] keyValue = current.split(",",2); jcp.hset(StringUtil.getBucketId(keyValue[0]),keyValue[0],keyValue[1]); if(count==1500000){ jcp.sync(); batchResult = jcp.syncAndReturnAll(); count = 0; System.out.println(theadName+"-->当前时间:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())+",totalCount:"+Util.mathFormat(totalCount)+",耗时:"+(System.currentTimeMillis() - s)/60000+"min,batchResult.size():"+batchResult.size()); } } if(count>0){ jcp.sync(); batchResult = jcp.syncAndReturnAll(); System.out.println(theadName+"-->当前时间:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())+",totalCount:"+Util.mathFormat(totalCount)+",耗时:"+(System.currentTimeMillis() - s)/60000+"min,batchResult.size():"+batchResult.size()); } }catch(Exception e){ e.printStackTrace(); } finally { try { jcp.close(); } catch (Exception e) { e.printStackTrace(); } try { jc.close(); } catch (Exception e) { e.printStackTrace(); } try { br.close(); } catch (Exception e) { e.printStackTrace(); } } // output time System.out.println(theadName+" over-->当前时间:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())+",totalCount:"+Util.mathFormat(totalCount)+",耗时:"+(System.currentTimeMillis() - s)/60000+"min"); return totalCount; } /** * 通过fields批量获取values * @param fields * @param host * @param port * @param pwd * @return */ public static List<String> mhget(List<String> fields,String host,int port,String pwd){ List<String> results = new ArrayList<String>(); JedisCluster jc = getJedisCluster(host, port, pwd); JedisClusterPipeline jcp = new JedisClusterPipeline(jc); jcp.refreshCluster(); try { for (String field:fields) { jcp.hget(StringUtil.getBucketId(field),field); } List<Object> list=jcp.syncAndReturnAll(); for(Object o:list){ if(o!=null){ results.add(o.toString()); }else{ results.add(null); } } }catch(Exception e){ e.printStackTrace(); } finally { try { jcp.close(); } catch (Exception e) { e.printStackTrace(); } } return results; } /** * 通过field获取value * @param field * @param host * @param port * @param pwd * @return */ public static String hget(String field,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.hget(StringUtil.getBucketId(field),field); } /** * 通过field赋值 * @param field * @param value * @param host * @param port * @param pwd * @return */ public static long hset(String field,String value,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.hset(StringUtil.getBucketId(field),field,value); } /** * 通过field判断是否存在 * @param field * @param value * @param host * @param port * @param pwd * @return */ public static boolean hexists(String field,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.hexists(StringUtil.getBucketId(field),field); } /** * 通过field删除值 * @param field * @param host * @param port * @param pwd * @return */ public static long hdel(String field,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.hdel(StringUtil.getBucketId(field),field); } /** * 通过key获取value * @param key * @param host * @param port * @param pwd * @return */ public static String get(String key,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.get(key); } /** * 通过key删除 * @param key * @param host * @param port * @param pwd * @return */ public static long del(String key,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.del(key); } /** * 通过key查询剩余有效期 * @param key * @param host * @param port * @param pwd * @return */ public static long ttl(String key,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.ttl(key); } private static JedisCluster getJedisCluster(String host,int port,String pwd){ HostAndPort node = new HostAndPort(host,port); return new JedisCluster(node, CONNECTION_TIMEOUT, SO_TIMEOUT, MAX_ATTEMPTS, pwd, new GenericObjectPoolConfig()); } public static void main(String[] args) throws IOException { //控制台输入文件地址 Scanner scanner= new Scanner(System.in); System.out.println("请输入redis主机/端口/密码认证,例如:10.186.1.1/7380/2wsx3edc"); String hostPortPwdStr;//redis主机/端口/密码认证 String host;//redis主机 int port;//redis端口 String pwd;//redis密码认证 while(true){ try { hostPortPwdStr = scanner.next(); String hostPortPwd[] = hostPortPwdStr.split("/"); if(hostPortPwd.length == 3){ host = hostPortPwd[0]; port = Integer.parseInt(hostPortPwd[1]); pwd = hostPortPwd[2]; break; }else{ System.out.println("redis主机/端口/密码认证格式错误,请重新输入"); } } catch(NumberFormatException e1){ System.out.println("redis密码认证错误,请重新输入"); }catch (Exception e) { System.out.println("redis主机/端口/密码认证错误,请重新输入"); } } System.out.println("请输入文件地址,例如:/app/redis/pipe/CUSTNO_TO_BRANCHID/0_CUSTNO_TO_BRANCHID.txt"); String filePath = scanner.next(); while(!new File(filePath).isFile()){ System.out.println("文件地址错误,请重新输入"); filePath = scanner.next(); } scanner.close(); write(host, port, pwd, filePath); } }