redis集群批量导入

时间:2022-03-17 14:52:59

把一个长key缩短

package com.org.util;

import java.security.MessageDigest;
import java.util.Scanner;

import org.apache.commons.lang.StringUtils;

public class StringUtil {
	/**
	 * 1个key下面挂480个field,80亿数据 len=4,64*64*64*64=1677万key
	 * 具体怎么控制见方法体
	 */
	private static final int LEN = 4;
	private static final char CHARS[] = {
			'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
			'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
			'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
			'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N',
			'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
			'Y', 'Z', '0', '1', '2', '3', '4', '5', '6', '7',
			'8', '9', '$', '_'};
	
	/**
	 * 根据field获取key 80亿 len=4
	 * @param field
	 * @param bit 2的byte次方=key数量
	 * @return
	 */
	public static String getBucketId(String field){
		byte[] md;
		try {
			MessageDigest mdInst = MessageDigest.getInstance("MD5");
			md = mdInst.digest(field.getBytes());
		} catch (Exception e) {
			if(StringUtils.isBlank(field)||field.length()<LEN){
				return "@@@@";
			}else{
				md = field.getBytes();
			}
		}
		
		StringBuffer sb = new StringBuffer();
		
		//前4位取所有值
		byte b;
		for(int i=0; i<LEN; i++){
			b = md[i];
			if(b<0){
				b&=127;
			}
			sb.append(CHARS[b>>1]);
		}
				
		return sb.toString();
	}
	
	/**
	 * 通过field获取key
	 * @param args 
	 */
	public static void main(String[] args) {
		Scanner scan = new Scanner(System.in);
		//通过field获取key
		System.out.println("通过field获取key");
		System.out.println("请输入field");
		String field = scan.next();
		scan.close();
		System.out.println(getBucketId(field));
	}
}

导入文件类

package com.cpic.ffsw;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Scanner;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import com.org.util.DateUtil;
import com.org.util.StringUtil;
import com.org.util.Util;

import redis.clients.jedis.BinaryJedisCluster;
import redis.clients.jedis.Client;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisClusterConnectionHandler;
import redis.clients.jedis.JedisClusterInfoCache;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.PipelineBase;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
import redis.clients.util.SafeEncoder;

/**
 * 指定文件导入
 * @author 1
 *
 */
public class JedisClusterPipeline extends PipelineBase implements Closeable {
	
	//常量定义
	private static final int CONNECTION_TIMEOUT = 10000;//连接超时时间
	private static final int SO_TIMEOUT = 10000;//返回值的超时时间
	private static final int MAX_ATTEMPTS = 5;//出现异常最大重试次数
	
