本来先打算上个图来说明一下这条线的,可是我的画图工具还没有安装好,我先把跟TestClient.java相关的几个文件代码贴上来,但是由于代码行数还是不少的,所以请大家阅读文章的时候先不要展开代码,等读到有关代码解释的时候再返回去查看。代码源自FastDFS老大鱼大大的。
TestClient.java:
/** * Copyright (C) 2008 Happy Fish / YuQing * * FastDFS Java Client may be copied only under the terms of the GNU Lesser * General Public License (LGPL). * Please visit the FastDFS Home Page http://www.csource.org/ for more detail. **/ package org.csource.fastdfs.test; import java.io.*; import java.net.*; import java.util.*; import org.csource.common.*; import org.csource.fastdfs.*; /** * client test * @author Happy Fish / YuQing * @version Version 1.18 */ public class TestClient { private TestClient() { } /** * entry point * @param args comand arguments * <ul><li>args[0]: config filename</li></ul> * <ul><li>args[1]: local filename to upload</li></ul> */ public static void main(String args[]) { if (args.length < 2) { System.out.println("Error: Must have 2 parameters, one is config filename, " + "the other is the local filename to upload"); return; } System.out.println("java.version=" + System.getProperty("java.version")); String conf_filename = args[0]; String local_filename = args[1]; try { ClientGlobal.init(conf_filename); System.out.println("network_timeout=" + ClientGlobal.g_network_timeout + "ms"); System.out.println("charset=" + ClientGlobal.g_charset); long startTime; String group_name; String remote_filename; ServerInfo[] servers; TrackerClient tracker = new TrackerClient(); TrackerServer trackerServer = tracker.getConnection(); StorageServer storageServer = null; /* storageServer = tracker.getStoreStorage(trackerServer); if (storageServer == null) { System.out.println("getStoreStorage fail, error code: " + tracker.getErrorCode()); return; } */ StorageClient client = new StorageClient(trackerServer, storageServer); byte[] file_buff; NameValuePair[] meta_list; String[] results; String master_filename; String prefix_name; String file_ext_name; String generated_slave_filename; int errno; meta_list = new NameValuePair[4]; meta_list[0] = new NameValuePair("width", "800"); meta_list[1] = new NameValuePair("heigth", "600"); meta_list[2] = new NameValuePair("bgcolor", "#FFFFFF"); meta_list[3] = new NameValuePair("author", "Mike"); file_buff = "this is a test".getBytes(ClientGlobal.g_charset); System.out.println("file length: " + file_buff.length); group_name = null; StorageServer[] storageServers = tracker.getStoreStorages(trackerServer, group_name); if (storageServers == null) { System.err.println("get store storage servers fail, error code: " + tracker.getErrorCode()); } else { System.err.println("store storage servers count: " + storageServers.length); for (int k=0; k<storageServers.length; k++) { System.err.println((k+1) + ". " + storageServers[k].getInetSocketAddress().getAddress().getHostAddress() + ":" + storageServers[k].getInetSocketAddress().getPort()); } System.err.println(""); } startTime = System.currentTimeMillis(); results = client.upload_file(file_buff, "txt", meta_list); System.out.println("upload_file time used: " + (System.currentTimeMillis() - startTime) + " ms"); /* group_name = ""; results = client.upload_file(group_name, file_buff, "txt", meta_list); */ if (results == null) { System.err.println("upload file fail, error code: " + client.getErrorCode()); return; } else { group_name = results[0]; remote_filename = results[1]; System.err.println("group_name: " + group_name + ", remote_filename: " + remote_filename); System.err.println(client.get_file_info(group_name, remote_filename)); servers = tracker.getFetchStorages(trackerServer, group_name, remote_filename); if (servers == null) { System.err.println("get storage servers fail, error code: " + tracker.getErrorCode()); } else { System.err.println("storage servers count: " + servers.length); for (int k=0; k<servers.length; k++) { System.err.println((k+1) + ". " + servers[k].getIpAddr() + ":" + servers[k].getPort()); } System.err.println(""); } meta_list = new NameValuePair[4]; meta_list[0] = new NameValuePair("width", "1024"); meta_list[1] = new NameValuePair("heigth", "768"); meta_list[2] = new NameValuePair("bgcolor", "#000000"); meta_list[3] = new NameValuePair("title", "Untitle"); startTime = System.currentTimeMillis(); errno=client.set_metadata(group_name, remote_filename, meta_list, ProtoCommon.STORAGE_SET_METADATA_FLAG_MERGE); System.out.println("set_metadata time used: " + (System.currentTimeMillis() - startTime) + " ms"); if (errno == 0) { System.err.println("set_metadata success"); } else { System.err.println("set_metadata fail, error no: " + errno); } meta_list = client.get_metadata(group_name, remote_filename); if (meta_list != null) { for (int i=0; i<meta_list.length; i++) { System.out.println(meta_list[i].getName() + " " + meta_list[i].getValue()); } } //Thread.sleep(30000); startTime = System.currentTimeMillis(); file_buff = client.download_file(group_name, remote_filename); System.out.println("download_file time used: " + (System.currentTimeMillis() - startTime) + " ms"); if (file_buff != null) { System.out.println("file length:" + file_buff.length); System.out.println((new String(file_buff))); } file_buff = "this is a slave buff".getBytes(ClientGlobal.g_charset); master_filename = remote_filename; prefix_name = "-part1"; file_ext_name = "txt"; startTime = System.currentTimeMillis(); results = client.upload_file(group_name, master_filename, prefix_name, file_buff, file_ext_name, meta_list); System.out.println("upload_file time used: " + (System.currentTimeMillis() - startTime) + " ms"); if (results != null) { System.err.println("slave file group_name: " + results[0] + ", remote_filename: " + results[1]); generated_slave_filename = ProtoCommon.genSlaveFilename(master_filename, prefix_name, file_ext_name); if (!generated_slave_filename.equals(results[1])) { System.err.println("generated slave file: " + generated_slave_filename + "\n != returned slave file: " + results[1]); } System.err.println(client.get_file_info(results[0], results[1])); } startTime = System.currentTimeMillis(); errno = client.delete_file(group_name, remote_filename); System.out.println("delete_file time used: " + (System.currentTimeMillis() - startTime) + " ms"); if (errno == 0) { System.err.println("Delete file success"); } else { System.err.println("Delete file fail, error no: " + errno); } } results = client.upload_file(local_filename, null, meta_list); if (results != null) { String file_id; int ts; String token; String file_url; InetSocketAddress inetSockAddr; group_name = results[0]; remote_filename = results[1]; file_id = group_name + StorageClient1.SPLIT_GROUP_NAME_AND_FILENAME_SEPERATOR + remote_filename; inetSockAddr = trackerServer.getInetSocketAddress(); file_url = "http://" + inetSockAddr.getAddress().getHostAddress(); if (ClientGlobal.g_tracker_http_port != 80) { file_url += ":" + ClientGlobal.g_tracker_http_port; } file_url += "/" + file_id; if (ClientGlobal.g_anti_steal_token) { ts = (int)(System.currentTimeMillis() / 1000); token = ProtoCommon.getToken(file_id, ts, ClientGlobal.g_secret_key); file_url += "?token=" + token + "&ts=" + ts; } System.err.println("group_name: " + group_name + ", remote_filename: " + remote_filename); System.err.println(client.get_file_info(group_name, remote_filename)); System.err.println("file url: " + file_url); errno = client.download_file(group_name, remote_filename, 0, 0, "c:\\" + remote_filename.replaceAll("/", "_")); if (errno == 0) { System.err.println("Download file success"); } else { System.err.println("Download file fail, error no: " + errno); } errno = client.download_file(group_name, remote_filename, 0, 0, new DownloadFileWriter("c:\\" + remote_filename.replaceAll("/", "-"))); if (errno == 0) { System.err.println("Download file success"); } else { System.err.println("Download file fail, error no: " + errno); } master_filename = remote_filename; prefix_name = "-part2"; file_ext_name = null; startTime = System.currentTimeMillis(); results = client.upload_file(group_name, master_filename, prefix_name, local_filename, null, meta_list); System.out.println("upload_file time used: " + (System.currentTimeMillis() - startTime) + " ms"); if (results != null) { System.err.println("slave file group_name: " + results[0] + ", remote_filename: " + results[1]); generated_slave_filename = ProtoCommon.genSlaveFilename(master_filename, prefix_name, file_ext_name); if (!generated_slave_filename.equals(results[1])) { System.err.println("generated slave file: " + generated_slave_filename + "\n != returned slave file: " + results[1]); } System.err.println(client.get_file_info(results[0], results[1])); } } File f; f = new File(local_filename); int nPos = local_filename.lastIndexOf('.'); if (nPos > 0 && local_filename.length() - nPos <= ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN + 1) { file_ext_name = local_filename.substring(nPos+1); } else { file_ext_name = null; } results = client.upload_file(null, f.length(), new UploadLocalFileSender(local_filename), file_ext_name, meta_list); if (results != null) { group_name = results[0]; remote_filename = results[1]; System.out.println("group name: " + group_name + ", remote filename: " + remote_filename); System.out.println(client.get_file_info(group_name, remote_filename)); master_filename = remote_filename; prefix_name = "-part3"; startTime = System.currentTimeMillis(); results = client.upload_file(group_name, master_filename, prefix_name, f.length(), new UploadLocalFileSender(local_filename), file_ext_name, meta_list); System.out.println("upload_file time used: " + (System.currentTimeMillis() - startTime) + " ms"); if (results != null) { System.err.println("slave file group_name: " + results[0] + ", remote_filename: " + results[1]); generated_slave_filename = ProtoCommon.genSlaveFilename(master_filename, prefix_name, file_ext_name); if (!generated_slave_filename.equals(results[1])) { System.err.println("generated slave file: " + generated_slave_filename + "\n != returned slave file: " + results[1]); } System.err.println(client.get_file_info(results[0], results[1])); } } else { System.err.println("Upload file fail, error no: " + errno); } storageServer = tracker.getFetchStorage(trackerServer, group_name, remote_filename); if (storageServer == null) { System.out.println("getFetchStorage fail, errno code: " + tracker.getErrorCode()); return; } /* for test only */ System.out.println("active test to storage server: " + ProtoCommon.activeTest(storageServer.getSocket())); storageServer.close(); /* for test only */ System.out.println("active test to tracker server: " + ProtoCommon.activeTest(trackerServer.getSocket())); trackerServer.close(); } catch(Exception ex) { ex.printStackTrace(); } } }
ClientGlobal.java:
/** * Copyright (C) 2008 Happy Fish / YuQing * * FastDFS Java Client may be copied only under the terms of the GNU Lesser * General Public License (LGPL). * Please visit the FastDFS Home Page http://www.csource.org/ for more detail. **/ package org.csource.fastdfs; import java.net.*; import java.io.*; import java.net.*; import org.csource.common.*; /** * Global variables * @author Happy Fish / YuQing * @version Version 1.11 */ public class ClientGlobal { public static int g_connect_timeout; //millisecond public static int g_network_timeout; //millisecond public static String g_charset; public static int g_tracker_http_port; public static boolean g_anti_steal_token; //if anti-steal token public static String g_secret_key; //generage token secret key public static TrackerGroup g_tracker_group; public static final int DEFAULT_CONNECT_TIMEOUT = 5; //second public static final int DEFAULT_NETWORK_TIMEOUT = 30; //second private ClientGlobal() { } /** * load global variables * @param conf_filename config filename */ public static void init(String conf_filename) throws FileNotFoundException, IOException, MyException { IniFileReader iniReader; String[] szTrackerServers; String[] parts; iniReader = new IniFileReader(conf_filename); g_connect_timeout = iniReader.getIntValue("connect_timeout", DEFAULT_CONNECT_TIMEOUT); if (g_connect_timeout < 0) { g_connect_timeout = DEFAULT_CONNECT_TIMEOUT; } g_connect_timeout *= 1000; //millisecond g_network_timeout = iniReader.getIntValue("network_timeout", DEFAULT_NETWORK_TIMEOUT); if (g_network_timeout < 0) { g_network_timeout = DEFAULT_NETWORK_TIMEOUT; } g_network_timeout *= 1000; //millisecond g_charset = iniReader.getStrValue("charset"); if (g_charset == null || g_charset.length() == 0) { g_charset = "ISO8859-1"; } szTrackerServers = iniReader.getValues("tracker_server"); if (szTrackerServers == null) { throw new MyException("item \"tracker_server\" in " + conf_filename + " not found"); } InetSocketAddress[] tracker_servers = new InetSocketAddress[szTrackerServers.length]; for (int i=0; i<szTrackerServers.length; i++) { parts = szTrackerServers[i].split("\\:", 2); if (parts.length != 2) { throw new MyException("the value of item \"tracker_server\" is invalid, the correct format is host:port"); } tracker_servers[i] = new InetSocketAddress(parts[0].trim(), Integer.parseInt(parts[1].trim())); } g_tracker_group = new TrackerGroup(tracker_servers); g_tracker_http_port = iniReader.getIntValue("http.tracker_http_port", 80); g_anti_steal_token = iniReader.getBoolValue("http.anti_steal_token", false); if (g_anti_steal_token) { g_secret_key = iniReader.getStrValue("http.secret_key"); } } /** * construct Socket object * @param ip_addr ip address or hostname * @param port port number * @return connected Socket object */ public static Socket getSocket(String ip_addr, int port) throws IOException { Socket sock = new Socket(); sock.setSoTimeout(ClientGlobal.g_network_timeout); sock.connect(new InetSocketAddress(ip_addr, port), ClientGlobal.g_connect_timeout); return sock; } /** * construct Socket object * @param addr InetSocketAddress object, including ip address and port * @return connected Socket object */ public static Socket getSocket(InetSocketAddress addr) throws IOException { Socket sock = new Socket(); sock.setSoTimeout(ClientGlobal.g_network_timeout); sock.connect(addr, ClientGlobal.g_connect_timeout); return sock; } public static int getG_connect_timeout() { return g_connect_timeout; } public static void setG_connect_timeout(int connect_timeout) { ClientGlobal.g_connect_timeout = connect_timeout; } public static int getG_network_timeout() { return g_network_timeout; } public static void setG_network_timeout(int network_timeout) { ClientGlobal.g_network_timeout = network_timeout; } public static String getG_charset() { return g_charset; } public static void setG_charset(String charset) { ClientGlobal.g_charset = charset; } public static int getG_tracker_http_port() { return g_tracker_http_port; } public static void setG_tracker_http_port(int tracker_http_port) { ClientGlobal.g_tracker_http_port = tracker_http_port; } public static boolean getG_anti_steal_token() { return g_anti_steal_token; } public static boolean isG_anti_steal_token() { return g_anti_steal_token; } public static void setG_anti_steal_token(boolean anti_steal_token) { ClientGlobal.g_anti_steal_token = anti_steal_token; } public static String getG_secret_key() { return g_secret_key; } public static void setG_secret_key(String secret_key) { ClientGlobal.g_secret_key = secret_key; } public static TrackerGroup getG_tracker_group() { return g_tracker_group; } public static void setG_tracker_group(TrackerGroup tracker_group) { ClientGlobal.g_tracker_group = tracker_group; } }
TrackerClient.java:
/** * Copyright (C) 2008 Happy Fish / YuQing * * FastDFS Java Client may be copied only under the terms of the GNU Lesser * General Public License (LGPL). * Please visit the FastDFS Home Page http://www.csource.org/ for more detail. */ package org.csource.fastdfs; import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; import java.net.Socket; /** * Tracker client * @author Happy Fish / YuQing * @version Version 1.19 */ public class TrackerClient { protected TrackerGroup tracker_group; protected byte errno; /** * constructor with global tracker group */ public TrackerClient() { this.tracker_group = ClientGlobal.g_tracker_group; } /** * constructor with specified tracker group * @param tracker_group the tracker group object */ public TrackerClient(TrackerGroup tracker_group) { this.tracker_group = tracker_group; } /** * get the error code of last call * @return the error code of last call */ public byte getErrorCode() { return this.errno; } /** * get a connection to tracker server * @return tracker server Socket object, return null if fail */ public TrackerServer getConnection() throws IOException { return this.tracker_group.getConnection(); } /** * query storage server to upload file * @param trackerServer the tracker server * @return storage server Socket object, return null if fail */ public StorageServer getStoreStorage(TrackerServer trackerServer) throws IOException { final String groupName = null; return this.getStoreStorage(trackerServer, groupName); } /** * query storage server to upload file * @param trackerServer the tracker server * @param groupName the group name to upload file to, can be empty * @return storage server object, return null if fail */ public StorageServer getStoreStorage(TrackerServer trackerServer, String groupName) throws IOException { byte[] header; String ip_addr; int port; byte cmd; int out_len; boolean bNewConnection; byte store_path; Socket trackerSocket; if (trackerServer == null) { trackerServer = getConnection(); if (trackerServer == null) { return null; } bNewConnection = true; } else { bNewConnection = false; } trackerSocket = trackerServer.getSocket(); OutputStream out = trackerSocket.getOutputStream(); try { if (groupName == null || groupName.length() == 0) { cmd = ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE; out_len = 0; } else { cmd = ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE; out_len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } header = ProtoCommon.packHeader(cmd, out_len, (byte)0); out.write(header); if (groupName != null && groupName.length() > 0) { byte[] bGroupName; byte[] bs; int group_len; bs = groupName.getBytes(ClientGlobal.g_charset); bGroupName = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; if (bs.length <= ProtoCommon.FDFS_GROUP_NAME_MAX_LEN) { group_len = bs.length; } else { group_len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } Arrays.fill(bGroupName, (byte)0); System.arraycopy(bs, 0, bGroupName, 0, group_len); out.write(bGroupName); } ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(trackerSocket.getInputStream(), ProtoCommon.TRACKER_PROTO_CMD_RESP, ProtoCommon.TRACKER_QUERY_STORAGE_STORE_BODY_LEN); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } ip_addr = new String(pkgInfo.body, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN, ProtoCommon.FDFS_IPADDR_SIZE-1).trim(); port = (int)ProtoCommon.buff2long(pkgInfo.body, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN + ProtoCommon.FDFS_IPADDR_SIZE-1); store_path = pkgInfo.body[ProtoCommon.TRACKER_QUERY_STORAGE_STORE_BODY_LEN - 1]; return new StorageServer(ip_addr, port, store_path); } catch(IOException ex) { if (!bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } throw ex; } finally { if (bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } } /** * query storage servers to upload file * @param trackerServer the tracker server * @param groupName the group name to upload file to, can be empty * @return storage servers, return null if fail */ public StorageServer[] getStoreStorages(TrackerServer trackerServer, String groupName) throws IOException { byte[] header; String ip_addr; int port; byte cmd; int out_len; boolean bNewConnection; Socket trackerSocket; if (trackerServer == null) { trackerServer = getConnection(); if (trackerServer == null) { return null; } bNewConnection = true; } else { bNewConnection = false; } trackerSocket = trackerServer.getSocket(); OutputStream out = trackerSocket.getOutputStream(); try { if (groupName == null || groupName.length() == 0) { cmd = ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ALL; out_len = 0; } else { cmd = ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ALL; out_len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } header = ProtoCommon.packHeader(cmd, out_len, (byte)0); out.write(header); if (groupName != null && groupName.length() > 0) { byte[] bGroupName; byte[] bs; int group_len; bs = groupName.getBytes(ClientGlobal.g_charset); bGroupName = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; if (bs.length <= ProtoCommon.FDFS_GROUP_NAME_MAX_LEN) { group_len = bs.length; } else { group_len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } Arrays.fill(bGroupName, (byte)0); System.arraycopy(bs, 0, bGroupName, 0, group_len); out.write(bGroupName); } ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(trackerSocket.getInputStream(), ProtoCommon.TRACKER_PROTO_CMD_RESP, -1); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } if (pkgInfo.body.length < ProtoCommon.TRACKER_QUERY_STORAGE_STORE_BODY_LEN) { this.errno = ProtoCommon.ERR_NO_EINVAL; return null; } int ipPortLen = pkgInfo.body.length - (ProtoCommon.FDFS_GROUP_NAME_MAX_LEN + 1); final int recordLength = ProtoCommon.FDFS_IPADDR_SIZE - 1 + ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE; if (ipPortLen % recordLength != 0) { this.errno = ProtoCommon.ERR_NO_EINVAL; return null; } int serverCount = ipPortLen / recordLength; if (serverCount > 16) { this.errno = ProtoCommon.ERR_NO_ENOSPC; return null; } StorageServer[] results = new StorageServer[serverCount]; byte store_path = pkgInfo.body[pkgInfo.body.length - 1]; int offset = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; for (int i=0; i<serverCount; i++) { ip_addr = new String(pkgInfo.body, offset, ProtoCommon.FDFS_IPADDR_SIZE - 1).trim(); offset += ProtoCommon.FDFS_IPADDR_SIZE - 1; port = (int)ProtoCommon.buff2long(pkgInfo.body, offset); offset += ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE; results[i] = new StorageServer(ip_addr, port, store_path); } return results; } catch(IOException ex) { if (!bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } throw ex; } finally { if (bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } } /** * query storage server to download file * @param trackerServer the tracker server * @param groupName the group name of storage server * @param filename filename on storage server * @return storage server Socket object, return null if fail */ public StorageServer getFetchStorage(TrackerServer trackerServer, String groupName, String filename) throws IOException { ServerInfo[] servers = this.getStorages(trackerServer, ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE, groupName, filename); if (servers == null) { return null; } else { return new StorageServer(servers[0].getIpAddr(), servers[0].getPort(), 0); } } /** * query storage server to update file (delete file or set meta data) * @param trackerServer the tracker server * @param groupName the group name of storage server * @param filename filename on storage server * @return storage server Socket object, return null if fail */ public StorageServer getUpdateStorage(TrackerServer trackerServer, String groupName, String filename) throws IOException { ServerInfo[] servers = this.getStorages(trackerServer, ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE, groupName, filename); if (servers == null) { return null; } else { return new StorageServer(servers[0].getIpAddr(), servers[0].getPort(), 0); } } /** * get storage servers to download file * @param trackerServer the tracker server * @param groupName the group name of storage server * @param filename filename on storage server * @return storage servers, return null if fail */ public ServerInfo[] getFetchStorages(TrackerServer trackerServer, String groupName, String filename) throws IOException { return this.getStorages(trackerServer, ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ALL, groupName, filename); } /** * query storage server to download file * @param trackerServer the tracker server * @param cmd command code, ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE or ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE * @param groupName the group name of storage server * @param filename filename on storage server * @return storage server Socket object, return null if fail */ protected ServerInfo[] getStorages(TrackerServer trackerServer, byte cmd, String groupName, String filename) throws IOException { byte[] header; byte[] bFileName; byte[] bGroupName; byte[] bs; int len; String ip_addr; int port; boolean bNewConnection; Socket trackerSocket; if (trackerServer == null) { trackerServer = getConnection(); if (trackerServer == null) { return null; } bNewConnection = true; } else { bNewConnection = false; } trackerSocket = trackerServer.getSocket(); OutputStream out = trackerSocket.getOutputStream(); try { bs = groupName.getBytes(ClientGlobal.g_charset); bGroupName = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; bFileName = filename.getBytes(ClientGlobal.g_charset); if (bs.length <= ProtoCommon.FDFS_GROUP_NAME_MAX_LEN) { len = bs.length; } else { len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } Arrays.fill(bGroupName, (byte)0); System.arraycopy(bs, 0, bGroupName, 0, len); header = ProtoCommon.packHeader(cmd, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN + bFileName.length, (byte)0); byte[] wholePkg = new byte[header.length + bGroupName.length + bFileName.length]; System.arraycopy(header, 0, wholePkg, 0, header.length); System.arraycopy(bGroupName, 0, wholePkg, header.length, bGroupName.length); System.arraycopy(bFileName, 0, wholePkg, header.length + bGroupName.length, bFileName.length); out.write(wholePkg); ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(trackerSocket.getInputStream(), ProtoCommon.TRACKER_PROTO_CMD_RESP, -1); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } if (pkgInfo.body.length < ProtoCommon.TRACKER_QUERY_STORAGE_FETCH_BODY_LEN) { throw new IOException("Invalid body length: " + pkgInfo.body.length); } if ((pkgInfo.body.length - ProtoCommon.TRACKER_QUERY_STORAGE_FETCH_BODY_LEN) % (ProtoCommon.FDFS_IPADDR_SIZE - 1) != 0) { throw new IOException("Invalid body length: " + pkgInfo.body.length); } int server_count = 1 + (pkgInfo.body.length - ProtoCommon.TRACKER_QUERY_STORAGE_FETCH_BODY_LEN) / (ProtoCommon.FDFS_IPADDR_SIZE - 1); ip_addr = new String(pkgInfo.body, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN, ProtoCommon.FDFS_IPADDR_SIZE-1).trim(); int offset = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN + ProtoCommon.FDFS_IPADDR_SIZE - 1; port = (int)ProtoCommon.buff2long(pkgInfo.body, offset); offset += ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE; ServerInfo[] servers = new ServerInfo[server_count]; servers[0] = new ServerInfo(ip_addr, port); for (int i=1; i<server_count; i++) { servers[i] = new ServerInfo(new String(pkgInfo.body, offset, ProtoCommon.FDFS_IPADDR_SIZE-1).trim(), port); offset += ProtoCommon.FDFS_IPADDR_SIZE - 1; } return servers; } catch(IOException ex) { if (!bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } throw ex; } finally { if (bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } } /** * query storage server to download file * @param trackerServer the tracker server * @param file_id the file id(including group name and filename) * @return storage server Socket object, return null if fail */ public StorageServer getFetchStorage1(TrackerServer trackerServer, String file_id) throws IOException { String[] parts = new String[2]; this.errno = StorageClient1.split_file_id(file_id, parts); if (this.errno != 0) { return null; } return this.getFetchStorage(trackerServer, parts[0], parts[1]); } /** * get storage servers to download file * @param trackerServer the tracker server * @param file_id the file id(including group name and filename) * @return storage servers, return null if fail */ public ServerInfo[] getFetchStorages1(TrackerServer trackerServer, String file_id) throws IOException { String[] parts = new String[2]; this.errno = StorageClient1.split_file_id(file_id, parts); if (this.errno != 0) { return null; } return this.getFetchStorages(trackerServer, parts[0], parts[1]); } /** * list groups * @param trackerServer the tracker server * @return group stat array, return null if fail */ public StructGroupStat[] listGroups(TrackerServer trackerServer) throws IOException { byte[] header; String ip_addr; int port; byte cmd; int out_len; boolean bNewConnection; byte store_path; Socket trackerSocket; if (trackerServer == null) { trackerServer = getConnection(); if (trackerServer == null) { return null; } bNewConnection = true; } else { bNewConnection = false; } trackerSocket = trackerServer.getSocket(); OutputStream out = trackerSocket.getOutputStream(); try { header = ProtoCommon.packHeader(ProtoCommon.TRACKER_PROTO_CMD_SERVER_LIST_GROUP, 0, (byte)0); out.write(header); ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(trackerSocket.getInputStream(), ProtoCommon.TRACKER_PROTO_CMD_RESP, -1); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } ProtoStructDecoder<StructGroupStat> decoder = new ProtoStructDecoder<StructGroupStat>(); return decoder.decode(pkgInfo.body, StructGroupStat.class, StructGroupStat.getFieldsTotalSize()); } catch(IOException ex) { if (!bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } throw ex; } catch(Exception ex) { ex.printStackTrace(); this.errno = ProtoCommon.ERR_NO_EINVAL; return null; } finally { if (bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } } /** * query storage server stat info of the group * @param trackerServer the tracker server * @param groupName the group name of storage server * @return storage server stat array, return null if fail */ public StructStorageStat[] listStorages(TrackerServer trackerServer, String groupName) throws IOException { final String storageIpAddr = null; return this.listStorages(trackerServer, groupName, storageIpAddr); } /** * query storage server stat info of the group * @param trackerServer the tracker server * @param groupName the group name of storage server * @param storageIpAddr the storage server ip address, can be null or empty * @return storage server stat array, return null if fail */ public StructStorageStat[] listStorages(TrackerServer trackerServer, String groupName, String storageIpAddr) throws IOException { byte[] header; byte[] bGroupName; byte[] bs; int len; boolean bNewConnection; Socket trackerSocket; if (trackerServer == null) { trackerServer = getConnection(); if (trackerServer == null) { return null; } bNewConnection = true; } else { bNewConnection = false; } trackerSocket = trackerServer.getSocket(); OutputStream out = trackerSocket.getOutputStream(); try { bs = groupName.getBytes(ClientGlobal.g_charset); bGroupName = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; if (bs.length <= ProtoCommon.FDFS_GROUP_NAME_MAX_LEN) { len = bs.length; } else { len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } Arrays.fill(bGroupName, (byte)0); System.arraycopy(bs, 0, bGroupName, 0, len); int ipAddrLen; byte[] bIpAddr; if (storageIpAddr != null && storageIpAddr.length() > 0) { bIpAddr = storageIpAddr.getBytes(ClientGlobal.g_charset); if (bIpAddr.length < ProtoCommon.FDFS_IPADDR_SIZE) { ipAddrLen = bIpAddr.length; } else { ipAddrLen = ProtoCommon.FDFS_IPADDR_SIZE - 1; } } else { bIpAddr = null; ipAddrLen = 0; } header = ProtoCommon.packHeader(ProtoCommon.TRACKER_PROTO_CMD_SERVER_LIST_STORAGE, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN + ipAddrLen, (byte)0); byte[] wholePkg = new byte[header.length + bGroupName.length + ipAddrLen]; System.arraycopy(header, 0, wholePkg, 0, header.length); System.arraycopy(bGroupName, 0, wholePkg, header.length, bGroupName.length); if (ipAddrLen > 0) { System.arraycopy(bIpAddr, 0, wholePkg, header.length + bGroupName.length, ipAddrLen); } out.write(wholePkg); ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(trackerSocket.getInputStream(), ProtoCommon.TRACKER_PROTO_CMD_RESP, -1); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } ProtoStructDecoder<StructStorageStat> decoder = new ProtoStructDecoder<StructStorageStat>(); return decoder.decode(pkgInfo.body, StructStorageStat.class, StructStorageStat.getFieldsTotalSize()); } catch(IOException ex) { if (!bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } throw ex; } catch(Exception ex) { ex.printStackTrace(); this.errno = ProtoCommon.ERR_NO_EINVAL; return null; } finally { if (bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } } /** * delete a storage server from the tracker server * @param trackerServer the connected tracker server * @param groupName the group name of storage server * @param storageIpAddr the storage server ip address * @return true for success, false for fail */ private boolean deleteStorage(TrackerServer trackerServer, String groupName, String storageIpAddr) throws IOException { byte[] header; byte[] bGroupName; byte[] bs; int len; Socket trackerSocket; trackerSocket = trackerServer.getSocket(); OutputStream out = trackerSocket.getOutputStream(); bs = groupName.getBytes(ClientGlobal.g_charset); bGroupName = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; if (bs.length <= ProtoCommon.FDFS_GROUP_NAME_MAX_LEN) { len = bs.length; } else { len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } Arrays.fill(bGroupName, (byte)0); System.arraycopy(bs, 0, bGroupName, 0, len); int ipAddrLen; byte[] bIpAddr = storageIpAddr.getBytes(ClientGlobal.g_charset); if (bIpAddr.length < ProtoCommon.FDFS_IPADDR_SIZE) { ipAddrLen = bIpAddr.length; } else { ipAddrLen = ProtoCommon.FDFS_IPADDR_SIZE - 1; } header = ProtoCommon.packHeader(ProtoCommon.TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN + ipAddrLen, (byte)0); byte[] wholePkg = new byte[header.length + bGroupName.length + ipAddrLen]; System.arraycopy(header, 0, wholePkg, 0, header.length); System.arraycopy(bGroupName, 0, wholePkg, header.length, bGroupName.length); System.arraycopy(bIpAddr, 0, wholePkg, header.length + bGroupName.length, ipAddrLen); out.write(wholePkg); ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(trackerSocket.getInputStream(), ProtoCommon.TRACKER_PROTO_CMD_RESP, 0); this.errno = pkgInfo.errno; return pkgInfo.errno == 0; } /** * delete a storage server from the global FastDFS cluster * @param groupName the group name of storage server * @param storageIpAddr the storage server ip address * @return true for success, false for fail */ public boolean deleteStorage(String groupName, String storageIpAddr) throws IOException { return this.deleteStorage(ClientGlobal.g_tracker_group, groupName, storageIpAddr); } /** * delete a storage server from the FastDFS cluster * @param trackerGroup the tracker server group * @param groupName the group name of storage server * @param storageIpAddr the storage server ip address * @return true for success, false for fail */ public boolean deleteStorage(TrackerGroup trackerGroup, String groupName, String storageIpAddr) throws IOException { int serverIndex; int notFoundCount; TrackerServer trackerServer; notFoundCount = 0; for (serverIndex=0; serverIndex<trackerGroup.tracker_servers.length; serverIndex++) { try { trackerServer = trackerGroup.getConnection(serverIndex); } catch(IOException ex) { ex.printStackTrace(System.err); this.errno = ProtoCommon.ECONNREFUSED; return false; } try { StructStorageStat[] storageStats = listStorages(trackerServer, groupName, storageIpAddr); if (storageStats == null) { if (this.errno == ProtoCommon.ERR_NO_ENOENT) { notFoundCount++; } else { return false; } } else if (storageStats.length == 0) { notFoundCount++; } else if (storageStats[0].getStatus() == ProtoCommon.FDFS_STORAGE_STATUS_ONLINE || storageStats[0].getStatus() == ProtoCommon.FDFS_STORAGE_STATUS_ACTIVE) { this.errno = ProtoCommon.ERR_NO_EBUSY; return false; } } finally { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } if (notFoundCount == trackerGroup.tracker_servers.length) { this.errno = ProtoCommon.ERR_NO_ENOENT; return false; } notFoundCount = 0; for (serverIndex=0; serverIndex<trackerGroup.tracker_servers.length; serverIndex++) { try { trackerServer = trackerGroup.getConnection(serverIndex); } catch(IOException ex) { System.err.println("connect to server " + trackerGroup.tracker_servers[serverIndex].getAddress().getHostAddress() + ":" + trackerGroup.tracker_servers[serverIndex].getPort() + " fail"); ex.printStackTrace(System.err); this.errno = ProtoCommon.ECONNREFUSED; return false; } try { if (!this.deleteStorage(trackerServer, groupName, storageIpAddr)) { if (this.errno != 0) { if (this.errno == ProtoCommon.ERR_NO_ENOENT) { notFoundCount++; } else if (this.errno != ProtoCommon.ERR_NO_EALREADY) { return false; } } } } finally { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } if (notFoundCount == trackerGroup.tracker_servers.length) { this.errno = ProtoCommon.ERR_NO_ENOENT; return false; } if (this.errno == ProtoCommon.ERR_NO_ENOENT) { this.errno = 0; } return this.errno == 0; } }
TrackerGroup.java:
/** * Copyright (C) 2008 Happy Fish / YuQing * * FastDFS Java Client may be copied only under the terms of the GNU Lesser * General Public License (LGPL). * Please visit the FastDFS Home Page http://www.csource.org/ for more detail. */ package org.csource.fastdfs; import java.io.*; import java.util.*; import java.net.*; import org.csource.common.*; /** * Tracker server group * @author Happy Fish / YuQing * @version Version 1.17 */ public class TrackerGroup { protected Integer lock; public int tracker_server_index; public InetSocketAddress[] tracker_servers; /** * Constructor * @param tracker_servers tracker servers */ public TrackerGroup(InetSocketAddress[] tracker_servers) { this.tracker_servers = tracker_servers; this.lock = new Integer(0); this.tracker_server_index = 0; } /** * return connected tracker server * @return connected tracker server, null for fail */ public TrackerServer getConnection(int serverIndex) throws IOException { Socket sock = new Socket(); sock.setReuseAddress(true); sock.setSoTimeout(ClientGlobal.g_network_timeout); sock.connect(this.tracker_servers[serverIndex], ClientGlobal.g_connect_timeout); return new TrackerServer(sock, this.tracker_servers[serverIndex]); } /** * return connected tracker server * @return connected tracker server, null for fail */ public TrackerServer getConnection() throws IOException { int current_index; synchronized(this.lock) { this.tracker_server_index++; if (this.tracker_server_index >= this.tracker_servers.length) { this.tracker_server_index = 0; } current_index = this.tracker_server_index; } try { return this.getConnection(current_index); } catch(IOException ex) { System.err.println("connect to server " + this.tracker_servers[current_index].getAddress().getHostAddress() + ":" + this.tracker_servers[current_index].getPort() + " fail"); ex.printStackTrace(System.err); } for (int i=0; i<this.tracker_servers.length; i++) { if (i == current_index) { continue; } try { TrackerServer trackerServer = this.getConnection(i); synchronized(this.lock) { if (this.tracker_server_index == current_index) { this.tracker_server_index = i; } } return trackerServer; } catch(IOException ex) { System.err.println("connect to server " + this.tracker_servers[i].getAddress().getHostAddress() + ":" + this.tracker_servers[i].getPort() + " fail"); ex.printStackTrace(System.err); } } return null; } public Object clone() { InetSocketAddress[] trackerServers = new InetSocketAddress[this.tracker_servers.length]; for (int i=0; i<trackerServers.length; i++) { trackerServers[i] = new InetSocketAddress(this.tracker_servers[i].getAddress().getHostAddress(), this.tracker_servers[i].getPort()); } return new TrackerGroup(trackerServers); } }
StorageClient.java:
/** * Copyright (C) 2008 Happy Fish / YuQing * * FastDFS Java Client may be copied only under the terms of the GNU Lesser * General Public License (LGPL). * Please visit the FastDFS Home Page http://www.csource.org/ for more detail. */ package org.csource.fastdfs; import java.io.IOException; import java.io.OutputStream; import java.io.FileOutputStream; import java.io.InputStream; import java.io.FileInputStream; import java.io.File; import java.util.Arrays; import java.net.Socket; import org.csource.common.MyException; import org.csource.common.NameValuePair; import org.csource.common.Base64; /** * Storage client for 2 fields file id: group name and filename * @author Happy Fish / YuQing * @version Version 1.24 */ public class StorageClient { /** * Upload file by file buff * @author Happy Fish / YuQing * @version Version 1.12 */ public static class UploadBuff implements UploadCallback { private byte[] fileBuff; private int offset; private int length; /** * constructor * @param fileBuff the file buff for uploading */ public UploadBuff(byte[] fileBuff, int offset, int length) { super(); this.fileBuff = fileBuff; this.offset = offset; this.length = length; } /** * send file content callback function, be called only once when the file uploaded * @param out output stream for writing file content * @return 0 success, return none zero(errno) if fail */ public int send(OutputStream out) throws IOException { out.write(this.fileBuff, this.offset, this.length); return 0; } } public final static Base64 base64 = new Base64('-', '_', '.', 0); protected TrackerServer trackerServer; protected StorageServer storageServer; protected byte errno; /** * constructor using global settings in class ClientGlobal */ public StorageClient() { this.trackerServer = null; this.storageServer = null; } /** * constructor with tracker server and storage server * @param trackerServer the tracker server, can be null * @param storageServer the storage server, can be null */ public StorageClient(TrackerServer trackerServer, StorageServer storageServer) { this.trackerServer = trackerServer; this.storageServer = storageServer; } /** * get the error code of last call * @return the error code of last call */ public byte getErrorCode() { return this.errno; } /** * upload file to storage server (by file name) * @param local_filename local filename to upload * @param file_ext_name file ext name, do not include dot(.), null to extract ext name from the local filename * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file </li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_file(String local_filename, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { final String group_name = null; return this.upload_file(group_name, local_filename, file_ext_name, meta_list); } /** * upload file to storage server (by file name) * @param group_name the group name to upload file to, can be empty * @param local_filename local filename to upload * @param file_ext_name file ext name, do not include dot(.), null to extract ext name from the local filename * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file </li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ protected String[] upload_file(String group_name, String local_filename, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { final byte cmd = ProtoCommon.STORAGE_PROTO_CMD_UPLOAD_FILE; return this.upload_file(cmd, group_name, local_filename, file_ext_name, meta_list); } /** * upload file to storage server (by file name) * @param cmd the command * @param group_name the group name to upload file to, can be empty * @param local_filename local filename to upload * @param file_ext_name file ext name, do not include dot(.), null to extract ext name from the local filename * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file </li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ protected String[] upload_file(byte cmd, String group_name, String local_filename, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { File f = new File(local_filename); FileInputStream fis = new FileInputStream(f); if (file_ext_name == null) { int nPos = local_filename.lastIndexOf('.'); if (nPos > 0 && local_filename.length() - nPos <= ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN + 1) { file_ext_name = local_filename.substring(nPos+1); } } try { return this.do_upload_file(cmd, group_name, null, null, file_ext_name, f.length(), new UploadStream(fis, f.length()), meta_list); } finally { fis.close(); } } /** * upload file to storage server (by file buff) * @param file_buff file content/buff * @param offset start offset of the buff * @param length the length of buff to upload * @param file_ext_name file ext name, do not include dot(.) * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file</li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_file(byte[] file_buff, int offset, int length, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { final String group_name = null; return this.upload_file(group_name, file_buff, offset, length, file_ext_name, meta_list); } /** * upload file to storage server (by file buff) * @param group_name the group name to upload file to, can be empty * @param file_buff file content/buff * @param offset start offset of the buff * @param length the length of buff to upload * @param file_ext_name file ext name, do not include dot(.) * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file</li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_file(String group_name, byte[] file_buff, int offset, int length, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { return this.do_upload_file(ProtoCommon.STORAGE_PROTO_CMD_UPLOAD_FILE, group_name, null, null, file_ext_name, length, new UploadBuff(file_buff, offset, length), meta_list); } /** * upload file to storage server (by file buff) * @param file_buff file content/buff * @param file_ext_name file ext name, do not include dot(.) * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file</li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_file(byte[] file_buff, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { final String group_name = null; return this.upload_file(group_name, file_buff, 0, file_buff.length, file_ext_name, meta_list); } /** * upload file to storage server (by file buff) * @param group_name the group name to upload file to, can be empty * @param file_buff file content/buff * @param file_ext_name file ext name, do not include dot(.) * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file</li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_file(String group_name, byte[] file_buff, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { return this.do_upload_file(ProtoCommon.STORAGE_PROTO_CMD_UPLOAD_FILE, group_name, null, null, file_ext_name, file_buff.length, new UploadBuff(file_buff, 0, file_buff.length), meta_list); } /** * upload file to storage server (by callback) * @param group_name the group name to upload file to, can be empty * @param file_size the file size * @param callback the write data callback object * @param file_ext_name file ext name, do not include dot(.) * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file</li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_file(String group_name, long file_size, UploadCallback callback, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { final String master_filename = null; final String prefix_name = null; return this.do_upload_file(ProtoCommon.STORAGE_PROTO_CMD_UPLOAD_FILE, group_name, master_filename, prefix_name, file_ext_name, file_size, callback, meta_list); } /** * upload file to storage server (by file name, slave file mode) * @param group_name the group name of master file * @param master_filename the master file name to generate the slave file * @param prefix_name the prefix name to generate the slave file * @param local_filename local filename to upload * @param file_ext_name file ext name, do not include dot(.), null to extract ext name from the local filename * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file </li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_file(String group_name, String master_filename, String prefix_name, String local_filename, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { if ((group_name == null || group_name.length() == 0) || (master_filename == null || master_filename.length() == 0) || (prefix_name == null)) { throw new MyException("invalid arguement"); } File f = new File(local_filename); FileInputStream fis = new FileInputStream(f); if (file_ext_name == null) { int nPos = local_filename.lastIndexOf('.'); if (nPos > 0 && local_filename.length() - nPos <= ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN + 1) { file_ext_name = local_filename.substring(nPos+1); } } try { return this.do_upload_file(ProtoCommon.STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, group_name, master_filename, prefix_name, file_ext_name, f.length(), new UploadStream(fis, f.length()), meta_list); } finally { fis.close(); } } /** * upload file to storage server (by file buff, slave file mode) * @param group_name the group name of master file * @param master_filename the master file name to generate the slave file * @param prefix_name the prefix name to generate the slave file * @param file_buff file content/buff * @param file_ext_name file ext name, do not include dot(.) * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file</li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_file(String group_name, String master_filename, String prefix_name, byte[] file_buff, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { if ((group_name == null || group_name.length() == 0) || (master_filename == null || master_filename.length() == 0) || (prefix_name == null)) { throw new MyException("invalid arguement"); } return this.do_upload_file(ProtoCommon.STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, group_name, master_filename, prefix_name, file_ext_name, file_buff.length, new UploadBuff(file_buff, 0, file_buff.length), meta_list); } /** * upload file to storage server (by file buff, slave file mode) * @param group_name the group name of master file * @param master_filename the master file name to generate the slave file * @param prefix_name the prefix name to generate the slave file * @param file_buff file content/buff * @param offset start offset of the buff * @param length the length of buff to upload * @param file_ext_name file ext name, do not include dot(.) * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file</li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_file(String group_name, String master_filename, String prefix_name, byte[] file_buff, int offset, int length, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { if ((group_name == null || group_name.length() == 0) || (master_filename == null || master_filename.length() == 0) || (prefix_name == null)) { throw new MyException("invalid arguement"); } return this.do_upload_file(ProtoCommon.STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, group_name, master_filename, prefix_name, file_ext_name, length, new UploadBuff(file_buff, offset, length), meta_list); } /** * upload file to storage server (by callback, slave file mode) * @param group_name the group name to upload file to, can be empty * @param master_filename the master file name to generate the slave file * @param prefix_name the prefix name to generate the slave file * @param file_size the file size * @param callback the write data callback object * @param file_ext_name file ext name, do not include dot(.) * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file</li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_file(String group_name, String master_filename, String prefix_name, long file_size, UploadCallback callback, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { return this.do_upload_file(ProtoCommon.STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, group_name, master_filename, prefix_name, file_ext_name, file_size, callback, meta_list); } /** * upload appender file to storage server (by file name) * @param local_filename local filename to upload * @param file_ext_name file ext name, do not include dot(.), null to extract ext name from the local filename * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file </li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_appender_file(String local_filename, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { final String group_name = null; return this.upload_appender_file(group_name, local_filename, file_ext_name, meta_list); } /** * upload appender file to storage server (by file name) * @param group_name the group name to upload file to, can be empty * @param local_filename local filename to upload * @param file_ext_name file ext name, do not include dot(.), null to extract ext name from the local filename * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file </li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ protected String[] upload_appender_file(String group_name, String local_filename, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { final byte cmd = ProtoCommon.STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE; return this.upload_file(cmd, group_name, local_filename, file_ext_name, meta_list); } /** * upload appender file to storage server (by file buff) * @param file_buff file content/buff * @param offset start offset of the buff * @param length the length of buff to upload * @param file_ext_name file ext name, do not include dot(.) * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file</li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_appender_file(byte[] file_buff, int offset, int length, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { final String group_name = null; return this.upload_appender_file(group_name, file_buff, offset, length, file_ext_name, meta_list); } /** * upload appender file to storage server (by file buff) * @param group_name the group name to upload file to, can be empty * @param file_buff file content/buff * @param offset start offset of the buff * @param length the length of buff to upload * @param file_ext_name file ext name, do not include dot(.) * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file</li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_appender_file(String group_name, byte[] file_buff, int offset, int length, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { return this.do_upload_file(ProtoCommon.STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, group_name, null, null, file_ext_name, length, new UploadBuff(file_buff, offset, length), meta_list); } /** * upload appender file to storage server (by file buff) * @param file_buff file content/buff * @param file_ext_name file ext name, do not include dot(.) * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file</li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_appender_file(byte[] file_buff, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { final String group_name = null; return this.upload_appender_file(group_name, file_buff, 0, file_buff.length, file_ext_name, meta_list); } /** * upload appender file to storage server (by file buff) * @param group_name the group name to upload file to, can be empty * @param file_buff file content/buff * @param file_ext_name file ext name, do not include dot(.) * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file</li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_appender_file(String group_name, byte[] file_buff, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { return this.do_upload_file(ProtoCommon.STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, group_name, null, null, file_ext_name, file_buff.length, new UploadBuff(file_buff, 0, file_buff.length), meta_list); } /** * upload appender file to storage server (by callback) * @param group_name the group name to upload file to, can be empty * @param file_size the file size * @param callback the write data callback object * @param file_ext_name file ext name, do not include dot(.) * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li>results[0]: the group name to store the file</li></ul> * <ul><li>results[1]: the new created filename</li></ul> * return null if fail */ public String[] upload_appender_file(String group_name, long file_size, UploadCallback callback, String file_ext_name, NameValuePair[] meta_list) throws IOException, MyException { final String master_filename = null; final String prefix_name = null; return this.do_upload_file(ProtoCommon.STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, group_name, master_filename, prefix_name, file_ext_name, file_size, callback, meta_list); } /** * append file to storage server (by file name) * @param group_name the group name of appender file * @param appender_filename the appender filename * @param local_filename local filename to append * @return 0 for success, != 0 for error (error no) */ public int append_file(String group_name, String appender_filename, String local_filename) throws IOException, MyException { File f = new File(local_filename); FileInputStream fis = new FileInputStream(f); try { return this.do_append_file(group_name, appender_filename, f.length(), new UploadStream(fis, f.length())); } finally { fis.close(); } } /** * append file to storage server (by file buff) * @param group_name the group name of appender file * @param appender_filename the appender filename * @param file_buff file content/buff * @return 0 for success, != 0 for error (error no) */ public int append_file(String group_name, String appender_filename, byte[] file_buff) throws IOException, MyException { return this.do_append_file(group_name, appender_filename, file_buff.length, new UploadBuff(file_buff, 0, file_buff.length)); } /** * append file to storage server (by file buff) * @param group_name the group name of appender file * @param appender_filename the appender filename * @param file_buff file content/buff * @param offset start offset of the buff * @param length the length of buff to append * @return 0 for success, != 0 for error (error no) */ public int append_file(String group_name, String appender_filename, byte[] file_buff, int offset, int length) throws IOException, MyException { return this.do_append_file(group_name, appender_filename, length, new UploadBuff(file_buff, offset, length)); } /** * append file to storage server (by callback) * @param group_name the group name to append file to * @param appender_filename the appender filename * @param file_size the file size * @param callback the write data callback object * @return 0 for success, != 0 for error (error no) */ public int append_file(String group_name, String appender_filename, long file_size, UploadCallback callback) throws IOException, MyException { return this.do_append_file(group_name, appender_filename, file_size, callback); } /** * modify appender file to storage server (by file name) * @param group_name the group name of appender file * @param appender_filename the appender filename * @param file_offset the offset of appender file * @param local_filename local filename to append * @return 0 for success, != 0 for error (error no) */ public int modify_file(String group_name, String appender_filename, long file_offset, String local_filename) throws IOException, MyException { File f = new File(local_filename); FileInputStream fis = new FileInputStream(f); try { return this.do_modify_file(group_name, appender_filename, file_offset, f.length(), new UploadStream(fis, f.length())); } finally { fis.close(); } } /** * modify appender file to storage server (by file buff) * @param group_name the group name of appender file * @param appender_filename the appender filename * @param file_offset the offset of appender file * @param file_buff file content/buff * @return 0 for success, != 0 for error (error no) */ public int modify_file(String group_name, String appender_filename, long file_offset, byte[] file_buff) throws IOException, MyException { return this.do_modify_file(group_name, appender_filename, file_offset, file_buff.length, new UploadBuff(file_buff, 0, file_buff.length)); } /** * modify appender file to storage server (by file buff) * @param group_name the group name of appender file * @param appender_filename the appender filename * @param file_offset the offset of appender file * @param file_buff file content/buff * @param buffer_offset start offset of the buff * @param buffer_length the length of buff to modify * @return 0 for success, != 0 for error (error no) */ public int modify_file(String group_name, String appender_filename, long file_offset, byte[] file_buff, int buffer_offset, int buffer_length) throws IOException, MyException { return this.do_modify_file(group_name, appender_filename, file_offset, buffer_length, new UploadBuff(file_buff, buffer_offset, buffer_length)); } /** * modify appender file to storage server (by callback) * @param group_name the group name to modify file to * @param appender_filename the appender filename * @param file_offset the offset of appender file * @param modify_size the modify size * @param callback the write data callback object * @return 0 for success, != 0 for error (error no) */ public int modify_file(String group_name, String appender_filename, long file_offset, long modify_size, UploadCallback callback) throws IOException, MyException { return this.do_modify_file(group_name, appender_filename, file_offset, modify_size, callback); } /** * upload file to storage server * @param cmd the command code * @param group_name the group name to upload file to, can be empty * @param master_filename the master file name to generate the slave file * @param prefix_name the prefix name to generate the slave file * @param file_ext_name file ext name, do not include dot(.) * @param file_size the file size * @param callback the write data callback object * @param meta_list meta info array * @return 2 elements string array if success:<br> * <ul><li> results[0]: the group name to store the file</li></ul> * <ul><li> results[1]: the new created filename</li></ul> * return null if fail */ protected String[] do_upload_file(byte cmd, String group_name, String master_filename, String prefix_name, String file_ext_name, long file_size, UploadCallback callback, NameValuePair[] meta_list) throws IOException, MyException { byte[] header; byte[] ext_name_bs; String new_group_name; String remote_filename; boolean bNewConnection; Socket storageSocket; byte[] sizeBytes; byte[] hexLenBytes; byte[] masterFilenameBytes; boolean bUploadSlave; int offset; long body_len; bUploadSlave = ((group_name != null && group_name.length() > 0) && (master_filename != null && master_filename.length() > 0) && (prefix_name != null)); if (bUploadSlave) { bNewConnection = this.newUpdatableStorageConnection(group_name, master_filename); } else { bNewConnection = this.newWritableStorageConnection(group_name); } try { storageSocket = this.storageServer.getSocket(); ext_name_bs = new byte[ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN]; Arrays.fill(ext_name_bs, (byte)0); if (file_ext_name != null && file_ext_name.length() > 0) { byte[] bs = file_ext_name.getBytes(ClientGlobal.g_charset); int ext_name_len = bs.length; if (ext_name_len > ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN) { ext_name_len = ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN; } System.arraycopy(bs, 0, ext_name_bs, 0, ext_name_len); } if (bUploadSlave) { masterFilenameBytes = master_filename.getBytes(ClientGlobal.g_charset); sizeBytes = new byte[2 * ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE]; body_len = sizeBytes.length + ProtoCommon.FDFS_FILE_PREFIX_MAX_LEN + ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN + masterFilenameBytes.length + file_size; hexLenBytes = ProtoCommon.long2buff(master_filename.length()); System.arraycopy(hexLenBytes, 0, sizeBytes, 0, hexLenBytes.length); offset = hexLenBytes.length; } else { masterFilenameBytes = null; sizeBytes = new byte[1 + 1 * ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE]; body_len = sizeBytes.length + ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN + file_size; sizeBytes[0] = (byte)this.storageServer.getStorePathIndex(); offset = 1; } hexLenBytes = ProtoCommon.long2buff(file_size); System.arraycopy(hexLenBytes, 0, sizeBytes, offset, hexLenBytes.length); OutputStream out = storageSocket.getOutputStream(); header = ProtoCommon.packHeader(cmd, body_len, (byte)0); byte[] wholePkg = new byte[(int)(header.length + body_len - file_size)]; System.arraycopy(header, 0, wholePkg, 0, header.length); System.arraycopy(sizeBytes, 0, wholePkg, header.length, sizeBytes.length); offset = header.length + sizeBytes.length; if (bUploadSlave) { byte[] prefix_name_bs = new byte[ProtoCommon.FDFS_FILE_PREFIX_MAX_LEN]; byte[] bs = prefix_name.getBytes(ClientGlobal.g_charset); int prefix_name_len = bs.length; Arrays.fill(prefix_name_bs, (byte)0); if (prefix_name_len > ProtoCommon.FDFS_FILE_PREFIX_MAX_LEN) { prefix_name_len = ProtoCommon.FDFS_FILE_PREFIX_MAX_LEN; } if (prefix_name_len > 0) { System.arraycopy(bs, 0, prefix_name_bs, 0, prefix_name_len); } System.arraycopy(prefix_name_bs, 0, wholePkg, offset, prefix_name_bs.length); offset += prefix_name_bs.length; } System.arraycopy(ext_name_bs, 0, wholePkg, offset, ext_name_bs.length); offset += ext_name_bs.length; if (bUploadSlave) { System.arraycopy(masterFilenameBytes, 0, wholePkg, offset, masterFilenameBytes.length); offset += masterFilenameBytes.length; } out.write(wholePkg); if ((this.errno=(byte)callback.send(out)) != 0) { return null; } ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(storageSocket.getInputStream(), ProtoCommon.STORAGE_PROTO_CMD_RESP, -1); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } if (pkgInfo.body.length <= ProtoCommon.FDFS_GROUP_NAME_MAX_LEN) { throw new MyException("body length: " + pkgInfo.body.length + " <= " + ProtoCommon.FDFS_GROUP_NAME_MAX_LEN); } new_group_name = new String(pkgInfo.body, 0, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN).trim(); remote_filename = new String(pkgInfo.body, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN, pkgInfo.body.length - ProtoCommon.FDFS_GROUP_NAME_MAX_LEN); String[] results = new String[2]; results[0] = new_group_name; results[1] = remote_filename; if (meta_list == null || meta_list.length == 0) { return results; } int result = 0; try { result = this.set_metadata(new_group_name, remote_filename, meta_list, ProtoCommon.STORAGE_SET_METADATA_FLAG_OVERWRITE); } catch(IOException ex) { result = 5; throw ex; } finally { if (result != 0) { this.errno = (byte)result; this.delete_file(new_group_name, remote_filename); return null; } } return results; } catch(IOException ex) { if (!bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } throw ex; } finally { if (bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } } } /** * append file to storage server * @param group_name the group name of appender file * @param appender_filename the appender filename * @param file_size the file size * @param callback the write data callback object * @return return true for success, false for fail */ protected int do_append_file(String group_name, String appender_filename, long file_size, UploadCallback callback) throws IOException, MyException { byte[] header; boolean bNewConnection; Socket storageSocket; byte[] hexLenBytes; byte[] appenderFilenameBytes; int offset; long body_len; if ((group_name == null || group_name.length() == 0) || (appender_filename == null || appender_filename.length() == 0)) { this.errno = ProtoCommon.ERR_NO_EINVAL; return this.errno; } bNewConnection = this.newUpdatableStorageConnection(group_name, appender_filename); try { storageSocket = this.storageServer.getSocket(); appenderFilenameBytes = appender_filename.getBytes(ClientGlobal.g_charset); body_len = 2 * ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE + appenderFilenameBytes.length + file_size; header = ProtoCommon.packHeader(ProtoCommon.STORAGE_PROTO_CMD_APPEND_FILE, body_len, (byte)0); byte[] wholePkg = new byte[(int)(header.length + body_len - file_size)]; System.arraycopy(header, 0, wholePkg, 0, header.length); offset = header.length; hexLenBytes = ProtoCommon.long2buff(appender_filename.length()); System.arraycopy(hexLenBytes, 0, wholePkg, offset, hexLenBytes.length); offset += hexLenBytes.length; hexLenBytes = ProtoCommon.long2buff(file_size); System.arraycopy(hexLenBytes, 0, wholePkg, offset, hexLenBytes.length); offset += hexLenBytes.length; OutputStream out = storageSocket.getOutputStream(); System.arraycopy(appenderFilenameBytes, 0, wholePkg, offset, appenderFilenameBytes.length); offset += appenderFilenameBytes.length; out.write(wholePkg); if ((this.errno=(byte)callback.send(out)) != 0) { return this.errno; } ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(storageSocket.getInputStream(), ProtoCommon.STORAGE_PROTO_CMD_RESP, 0); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return this.errno; } return 0; } catch(IOException ex) { if (!bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } throw ex; } finally { if (bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } } } /** * modify appender file to storage server * @param group_name the group name of appender file * @param appender_filename the appender filename * @param file_offset the offset of appender file * @param modify_size the modify size * @param callback the write data callback object * @return return true for success, false for fail */ protected int do_modify_file(String group_name, String appender_filename, long file_offset, long modify_size, UploadCallback callback) throws IOException, MyException { byte[] header; boolean bNewConnection; Socket storageSocket; byte[] hexLenBytes; byte[] appenderFilenameBytes; int offset; long body_len; if ((group_name == null || group_name.length() == 0) || (appender_filename == null || appender_filename.length() == 0)) { this.errno = ProtoCommon.ERR_NO_EINVAL; return this.errno; } bNewConnection = this.newUpdatableStorageConnection(group_name, appender_filename); try { storageSocket = this.storageServer.getSocket(); appenderFilenameBytes = appender_filename.getBytes(ClientGlobal.g_charset); body_len = 3 * ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE + appenderFilenameBytes.length + modify_size; header = ProtoCommon.packHeader(ProtoCommon.STORAGE_PROTO_CMD_MODIFY_FILE, body_len, (byte)0); byte[] wholePkg = new byte[(int)(header.length + body_len - modify_size)]; System.arraycopy(header, 0, wholePkg, 0, header.length); offset = header.length; hexLenBytes = ProtoCommon.long2buff(appender_filename.length()); System.arraycopy(hexLenBytes, 0, wholePkg, offset, hexLenBytes.length); offset += hexLenBytes.length; hexLenBytes = ProtoCommon.long2buff(file_offset); System.arraycopy(hexLenBytes, 0, wholePkg, offset, hexLenBytes.length); offset += hexLenBytes.length; hexLenBytes = ProtoCommon.long2buff(modify_size); System.arraycopy(hexLenBytes, 0, wholePkg, offset, hexLenBytes.length); offset += hexLenBytes.length; OutputStream out = storageSocket.getOutputStream(); System.arraycopy(appenderFilenameBytes, 0, wholePkg, offset, appenderFilenameBytes.length); offset += appenderFilenameBytes.length; out.write(wholePkg); if ((this.errno=(byte)callback.send(out)) != 0) { return this.errno; } ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(storageSocket.getInputStream(), ProtoCommon.STORAGE_PROTO_CMD_RESP, 0); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return this.errno; } return 0; } catch(IOException ex) { if (!bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } throw ex; } finally { if (bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } } } /** * delete file from storage server * @param group_name the group name of storage server * @param remote_filename filename on storage server * @return 0 for success, none zero for fail (error code) */ public int delete_file(String group_name, String remote_filename) throws IOException, MyException { boolean bNewConnection = this.newUpdatableStorageConnection(group_name, remote_filename); Socket storageSocket = this.storageServer.getSocket(); try { this.send_package(ProtoCommon.STORAGE_PROTO_CMD_DELETE_FILE, group_name, remote_filename); ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(storageSocket.getInputStream(), ProtoCommon.STORAGE_PROTO_CMD_RESP, 0); this.errno = pkgInfo.errno; return pkgInfo.errno; } catch(IOException ex) { if (!bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } throw ex; } finally { if (bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } } } /** * truncate appender file to size 0 from storage server * @param group_name the group name of storage server * @param appender_filename the appender filename * @return 0 for success, none zero for fail (error code) */ public int truncate_file(String group_name, String appender_filename) throws IOException, MyException { final long truncated_file_size = 0; return this.truncate_file(group_name, appender_filename, truncated_file_size); } /** * truncate appender file from storage server * @param group_name the group name of storage server * @param appender_filename the appender filename * @param truncated_file_size truncated file size * @return 0 for success, none zero for fail (error code) */ public int truncate_file(String group_name, String appender_filename, long truncated_file_size) throws IOException, MyException { byte[] header; boolean bNewConnection; Socket storageSocket; byte[] hexLenBytes; byte[] appenderFilenameBytes; int offset; int body_len; if ((group_name == null || group_name.length() == 0) || (appender_filename == null || appender_filename.length() == 0)) { this.errno = ProtoCommon.ERR_NO_EINVAL; return this.errno; } bNewConnection = this.newUpdatableStorageConnection(group_name, appender_filename); try { storageSocket = this.storageServer.getSocket(); appenderFilenameBytes = appender_filename.getBytes(ClientGlobal.g_charset); body_len = 2 * ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE + appenderFilenameBytes.length; header = ProtoCommon.packHeader(ProtoCommon.STORAGE_PROTO_CMD_TRUNCATE_FILE, body_len, (byte)0); byte[] wholePkg = new byte[header.length + body_len]; System.arraycopy(header, 0, wholePkg, 0, header.length); offset = header.length; hexLenBytes = ProtoCommon.long2buff(appender_filename.length()); System.arraycopy(hexLenBytes, 0, wholePkg, offset, hexLenBytes.length); offset += hexLenBytes.length; hexLenBytes = ProtoCommon.long2buff(truncated_file_size); System.arraycopy(hexLenBytes, 0, wholePkg, offset, hexLenBytes.length); offset += hexLenBytes.length; OutputStream out = storageSocket.getOutputStream(); System.arraycopy(appenderFilenameBytes, 0, wholePkg, offset, appenderFilenameBytes.length); offset += appenderFilenameBytes.length; out.write(wholePkg); ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(storageSocket.getInputStream(), ProtoCommon.STORAGE_PROTO_CMD_RESP, 0); this.errno = pkgInfo.errno; return pkgInfo.errno; } catch(IOException ex) { if (!bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } throw ex; } finally { if (bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } } } /** * download file from storage server * @param group_name the group name of storage server * @param remote_filename filename on storage server * @return file content/buff, return null if fail */ public byte[] download_file(String group_name, String remote_filename) throws IOException, MyException { final long file_offset = 0; final long download_bytes = 0; return this.download_file(group_name, remote_filename, file_offset, download_bytes); } /** * download file from storage server * @param group_name the group name of storage server * @param remote_filename filename on storage server * @param file_offset the start offset of the file * @param download_bytes download bytes, 0 for remain bytes from offset * @return file content/buff, return null if fail */ public byte[] download_file(String group_name, String remote_filename, long file_offset, long download_bytes) throws IOException, MyException { boolean bNewConnection = this.newReadableStorageConnection(group_name, remote_filename); Socket storageSocket = this.storageServer.getSocket(); try { ProtoCommon.RecvPackageInfo pkgInfo; this.send_download_package(group_name, remote_filename, file_offset, download_bytes); pkgInfo = ProtoCommon.recvPackage(storageSocket.getInputStream(), ProtoCommon.STORAGE_PROTO_CMD_RESP, -1); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } return pkgInfo.body; } catch(IOException ex) { if (!bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } throw ex; } finally { if (bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } } } /** * download file from storage server * @param group_name the group name of storage server * @param remote_filename filename on storage server * @param local_filename filename on local * @return 0 success, return none zero errno if fail */ public int download_file(String group_name, String remote_filename, String local_filename) throws IOException, MyException { final long file_offset = 0; final long download_bytes = 0; return this.download_file(group_name, remote_filename, file_offset, download_bytes, local_filename); } /** * download file from storage server * @param group_name the group name of storage server * @param remote_filename filename on storage server * @param file_offset the start offset of the file * @param download_bytes download bytes, 0 for remain bytes from offset * @param local_filename filename on local * @return 0 success, return none zero errno if fail */ public int download_file(String group_name, String remote_filename, long file_offset, long download_bytes, String local_filename) throws IOException, MyException { boolean bNewConnection = this.newReadableStorageConnection(group_name, remote_filename); Socket storageSocket = this.storageServer.getSocket(); try { ProtoCommon.RecvHeaderInfo header; FileOutputStream out = new FileOutputStream(local_filename); try { this.errno = 0; this.send_download_package(group_name, remote_filename, file_offset, download_bytes); InputStream in = storageSocket.getInputStream(); header = ProtoCommon.recvHeader(in, ProtoCommon.STORAGE_PROTO_CMD_RESP, -1); this.errno = header.errno; if (header.errno != 0) { return header.errno; } byte[] buff = new byte[256 * 1024]; long remainBytes = header.body_len; int bytes; //System.out.println("expect_body_len=" + header.body_len); while (remainBytes > 0) { if ((bytes=in.read(buff, 0, remainBytes > buff.length ? buff.length : (int)remainBytes)) < 0) { throw new IOException("recv package size " + (header.body_len - remainBytes) + " != " + header.body_len); } out.write(buff, 0, bytes); remainBytes -= bytes; //System.out.println("totalBytes=" + (header.body_len - remainBytes)); } return 0; } catch(IOException ex) { if (this.errno == 0) { this.errno = ProtoCommon.ERR_NO_EIO; } throw ex; } finally { out.close(); if (this.errno != 0) { (new File(local_filename)).delete(); } } } catch(IOException ex) { if (!bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } throw ex; } finally { if (bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } } } /** * download file from storage server * @param group_name the group name of storage server * @param remote_filename filename on storage server * @param callback call callback.recv() when data arrive * @return 0 success, return none zero errno if fail */ public int download_file(String group_name, String remote_filename, DownloadCallback callback) throws IOException, MyException { final long file_offset = 0; final long download_bytes = 0; return this.download_file(group_name, remote_filename, file_offset, download_bytes, callback); } /** * download file from storage server * @param group_name the group name of storage server * @param remote_filename filename on storage server * @param file_offset the start offset of the file * @param download_bytes download bytes, 0 for remain bytes from offset * @param callback call callback.recv() when data arrive * @return 0 success, return none zero errno if fail */ public int download_file(String group_name, String remote_filename, long file_offset, long download_bytes, DownloadCallback callback) throws IOException, MyException { int result; boolean bNewConnection = this.newReadableStorageConnection(group_name, remote_filename); Socket storageSocket = this.storageServer.getSocket(); try { ProtoCommon.RecvHeaderInfo header; this.send_download_package(group_name, remote_filename, file_offset, download_bytes); InputStream in = storageSocket.getInputStream(); header = ProtoCommon.recvHeader(in, ProtoCommon.STORAGE_PROTO_CMD_RESP, -1); this.errno = header.errno; if (header.errno != 0) { return header.errno; } byte[] buff = new byte[2 * 1024]; long remainBytes = header.body_len; int bytes; //System.out.println("expect_body_len=" + header.body_len); while (remainBytes > 0) { if ((bytes=in.read(buff, 0, remainBytes > buff.length ? buff.length : (int)remainBytes)) < 0) { throw new IOException("recv package size " + (header.body_len - remainBytes) + " != " + header.body_len); } if ((result=callback.recv(header.body_len, buff, bytes)) != 0) { this.errno = (byte)result; return result; } remainBytes -= bytes; //System.out.println("totalBytes=" + (header.body_len - remainBytes)); } return 0; } catch(IOException ex) { if (!bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } throw ex; } finally { if (bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } } } /** * get all metadata items from storage server * @param group_name the group name of storage server * @param remote_filename filename on storage server * @return meta info array, return null if fail */ public NameValuePair[] get_metadata(String group_name, String remote_filename)throws IOException, MyException { boolean bNewConnection = this.newUpdatableStorageConnection(group_name, remote_filename); Socket storageSocket = this.storageServer.getSocket(); try { ProtoCommon.RecvPackageInfo pkgInfo; this.send_package(ProtoCommon.STORAGE_PROTO_CMD_GET_METADATA, group_name, remote_filename); pkgInfo = ProtoCommon.recvPackage(storageSocket.getInputStream(), ProtoCommon.STORAGE_PROTO_CMD_RESP, -1); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } return ProtoCommon.split_metadata(new String(pkgInfo.body, ClientGlobal.g_charset)); } catch(IOException ex) { if (!bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } throw ex; } finally { if (bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } } } /** * set metadata items to storage server * @param group_name the group name of storage server * @param remote_filename filename on storage server * @param meta_list meta item array * @param op_flag flag, can be one of following values: <br> * <ul><li> ProtoCommon.STORAGE_SET_METADATA_FLAG_OVERWRITE: overwrite all old * metadata items</li></ul> * <ul><li> ProtoCommon.STORAGE_SET_METADATA_FLAG_MERGE: merge, insert when * the metadata item not exist, otherwise update it</li></ul> * @return 0 for success, !=0 fail (error code) */ public int set_metadata(String group_name, String remote_filename, NameValuePair[] meta_list, byte op_flag) throws IOException, MyException { boolean bNewConnection = this.newUpdatableStorageConnection(group_name, remote_filename); Socket storageSocket = this.storageServer.getSocket(); try { byte[] header; byte[] groupBytes; byte[] filenameBytes; byte[] meta_buff; byte[] bs; int groupLen; byte[] sizeBytes; ProtoCommon.RecvPackageInfo pkgInfo; if (meta_list == null) { meta_buff = new byte[0]; } else { meta_buff = ProtoCommon.pack_metadata(meta_list).getBytes(ClientGlobal.g_charset); } filenameBytes = remote_filename.getBytes(ClientGlobal.g_charset); sizeBytes = new byte[2 * ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE]; Arrays.fill(sizeBytes, (byte)0); bs = ProtoCommon.long2buff(filenameBytes.length); System.arraycopy(bs, 0, sizeBytes, 0, bs.length); bs = ProtoCommon.long2buff(meta_buff.length); System.arraycopy(bs, 0, sizeBytes, ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE, bs.length); groupBytes = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; bs = group_name.getBytes(ClientGlobal.g_charset); Arrays.fill(groupBytes, (byte)0); if (bs.length <= groupBytes.length) { groupLen = bs.length; } else { groupLen = groupBytes.length; } System.arraycopy(bs, 0, groupBytes, 0, groupLen); header = ProtoCommon.packHeader(ProtoCommon.STORAGE_PROTO_CMD_SET_METADATA, 2 * ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE + 1 + groupBytes.length + filenameBytes.length + meta_buff.length, (byte)0); OutputStream out = storageSocket.getOutputStream(); byte[] wholePkg = new byte[header.length + sizeBytes.length + 1 + groupBytes.length + filenameBytes.length]; System.arraycopy(header, 0, wholePkg, 0, header.length); System.arraycopy(sizeBytes, 0, wholePkg, header.length, sizeBytes.length); wholePkg[header.length+sizeBytes.length] = op_flag; System.arraycopy(groupBytes, 0, wholePkg, header.length+sizeBytes.length+1, groupBytes.length); System.arraycopy(filenameBytes, 0, wholePkg, header.length+sizeBytes.length+1+groupBytes.length, filenameBytes.length); out.write(wholePkg); if (meta_buff.length > 0) { out.write(meta_buff); } pkgInfo = ProtoCommon.recvPackage(storageSocket.getInputStream(), ProtoCommon.STORAGE_PROTO_CMD_RESP, 0); this.errno = pkgInfo.errno; return pkgInfo.errno; } catch(IOException ex) { if (!bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } throw ex; } finally { if (bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } } } /** * get file info decoded from the filename, fetch from the storage if necessary * @param group_name the group name * @param remote_filename the filename * @return FileInfo object for success, return null for fail */ public FileInfo get_file_info(String group_name, String remote_filename) throws IOException, MyException { if (remote_filename.length() < ProtoCommon.FDFS_FILE_PATH_LEN + ProtoCommon.FDFS_FILENAME_BASE64_LENGTH + ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN + 1) { this.errno = ProtoCommon.ERR_NO_EINVAL; return null; } byte[] buff = base64.decodeAuto(remote_filename.substring(ProtoCommon.FDFS_FILE_PATH_LEN, ProtoCommon.FDFS_FILE_PATH_LEN + ProtoCommon.FDFS_FILENAME_BASE64_LENGTH)); long file_size = ProtoCommon.buff2long(buff, 4 * 2); if (((remote_filename.length() > ProtoCommon.TRUNK_LOGIC_FILENAME_LENGTH) || ((remote_filename.length() > ProtoCommon.NORMAL_LOGIC_FILENAME_LENGTH) && ((file_size & ProtoCommon.TRUNK_FILE_MARK_SIZE) == 0))) || ((file_size & ProtoCommon.APPENDER_FILE_SIZE) != 0)) { //slave file or appender file FileInfo fi = this.query_file_info(group_name, remote_filename); if (fi == null) { return null; } return fi; } FileInfo fileInfo = new FileInfo(file_size, 0, 0, ProtoCommon.getIpAddress(buff, 0)); fileInfo.setCreateTimestamp(ProtoCommon.buff2int(buff, 4)); if ((file_size >> 63) != 0) { file_size &= 0xFFFFFFFFL; //low 32 bits is file size fileInfo.setFileSize(file_size); } fileInfo.setCrc32(ProtoCommon.buff2int(buff, 4 * 4)); return fileInfo; } /** * get file info from storage server * @param group_name the group name of storage server * @param remote_filename filename on storage server * @return FileInfo object for success, return null for fail */ public FileInfo query_file_info(String group_name, String remote_filename) throws IOException, MyException { boolean bNewConnection = this.newUpdatableStorageConnection(group_name, remote_filename); Socket storageSocket = this.storageServer.getSocket(); try { byte[] header; byte[] groupBytes; byte[] filenameBytes; byte[] bs; int groupLen; ProtoCommon.RecvPackageInfo pkgInfo; filenameBytes = remote_filename.getBytes(ClientGlobal.g_charset); groupBytes = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; bs = group_name.getBytes(ClientGlobal.g_charset); Arrays.fill(groupBytes, (byte)0); if (bs.length <= groupBytes.length) { groupLen = bs.length; } else { groupLen = groupBytes.length; } System.arraycopy(bs, 0, groupBytes, 0, groupLen); header = ProtoCommon.packHeader(ProtoCommon.STORAGE_PROTO_CMD_QUERY_FILE_INFO, + groupBytes.length + filenameBytes.length, (byte)0); OutputStream out = storageSocket.getOutputStream(); byte[] wholePkg = new byte[header.length + groupBytes.length + filenameBytes.length]; System.arraycopy(header, 0, wholePkg, 0, header.length); System.arraycopy(groupBytes, 0, wholePkg, header.length, groupBytes.length); System.arraycopy(filenameBytes, 0, wholePkg, header.length + groupBytes.length, filenameBytes.length); out.write(wholePkg); pkgInfo = ProtoCommon.recvPackage(storageSocket.getInputStream(), ProtoCommon.STORAGE_PROTO_CMD_RESP, 3 * ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE + ProtoCommon.FDFS_IPADDR_SIZE); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } long file_size = ProtoCommon.buff2long(pkgInfo.body, 0); int create_timestamp = (int)ProtoCommon.buff2long(pkgInfo.body, ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE); int crc32 = (int)ProtoCommon.buff2long(pkgInfo.body, 2 * ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE); String source_ip_addr = (new String(pkgInfo.body, 3 * ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE, ProtoCommon.FDFS_IPADDR_SIZE)).trim(); return new FileInfo(file_size, create_timestamp, crc32, source_ip_addr); } catch(IOException ex) { if (!bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } throw ex; } finally { if (bNewConnection) { try { this.storageServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } finally { this.storageServer = null; } } } } /** * check storage socket, if null create a new connection * @param group_name the group name to upload file to, can be empty * @return true if create a new connection */ protected boolean newWritableStorageConnection(String group_name) throws IOException, MyException { if (this.storageServer != null) { return false; } else { TrackerClient tracker = new TrackerClient(); this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name); if (this.storageServer == null) { throw new MyException("getStoreStorage fail, errno code: " + tracker.getErrorCode()); } return true; } } /** * check storage socket, if null create a new connection * @param group_name the group name of storage server * @param remote_filename filename on storage server * @return true if create a new connection */ protected boolean newReadableStorageConnection(String group_name, String remote_filename) throws IOException, MyException { if (this.storageServer != null) { return false; } else { TrackerClient tracker = new TrackerClient(); this.storageServer = tracker.getFetchStorage(this.trackerServer, group_name, remote_filename); if (this.storageServer == null) { throw new MyException("getStoreStorage fail, errno code: " + tracker.getErrorCode()); } return true; } } /** * check storage socket, if null create a new connection * @param group_name the group name of storage server * @param remote_filename filename on storage server * @return true if create a new connection */ protected boolean newUpdatableStorageConnection(String group_name, String remote_filename) throws IOException, MyException { if (this.storageServer != null) { return false; } else { TrackerClient tracker = new TrackerClient(); this.storageServer = tracker.getUpdateStorage(this.trackerServer, group_name, remote_filename); if (this.storageServer == null) { throw new MyException("getStoreStorage fail, errno code: " + tracker.getErrorCode()); } return true; } } /** * send package to storage server * @param cmd which command to send * @param group_name the group name of storage server * @param remote_filename filename on storage server */ protected void send_package(byte cmd, String group_name, String remote_filename) throws IOException { byte[] header; byte[] groupBytes; byte[] filenameBytes; byte[] bs; int groupLen; groupBytes = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; bs = group_name.getBytes(ClientGlobal.g_charset); filenameBytes = remote_filename.getBytes(ClientGlobal.g_charset); Arrays.fill(groupBytes, (byte)0); if (bs.length <= groupBytes.length) { groupLen = bs.length; } else { groupLen = groupBytes.length; } System.arraycopy(bs, 0, groupBytes, 0, groupLen); header = ProtoCommon.packHeader(cmd, groupBytes.length + filenameBytes.length, (byte)0); byte[] wholePkg = new byte[header.length + groupBytes.length + filenameBytes.length]; System.arraycopy(header, 0, wholePkg, 0, header.length); System.arraycopy(groupBytes, 0, wholePkg, header.length, groupBytes.length); System.arraycopy(filenameBytes, 0, wholePkg, header.length+groupBytes.length, filenameBytes.length); this.storageServer.getSocket().getOutputStream().write(wholePkg); } /** * send package to storage server * @param group_name the group name of storage server * @param remote_filename filename on storage server * @param file_offset the start offset of the file * @param download_bytes download bytes */ protected void send_download_package(String group_name, String remote_filename, long file_offset, long download_bytes) throws IOException { byte[] header; byte[] bsOffset; byte[] bsDownBytes; byte[] groupBytes; byte[] filenameBytes; byte[] bs; int groupLen; bsOffset = ProtoCommon.long2buff(file_offset); bsDownBytes = ProtoCommon.long2buff(download_bytes); groupBytes = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; bs = group_name.getBytes(ClientGlobal.g_charset); filenameBytes = remote_filename.getBytes(ClientGlobal.g_charset); Arrays.fill(groupBytes, (byte)0); if (bs.length <= groupBytes.length) { groupLen = bs.length; } else { groupLen = groupBytes.length; } System.arraycopy(bs, 0, groupBytes, 0, groupLen); header = ProtoCommon.packHeader(ProtoCommon.STORAGE_PROTO_CMD_DOWNLOAD_FILE, bsOffset.length + bsDownBytes.length + groupBytes.length + filenameBytes.length, (byte)0); byte[] wholePkg = new byte[header.length + bsOffset.length + bsDownBytes.length + groupBytes.length + filenameBytes.length]; System.arraycopy(header, 0, wholePkg, 0, header.length); System.arraycopy(bsOffset, 0, wholePkg, header.length, bsOffset.length); System.arraycopy(bsDownBytes, 0, wholePkg, header.length+bsOffset.length, bsDownBytes.length); System.arraycopy(groupBytes, 0, wholePkg, header.length+bsOffset.length+bsDownBytes.length, groupBytes.length); System.arraycopy(filenameBytes, 0, wholePkg, header.length+bsOffset.length+bsDownBytes.length+groupBytes.length, filenameBytes.length); this.storageServer.getSocket().getOutputStream().write(wholePkg); } }
ProtoCommon.java:
/** * Copyright (C) 2008 Happy Fish / YuQing * * FastDFS Java Client may be copied only under the terms of the GNU Lesser * General Public License (LGPL). * Please visit the FastDFS Home Page http://www.csource.org/ for more detail. **/ package org.csource.fastdfs; import java.io.InputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.net.Socket; import org.csource.common.MyException; import org.csource.common.NameValuePair; /** * protocol common functions * @author Happy Fish / YuQing * @version Version 1.18 */ public class ProtoCommon { /** * receive package info */ public static class RecvPackageInfo { public byte errno; public byte[] body; public RecvPackageInfo(byte errno, byte[] body) { this.errno = errno; this.body = body; } } /** * receive header info */ public static class RecvHeaderInfo { public byte errno; public long body_len; public RecvHeaderInfo(byte errno, long body_len) { this.errno = errno; this.body_len = body_len; } } public static final byte FDFS_PROTO_CMD_QUIT = 82; public static final byte TRACKER_PROTO_CMD_SERVER_LIST_GROUP = 91; public static final byte TRACKER_PROTO_CMD_SERVER_LIST_STORAGE = 92; public static final byte TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE = 93; public static final byte TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE = 101; public static final byte TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE = 102; public static final byte TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE = 103; public static final byte TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE = 104; public static final byte TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ALL = 105; public static final byte TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ALL = 106; public static final byte TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ALL = 107; public static final byte TRACKER_PROTO_CMD_RESP = 100; public static final byte FDFS_PROTO_CMD_ACTIVE_TEST = 111; public static final byte STORAGE_PROTO_CMD_UPLOAD_FILE = 11; public static final byte STORAGE_PROTO_CMD_DELETE_FILE = 12; public static final byte STORAGE_PROTO_CMD_SET_METADATA = 13; public static final byte STORAGE_PROTO_CMD_DOWNLOAD_FILE = 14; public static final byte STORAGE_PROTO_CMD_GET_METADATA = 15; public static final byte STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE = 21; public static final byte STORAGE_PROTO_CMD_QUERY_FILE_INFO = 22; public static final byte STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE= 23; //create appender file public static final byte STORAGE_PROTO_CMD_APPEND_FILE = 24; //append file public static final byte STORAGE_PROTO_CMD_MODIFY_FILE = 34; //modify appender file public static final byte STORAGE_PROTO_CMD_TRUNCATE_FILE = 36; //truncate appender file public static final byte STORAGE_PROTO_CMD_RESP = TRACKER_PROTO_CMD_RESP; public static final byte FDFS_STORAGE_STATUS_INIT = 0; public static final byte FDFS_STORAGE_STATUS_WAIT_SYNC = 1; public static final byte FDFS_STORAGE_STATUS_SYNCING = 2; public static final byte FDFS_STORAGE_STATUS_IP_CHANGED = 3; public static final byte FDFS_STORAGE_STATUS_DELETED = 4; public static final byte FDFS_STORAGE_STATUS_OFFLINE = 5; public static final byte FDFS_STORAGE_STATUS_ONLINE = 6; public static final byte FDFS_STORAGE_STATUS_ACTIVE = 7; public static final byte FDFS_STORAGE_STATUS_NONE = 99; /** * for overwrite all old metadata */ public static final byte STORAGE_SET_METADATA_FLAG_OVERWRITE = 'O'; /** * for replace, insert when the meta item not exist, otherwise update it */ public static final byte STORAGE_SET_METADATA_FLAG_MERGE = 'M'; public static final int FDFS_PROTO_PKG_LEN_SIZE = 8; public static final int FDFS_PROTO_CMD_SIZE = 1; public static final int FDFS_GROUP_NAME_MAX_LEN = 16; public static final int FDFS_IPADDR_SIZE = 16; public static final int FDFS_DOMAIN_NAME_MAX_SIZE = 128; public static final int FDFS_VERSION_SIZE = 6; public static final int FDFS_STORAGE_ID_MAX_SIZE = 16; public static final String FDFS_RECORD_SEPERATOR = "\u0001"; public static final String FDFS_FIELD_SEPERATOR = "\u0002"; public static final int TRACKER_QUERY_STORAGE_FETCH_BODY_LEN = FDFS_GROUP_NAME_MAX_LEN + FDFS_IPADDR_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE; public static final int TRACKER_QUERY_STORAGE_STORE_BODY_LEN = FDFS_GROUP_NAME_MAX_LEN + FDFS_IPADDR_SIZE + FDFS_PROTO_PKG_LEN_SIZE; protected static final int PROTO_HEADER_CMD_INDEX = FDFS_PROTO_PKG_LEN_SIZE; protected static final int PROTO_HEADER_STATUS_INDEX = FDFS_PROTO_PKG_LEN_SIZE+1; public static final byte FDFS_FILE_EXT_NAME_MAX_LEN = 6; public static final byte FDFS_FILE_PREFIX_MAX_LEN = 16; public static final byte FDFS_FILE_PATH_LEN = 10; public static final byte FDFS_FILENAME_BASE64_LENGTH = 27; public static final byte FDFS_TRUNK_FILE_INFO_LEN = 16; public static final byte ERR_NO_ENOENT = 2; public static final byte ERR_NO_EIO = 5; public static final byte ERR_NO_EBUSY = 16; public static final byte ERR_NO_EINVAL = 22; public static final byte ERR_NO_ENOSPC = 28; public static final byte ECONNREFUSED = 61; public static final byte ERR_NO_EALREADY = 114; public static final long INFINITE_FILE_SIZE = 256 * 1024L * 1024 * 1024 * 1024 * 1024L; public static final long APPENDER_FILE_SIZE = INFINITE_FILE_SIZE; public static final long TRUNK_FILE_MARK_SIZE = 512 * 1024L * 1024 * 1024 * 1024 * 1024L; public static final long NORMAL_LOGIC_FILENAME_LENGTH = FDFS_FILE_PATH_LEN + FDFS_FILENAME_BASE64_LENGTH + FDFS_FILE_EXT_NAME_MAX_LEN + 1; public static final long TRUNK_LOGIC_FILENAME_LENGTH = NORMAL_LOGIC_FILENAME_LENGTH + FDFS_TRUNK_FILE_INFO_LEN; private ProtoCommon() { } public static String getStorageStatusCaption(byte status) { switch(status) { case FDFS_STORAGE_STATUS_INIT: return "INIT"; case FDFS_STORAGE_STATUS_WAIT_SYNC: return "WAIT_SYNC"; case FDFS_STORAGE_STATUS_SYNCING: return "SYNCING"; case FDFS_STORAGE_STATUS_IP_CHANGED: return "IP_CHANGED"; case FDFS_STORAGE_STATUS_DELETED: return "DELETED"; case FDFS_STORAGE_STATUS_OFFLINE: return "OFFLINE"; case FDFS_STORAGE_STATUS_ONLINE: return "ONLINE"; case FDFS_STORAGE_STATUS_ACTIVE: return "ACTIVE"; case FDFS_STORAGE_STATUS_NONE: return "NONE"; default: return "UNKOWN"; } } /** * pack header by FastDFS transfer protocol * @param cmd which command to send * @param pkg_len package body length * @param errno status code, should be (byte)0 * @return packed byte buffer */ public static byte[] packHeader(byte cmd, long pkg_len, byte errno) throws UnsupportedEncodingException { byte[] header; byte[] hex_len; header = new byte[FDFS_PROTO_PKG_LEN_SIZE + 2]; Arrays.fill(header, (byte)0); hex_len = ProtoCommon.long2buff(pkg_len); System.arraycopy(hex_len, 0, header, 0, hex_len.length); header[PROTO_HEADER_CMD_INDEX] = cmd; header[PROTO_HEADER_STATUS_INDEX] = errno; return header; } /** * receive pack header * @param in input stream * @param expect_cmd expect response command * @param expect_body_len expect response package body length * @return RecvHeaderInfo: errno and pkg body length */ public static RecvHeaderInfo recvHeader(InputStream in, byte expect_cmd, long expect_body_len) throws IOException { byte[] header; int bytes; long pkg_len; header = new byte[FDFS_PROTO_PKG_LEN_SIZE + 2]; if ((bytes=in.read(header)) != header.length) { throw new IOException("recv package size " + bytes + " != " + header.length); } if (header[PROTO_HEADER_CMD_INDEX] != expect_cmd) { throw new IOException("recv cmd: " + header[PROTO_HEADER_CMD_INDEX] + " is not correct, expect cmd: " + expect_cmd); } if (header[PROTO_HEADER_STATUS_INDEX] != 0) { return new RecvHeaderInfo(header[PROTO_HEADER_STATUS_INDEX], 0); } pkg_len = ProtoCommon.buff2long(header, 0); if (pkg_len < 0) { throw new IOException("recv body length: " + pkg_len + " < 0!"); } if (expect_body_len >= 0 && pkg_len != expect_body_len) { throw new IOException("recv body length: " + pkg_len + " is not correct, expect length: " + expect_body_len); } return new RecvHeaderInfo((byte)0, pkg_len); } /** * receive whole pack * @param in input stream * @param expect_cmd expect response command * @param expect_body_len expect response package body length * @return RecvPackageInfo: errno and reponse body(byte buff) */ public static RecvPackageInfo recvPackage(InputStream in, byte expect_cmd, long expect_body_len) throws IOException { RecvHeaderInfo header = recvHeader(in, expect_cmd, expect_body_len); if (header.errno != 0) { return new RecvPackageInfo(header.errno, null); } byte[] body = new byte[(int)header.body_len]; int totalBytes = 0; int remainBytes = (int)header.body_len; int bytes; while (totalBytes < header.body_len) { if ((bytes=in.read(body, totalBytes, remainBytes)) < 0) { break; } totalBytes += bytes; remainBytes -= bytes; } if (totalBytes != header.body_len) { throw new IOException("recv package size " + totalBytes + " != " + header.body_len); } return new RecvPackageInfo((byte)0, body); } /** * split metadata to name value pair array * @param meta_buff metadata * @return name value pair array */ public static NameValuePair[] split_metadata(String meta_buff) { return split_metadata(meta_buff, FDFS_RECORD_SEPERATOR, FDFS_FIELD_SEPERATOR); } /** * split metadata to name value pair array * @param meta_buff metadata * @param recordSeperator record/row seperator * @param filedSeperator field/column seperator * @return name value pair array */ public static NameValuePair[] split_metadata(String meta_buff, String recordSeperator, String filedSeperator) { String[] rows; String[] cols; NameValuePair[] meta_list; rows = meta_buff.split(recordSeperator); meta_list = new NameValuePair[rows.length]; for (int i=0; i<rows.length; i++) { cols = rows[i].split(filedSeperator, 2); meta_list[i] = new NameValuePair(cols[0]); if (cols.length == 2) { meta_list[i].setValue(cols[1]); } } return meta_list; } /** * pack metadata array to string * @param meta_list metadata array * @return packed metadata */ public static String pack_metadata(NameValuePair[] meta_list) { if (meta_list.length == 0) { return ""; } StringBuffer sb = new StringBuffer(32 * meta_list.length); sb.append(meta_list[0].getName()).append(FDFS_FIELD_SEPERATOR).append(meta_list[0].getValue()); for (int i=1; i<meta_list.length; i++) { sb.append(FDFS_RECORD_SEPERATOR); sb.append(meta_list[i].getName()).append(FDFS_FIELD_SEPERATOR).append(meta_list[i].getValue()); } return sb.toString(); } /** * send quit command to server and close socket * @param sock the Socket object */ public static void closeSocket(Socket sock) throws IOException { byte[] header; header = packHeader(FDFS_PROTO_CMD_QUIT, 0, (byte)0); sock.getOutputStream().write(header); sock.close(); } /** * send ACTIVE_TEST command to server, test if network is ok and the server is alive * @param sock the Socket object */ public static boolean activeTest(Socket sock) throws IOException { byte[] header; header = packHeader(FDFS_PROTO_CMD_ACTIVE_TEST, 0, (byte)0); sock.getOutputStream().write(header); RecvHeaderInfo headerInfo = recvHeader(sock.getInputStream(), TRACKER_PROTO_CMD_RESP, 0); return headerInfo.errno == 0 ? true : false; } /** * long convert to buff (big-endian) * @param n long number * @return 8 bytes buff */ public static byte[] long2buff(long n) { byte[] bs; bs = new byte[8]; bs[0] = (byte)((n >> 56) & 0xFF); bs[1] = (byte)((n >> 48) & 0xFF); bs[2] = (byte)((n >> 40) & 0xFF); bs[3] = (byte)((n >> 32) & 0xFF); bs[4] = (byte)((n >> 24) & 0xFF); bs[5] = (byte)((n >> 16) & 0xFF); bs[6] = (byte)((n >> 8) & 0xFF); bs[7] = (byte)(n & 0xFF); return bs; } /** * buff convert to long * @param bs the buffer (big-endian) * @param offset the start position based 0 * @return long number */ public static long buff2long(byte[] bs, int offset) { return (((long)(bs[offset] >= 0 ? bs[offset] : 256+bs[offset])) << 56) | (((long)(bs[offset+1] >= 0 ? bs[offset+1] : 256+bs[offset+1])) << 48) | (((long)(bs[offset+2] >= 0 ? bs[offset+2] : 256+bs[offset+2])) << 40) | (((long)(bs[offset+3] >= 0 ? bs[offset+3] : 256+bs[offset+3])) << 32) | (((long)(bs[offset+4] >= 0 ? bs[offset+4] : 256+bs[offset+4])) << 24) | (((long)(bs[offset+5] >= 0 ? bs[offset+5] : 256+bs[offset+5])) << 16) | (((long)(bs[offset+6] >= 0 ? bs[offset+6] : 256+bs[offset+6])) << 8) | ((long)(bs[offset+7] >= 0 ? bs[offset+7] : 256+bs[offset+7])); } /** * buff convert to int * @param bs the buffer (big-endian) * @param offset the start position based 0 * @return int number */ public static int buff2int(byte[] bs, int offset) { return (((int)(bs[offset] >= 0 ? bs[offset] : 256+bs[offset])) << 24) | (((int)(bs[offset+1] >= 0 ? bs[offset+1] : 256+bs[offset+1])) << 16) | (((int)(bs[offset+2] >= 0 ? bs[offset+2] : 256+bs[offset+2])) << 8) | ((int)(bs[offset+3] >= 0 ? bs[offset+3] : 256+bs[offset+3])); } /** * buff convert to ip address * @param bs the buffer (big-endian) * @param offset the start position based 0 * @return ip address */ public static String getIpAddress(byte[] bs, int offset) { if (bs[0] == 0 || bs[3] == 0) //storage server ID { return ""; } int n; StringBuilder sbResult = new StringBuilder(16); for (int i=offset; i<offset+4; i++) { n = (bs[i] >= 0) ? bs[i] : 256 + bs[i]; if (sbResult.length() > 0) { sbResult.append("."); } sbResult.append(String.valueOf(n)); } return sbResult.toString(); } /** * md5 function * @param source the input buffer * @return md5 string */ public static String md5(byte[] source) throws NoSuchAlgorithmException { char hexDigits[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; java.security.MessageDigest md = java.security.MessageDigest.getInstance("MD5"); md.update(source); byte tmp[] = md.digest(); char str[] = new char[32]; int k = 0; for (int i = 0; i < 16; i++) { str[k++] = hexDigits[tmp[i] >>> 4 & 0xf]; str[k++] = hexDigits[tmp[i] & 0xf]; } return new String(str); } /** * get token for file URL * @param remote_filename the filename return by FastDFS server * @param ts unix timestamp, unit: second * @param secret_key the secret key * @return token string */ public static String getToken(String remote_filename, int ts, String secret_key) throws UnsupportedEncodingException, NoSuchAlgorithmException, MyException { byte[] bsFilename = remote_filename.getBytes(ClientGlobal.g_charset); byte[] bsKey = secret_key.getBytes(ClientGlobal.g_charset); byte[] bsTimestamp = (new Integer(ts)).toString().getBytes(ClientGlobal.g_charset); byte[] buff = new byte[bsFilename.length + bsKey.length + bsTimestamp.length]; System.arraycopy(bsFilename, 0, buff, 0, bsFilename.length); System.arraycopy(bsKey, 0, buff, bsFilename.length, bsKey.length); System.arraycopy(bsTimestamp, 0, buff, bsFilename.length + bsKey.length, bsTimestamp.length); return md5(buff); } /** * generate slave filename * @param master_filename the master filename to generate the slave filename * @param prefix_name the prefix name to generate the slave filename * @param ext_name the extension name of slave filename, null for same as the master extension name * @return slave filename string */ public static String genSlaveFilename(String master_filename, String prefix_name, String ext_name) throws MyException { String true_ext_name; int dotIndex; if (master_filename.length() < 28 + FDFS_FILE_EXT_NAME_MAX_LEN) { throw new MyException("master filename \"" + master_filename + "\" is invalid"); } dotIndex = master_filename.indexOf('.', master_filename.length() - (FDFS_FILE_EXT_NAME_MAX_LEN + 1)); if (ext_name != null) { if (ext_name.length() == 0) { true_ext_name = ""; } else if (ext_name.charAt(0) == '.') { true_ext_name = ext_name; } else { true_ext_name = "." + ext_name; } } else { if (dotIndex < 0) { true_ext_name = ""; } else { true_ext_name = master_filename.substring(dotIndex); } } if (true_ext_name.length() == 0 && prefix_name.equals("-m")) { throw new MyException("prefix_name \"" + prefix_name + "\" is invalid"); } if (dotIndex < 0) { return master_filename + prefix_name + true_ext_name; } else { return master_filename.substring(0, dotIndex) + prefix_name + true_ext_name; } } }
差不多就这几个程序了。接下来分析一下。
TestClient中首先就调用ClientGlobal中的static方法init对程序运行时传进来的参数conf_filename文件进行了处理,把里面的一些相关参数进行的初始化,其中最重要的就是g_tracker_group这个东西的初始化,它是通过调用TrackerGroup的init函数完成的,这一下之后,g_tracker_group和TrackerGroup中的三个变量便都有值了。接下来TestClient程序构造TrackerClient对象,大家一步步找过去会发现这个对象就是咱之前的g_tracker_group,也就是说现在TrackerClient对象中已经保存了配置文件中的跟tracker_server相关的列表了。接下来,通过TrackerClient对象的getConnection操作去生成一个TrackerServer对象,最终源头在TrackerGroup中的getConnection函数。而这一步说到底是通过TrackerClient来调用的,而tracker_group已经在咱之前生成TrackerClient对象的时候初始化完毕,所以说还是对那配置文件里面的那个tracker_server列表进行处理,TrackerGroup中的getConnection函数部分有个synchronized(同步),而tracker_server_index初始化是0,所以最终返回给TestClient的是一个可用的且尚未被使用的TrackerServer对象。接下来是生成一个StorageClient对象,然后是group_name = null;
StorageServer[] storageServers = tracker.getStoreStorages(trackerServer, group_name);这一步要做的事情是,把和TrackerClient能建立起连接的StorageServer对象保存在一个数组里存起来。到此为止,TrackerServer跟StorageServer的对应关系已经建立起来了,TestClient中剩下的部分就是通过StorageClient对象client去upload_file,delete_file,download_file等等的测试。但是之前返回来的StorageServer的数组对象有什么用我还没有看出来,我也是刚开始看客户端源码,如果有对文中描述不清楚的地方欢迎留言交流。
我自己接下来要做的事情:
1.把(ip,port)这个二元组与TrackerServer的对应关系先保存下来。(客户端代码中原来是track连过去,因为有进程锁,所以取一个当前可用的trackerServer对象,那么我猜想自己需要做的部分是在ClientGlobal中的init这个地方。
2.trackerServer的getStoreStorages操作结束之后,会返回当前可用且尚未被使用的StorageServer对象的数组。我要做的事情是把storageServer跟(ip,port)这个二元组的对应关系先找出来。
我为什么要做这些事情呢,是因为想到每次去连接,都会去找一次那些storageserver是可用的,保存起来之后通过ip地址+port映射过去,那么我就知道当前可用的storageserver,然后就可以进行上传下载等操作了。
最后想反应一下,关于FastDFS的相关资料实在是太少了,昨天想找一个关于协议的东西,官网上喊了好久都没人理。这篇东西是我在中午吃饭之前赶出来的,可能会比较粗糙,请见谅。欢迎大家留言交流。
----------------------------------------------------------
完成了那两个部分,现在来贴上代码,其中有三个类做过改动,改动的部分我用//做了标记。
ClientGlobal.java:其中init部分有添加
/** * Copyright (C) 2008 Happy Fish / YuQing * * FastDFS Java Client may be copied only under the terms of the GNU Lesser * General Public License (LGPL). * Please visit the FastDFS Home Page http://www.csource.org/ for more detail. **/ package org.csource.fastdfs; import java.net.*; import java.io.*; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import org.csource.common.*; /** * Global variables * @author Happy Fish / YuQing * @version Version 1.11 */ public class ClientGlobal { public static int g_connect_timeout; //millisecond public static int g_network_timeout; //millisecond public static String g_charset; public static int g_tracker_http_port; public static boolean g_anti_steal_token; //if anti-steal token public static String g_secret_key; //generage token secret key public static TrackerGroup g_tracker_group; public static final int DEFAULT_CONNECT_TIMEOUT = 5; //second public static final int DEFAULT_NETWORK_TIMEOUT = 30; //second /////////////////////////////////////////////////////// public static Map<String,ArrayList<TrackerServer> > mp; public static Map<String,ArrayList<StorageServer> >mp2; ////////////////////////////////////////////////////// private ClientGlobal() { } /** * load global variables * @param conf_filename config filename */ public static void init(String conf_filename) throws FileNotFoundException, IOException, MyException { IniFileReader iniReader; String[] szTrackerServers; String[] parts; iniReader = new IniFileReader(conf_filename); g_connect_timeout = iniReader.getIntValue("connect_timeout", DEFAULT_CONNECT_TIMEOUT); if (g_connect_timeout < 0) { g_connect_timeout = DEFAULT_CONNECT_TIMEOUT; } g_connect_timeout *= 1000; //millisecond g_network_timeout = iniReader.getIntValue("network_timeout", DEFAULT_NETWORK_TIMEOUT); if (g_network_timeout < 0) { g_network_timeout = DEFAULT_NETWORK_TIMEOUT; } g_network_timeout *= 1000; //millisecond g_charset = iniReader.getStrValue("charset"); if (g_charset == null || g_charset.length() == 0) { g_charset = "ISO8859-1"; } szTrackerServers = iniReader.getValues("tracker_server"); if (szTrackerServers == null) { throw new MyException("item \"tracker_server\" in " + conf_filename + " not found"); } InetSocketAddress[] tracker_servers = new InetSocketAddress[szTrackerServers.length]; for (int i=0; i<szTrackerServers.length; i++) { parts = szTrackerServers[i].split("\\:", 2); if (parts.length != 2) { throw new MyException("the value of item \"tracker_server\" is invalid, the correct format is host:port"); } tracker_servers[i] = new InetSocketAddress(parts[0].trim(), Integer.parseInt(parts[1].trim())); } g_tracker_group = new TrackerGroup(tracker_servers); g_tracker_http_port = iniReader.getIntValue("http.tracker_http_port", 80); g_anti_steal_token = iniReader.getBoolValue("http.anti_steal_token", false); if (g_anti_steal_token) { g_secret_key = iniReader.getStrValue("http.secret_key"); } /////////////////////////////////////////////////////// mp=new HashMap<String,ArrayList<TrackerServer> >(); mp.clear(); for(int i=0;i<tracker_servers.length;i++) { Socket sock = new Socket(); sock.setReuseAddress(true); sock.setSoTimeout(ClientGlobal.g_network_timeout); sock.connect(tracker_servers[i], ClientGlobal.g_connect_timeout); String tmp=tracker_servers[i].getAddress().toString(); TrackerServer trackerserver=new TrackerServer(sock, tracker_servers[i]); if(mp.get(tmp)==null) { ArrayList<TrackerServer> al=new ArrayList<TrackerServer>(); al.add(trackerserver); mp.put(tmp, al); } else { ArrayList<TrackerServer> al=mp.get(tmp); al.add(trackerserver); mp.put(tmp, al); } } ////////////////////////////////////////////////////// } /** * construct Socket object * @param ip_addr ip address or hostname * @param port port number * @return connected Socket object */ public static Socket getSocket(String ip_addr, int port) throws IOException { Socket sock = new Socket(); sock.setSoTimeout(ClientGlobal.g_network_timeout); sock.connect(new InetSocketAddress(ip_addr, port), ClientGlobal.g_connect_timeout); return sock; } /** * construct Socket object * @param addr InetSocketAddress object, including ip address and port * @return connected Socket object */ public static Socket getSocket(InetSocketAddress addr) throws IOException { Socket sock = new Socket(); sock.setSoTimeout(ClientGlobal.g_network_timeout); sock.connect(addr, ClientGlobal.g_connect_timeout); return sock; } public static int getG_connect_timeout() { return g_connect_timeout; } public static void setG_connect_timeout(int connect_timeout) { ClientGlobal.g_connect_timeout = connect_timeout; } public static int getG_network_timeout() { return g_network_timeout; } public static void setG_network_timeout(int network_timeout) { ClientGlobal.g_network_timeout = network_timeout; } public static String getG_charset() { return g_charset; } public static void setG_charset(String charset) { ClientGlobal.g_charset = charset; } public static int getG_tracker_http_port() { return g_tracker_http_port; } public static void setG_tracker_http_port(int tracker_http_port) { ClientGlobal.g_tracker_http_port = tracker_http_port; } public static boolean getG_anti_steal_token() { return g_anti_steal_token; } public static boolean isG_anti_steal_token() { return g_anti_steal_token; } public static void setG_anti_steal_token(boolean anti_steal_token) { ClientGlobal.g_anti_steal_token = anti_steal_token; } public static String getG_secret_key() { return g_secret_key; } public static void setG_secret_key(String secret_key) { ClientGlobal.g_secret_key = secret_key; } public static TrackerGroup getG_tracker_group() { return g_tracker_group; } public static void setG_tracker_group(TrackerGroup tracker_group) { ClientGlobal.g_tracker_group = tracker_group; } }
TrackerClient.java:其中我把原来代码中的getStoreStorages函数复制了一份,也可以不复制,反正返回的数组只是为了测试用,并无实际用途。
/** * Copyright (C) 2008 Happy Fish / YuQing * * FastDFS Java Client may be copied only under the terms of the GNU Lesser * General Public License (LGPL). * Please visit the FastDFS Home Page http://www.csource.org/ for more detail. */ package org.csource.fastdfs; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.net.InetSocketAddress; import java.net.Socket; /** * Tracker client * @author Happy Fish / YuQing * @version Version 1.19 */ public class TrackerClient { protected TrackerGroup tracker_group; protected byte errno; /** * constructor with global tracker group */ public TrackerClient() { this.tracker_group = ClientGlobal.g_tracker_group; } /** * constructor with specified tracker group * @param tracker_group the tracker group object */ public TrackerClient(TrackerGroup tracker_group) { this.tracker_group = tracker_group; } /** * get the error code of last call * @return the error code of last call */ public byte getErrorCode() { return this.errno; } /** * get a connection to tracker server * @return tracker server Socket object, return null if fail */ public TrackerServer getConnection() throws IOException { return this.tracker_group.getConnection(); } /** * query storage server to upload file * @param trackerServer the tracker server * @return storage server Socket object, return null if fail */ public StorageServer getStoreStorage(TrackerServer trackerServer) throws IOException { final String groupName = null; return this.getStoreStorage(trackerServer, groupName); } /** * query storage server to upload file * @param trackerServer the tracker server * @param groupName the group name to upload file to, can be empty * @return storage server object, return null if fail */ public StorageServer getStoreStorage(TrackerServer trackerServer, String groupName) throws IOException { byte[] header; String ip_addr; int port; byte cmd; int out_len; boolean bNewConnection; byte store_path; Socket trackerSocket; if (trackerServer == null) { trackerServer = getConnection(); if (trackerServer == null) { return null; } bNewConnection = true; } else { bNewConnection = false; } trackerSocket = trackerServer.getSocket(); OutputStream out = trackerSocket.getOutputStream(); try { if (groupName == null || groupName.length() == 0) { cmd = ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE; out_len = 0; } else { cmd = ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE; out_len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } header = ProtoCommon.packHeader(cmd, out_len, (byte)0); out.write(header); if (groupName != null && groupName.length() > 0) { byte[] bGroupName; byte[] bs; int group_len; bs = groupName.getBytes(ClientGlobal.g_charset); bGroupName = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; if (bs.length <= ProtoCommon.FDFS_GROUP_NAME_MAX_LEN) { group_len = bs.length; } else { group_len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } Arrays.fill(bGroupName, (byte)0); System.arraycopy(bs, 0, bGroupName, 0, group_len); out.write(bGroupName); } ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(trackerSocket.getInputStream(), ProtoCommon.TRACKER_PROTO_CMD_RESP, ProtoCommon.TRACKER_QUERY_STORAGE_STORE_BODY_LEN); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } ip_addr = new String(pkgInfo.body, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN, ProtoCommon.FDFS_IPADDR_SIZE-1).trim(); port = (int)ProtoCommon.buff2long(pkgInfo.body, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN + ProtoCommon.FDFS_IPADDR_SIZE-1); store_path = pkgInfo.body[ProtoCommon.TRACKER_QUERY_STORAGE_STORE_BODY_LEN - 1]; return new StorageServer(ip_addr, port, store_path); } catch(IOException ex) { if (!bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } throw ex; } finally { if (bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } } /** * query storage servers to upload file * @param trackerServer the tracker server * @param groupName the group name to upload file to, can be empty * @return storage servers, return null if fail */ public StorageServer[] getStoreStorages1(TrackerServer trackerServer, String groupName) throws IOException { byte[] header; String ip_addr; int port; byte cmd; int out_len; boolean bNewConnection; Socket trackerSocket; if (trackerServer == null) { trackerServer = getConnection(); if (trackerServer == null) { return null; } bNewConnection = true; } else { bNewConnection = false; } trackerSocket = trackerServer.getSocket(); OutputStream out = trackerSocket.getOutputStream(); try { if (groupName == null || groupName.length() == 0) { cmd = ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ALL; out_len = 0; } else { cmd = ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ALL; out_len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } header = ProtoCommon.packHeader(cmd, out_len, (byte)0); out.write(header); if (groupName != null && groupName.length() > 0) { byte[] bGroupName; byte[] bs; int group_len; bs = groupName.getBytes(ClientGlobal.g_charset); bGroupName = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; if (bs.length <= ProtoCommon.FDFS_GROUP_NAME_MAX_LEN) { group_len = bs.length; } else { group_len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } Arrays.fill(bGroupName, (byte)0); System.arraycopy(bs, 0, bGroupName, 0, group_len); out.write(bGroupName); } ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(trackerSocket.getInputStream(), ProtoCommon.TRACKER_PROTO_CMD_RESP, -1); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } if (pkgInfo.body.length < ProtoCommon.TRACKER_QUERY_STORAGE_STORE_BODY_LEN) { this.errno = ProtoCommon.ERR_NO_EINVAL; return null; } int ipPortLen = pkgInfo.body.length - (ProtoCommon.FDFS_GROUP_NAME_MAX_LEN + 1); final int recordLength = ProtoCommon.FDFS_IPADDR_SIZE - 1 + ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE; if (ipPortLen % recordLength != 0) { this.errno = ProtoCommon.ERR_NO_EINVAL; return null; } int serverCount = ipPortLen / recordLength; if (serverCount > 16) { this.errno = ProtoCommon.ERR_NO_ENOSPC; return null; } StorageServer[] results = new StorageServer[serverCount]; byte store_path = pkgInfo.body[pkgInfo.body.length - 1]; int offset = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; for (int i=0; i<serverCount; i++) { ip_addr = new String(pkgInfo.body, offset, ProtoCommon.FDFS_IPADDR_SIZE - 1).trim(); offset += ProtoCommon.FDFS_IPADDR_SIZE - 1; port = (int)ProtoCommon.buff2long(pkgInfo.body, offset); offset += ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE; results[i] = new StorageServer(ip_addr, port, store_path); } return results; } catch(IOException ex) { if (!bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } throw ex; } finally { if (bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } } //////////////////////////////////////////////////////////////////////////////////////////////// public void getStoreStorages2(TrackerServer trackerServer, String groupName) throws IOException { byte[] header; String ip_addr; int port; byte cmd; int out_len; boolean bNewConnection; Socket trackerSocket; if (trackerServer == null) { trackerServer = getConnection(); if (trackerServer == null) { return ; } bNewConnection = true; } else { bNewConnection = false; } trackerSocket = trackerServer.getSocket(); OutputStream out = trackerSocket.getOutputStream(); try { if (groupName == null || groupName.length() == 0) { cmd = ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ALL; out_len = 0; } else { cmd = ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ALL; out_len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } header = ProtoCommon.packHeader(cmd, out_len, (byte)0); out.write(header); if (groupName != null && groupName.length() > 0) { byte[] bGroupName; byte[] bs; int group_len; bs = groupName.getBytes(ClientGlobal.g_charset); bGroupName = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; if (bs.length <= ProtoCommon.FDFS_GROUP_NAME_MAX_LEN) { group_len = bs.length; } else { group_len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } Arrays.fill(bGroupName, (byte)0); System.arraycopy(bs, 0, bGroupName, 0, group_len); out.write(bGroupName); } ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(trackerSocket.getInputStream(), ProtoCommon.TRACKER_PROTO_CMD_RESP, -1); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return ; } if (pkgInfo.body.length < ProtoCommon.TRACKER_QUERY_STORAGE_STORE_BODY_LEN) { this.errno = ProtoCommon.ERR_NO_EINVAL; return ; } int ipPortLen = pkgInfo.body.length - (ProtoCommon.FDFS_GROUP_NAME_MAX_LEN + 1); final int recordLength = ProtoCommon.FDFS_IPADDR_SIZE - 1 + ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE; if (ipPortLen % recordLength != 0) { this.errno = ProtoCommon.ERR_NO_EINVAL; return ; } int serverCount = ipPortLen / recordLength; if (serverCount > 16) { this.errno = ProtoCommon.ERR_NO_ENOSPC; return ; } StorageServer[] results = new StorageServer[serverCount]; byte store_path = pkgInfo.body[pkgInfo.body.length - 1]; int offset = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; for (int i=0; i<serverCount; i++) { ip_addr = new String(pkgInfo.body, offset, ProtoCommon.FDFS_IPADDR_SIZE - 1).trim(); offset += ProtoCommon.FDFS_IPADDR_SIZE - 1; port = (int)ProtoCommon.buff2long(pkgInfo.body, offset); offset += ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE; /////////////////////////////////////////////////////////////////// ClientGlobal.mp2=new HashMap<String,ArrayList<StorageServer> >(); ClientGlobal.mp2.clear(); String s=(new InetSocketAddress(ip_addr,port)).getAddress().toString(); StorageServer storageserver=new StorageServer(ip_addr, port, store_path); if(ClientGlobal.mp2.get(s)==null) { ArrayList<StorageServer> al=new ArrayList<StorageServer>(); al.add(storageserver); ClientGlobal.mp2.put(s, al); } else { ArrayList<StorageServer> al=ClientGlobal.mp2.get(s); al.add(storageserver); ClientGlobal.mp2.put(s, al); } //////////////////////////////////////////////////////////////////// } return ; } catch(IOException ex) { if (!bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } throw ex; } finally { if (bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } } ////////////////////////////////////////////////////////////////////////// /** * query storage server to download file * @param trackerServer the tracker server * @param groupName the group name of storage server * @param filename filename on storage server * @return storage server Socket object, return null if fail */ public StorageServer getFetchStorage(TrackerServer trackerServer, String groupName, String filename) throws IOException { ServerInfo[] servers = this.getStorages(trackerServer, ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE, groupName, filename); if (servers == null) { return null; } else { return new StorageServer(servers[0].getIpAddr(), servers[0].getPort(), 0); } } /** * query storage server to update file (delete file or set meta data) * @param trackerServer the tracker server * @param groupName the group name of storage server * @param filename filename on storage server * @return storage server Socket object, return null if fail */ public StorageServer getUpdateStorage(TrackerServer trackerServer, String groupName, String filename) throws IOException { ServerInfo[] servers = this.getStorages(trackerServer, ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE, groupName, filename); if (servers == null) { return null; } else { return new StorageServer(servers[0].getIpAddr(), servers[0].getPort(), 0); } } /** * get storage servers to download file * @param trackerServer the tracker server * @param groupName the group name of storage server * @param filename filename on storage server * @return storage servers, return null if fail */ public ServerInfo[] getFetchStorages(TrackerServer trackerServer, String groupName, String filename) throws IOException { return this.getStorages(trackerServer, ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ALL, groupName, filename); } /** * query storage server to download file * @param trackerServer the tracker server * @param cmd command code, ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE or ProtoCommon.TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE * @param groupName the group name of storage server * @param filename filename on storage server * @return storage server Socket object, return null if fail */ protected ServerInfo[] getStorages(TrackerServer trackerServer, byte cmd, String groupName, String filename) throws IOException { byte[] header; byte[] bFileName; byte[] bGroupName; byte[] bs; int len; String ip_addr; int port; boolean bNewConnection; Socket trackerSocket; if (trackerServer == null) { trackerServer = getConnection(); if (trackerServer == null) { return null; } bNewConnection = true; } else { bNewConnection = false; } trackerSocket = trackerServer.getSocket(); OutputStream out = trackerSocket.getOutputStream(); try { bs = groupName.getBytes(ClientGlobal.g_charset); bGroupName = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; bFileName = filename.getBytes(ClientGlobal.g_charset); if (bs.length <= ProtoCommon.FDFS_GROUP_NAME_MAX_LEN) { len = bs.length; } else { len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } Arrays.fill(bGroupName, (byte)0); System.arraycopy(bs, 0, bGroupName, 0, len); header = ProtoCommon.packHeader(cmd, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN + bFileName.length, (byte)0); byte[] wholePkg = new byte[header.length + bGroupName.length + bFileName.length]; System.arraycopy(header, 0, wholePkg, 0, header.length); System.arraycopy(bGroupName, 0, wholePkg, header.length, bGroupName.length); System.arraycopy(bFileName, 0, wholePkg, header.length + bGroupName.length, bFileName.length); out.write(wholePkg); ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(trackerSocket.getInputStream(), ProtoCommon.TRACKER_PROTO_CMD_RESP, -1); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } if (pkgInfo.body.length < ProtoCommon.TRACKER_QUERY_STORAGE_FETCH_BODY_LEN) { throw new IOException("Invalid body length: " + pkgInfo.body.length); } if ((pkgInfo.body.length - ProtoCommon.TRACKER_QUERY_STORAGE_FETCH_BODY_LEN) % (ProtoCommon.FDFS_IPADDR_SIZE - 1) != 0) { throw new IOException("Invalid body length: " + pkgInfo.body.length); } int server_count = 1 + (pkgInfo.body.length - ProtoCommon.TRACKER_QUERY_STORAGE_FETCH_BODY_LEN) / (ProtoCommon.FDFS_IPADDR_SIZE - 1); ip_addr = new String(pkgInfo.body, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN, ProtoCommon.FDFS_IPADDR_SIZE-1).trim(); int offset = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN + ProtoCommon.FDFS_IPADDR_SIZE - 1; port = (int)ProtoCommon.buff2long(pkgInfo.body, offset); offset += ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE; ServerInfo[] servers = new ServerInfo[server_count]; servers[0] = new ServerInfo(ip_addr, port); for (int i=1; i<server_count; i++) { servers[i] = new ServerInfo(new String(pkgInfo.body, offset, ProtoCommon.FDFS_IPADDR_SIZE-1).trim(), port); offset += ProtoCommon.FDFS_IPADDR_SIZE - 1; } return servers; } catch(IOException ex) { if (!bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } throw ex; } finally { if (bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } } /** * query storage server to download file * @param trackerServer the tracker server * @param file_id the file id(including group name and filename) * @return storage server Socket object, return null if fail */ public StorageServer getFetchStorage1(TrackerServer trackerServer, String file_id) throws IOException { String[] parts = new String[2]; this.errno = StorageClient1.split_file_id(file_id, parts); if (this.errno != 0) { return null; } return this.getFetchStorage(trackerServer, parts[0], parts[1]); } /** * get storage servers to download file * @param trackerServer the tracker server * @param file_id the file id(including group name and filename) * @return storage servers, return null if fail */ public ServerInfo[] getFetchStorages1(TrackerServer trackerServer, String file_id) throws IOException { String[] parts = new String[2]; this.errno = StorageClient1.split_file_id(file_id, parts); if (this.errno != 0) { return null; } return this.getFetchStorages(trackerServer, parts[0], parts[1]); } /** * list groups * @param trackerServer the tracker server * @return group stat array, return null if fail */ public StructGroupStat[] listGroups(TrackerServer trackerServer) throws IOException { byte[] header; String ip_addr; int port; byte cmd; int out_len; boolean bNewConnection; byte store_path; Socket trackerSocket; if (trackerServer == null) { trackerServer = getConnection(); if (trackerServer == null) { return null; } bNewConnection = true; } else { bNewConnection = false; } trackerSocket = trackerServer.getSocket(); OutputStream out = trackerSocket.getOutputStream(); try { header = ProtoCommon.packHeader(ProtoCommon.TRACKER_PROTO_CMD_SERVER_LIST_GROUP, 0, (byte)0); out.write(header); ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(trackerSocket.getInputStream(), ProtoCommon.TRACKER_PROTO_CMD_RESP, -1); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } ProtoStructDecoder<StructGroupStat> decoder = new ProtoStructDecoder<StructGroupStat>(); return decoder.decode(pkgInfo.body, StructGroupStat.class, StructGroupStat.getFieldsTotalSize()); } catch(IOException ex) { if (!bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } throw ex; } catch(Exception ex) { ex.printStackTrace(); this.errno = ProtoCommon.ERR_NO_EINVAL; return null; } finally { if (bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } } /** * query storage server stat info of the group * @param trackerServer the tracker server * @param groupName the group name of storage server * @return storage server stat array, return null if fail */ public StructStorageStat[] listStorages(TrackerServer trackerServer, String groupName) throws IOException { final String storageIpAddr = null; return this.listStorages(trackerServer, groupName, storageIpAddr); } /** * query storage server stat info of the group * @param trackerServer the tracker server * @param groupName the group name of storage server * @param storageIpAddr the storage server ip address, can be null or empty * @return storage server stat array, return null if fail */ public StructStorageStat[] listStorages(TrackerServer trackerServer, String groupName, String storageIpAddr) throws IOException { byte[] header; byte[] bGroupName; byte[] bs; int len; boolean bNewConnection; Socket trackerSocket; if (trackerServer == null) { trackerServer = getConnection(); if (trackerServer == null) { return null; } bNewConnection = true; } else { bNewConnection = false; } trackerSocket = trackerServer.getSocket(); OutputStream out = trackerSocket.getOutputStream(); try { bs = groupName.getBytes(ClientGlobal.g_charset); bGroupName = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; if (bs.length <= ProtoCommon.FDFS_GROUP_NAME_MAX_LEN) { len = bs.length; } else { len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } Arrays.fill(bGroupName, (byte)0); System.arraycopy(bs, 0, bGroupName, 0, len); int ipAddrLen; byte[] bIpAddr; if (storageIpAddr != null && storageIpAddr.length() > 0) { bIpAddr = storageIpAddr.getBytes(ClientGlobal.g_charset); if (bIpAddr.length < ProtoCommon.FDFS_IPADDR_SIZE) { ipAddrLen = bIpAddr.length; } else { ipAddrLen = ProtoCommon.FDFS_IPADDR_SIZE - 1; } } else { bIpAddr = null; ipAddrLen = 0; } header = ProtoCommon.packHeader(ProtoCommon.TRACKER_PROTO_CMD_SERVER_LIST_STORAGE, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN + ipAddrLen, (byte)0); byte[] wholePkg = new byte[header.length + bGroupName.length + ipAddrLen]; System.arraycopy(header, 0, wholePkg, 0, header.length); System.arraycopy(bGroupName, 0, wholePkg, header.length, bGroupName.length); if (ipAddrLen > 0) { System.arraycopy(bIpAddr, 0, wholePkg, header.length + bGroupName.length, ipAddrLen); } out.write(wholePkg); ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(trackerSocket.getInputStream(), ProtoCommon.TRACKER_PROTO_CMD_RESP, -1); this.errno = pkgInfo.errno; if (pkgInfo.errno != 0) { return null; } ProtoStructDecoder<StructStorageStat> decoder = new ProtoStructDecoder<StructStorageStat>(); return decoder.decode(pkgInfo.body, StructStorageStat.class, StructStorageStat.getFieldsTotalSize()); } catch(IOException ex) { if (!bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } throw ex; } catch(Exception ex) { ex.printStackTrace(); this.errno = ProtoCommon.ERR_NO_EINVAL; return null; } finally { if (bNewConnection) { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } } /** * delete a storage server from the tracker server * @param trackerServer the connected tracker server * @param groupName the group name of storage server * @param storageIpAddr the storage server ip address * @return true for success, false for fail */ private boolean deleteStorage(TrackerServer trackerServer, String groupName, String storageIpAddr) throws IOException { byte[] header; byte[] bGroupName; byte[] bs; int len; Socket trackerSocket; trackerSocket = trackerServer.getSocket(); OutputStream out = trackerSocket.getOutputStream(); bs = groupName.getBytes(ClientGlobal.g_charset); bGroupName = new byte[ProtoCommon.FDFS_GROUP_NAME_MAX_LEN]; if (bs.length <= ProtoCommon.FDFS_GROUP_NAME_MAX_LEN) { len = bs.length; } else { len = ProtoCommon.FDFS_GROUP_NAME_MAX_LEN; } Arrays.fill(bGroupName, (byte)0); System.arraycopy(bs, 0, bGroupName, 0, len); int ipAddrLen; byte[] bIpAddr = storageIpAddr.getBytes(ClientGlobal.g_charset); if (bIpAddr.length < ProtoCommon.FDFS_IPADDR_SIZE) { ipAddrLen = bIpAddr.length; } else { ipAddrLen = ProtoCommon.FDFS_IPADDR_SIZE - 1; } header = ProtoCommon.packHeader(ProtoCommon.TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN + ipAddrLen, (byte)0); byte[] wholePkg = new byte[header.length + bGroupName.length + ipAddrLen]; System.arraycopy(header, 0, wholePkg, 0, header.length); System.arraycopy(bGroupName, 0, wholePkg, header.length, bGroupName.length); System.arraycopy(bIpAddr, 0, wholePkg, header.length + bGroupName.length, ipAddrLen); out.write(wholePkg); ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(trackerSocket.getInputStream(), ProtoCommon.TRACKER_PROTO_CMD_RESP, 0); this.errno = pkgInfo.errno; return pkgInfo.errno == 0; } /** * delete a storage server from the global FastDFS cluster * @param groupName the group name of storage server * @param storageIpAddr the storage server ip address * @return true for success, false for fail */ public boolean deleteStorage(String groupName, String storageIpAddr) throws IOException { return this.deleteStorage(ClientGlobal.g_tracker_group, groupName, storageIpAddr); } /** * delete a storage server from the FastDFS cluster * @param trackerGroup the tracker server group * @param groupName the group name of storage server * @param storageIpAddr the storage server ip address * @return true for success, false for fail */ public boolean deleteStorage(TrackerGroup trackerGroup, String groupName, String storageIpAddr) throws IOException { int serverIndex; int notFoundCount; TrackerServer trackerServer; notFoundCount = 0; for (serverIndex=0; serverIndex<trackerGroup.tracker_servers.length; serverIndex++) { try { trackerServer = trackerGroup.getConnection(serverIndex); } catch(IOException ex) { ex.printStackTrace(System.err); this.errno = ProtoCommon.ECONNREFUSED; return false; } try { StructStorageStat[] storageStats = listStorages(trackerServer, groupName, storageIpAddr); if (storageStats == null) { if (this.errno == ProtoCommon.ERR_NO_ENOENT) { notFoundCount++; } else { return false; } } else if (storageStats.length == 0) { notFoundCount++; } else if (storageStats[0].getStatus() == ProtoCommon.FDFS_STORAGE_STATUS_ONLINE || storageStats[0].getStatus() == ProtoCommon.FDFS_STORAGE_STATUS_ACTIVE) { this.errno = ProtoCommon.ERR_NO_EBUSY; return false; } } finally { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } if (notFoundCount == trackerGroup.tracker_servers.length) { this.errno = ProtoCommon.ERR_NO_ENOENT; return false; } notFoundCount = 0; for (serverIndex=0; serverIndex<trackerGroup.tracker_servers.length; serverIndex++) { try { trackerServer = trackerGroup.getConnection(serverIndex); } catch(IOException ex) { System.err.println("connect to server " + trackerGroup.tracker_servers[serverIndex].getAddress().getHostAddress() + ":" + trackerGroup.tracker_servers[serverIndex].getPort() + " fail"); ex.printStackTrace(System.err); this.errno = ProtoCommon.ECONNREFUSED; return false; } try { if (!this.deleteStorage(trackerServer, groupName, storageIpAddr)) { if (this.errno != 0) { if (this.errno == ProtoCommon.ERR_NO_ENOENT) { notFoundCount++; } else if (this.errno != ProtoCommon.ERR_NO_EALREADY) { return false; } } } } finally { try { trackerServer.close(); } catch(IOException ex1) { ex1.printStackTrace(); } } } if (notFoundCount == trackerGroup.tracker_servers.length) { this.errno = ProtoCommon.ERR_NO_ENOENT; return false; } if (this.errno == ProtoCommon.ERR_NO_ENOENT) { this.errno = 0; } return this.errno == 0; } }
ForReady.java:这个我自己新建的一个类,也在org.csource.fastdfs包下。其中前半部分是自己为了测试加进去的,后半部分与org.csource.fastdfs.test包下的TestClient.java的后半部分差不多。
package org.csource.fastdfs; import java.io.File; import java.net.InetSocketAddress; import java.util.Iterator; import java.util.Set; import org.csource.common.NameValuePair; import org.csource.fastdfs.test.DownloadFileWriter; import org.csource.fastdfs.test.UploadLocalFileSender; public class ForReady { public static void main(String args[]) { if (args.length < 2) { System.out.println("Error: Must have 2 parameters, one is config filename, " + "the other is the local filename to upload"); return; } System.out.println("java.version=" + System.getProperty("java.version")); String conf_filename = args[0]; String local_filename = args[1]; // String str="/home/niuguoqin/workspace/FastDFS_Test/src/fdfs_client.conf"; try { ClientGlobal.init(conf_filename); // ClientGlobal.init(str); System.out.println("network_timeout=" + ClientGlobal.g_network_timeout + "ms"); System.out.println("charset=" + ClientGlobal.g_charset); long startTime; String group_name; String remote_filename; ServerInfo[] servers; TrackerClient tracker = new TrackerClient(); Set<String> ss=ClientGlobal.mp.keySet(); Iterator<String> it=ss.iterator(); while(it.hasNext()) { String s=it.next(); // System.out.println(s); // System.out.println(ClientGlobal.mp.get(s).size()); Iterator<TrackerServer> i=ClientGlobal.mp.get(s).iterator(); while(i.hasNext()) { TrackerServer trackerServer=i.next(); // trackerServer.sock.connect(trackerServer.getInetSocketAddress(), ClientGlobal.g_connect_timeout); group_name = null; tracker.getStoreStorages2(trackerServer, group_name); // System.out.println(i.next()); } } /////////test Set<String> s1=ClientGlobal.mp.keySet(); Iterator<String> it1=ss.iterator(); while(it1.hasNext()) { String s=it1.next(); System.out.println(s); Iterator<TrackerServer> i1=ClientGlobal.mp.get(s).iterator(); while(i1.hasNext()) { TrackerServer trackerserver=i1.next(); // System.out.println(trackerserver.toString()); ////////////////////////////////////////////////////////////////////// String ip=trackerserver.getInetSocketAddress().getAddress().toString(); int port=trackerserver.getInetSocketAddress().getPort(); System.out.println("The ip of the trackerServer is " + ip); System.out.println("The port of the trackerServer is " + port); ////////////////////////////////////////////////////////////////////// Iterator<StorageServer> i2=ClientGlobal.mp2.get(s).iterator(); while(i2.hasNext()) { StorageServer storageserver=i2.next(); System.out.println(storageserver.toString()); StorageClient client = new StorageClient(trackerserver, storageserver);////////////////// /////////////////upload byte[] file_buff; NameValuePair[] meta_list; String[] results; String master_filename; String prefix_name; String file_ext_name; String generated_slave_filename; int errno; meta_list = new NameValuePair[4]; meta_list[0] = new NameValuePair("width", "800"); meta_list[1] = new NameValuePair("heigth", "600"); meta_list[2] = new NameValuePair("bgcolor", "#FFFFFF"); meta_list[3] = new NameValuePair("author", "Mike"); file_buff = "this is a test".getBytes(ClientGlobal.g_charset); System.out.println("file length: " + file_buff.length); startTime = System.currentTimeMillis(); results = client.upload_file(file_buff, "txt", meta_list); System.out.println("upload_file time used: " + (System.currentTimeMillis() - startTime) + " ms"); if (results == null) { System.err.println("upload file fail, error code: " + client.getErrorCode()); return; } else { group_name = results[0]; remote_filename = results[1]; System.err.println("group_name: " + group_name + ", remote_filename: " + remote_filename); System.err.println(client.get_file_info(group_name, remote_filename)); //////////////////////////////////////////////////////////////// ip=storageserver.getInetSocketAddress().getAddress().toString(); port=storageserver.getInetSocketAddress().getPort(); System.out.println("The ip of the storageServer is " + ip); System.out.println("The port of the storageServer is " + port); //////////////////////////////////////////////////////////////// meta_list = new NameValuePair[4]; meta_list[0] = new NameValuePair("width", "1024"); meta_list[1] = new NameValuePair("heigth", "768"); meta_list[2] = new NameValuePair("bgcolor", "#000000"); meta_list[3] = new NameValuePair("title", "Untitle"); startTime = System.currentTimeMillis(); errno=client.set_metadata(group_name, remote_filename, meta_list, ProtoCommon.STORAGE_SET_METADATA_FLAG_MERGE); System.out.println("set_metadata time used: " + (System.currentTimeMillis() - startTime) + " ms"); if (errno == 0) { System.err.println("set_metadata success"); } else { System.err.println("set_metadata fail, error no: " + errno); } meta_list = client.get_metadata(group_name, remote_filename); if (meta_list != null) { for (int i=0; i<meta_list.length; i++) { System.out.println(meta_list[i].getName() + " " + meta_list[i].getValue()); } } //Thread.sleep(30000); startTime = System.currentTimeMillis(); file_buff = client.download_file(group_name, remote_filename); System.out.println("download_file time used: " + (System.currentTimeMillis() - startTime) + " ms"); if (file_buff != null) { System.out.println("file length:" + file_buff.length); System.out.println((new String(file_buff))); } file_buff = "this is a slave buff".getBytes(ClientGlobal.g_charset); master_filename = remote_filename; prefix_name = "-part1"; file_ext_name = "txt"; startTime = System.currentTimeMillis(); results = client.upload_file(group_name, master_filename, prefix_name, file_buff, file_ext_name, meta_list); System.out.println("upload_file time used: " + (System.currentTimeMillis() - startTime) + " ms"); if (results != null) { System.err.println("slave file group_name: " + results[0] + ", remote_filename: " + results[1]); generated_slave_filename = ProtoCommon.genSlaveFilename(master_filename, prefix_name, file_ext_name); if (!generated_slave_filename.equals(results[1])) { System.err.println("generated slave file: " + generated_slave_filename + "\n != returned slave file: " + results[1]); } System.err.println(client.get_file_info(results[0], results[1])); } startTime = System.currentTimeMillis(); errno = client.delete_file(group_name, remote_filename); System.out.println("delete_file time used: " + (System.currentTimeMillis() - startTime) + " ms"); if (errno == 0) { System.err.println("Delete file success"); } else { System.err.println("Delete file fail, error no: " + errno); } } results = client.upload_file(local_filename, null, meta_list); if (results != null) { String file_id; int ts; String token; String file_url; InetSocketAddress inetSockAddr; group_name = results[0]; remote_filename = results[1]; file_id = group_name + StorageClient1.SPLIT_GROUP_NAME_AND_FILENAME_SEPERATOR + remote_filename; inetSockAddr = trackerserver.getInetSocketAddress(); file_url = "http://" + inetSockAddr.getAddress().getHostAddress(); if (ClientGlobal.g_tracker_http_port != 80) { file_url += ":" + ClientGlobal.g_tracker_http_port; } file_url += "/" + file_id; if (ClientGlobal.g_anti_steal_token) { ts = (int)(System.currentTimeMillis() / 1000); token = ProtoCommon.getToken(file_id, ts, ClientGlobal.g_secret_key); file_url += "?token=" + token + "&ts=" + ts; } System.err.println("group_name: " + group_name + ", remote_filename: " + remote_filename); System.err.println(client.get_file_info(group_name, remote_filename)); System.err.println("file url: " + file_url); errno = client.download_file(group_name, remote_filename, 0, 0, "c:\\" + remote_filename.replaceAll("/", "_")); if (errno == 0) { System.err.println("Download file success"); } else { System.err.println("Download file fail, error no: " + errno); } errno = client.download_file(group_name, remote_filename, 0, 0, new DownloadFileWriter("c:\\" + remote_filename.replaceAll("/", "-"))); if (errno == 0) { System.err.println("Download file success"); } else { System.err.println("Download file fail, error no: " + errno); } master_filename = remote_filename; prefix_name = "-part2"; file_ext_name = null; startTime = System.currentTimeMillis(); results = client.upload_file(group_name, master_filename, prefix_name, local_filename, null, meta_list); System.out.println("upload_file time used: " + (System.currentTimeMillis() - startTime) + " ms"); if (results != null) { System.err.println("slave file group_name: " + results[0] + ", remote_filename: " + results[1]); generated_slave_filename = ProtoCommon.genSlaveFilename(master_filename, prefix_name, file_ext_name); if (!generated_slave_filename.equals(results[1])) { System.err.println("generated slave file: " + generated_slave_filename + "\n != returned slave file: " + results[1]); } System.err.println(client.get_file_info(results[0], results[1])); } } File f; f = new File(local_filename); int nPos = local_filename.lastIndexOf('.'); if (nPos > 0 && local_filename.length() - nPos <= ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN + 1) { file_ext_name = local_filename.substring(nPos+1); } else { file_ext_name = null; } results = client.upload_file(null, f.length(), new UploadLocalFileSender(local_filename), file_ext_name, meta_list); if (results != null) { group_name = results[0]; remote_filename = results[1]; System.out.println("group name: " + group_name + ", remote filename: " + remote_filename); System.out.println(client.get_file_info(group_name, remote_filename)); master_filename = remote_filename; prefix_name = "-part3"; startTime = System.currentTimeMillis(); results = client.upload_file(group_name, master_filename, prefix_name, f.length(), new UploadLocalFileSender(local_filename), file_ext_name, meta_list); System.out.println("upload_file time used: " + (System.currentTimeMillis() - startTime) + " ms"); if (results != null) { System.err.println("slave file group_name: " + results[0] + ", remote_filename: " + results[1]); generated_slave_filename = ProtoCommon.genSlaveFilename(master_filename, prefix_name, file_ext_name); if (!generated_slave_filename.equals(results[1])) { System.err.println("generated slave file: " + generated_slave_filename + "\n != returned slave file: " + results[1]); } System.err.println(client.get_file_info(results[0], results[1])); } } else { System.err.println("Upload file fail, error no: " + errno); } //////////////////////////////////////////////////////////////// ip=storageserver.getInetSocketAddress().getAddress().toString(); port=storageserver.getInetSocketAddress().getPort(); System.out.println("The ip of the storageServer is " + ip); System.out.println("The port of the storageServer is " + port); //////////////////////////////////////////////////////////////// /* for test only */ System.out.println("active test to storage server: " + ProtoCommon.activeTest(storageserver.getSocket())); storageserver.close(); /* for test only */ System.out.println("active test to tracker server: " + ProtoCommon.activeTest(trackerserver.getSocket())); trackerserver.close(); } } } } catch(Exception ex) { ex.printStackTrace(); } } }
测试的时候配置文件也需要再新写几个,因为我只是在自己机器上测试,所以只是改变了其中的tracker_port和监听端口而已。
很希望大家留言交流,自己学习太孤单了。