练习1 编写Java程序实现以下函数:
1.向HDFS中上传文件
2.从HDFS下载文件到本地
3.显示文件目录
4.移动文件
5.新建文件夹
6.移除文件夹
package cn.itcast.hadoop.hdfs; import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException; import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Before;
import org.junit.Test; public class temp { static FileSystem fs = null;
/*
* initiation
*/
@Before
public void init() throws IOException{
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://zpfbuaa:9000/");
fs = FileSystem.get(configuration);
}
/*
* upload files
*/
@Test
public void upload() throws IOException{
init(); Path dstPath = new Path("hdfs://zpfbuaa:9000/aa/my.jar"); FSDataOutputStream os = fs.create(dstPath); FileInputStream is = new FileInputStream("/home/hadoop/download/my.jar"); IOUtils.copy(is, os);
}
/*
* upload files to HDFS
*/
@Test
public void upload2() throws IOException{
fs.copyFromLocalFile(new Path("/home/hadoop/download/my.jar"), new Path("hdfs://zpfbuaa:9000/aaa/bbb/ccc/my3.jar"));
}
/*
* download files to local
*/
public void download(){ }
/*
* list the information of files
*/
@Test
public void listfile() throws FileNotFoundException, IllegalArgumentException, IOException{ RemoteIterator<LocatedFileStatus> filesIterator = fs.listFiles(new Path("/"), true); while(filesIterator.hasNext()){
LocatedFileStatus fileStatus = filesIterator.next();
Path path = fileStatus.getPath();
String filename = path.getName();
System.out.println(filename);
} System.out.println("---------------------------------------------"); FileStatus[] listStatus = fs.listStatus(new Path("/"));
for(FileStatus status : listStatus){
String name = status.getPath().getName();
System.out.println(name + (status.isDirectory()?" is a dir":" is a file"));
}
}
/*
* make a new file
*/
@Test
public void makdir() throws IllegalArgumentException, IOException{
fs.mkdirs(new Path("/aaa/bbb/ccc")); }
/*
* delete a old file
*/ public void rm() throws IllegalArgumentException, IOException{
fs.delete(new Path("/aaa/bbb"), true);
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://zpfbuaa:9000/");
fs = FileSystem.get(configuration);
FSDataInputStream is = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz")); FileOutputStream os = new FileOutputStream("/home/hadoop/download/my.jar"); IOUtils.copy(is,os);
} }
练习2 编写Java程序实现客户端和服务器端的socket信息交互以及函数调用
LoginServiceImpl.class 服务器实例类
package cn.itcast.hadoop.rpc; public class LoginServiceImpl implements LoginServiceInterface{ @Override
public String Login(String username, String password) { return username + " logged in successfully!";
} }
package cn.itcast.hadoop.rpc; public interface LoginServiceInterface { public static final long versionID = 1L; public String Login(String username,String password);
}
package cn.itcast.hadoop.rpc; import java.io.IOException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.RPC.Builder; public class starter { public static void main(String[] args) throws HadoopIllegalArgumentException, IOException { Builder builder = new RPC.Builder(new Configuration()); builder.setBindAddress("zpfbuaa").setPort(10000).setProtocol(LoginServiceInterface.class).setInstance(new LoginServiceImpl()); Server server = builder.build(); } }
LoginController登录类
package cn.itcast.hadoop.rpc; import java.io.IOException;
import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC; public class LoginController { public static void main(String[] args) throws IOException { LoginServiceInterface proxy = RPC.getProxy(LoginServiceInterface.class, 1L, new InetSocketAddress("zpfbuaa", 10000), new Configuration()); String result = proxy.Login("zpfbuaa", "123456789"); System.out.println(result);
}
}
LoginServiceInterface 接口类
package cn.itcast.hadoop.rpc; public interface LoginServiceInterface { public static final long versionID = 1L; public String Login(String username,String password);
}
需要注意的是:
1.为了进行远程调用的模仿,将LoginServiceImpl.class以及LoginServiceInterface.class接口类和 starter.class类放在虚拟机上。本地放LoginController类以及LoginServiceInterface接口类。
2.首先需要将服务器端的服务启动,上述例子会监听虚拟机的10000端口。
3.容易忽略的地方:版本号versionID. 对于不同的版本拥有不同的版本号。在上述例子中简单的均定义版本号为Long类型 并且为final类型 赋值为1L。
4.jar包的导入以及版本的控制。
5.本地以及服务器端的函数都要实现一样的接口类,但是为了防止调用时版本的不对应,所以在Build实例的时候需要将版本号也就是versionID声明清楚,这样调用的时候可以通过版本号的不同将函数进行区别开。
Hadoop自身的远程调用实现机制RPC主要步骤如下:
1.将本地socket以及接口类封装为一个proxy,生成动态本地代理实例。
2.该实例调用相对应的函数并且传入相应的参数。
3.本地socket得到动态代理调用的函数以及传入的参数。
4.使用网络传输协议实现本地socket与远程服务器的socket进行连接,实现信息传递。
5.服务器端socket得到调用的函数以及传入的参数,生成动态服务器端的代理实例。
6.该服务器端实例调用服务器端的函数,并且传入得到的参数。
7.函数调用结果返回给服务器端socket。
8.服务器端socket将返回结果通过网络传输协议传递给本地socket。
9.本地socket将返回结果传递给本地动态代理proxy。
RPC的优点:
1.实现了controller和implement的分离
2.利用RPC机制可以实现信息的有效传递。
3.保障数据的可靠性(DataNode需要定时向NameNode传递自身保存的blocks信息,以便NameNode进行blocks的维护)。
远程调用的底层实现机制: 实现RPC机制:查看FileSystem fs = FileSystem.get(new Configuration());
一步一步查看fs的生成过程!加入断点后,逐步进行查看!