	// 部分字段没有对应的获取方法,只能采用反射来做
	// 你也可以去继承JedisCluster和JedisSlotBasedConnectionHandler来提供访问接口
	private static final Field FIELD_CONNECTION_HANDLER;
	private static final Field FIELD_CACHE;
	static {
		FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class,
				"connectionHandler");
		FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
	}

	private JedisSlotBasedConnectionHandler connectionHandler;
	private JedisClusterInfoCache clusterInfoCache;
	private Queue<Client> clients = new LinkedList<Client>(); // 根据顺序存储每个命令对应的Client
	private Map<JedisPool, Map<Long,Jedis>> jedisMap = new HashMap<>(); // 用于缓存连接
	private boolean hasDataInBuf = false; // 是否有数据在缓存区

	/**
	 * 根据jedisCluster实例生成对应的JedisClusterPipeline
	 * 
	 * @param
	 * @return
	 */
	public JedisClusterPipeline(JedisCluster jedisCluster){
		setJedisCluster(jedisCluster);
	}

	private void setJedisCluster(JedisCluster jedis) {
		connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
		clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
	}

	/**
	 * 刷新集群信息,当集群信息发生变更时调用
	 * 
	 * @param
	 * @return
	 */
	private void refreshCluster() {
		connectionHandler.renewSlotCache();
	}

	/**
	 * 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化
	 */
	private void sync() {
		innerSync(null);
	}

	/**
	 * 同步读取所有数据 并按命令顺序返回一个列表
	 * 
	 * @return 按照命令的顺序返回所有的数据
	 */
	private List<Object> syncAndReturnAll() {
		List<Object> responseList = new ArrayList<Object>();

		innerSync(responseList);

		return responseList;
	}

	private void innerSync(List<Object> formatted) {
		HashSet<Client> clientSet = new HashSet<Client>();

		try {
			for (Client client : clients) {
				// 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,发生了JedisMovedDataException这样的错误应用是不知道的,因此需要调用get()来触发错误。
				// 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,然而它并没有提供对应方法,要获取data属性就得用反射,不想再反射了,所以就这样了
				Object data = generateResponse(client.getOne()).get();
				if (null != formatted) {
					formatted.add(data);
				}

				// size相同说明所有的client都已经添加,就不用再调用add方法了
				if (clientSet.size() != jedisMap.size()) {
					clientSet.add(client);
				}
			}
		} catch (JedisRedirectionException jre) {
			if (jre instanceof JedisMovedDataException) {
				// if MOVED redirection occurred, rebuilds cluster's slot cache,
				// recommended by Redis cluster specification
				refreshCluster();
			}
			jre.printStackTrace();
			throw jre;
		} finally {
			if (clientSet.size() != jedisMap.size()) {
				// 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染
				for(Map.Entry<JedisPool, Map<Long,Jedis>> poolEntry:jedisMap.entrySet()){
					for (Map.Entry<Long, Jedis> jedisEntry : poolEntry.getValue().entrySet()) {
						if (clientSet.contains(jedisEntry.getValue().getClient())) {
							continue;
						}

						flushCachedData(jedisEntry.getValue());
					}
				}
			}

			hasDataInBuf = false;
			close();
		}
	}

	@Override
	public void close() {
		clean();

		clients.clear();

		for (Map.Entry<JedisPool, Map<Long, Jedis>> poolEntry:jedisMap.entrySet()) {
			for(Map.Entry<Long, Jedis> jedisEntry:poolEntry.getValue().entrySet()){
				if (hasDataInBuf) {
					flushCachedData(jedisEntry.getValue());
				}
				jedisEntry.getValue().close();
			}
		}

		jedisMap.clear();

		hasDataInBuf = false;
	}

	private void flushCachedData(Jedis jedis) {
		try {
			jedis.getClient().getAll();
		} catch (RuntimeException ex) {
			ex.printStackTrace();
		}
	}

	@Override
	protected Client getClient(String key) {
		byte[] bKey = SafeEncoder.encode(key);

		return getClient(bKey);
	}

	@Override
	protected Client getClient(byte[] key) {
		Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));

		Client client = jedis.getClient();
		clients.add(client);

		return client;
	}

	private Jedis getJedis(int slot) {
		
		//根据线程id从缓存中获取Jedis
		Jedis jedis = null;
		Map<Long,Jedis> tmpMap = null;
		
		//获取线程id
		long id = Thread.currentThread().getId();
		
		//获取jedispool
		JedisPool pool = clusterInfoCache.getSlotPool(slot);

		if (jedisMap.containsKey(pool)) {
			tmpMap = jedisMap.get(pool);
			if(tmpMap.containsKey(id)){
				jedis = tmpMap.get(id);
			}else{
				jedis = pool.getResource();
				tmpMap.put(id, jedis);
			}
		}else{
			tmpMap = new HashMap<>();
			jedis = pool.getResource();
			tmpMap.put(id, jedis);
			jedisMap.put(pool, tmpMap);
		}

		hasDataInBuf = true;
		return jedis;
	}

	private static Field getField(Class<?> cls, String fieldName) {
		try {
			Field field = cls.getDeclaredField(fieldName);
			field.setAccessible(true);

			return field;
		} catch (NoSuchFieldException | SecurityException e) {
			e.printStackTrace();
			throw new RuntimeException("cannot find or access field '"
					+ fieldName + "' from " + cls.getName(), e);
		}
	}

	@SuppressWarnings({ "unchecked" })
	private static <T> T getValue(Object obj, Field field) {
		try {
			return (T) field.get(obj);
		} catch (IllegalArgumentException | IllegalAccessException e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		}
	}

	public static long write(String host,int port,String pwd,String fileName){
		long s = System.currentTimeMillis();
		File file = new File(fileName);
		String theadName = file.getName();
		System.out.println(theadName+"-->当前时间:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date()));
		BufferedReader br = null;
		JedisCluster jc = getJedisCluster(host, port, pwd);
		JedisClusterPipeline jcp = new JedisClusterPipeline(jc);
		jcp.refreshCluster();
		List<Object> batchResult = null;
		int totalCount = 0;
		try {
			// batch write
			
			br = new BufferedReader(new FileReader(file));
			int count = 0;
			String current;
			while (null!=(current=br.readLine())) {
				count++;
				totalCount++;
				String[] keyValue = current.split(",",2);
				jcp.hset(StringUtil.getBucketId(keyValue[0]),keyValue[0],keyValue[1]);
				if(count==1500000){
					jcp.sync();
					batchResult = jcp.syncAndReturnAll();
					count = 0;
					System.out.println(theadName+"-->当前时间:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())+",totalCount:"+Util.mathFormat(totalCount)+",耗时:"+(System.currentTimeMillis() - s)/60000+"min,batchResult.size():"+batchResult.size());
				}
			}
			if(count>0){
				jcp.sync();
				batchResult = jcp.syncAndReturnAll();
				System.out.println(theadName+"-->当前时间:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())+",totalCount:"+Util.mathFormat(totalCount)+",耗时:"+(System.currentTimeMillis() - s)/60000+"min,batchResult.size():"+batchResult.size());
			}

		}catch(Exception e){
			e.printStackTrace();
		} finally {
			try {
				jcp.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
			try {
				jc.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
			try {
				br.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		// output time
		System.out.println(theadName+" over-->当前时间:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())+",totalCount:"+Util.mathFormat(totalCount)+",耗时:"+(System.currentTimeMillis() - s)/60000+"min");
		return totalCount;
	}
	
	/**
	 * 通过fields批量获取values
	 * @param fields
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static List<String> mhget(List<String> fields,String host,int port,String pwd){
		List<String> results = new ArrayList<String>(); 
		JedisCluster jc = getJedisCluster(host, port, pwd);
		JedisClusterPipeline jcp = new JedisClusterPipeline(jc);
		jcp.refreshCluster();
		try {
			for (String field:fields)     
	        {
				jcp.hget(StringUtil.getBucketId(field),field);
	        }
	        List<Object> list=jcp.syncAndReturnAll();
	        for(Object o:list){
	        	if(o!=null){
	        		results.add(o.toString());
	        	}else{
	        		results.add(null);
	        	}
	        }
		}catch(Exception e){
			e.printStackTrace();
		} finally {
			try {
				jcp.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		return results;

	}
	
	/**
	 * 通过field获取value
	 * @param field
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static String hget(String field,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.hget(StringUtil.getBucketId(field),field);
	}
	
	/**
	 * 通过field赋值
	 * @param field
	 * @param value
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static long hset(String field,String value,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.hset(StringUtil.getBucketId(field),field,value);
	}
	
	/**
	 * 通过field判断是否存在
	 * @param field
	 * @param value
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static boolean hexists(String field,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.hexists(StringUtil.getBucketId(field),field);
	}
	
	/**
	 * 通过field删除值
	 * @param field
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static long hdel(String field,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.hdel(StringUtil.getBucketId(field),field);
	}
	
	/**
	 * 通过key获取value
	 * @param key
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static String get(String key,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.get(key);
	}
	
	/**
	 * 通过key删除
	 * @param key
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static long del(String key,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.del(key);
	}
	
	/**
	 * 通过key查询剩余有效期
	 * @param key
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static long ttl(String key,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.ttl(key);
	}
	
	private static JedisCluster getJedisCluster(String host,int port,String pwd){
		HostAndPort node = new HostAndPort(host,port);
		return new JedisCluster(node, CONNECTION_TIMEOUT, SO_TIMEOUT, MAX_ATTEMPTS, pwd, new GenericObjectPoolConfig());
	}
	
	public static void main(String[] args) throws IOException {
		//控制台输入文件地址
		Scanner scanner= new Scanner(System.in);
		System.out.println("请输入redis主机/端口/密码认证,例如:10.186.1.1/7380/2wsx3edc");
		String hostPortPwdStr;//redis主机/端口/密码认证
		String host;//redis主机
		int port;//redis端口
		String pwd;//redis密码认证
		while(true){
			try {
				hostPortPwdStr = scanner.next();
				String hostPortPwd[] = hostPortPwdStr.split("/");
				if(hostPortPwd.length == 3){
					host = hostPortPwd[0];
					port = Integer.parseInt(hostPortPwd[1]);
					pwd = hostPortPwd[2];
					break;
				}else{
					System.out.println("redis主机/端口/密码认证格式错误,请重新输入");
				}
			} catch(NumberFormatException e1){
				System.out.println("redis密码认证错误,请重新输入");
			}catch (Exception e) {
				System.out.println("redis主机/端口/密码认证错误,请重新输入");
			}
		}
		System.out.println("请输入文件地址,例如:/app/redis/pipe/CUSTNO_TO_BRANCHID/0_CUSTNO_TO_BRANCHID.txt");
		String filePath = scanner.next();
		while(!new File(filePath).isFile()){
			System.out.println("文件地址错误,请重新输入");
			filePath = scanner.next();
		}
		scanner.close();
		write(host, port, pwd, filePath);
	}
}