thirift是一个支持跨种语言的远程调用框架,通过thrift远程调用框架,结合hadoop1.x中的thriftfs,编写了一个针对hadoop2.x的thriftfs,供外部程序调用。
1.准备工作
1.1 编译boost
boost下载地址:http://120.52.72.39/jaist.dl.sourceforge.net/c3pr90ntcsf0/project/boost/boost/1.60.0/boost_1_60_0.tar.gz
解压boost,windows运行bootstrap.bat,然后点击生成的b2.exe。
若有多个vs编译环境,选择特定环境编译时,选择该vs的命令提示工具(开始-》vs-》工具-》命令行xxx),cd boost解压目录,./b2.exe.
1.2 编译thrift
thrift下载地址:http://mirrors.cnnic.cn/apache/thrift/0.9.3/thrift-0.9.3.tar.gz
解压后,进入lib\cpp文件夹,选择thrift.sln(需要vs2010以上的工具打开),选择libthrift,设置boost头文件引用路径。
在编译过程中,删除不需要的.h和.cpp文件,增加需要的文件(视使用环境而定)。
1.3编译hadoopfs.thrift文件
下载编译好的thrift程序:http://www.apache.org/dyn/closer.cgi?path=/thrift/0.9.3/thrift-0.9.3.exe
修改hadoop1.x中的hadoopfs.thrift文件
hadoopfs.thrift内容:
#!/usr/local/bin/thrift -java
#
# Thrift Service exported by Hadoop File System
# Dhruba Borthakur (dhruba@gmail.com)
# /**
* The available types in Thrift:
*
* bool Boolean, one byte
* byte Signed byte
* i16 Signed 16-bit integer
* i32 Signed 32-bit integer
* i64 Signed 64-bit integer
* double 64-bit floating point value
* string String
* binary Blob (byte array)
* map<t1,t2> Map from one type to another
* list<t1> Ordered list of one type
* set<t1> Set of unique elements of one type
*
*/ namespace java org.apache.hadoop.thriftfs.api
namespace php hadoopfs struct ThriftHandle {
: i64 id
} struct Pathname {
: string pathname
} struct FileStatus {
: string path,
: i64 length,
: bool isdir,
: i16 block_replication,
: i64 blocksize,
: i64 modification_time,
: string permission,
: string owner,
: string group
} struct BlockLocation {
: list<string> hosts, /* hostnames of datanodes */
: list<string> names, /* hostname:portNumber of datanodes */
: i64 offset, /* offset of the block in the file */
: i64 length /* length of data */
} exception MalformedInputException {
: string message
} exception ThriftIOException {
: string message
} service ThriftHadoopFileSystem
{ // set inactivity timeout period. The period is specified in seconds.
// if there are no RPC calls to the HadoopThrift server for this much
// time, then the server kills itself.
void setInactivityTimeoutPeriod(:i64 periodInSeconds), // close session
void shutdown(:i32 status), // create a file and open it for writing
ThriftHandle create(:Pathname path) throws (:ThriftIOException ouch), // create a file and open it for writing
ThriftHandle createFile(:Pathname path, :i16 mode,
:bool overwrite, :i32 bufferSize,
:i16 block_replication, :i64 blocksize)
throws (:ThriftIOException ouch), // returns a handle to an existing file for reading
ThriftHandle open(:Pathname path) throws (:ThriftIOException ouch), // returns a handle to an existing file for appending to it.
ThriftHandle append(:Pathname path) throws (:ThriftIOException ouch), // write a string to the open handle for the file
bool write(1:ThriftHandle handle, 2:binary data) throws (1:ThriftIOException ouch), // read some bytes from the open handle for the file
binary read(1:ThriftHandle handle, 2:i64 offset, 3:i32 size) throws (1:ThriftIOException ouch), // close file
bool close(:ThriftHandle out) throws (:ThriftIOException ouch), // delete file(s) or directory(s)
bool rm(:Pathname path, :bool recursive) throws (:ThriftIOException ouch), // rename file(s) or directory(s)
bool rename(:Pathname path, :Pathname dest) throws (:ThriftIOException ouch), // create directory
bool mkdirs(:Pathname path) throws (:ThriftIOException ouch), // Does this pathname exist?
bool exists(:Pathname path) throws (:ThriftIOException ouch), // Returns status about the path
FileStatus stat(:Pathname path) throws (:ThriftIOException ouch), // If the path is a directory, then returns the list of pathnames in that directory
list<FileStatus> listStatus(:Pathname path) throws (:ThriftIOException ouch), // Set permission for this file
void chmod(:Pathname path, :i16 mode) throws (:ThriftIOException ouch), // set the owner and group of the file.
void chown(:Pathname path, :string owner, :string group) throws (:ThriftIOException ouch), // set the replication factor for all blocks of the specified file
void setReplication(:Pathname path, :i16 replication) throws (:ThriftIOException ouch), // get the locations of the blocks of this file
list<BlockLocation> getFileBlockLocations(:Pathname path, :i64 start, :i64 length) throws (:ThriftIOException ouch),
}
编译 cpp,java文件
cmd进入thrift.exe文件夹下,复制hadoopfs.thrift到相同目录,分别运行
thrift -gen java hadoopfs.thrift
thrift -gen cpp hadoopfs.thrift
生成gen-cpp, gen-java文件夹,里面为生成的程序文件
2.编写hdfs服务端程序(java)
2.1 eclipse创建libthrift工程,复制thrift解压路径/lib/java/src下的代码到工程src目录下;复制1.3生成的gen-java中的代码到工程src目录下;
解压hadoop2.x(下载地址:http://mirrors.cnnic.cn/apache/hadoop/common/hadoop-2.6.3/hadoop-2.6.3.tar.gz);
eclipse添加lib引用的jar文件:进入hadoop2.x/share/hadoop/目录下,添加common, common/lib, hdfs, hdfs/lib文件夹下的所有jar到工程引用路径
修改hadoop1.x中提供的HadoopThriftServer代码为:
package org.apache.hadoop.thriftfs; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Random; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
// Include Generated code
import org.apache.hadoop.thriftfs.api.Pathname;
import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
import org.apache.hadoop.thriftfs.api.ThriftHandle;
import org.apache.hadoop.thriftfs.api.ThriftIOException;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.thrift.TException;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport; /**
* ThriftHadoopFileSystem
* A thrift wrapper around the Hadoop File System
*/
public class HadoopThriftServer extends ThriftHadoopFileSystem { static int serverPort = 0; // default port
TServer server = null; public static class HadoopThriftHandler implements ThriftHadoopFileSystem.Iface
{ public static final Log LOG = LogFactory.getLog("org.apache.hadoop.thrift"); // HDFS glue
Configuration conf;
FileSystem fs; // stucture that maps each Thrift object into an hadoop object
private long nextId = new Random().nextLong();
private HashMap<Long, Object> hadoopHash = new HashMap<Long, Object>();
private Daemon inactivityThread = null; // Detect inactive session
private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr
private static volatile long inactivityRecheckInterval = 60 * 1000;
private static volatile boolean fsRunning = true;
private static long now; // allow outsider to change the hadoopthrift path
public void setOption(String key, String val) {
} /**
* Current system time.
* @return current time in msec.
*/
static long now() {
return System.currentTimeMillis();
} /**
* getVersion
*
* @return current version of the interface.
*/
public String getVersion() {
return "0.1";
} /**
* shutdown
*
* cleanly closes everything and exit.
*/
@Override
public void shutdown(int status) {
LOG.info("HadoopThriftServer shutting down.");
try {
fs.close();
} catch (IOException e) {
LOG.warn("Unable to close file system");
}
Runtime.getRuntime().exit(status);
} /**
* Periodically checks to see if there is inactivity
*/
class InactivityMonitor implements Runnable {
@Override
public void run() {
while (fsRunning) {
try {
if (now() > now + inactivityPeriod) {
LOG.warn("HadoopThriftServer Inactivity period of " +
inactivityPeriod + " expired... Stopping Server.");
shutdown(-1);
}
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
try {
Thread.sleep(inactivityRecheckInterval);
} catch (InterruptedException ie) {
}
}
}
} /**
* HadoopThriftServer
*
* Constructor for the HadoopThriftServer glue with Thrift Class.
*
* @param name - the name of this handler
*/
public HadoopThriftHandler(String name) {
conf = new Configuration();
now = now();
try {
inactivityThread = new Daemon(new InactivityMonitor());
fs = FileSystem.get(conf);
} catch (IOException e) {
LOG.warn("Unable to open hadoop file system...");
Runtime.getRuntime().exit(-1);
}
} /**
* printStackTrace
*
* Helper function to print an exception stack trace to the log and not stderr
*
* @param e the exception
*
*/
static private void printStackTrace(Exception e) {
for(StackTraceElement s: e.getStackTrace()) {
LOG.error(s);
}
} /**
* Lookup a thrift object into a hadoop object
*/
private synchronized Object lookup(long id) {
return hadoopHash.get(new Long(id));
} /**
* Insert a thrift object into a hadoop object. Return its id.
*/
private synchronized long insert(Object o) {
nextId++;
hadoopHash.put(nextId, o);
return nextId;
} /**
* Delete a thrift object from the hadoop store.
*/
private synchronized Object remove(long id) {
return hadoopHash.remove(new Long(id));
} /**
* Implement the API exported by this thrift server
*/ /** Set inactivity timeout period. The period is specified in seconds.
* if there are no RPC calls to the HadoopThrift server for this much
* time, then the server kills itself.
*/
@Override
public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) {
inactivityPeriod = periodInSeconds * 1000; // in milli seconds
if (inactivityRecheckInterval > inactivityPeriod ) {
inactivityRecheckInterval = inactivityPeriod;
}
} /**
* Create a file and open it for writing
*/
@Override
public ThriftHandle create(Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("create: " + path);
FSDataOutputStream out = fs.create(new Path(path.pathname));
long id = insert(out);
ThriftHandle obj = new ThriftHandle(id);
HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
return obj;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* Create a file and open it for writing, delete file if it exists
*/
@Override
public ThriftHandle createFile(Pathname path,
short mode,
boolean overwrite,
int bufferSize,
short replication,
long blockSize) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("create: " + path +
" permission: " + mode +
" overwrite: " + overwrite +
" bufferSize: " + bufferSize +
" replication: " + replication +
" blockSize: " + blockSize);
FSDataOutputStream out = fs.create(new Path(path.pathname),
new FsPermission(mode),
overwrite,
bufferSize,
replication,
blockSize,
null); // progress
long id = insert(out);
ThriftHandle obj = new ThriftHandle(id);
HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
return obj;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* Opens an existing file and returns a handle to read it
*/
@Override
public ThriftHandle open(Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("open: " + path);
FSDataInputStream out = fs.open(new Path(path.pathname));
long id = insert(out);
ThriftHandle obj = new ThriftHandle(id);
HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id);
return obj;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* Opens an existing file to append to it.
*/
@Override
public ThriftHandle append(Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("append: " + path);
FSDataOutputStream out = fs.append(new Path(path.pathname));
long id = insert(out);
ThriftHandle obj = new ThriftHandle(id);
HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id);
return obj;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* write to a file
*/
@Override
public boolean write(ThriftHandle tout, ByteBuffer data)
throws ThriftIOException, TException {
try {
now = now();
HadoopThriftHandler.LOG.debug("write: " + tout.id);
FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
// Retrieve all bytes in the buffer
byte[] bytes = new byte[data.limit()];
// transfer bytes from this buffer into the given destination array
data.get(bytes);
out.write(bytes, 0, bytes.length);
data.clear();
HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
return true;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* read from a file
*/
@Override
public ByteBuffer read(ThriftHandle tout, long offset, int length)
throws ThriftIOException, TException {
try {
now = now();
HadoopThriftHandler.LOG.debug("read: " + tout.id +
" offset: " + offset +
" length: " + length);
FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
if (in.getPos() != offset) {
in.seek(offset);
}
byte[] tmp = new byte[length];
int numbytes = in.read(offset, tmp, 0, length);
HadoopThriftHandler.LOG.debug("read done: " + tout.id);
return ByteBuffer.wrap(tmp,0,numbytes);
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* Delete a file/directory
*/
@Override
public boolean rm(Pathname path, boolean recursive)
throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("rm: " + path +
" recursive: " + recursive);
boolean ret = fs.delete(new Path(path.pathname), recursive);
HadoopThriftHandler.LOG.debug("rm: " + path);
return ret;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* Move a file/directory
*/
@Override
public boolean rename(Pathname path, Pathname dest)
throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("rename: " + path +
" destination: " + dest);
boolean ret = fs.rename(new Path(path.pathname),
new Path(dest.pathname));
HadoopThriftHandler.LOG.debug("rename: " + path);
return ret;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* close file
*/
@Override
public boolean close(ThriftHandle tout) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("close: " + tout.id);
Object obj = remove(tout.id);
if (obj instanceof FSDataOutputStream) {
FSDataOutputStream out = (FSDataOutputStream)obj;
out.close();
} else if (obj instanceof FSDataInputStream) {
FSDataInputStream in = (FSDataInputStream)obj;
in.close();
} else {
throw new ThriftIOException("Unknown thrift handle.");
}
HadoopThriftHandler.LOG.debug("closed: " + tout.id);
return true;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* Create a directory
*/
@Override
public boolean mkdirs(Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("mkdirs: " + path);
boolean ret = fs.mkdirs(new Path(path.pathname));
HadoopThriftHandler.LOG.debug("mkdirs: " + path);
return ret;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* Does this pathname exist?
*/
@Override
public boolean exists(Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("exists: " + path);
boolean ret = fs.exists(new Path(path.pathname));
HadoopThriftHandler.LOG.debug("exists done: " + path);
return ret;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* Returns status about the specified pathname
*/
@Override
public org.apache.hadoop.thriftfs.api.FileStatus stat(
Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("stat: " + path);
org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus(
new Path(path.pathname));
HadoopThriftHandler.LOG.debug("stat done: " + path);
return new org.apache.hadoop.thriftfs.api.FileStatus(
stat.getPath().toString(),
stat.getLen(),
stat.isDir(),
stat.getReplication(),
stat.getBlockSize(),
stat.getModificationTime(),
stat.getPermission().toString(),
stat.getOwner(),
stat.getGroup());
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* If the specified pathname is a directory, then return the
* list of pathnames in this directory
*/
@Override
public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus(
Pathname path) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("listStatus: " + path); org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus(
new Path(path.pathname));
HadoopThriftHandler.LOG.debug("listStatus done: " + path);
org.apache.hadoop.thriftfs.api.FileStatus tmp;
List<org.apache.hadoop.thriftfs.api.FileStatus> value =
new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>(); for (int i = 0; i < stat.length; i++) {
tmp = new org.apache.hadoop.thriftfs.api.FileStatus(
stat[i].getPath().toString(),
stat[i].getLen(),
stat[i].isDir(),
stat[i].getReplication(),
stat[i].getBlockSize(),
stat[i].getModificationTime(),
stat[i].getPermission().toString(),
stat[i].getOwner(),
stat[i].getGroup());
value.add(tmp);
}
return value;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* Sets the permission of a pathname
*/
@Override
public void chmod(Pathname path, short mode) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("chmod: " + path +
" mode " + mode);
fs.setPermission(new Path(path.pathname), new FsPermission(mode));
HadoopThriftHandler.LOG.debug("chmod done: " + path);
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* Sets the owner & group of a pathname
*/
@Override
public void chown(Pathname path, String owner, String group)
throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("chown: " + path +
" owner: " + owner +
" group: " + group);
fs.setOwner(new Path(path.pathname), owner, group);
HadoopThriftHandler.LOG.debug("chown done: " + path);
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} /**
* Sets the replication factor of a file
*/
@Override
public void setReplication(Pathname path, short repl) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("setrepl: " + path +
" replication factor: " + repl);
fs.setReplication(new Path(path.pathname), repl);
HadoopThriftHandler.LOG.debug("setrepl done: " + path);
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
} } /**
* Returns the block locations of this file
*/
@Override
public List<org.apache.hadoop.thriftfs.api.BlockLocation>
getFileBlockLocations(Pathname path, long start, long length)
throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path); org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(
new Path(path.pathname)); org.apache.hadoop.fs.BlockLocation[] stat =
fs.getFileBlockLocations(status, start, length);
HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path); org.apache.hadoop.thriftfs.api.BlockLocation tmp;
List<org.apache.hadoop.thriftfs.api.BlockLocation> value =
new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>(); for (int i = 0; i < stat.length; i++) { // construct the list of hostnames from the array returned
// by HDFS
List<String> hosts = new LinkedList<String>();
String[] hostsHdfs = stat[i].getHosts();
for (int j = 0; j < hostsHdfs.length; j++) {
hosts.add(hostsHdfs[j]);
} // construct the list of host:port from the array returned
// by HDFS
List<String> names = new LinkedList<String>();
String[] namesHdfs = stat[i].getNames();
for (int j = 0; j < namesHdfs.length; j++) {
names.add(namesHdfs[j]);
}
tmp = new org.apache.hadoop.thriftfs.api.BlockLocation(
hosts, names, stat[i].getOffset(), stat[i].getLength());
value.add(tmp);
}
return value;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
} } // Bind to port. If the specified port is 0, then bind to random port.
private ServerSocket createServerSocket(int port) throws IOException {
try {
ServerSocket sock = new ServerSocket();
// Prevent 2MSL delay problem on server restarts
sock.setReuseAddress(true);
// Bind to listening port
if (port == 0) {
sock.bind(null);
serverPort = sock.getLocalPort();
} else {
sock.bind(new InetSocketAddress(port));
}
return sock;
} catch (IOException ioe) {
throw new IOException("Could not create ServerSocket on port " + port + "." +
ioe);
}
} /**
* Constrcts a server object
*/
public HadoopThriftServer(String [] args) { if (args.length > 0) {
serverPort = new Integer(args[0]);
}
try {
ServerSocket ssock = createServerSocket(serverPort);
TServerTransport serverTransport = new TServerSocket(ssock);
Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba");
ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler);
TThreadPoolServer.Args options = new TThreadPoolServer.Args(serverTransport);
options.minWorkerThreads(10);
options.processor(processor);
server = new TThreadPoolServer(options);
System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]...");
HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]...");
System.out.flush(); } catch (Exception x) {
x.printStackTrace();
}
} public static void main(String [] args) {
HadoopThriftServer me = new HadoopThriftServer(args);
me.server.serve();
}
};
红色部分为修改的内容;这里,我们使用thrift的binary类型传输文件!!!
参考资料:使用Thrift传输二进制数据遇到的问题
3.windows c++客户端
3.1创建工程,添加boost,thrift中的lib/cpp/src文件夹为头文件引用路径,将1.2生成的libthrift.lib添加到工程根目录(或添加到库目录引用路径)
3.2复制1.3生成的gen-cpp文件夹下的代码到工程根目录,添加到工程
3.3 编写thriftfsclient处理类:
HdfsClient.h
#pragma once
#include "hadoopfs_types.h"
#include "ThriftHadoopFileSystem.h"
#include <boost\shared_ptr.hpp>
#include <thrift\transport\TSocket.h>
#include <thrift\transport\TBufferTransports.h>
#include <thrift\protocol\TBinaryProtocol.h> using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using boost::shared_ptr; #define FILEOPEN_SUCCESS 0 class HdfsClient
{
private:
bool m_IsConn;
shared_ptr<TTransport> m_Socket;
shared_ptr<TBufferedTransport> m_Transport;
shared_ptr<TBinaryProtocol> m_Protocol;
shared_ptr<ThriftHadoopFileSystemClient> m_Client;
ThriftHandle m_Handler;
public:
HdfsClient(void);
~HdfsClient(void); bool connect(const std::string server,int port);
bool shutdown();
bool put(const std::string& localfile,const std::string& rem_path);
bool append(const std::string& localfile,const std::string& rem_path);
bool get(const std::string& rem_path,const std::string& localfile);
bool rm(const std::string& rem_path, const bool recursive=false);
bool mv(const std::string& src_path,const std::string& dst_path);
bool mkdirs(const std::string& rem_path);
bool exists(const std::string& rem_path);
void ls(std::vector<FileStatus> & result, const std::string& path);
void chmod(const std::string& path, const int16_t mode);
void chown(const std::string& path, const std::string& owner);
void setReplication(const std::string& path, const int16_t replication);
void getFileBlockLocations(std::vector<BlockLocation> & result, const std::string& path, const int64_t start, const int64_t length);
};
HdfsClient.cpp
#include "StdAfx.h"
#include "HdfsClient.h"
#include <stdio.h> HdfsClient::HdfsClient(void)
{
m_IsConn = false;
} HdfsClient::~HdfsClient(void)
{
if(m_IsConn)
shutdown();
} bool HdfsClient::connect(std::string server,int port)
{
m_Socket = shared_ptr<TTransport>(new TSocket(server,port));
m_Transport = shared_ptr<TBufferedTransport>(new TBufferedTransport(m_Socket));
m_Protocol = shared_ptr<TBinaryProtocol>(new TBinaryProtocol(m_Transport));
m_Client = shared_ptr<ThriftHadoopFileSystemClient>(new ThriftHadoopFileSystemClient(m_Protocol)); try
{
m_Transport->open();
// tell the HadoopThrift server to die after 60 minutes of inactivity
m_Client->setInactivityTimeoutPeriod();
m_IsConn = true;
}
catch (const ThriftIOException& ex)
{
printf("ERROR: %s",ex.message.c_str());
return false;
}
return true;
} bool HdfsClient::shutdown()
{
try
{
m_Transport->close();
m_IsConn = false;
}
catch (const ThriftIOException& ex)
{
printf("ERROR: %s",ex.message.c_str());
return false;
}
return true;
} bool HdfsClient::put(const std::string& localfile,const std::string& rem_path)
{
Pathname ptname;
ptname.pathname = rem_path;
m_Client->create(m_Handler,ptname);//Create the specified file. Returns a handle to write data. if(m_Handler.id == )//error
return false;
else
{
FILE* fp = fopen(localfile.c_str(),"rb");
if(GetLastError() != FILEOPEN_SUCCESS) return false; /*//A.read the whole file into content;
//把文件的位置指针移到文件尾
fseek(fp,0L,SEEK_END);
//获取文件长度;
long length=ftell(fp);
fseek(fp,0,SEEK_SET);//将文件指针设置到开始位置
char* buffer = new char[length];
//memset(buffer,'\0',length);
fread(buffer,sizeof(char),length,fp);
//write data to hdfs
std::string content;
content.append(buffer,buffer+length);//string
m_Client->write(m_Handler,content);*/ //B.分块读取,然后上传; 可以使用内存映射文件方式将文件读入内存,然后发送到hdfs
size_t bufferSize = << ;//1M
size_t readSize = ;
char* buffer = new char[bufferSize];
while(!feof(fp))
{
readSize = fread(buffer,sizeof(char),bufferSize,fp);
//write data to hdfs
std::string content;
content.append(buffer,buffer+readSize);//string
m_Client->write(m_Handler,content);
}
fclose(fp);
delete[] buffer;
return m_Client->close(m_Handler);
}
} bool HdfsClient::append(const std::string& localfile,const std::string& rem_path)
{
Pathname ptname;
ptname.pathname = rem_path;
m_Client->append(m_Handler,ptname); if(m_Handler.id == )//error
return false;
else
{
FILE* fp = fopen(localfile.c_str(),"rb");
if(GetLastError() != FILEOPEN_SUCCESS) return false; /*//A.read the whole file into content;
//把文件的位置指针移到文件尾
fseek(fp,0L,SEEK_END);
//获取文件长度;
long length=ftell(fp);
fseek(fp,0,SEEK_SET);//将文件指针设置到开始位置
char* buffer = new char[length];
//memset(buffer,'\0',length);
fread(buffer,sizeof(char),length,fp);
//write data to hdfs
std::string content;
content.append(buffer,buffer+length);//string
m_Client->write(m_Handler,content);*/ //B.分块读取,然后上传; 可以使用内存映射文件方式将文件读入内存,然后发送到hdfs
size_t bufferSize = << ;//1M
size_t readSize = ;
char* buffer = new char[bufferSize];
while(!feof(fp))
{
readSize = fread(buffer,sizeof(char),bufferSize,fp);
//write data to hdfs
std::string content;
content.append(buffer,buffer+readSize);//string
m_Client->write(m_Handler,content);
}
fclose(fp);
delete[] buffer;
return m_Client->close(m_Handler);
}
} bool HdfsClient::get(const std::string& rem_path,const std::string& localfile)
{
Pathname ptname;
ptname.__set_pathname(rem_path);
m_Client->open(m_Handler,ptname); if(m_Handler.id == )//error
return false;
else
{
FileStatus rfstat;
m_Client->stat(rfstat,ptname); int64_t offset = ;
int bufferSize = << ;//1M
std::string content;
int contentlen = ;
FILE* fp = fopen(localfile.c_str(),"wb+");
DWORD err_code = GetLastError();
if(err_code != FILEOPEN_SUCCESS) return false; while(offset < rfstat.length)
{
m_Client->read(content,m_Handler,offset,bufferSize);
contentlen = content.length();
if(contentlen > )
{
fwrite(content.c_str(),sizeof(char),contentlen,fp);//todo: can use multi thread to read and write
offset += contentlen;
}
else
break;
}
fclose(fp);
return m_Client->close(m_Handler);
}
} bool HdfsClient::rm(const std::string& rem_path, const bool recursive)
{
Pathname ptname;
ptname.pathname = rem_path;
return m_Client->rm(ptname,recursive);
} bool HdfsClient::mv(const std::string& src_path,const std::string& dst_path)
{
Pathname src_ptname,dst_ptname;
src_ptname.pathname = src_path;
dst_ptname.pathname = dst_path;
return m_Client->rename(src_ptname,dst_ptname);
} bool HdfsClient::mkdirs(const std::string& rem_path)
{
Pathname ptname;
ptname.pathname = rem_path;
return m_Client->mkdirs(ptname);
} bool HdfsClient::exists(const std::string& rem_path)
{
Pathname ptname;
ptname.pathname = rem_path;
return m_Client->exists(ptname);
} void HdfsClient::ls(std::vector<FileStatus> & result, const std::string& path)
{
Pathname ptname;
ptname.pathname = path;
m_Client->listStatus(result,ptname);
} void HdfsClient::chmod(const std::string& path, const int16_t mode)
{
Pathname ptname;
ptname.pathname = path;
m_Client->chmod(ptname,mode);
} void HdfsClient::chown(const std::string& path, const std::string& owner)
{
Pathname ptname;
ptname.pathname = path; FileStatus rfstat;
m_Client->stat(rfstat,ptname);
m_Client->chown(ptname,owner,rfstat.group);
} void HdfsClient::setReplication(const std::string& path, const int16_t replication)
{
Pathname ptname;
ptname.pathname = path;
m_Client->setReplication(ptname,replication);
} void HdfsClient::getFileBlockLocations(std::vector<BlockLocation> & result, const std::string& path, const int64_t start, const int64_t length)
{
Pathname ptname;
ptname.pathname = path; m_Client->getFileBlockLocations(result,ptname,start,length);
} int main()
{
std::string host = "192.168.0.111";
int port = ;
HdfsClient hdfs;
std::string local_file = ".\\hadoop1.1.2-thriftfs.rar";
std::string local_file2 = ".\\test.rar";
std::string rem_file = "hdfs://master:9000/test.txt";
std::string rem_dir = "hdfs://master:9000/";
hdfs.connect(host,port);
std::vector<FileStatus> result;
hdfs.put(local_file,rem_file);
//hdfs.append(local_file,rem_file);
//hdfs.rm(rem_file);
hdfs.ls(result,rem_dir);
for (std::vector<FileStatus>::const_iterator itr = result.begin();
itr != result.end(); itr++)
{
printf("%s\t%d\n",itr->path.c_str(),itr->length);
}
hdfs.get(rem_file,local_file2);
getchar();
return ;
}
4.测试
4.1安装配置hadoop2.x环境
(具体步骤参考网络)
4.2 编写开启服务端程序的脚本
首先将服务端的java代码编译成功后打包成jar文件(libthrift.jar),放在libthrift文件夹下。
然后复制hadoop安装目录/etc/hadoop/下的core-site.xml 和 hdfs-site.xml配置文件到脚本所在目录(访问hdfs时使用)(参考:http://blog.csdn.net/kkdelta/article/details/19908209)
start_thrift_server.sh脚本
#!/bin/sh CLASSPATH=
HADOOP_DIR=/usr/hadoop-2.6. # the hadoop common libraries
for f in $HADOOP_DIR/share/hadoop/common/*.jar ; do
CLASSPATH=$CLASSPATH:$f
done # the apache libraries
for f in $HADOOP_DIR/share/hadoop/common/lib/*.jar ; do
CLASSPATH=$CLASSPATH:$f
done # the hadoop hdfs libraries
for f in $HADOOP_DIR/share/hadoop/hdfs/*.jar ; do
CLASSPATH=$CLASSPATH:$f
done # the apache libraries
for f in $HADOOP_DIR/share/hadoop/hdfs/lib/*.jar ; do
CLASSPATH=$CLASSPATH:$f
done # the thrift libraries
for f in ./libthrift/*.jar ; do
CLASSPATH=$CLASSPATH:$f
done java -Dcom.sun.management.jmxremote -cp $CLASSPATH org.apache.hadoop.thriftfs.HadoopThriftServer $*
运行该脚本,记录程序数据的端口号,便于客户端使用。
测试c++客户端,测试上传、下载等操作是否正常。
windows通过thrift访问hdfs的更多相关文章
-
通过Thrift访问HDFS分布式文件系统的性能瓶颈分析
通过Thrift访问HDFS分布式文件系统的性能瓶颈分析 引言 Hadoop提供的HDFS布式文件存储系统,提供了基于thrift的客户端访问支持,但是因为Thrift自身的访问特点,在高并发的访问情 ...
-
使用C#通过Thrift访问HBase
前言 因为项目需要要为客户程序提供C#.Net的HBase访问接口,而HBase并没有提供原生的.Net客户端接口,可以通过启动HBase的Thrift服务来提供多语言支持. Thrift介绍 环境 ...
-
Hadoop(五)搭建Hadoop与Java访问HDFS集群
前言 上一篇详细介绍了HDFS集群,还有操作HDFS集群的一些命令,常用的命令: hdfs dfs -ls xxx hdfs dfs -mkdir -p /xxx/xxx hdfs dfs -cat ...
-
Hadoop(五)搭建Hadoop客户端与Java访问HDFS集群
阅读目录(Content) 一.Hadoop客户端配置 二.Java访问HDFS集群 2.1.HDFS的Java访问接口 2.2.Java访问HDFS主要编程步骤 2.3.使用FileSystem A ...
-
day07 eclipse使用本地 库文件 访问HDFS
常用命令 1. hdfs dfsadmin -report 查看系统的各台机器状态 HDFS的概念和特性 首先,它是一个文件系统,用于存储文件,通过统一的命名空间——目录树来定位文件 其次,它是分 ...
-
【转】Python 访问 HDFS
1.前言 hdfs , Hadoop Distributed File System.Hadoop的分布式文件系统,安全行和扩展性没得说. 访问HDFS的方式有以下几种: 命令行方式:FS Shell ...
-
外网无法访问hdfs文件系统
由于本地测试和服务器不在一个局域网,安装的hadoop配置文件是以内网ip作为机器间通信的ip. 在这种情况下,我们能够访问到namenode机器, namenode会给我们数据所在机器的ip地址供我 ...
-
Windows环境下访问NFS(33篇Storage的文章)
Windows环境下访问NFS 使用Solaris时,如果想在两台Solaris之间共享数据,那么你想到的最省事.最方便的方法肯定是nfs.但是现在的学生们的桌面,估计99%以上都是Windows,W ...
-
Linux启动kettle及linux和windows中kettle往hdfs中写数据(3)
在xmanager中的xshell运行进入图形化界面 sh spoon.sh 新建一个job
随机推荐
-
Redis/HBase/Tair比较
KV系统对比表 对比维度 Redis Redis Cluster Medis Hbase Tair 访问模式 支持Value大小 理论上不超过1GB(建议不超过1MB) 理论上可配置(默认配置1 ...
-
Unity学习疑问记录之触摸点坐标
Vector3 pos=Camera.main.ScreenToWorldPoint(Input.GetTouch(0).position); 类似的鼠标点击Camera.main.ScreenToW ...
-
css的学习
第一天. css 1.知道 内联 内部 外部 的优先权 2.css的语法 3.id 选择器 以及 类选择器 和属性选择器 4.对图片 长 宽 的编辑 调整图片 5.通过内部 对整个页面 文字 颜 ...
-
[MOSEK] Stupid things when using mosek
1.2016-8-14 我希望把一个qp问题的代码从conic constraints改为无外加约束,仅适用variable bounds的线性不等式约束 于是原来的约束代码为 if (r == MS ...
-
iOS高效开发之Xcode应用插件
前言:本文非原创 文章摘自 www.cocoachina.com/industry/20130918/7022.html 古人云“工欲善其事必先利其器”,打造一个强大的开发环境,是立即提升自身战 ...
-
DFS+模拟 ZOJ 3861 Valid Pattern Lock
题目传送门 /* 题意:手机划屏解锁,一笔连通所有数字,输出所有可能的路径: DFS:全排列 + ok () 判断函数,去除一些不可能连通的点:) */ #include <cstdio> ...
-
XManager介绍、安装、使用
简介 Xmanager是一款小巧.便捷的浏览远端X窗口系统的工具.在工作中经常使用Xmanager来登录远端的Linux系统,在X窗口系统上作图形化的操作.Xmanager可以将PC变成X Windo ...
-
转载:abstract的方法是否可同时是static,是否可同时是native,是否可同时是synchronized?
原文:http://blog.csdn.net/fhm727/article/details/5222965 1.abstract与static (what) abstract:用来声明抽象方法,抽象 ...
-
Expo大作战(二十五)--expo sdk api之Admob
简要:本系列文章讲会对expo进行全面的介绍,本人从2017年6月份接触expo以来,对expo的研究断断续续,一路走来将近10个月,废话不多说,接下来你看到内容,讲全部来与官网 我猜去全部机翻+个人 ...
-
JAVA自学日记——Part Ⅲ
终于来到了可视化窗口制作的部分了,从学习JAVA之前,到开始入手学习,一直到现在,都在盼望着有一天可以自己写出一款有界面而且是很美观的应用程序,今天算是一个真正开始的时间节点,值得纪念. 内容有很多, ...