1、项目介绍:
由于大数据部门涉及到其他部门将数据传到数据中心,大部分公司采用的方式是用json文件的方式传输,因此就需要编写服务端和客户端的小程序了。而我主要实现服务端的代码,也有相应的客户端的测试代码。这里须有一个需要提到的是,我在实现接收json文件的同时,而且还需将数据写到hbase中。写入到hbase当中采用的是批量插入的方式,即一次插入多条记录。
好了,有了前面的说明,下面来简单的说一下我实现的服务端的小程序把。
2、为了实现服务端能够监听客户端的行为,因此我在服务端采用多线程的技术来实现,并用socket的方式来实现网络通信。具体实现如下:
服务端的主程序:
package com.yiban.datacenter.finalversion; import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket; public class HbaseServer { public static void main(String[] args) {
// TODO Auto-generated method stub
backprocess();
} public static void backprocess(){
try {
ServerSocket ss=new ServerSocket(11111);
while(true){
Socket s=ss.accept(); Thread deal=new Thread(new DealUserThread(s));
deal.setDaemon(true); //这里设置对应线程是后台线程
deal.start();
} } catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
处理数据的线程类:
在这里我实现了接收数据,并将数据写入hbase中。
在实现这些大的目标的同时,也将客户端的请求通过日志文件的形式存到服务端的本地磁盘上,供后续查看。
package com.yiban.datacenter.finalversion; import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap; import net.sf.json.JSONArray;
import net.sf.json.JSONObject; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes; public class DealUserThread implements Runnable { private String testconnect = "username=chenpiao,password=123;username=liujiyu,password=123"; // 这个可以用来验证用户名和密码 private static Configuration conf = HBaseConfiguration.create(); private static Connection connection = null; private String logFile=null;
// 配置hbase的信息
static {
try {
conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.27.233");
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
connection = ConnectionFactory.createConnection(conf);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
} private Socket s; public DealUserThread(Socket s) {
this.s = s;
} private String userTableName = "nihao";
private String columnFamilyName = null;
private String rowKey = null; private BufferedReader serverread = null;
private BufferedWriter serverwrite = null; @Override
public void run() {
// TODO Auto-generated method stub
try {
// 将通道内的字节流转换成字符流,并用bufferedreader进行封装,InputStreamReader是将字节流转换成字符流
serverread = new BufferedReader(new InputStreamReader(
s.getInputStream())); // 询问客户端连接是否准备好,接受客户端的连接请求
String line = serverread.readLine(); // 阻塞
//System.out.println(line);// 输出客户端的连接请求 //为日志文件命名,并创建文件
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
logFile="/var/log/datacenter/"+"user.log";
//System.out.println(logFile);
File destFile = new File(logFile);
if (!destFile.exists()) {
destFile.createNewFile();
}
writeByFileWrite(logFile, line+"\n"+sdf.format(System.currentTimeMillis()));
// 将通道内的字符写入到对应的文件中,利用bufferedwrite进行封装,FileWriter是将字符流写入到文件中
serverwrite = new BufferedWriter(new OutputStreamWriter(
s.getOutputStream()));
String[] strArray = testconnect.split("\\;");
boolean flag = false;
for (String str : strArray) {
if (str.equals(line)) {
/*
* serverwrite.write("连接成功,你可以发送数据了,发送数据前,请先发送你要用的数据库表名!");
* serverwrite.newLine(); serverwrite.flush();
*/
printInfomationForClient("connection successful ,now you can send data,befor send data ,you must send tablename!");
flag = true;
break;
}
} if (!flag) {
/*
* serverwrite.write("密码或者用户名错误,连接失败!"); serverwrite.newLine();
* serverwrite.flush();
*/
printInfomationForClient("username or password is error! connection failed!");
s.close();
} // 准备接收表名
userTableName = serverread.readLine();
//System.out.println("tablename:" + userTableName);// 输出客户端的连接请求的表名 writeByFileWrite(logFile, "tablename="+userTableName);//将内容写到日志文件中 // 告诉客户端,我接受成功
if (TableIsExist(userTableName)) {
printInfomationForClient("received tablename successful!");
} else {
printInfomationForClient("tablename Is not exist ");
s.close();
} line = "[";
StringBuffer temp = new StringBuffer(line);
while ((line = serverread.readLine()) != null) {
temp.append(line);
}
temp.append("]");
//System.out.println(temp.toString()); // 对接收到的数据进行异常处理,如上传的数据格式不正确等等。
try {
// 对json文件进行解析
JSONArray jsonArray = JSONArray.fromObject(temp.toString());
// JSONObject jsonobject=JSONObject.fromObject(temp.toString()); // 解析之后进行输出
//PrintJsonArray(jsonArray); //获取所有的表名
//getAllTables(conf); // 将接收到的数据写入hbase中的表中
insertData(jsonArray, userTableName); //System.out.println("存入数据成功!"); } catch (Exception e) {
printErrorForClient(e);
}
// 给出一个反馈,提示数据上传成功
// 封装通道内的输出流,方便对他进行写字符数据
// BufferedWriter bwserver = new BufferedWriter(new
// OutputStreamWriter(s.getOutputStream())); /*
* serverwrite.write("文件上传成功!"); // bwserver.newLine();
* serverwrite.flush(); serverwrite.close();
*/
printInfomationForClient("upload data successful!"); // 释放资源
s.close(); } catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
try {
printErrorForClient(e);
writeByFileWrite(logFile,e.getMessage()+e.toString());
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
} private void printInfomationForClient(String s) throws IOException {
try {
serverwrite.write(s);
writeByFileWrite(logFile, s);//将内容写到日志文件中
serverwrite.newLine();
serverwrite.flush();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
writeByFileWrite(logFile, e.getMessage()+e.toString());//将内容写到日志文件中
}
} private void printErrorForClient(Exception e) throws IOException {
try {
serverwrite.write("found a error:" + e.getMessage() + e.toString());
writeByFileWrite(logFile,"found a error:" + e.getMessage() + e.toString());
serverwrite.newLine();
serverwrite.flush();
s.close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
writeByFileWrite(logFile,e1.getMessage()+e1.toString());
} } Map<String, String> colvalue = new TreeMap<String, String>(); private void insertData(JSONArray jsonArray, String userTableName)
throws IOException { // connect the table
Table table = null;
try {
table = connection.getTable(TableName.valueOf(userTableName));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
printErrorForClient(e1);
} colvalue.clear(); for (int i = 0; i < jsonArray.size(); i++) {
JSONObject obj = jsonArray.getJSONObject(i);
Set<String> keysets = obj.keySet();
for (String key : keysets) {
switch (key) {
case "category":
columnFamilyName = obj.getString(key);
break;
case "version":
colvalue.put("version", obj.getString("version"));
break;
case "DocumentType":
colvalue.put("DocumentType", obj.getString("DocumentType"));
break;
case "articles":
JSONArray articlesjars = obj.getJSONArray("articles");
dealjsonArray(table,articlesjars);
break;
default:
printErrorForClient(new Exception("send datatype is error!"));
}
} }
} private void insertColDataToHbase(Table table) throws IOException {
// 判断是否包含对应的列族,若不包含则添加
HTableDescriptor desc = new HTableDescriptor(table.getName());
Collection<HColumnDescriptor> familys=desc.getFamilies();
if ( familys.contains(new HColumnDescriptor(columnFamilyName))
&& columnFamilyName != null) {
addColFamily(table, desc, columnFamilyName);
} // insert data
List<Put> putlist = new ArrayList<Put>(); if (!colvalue.isEmpty() && rowKey != null && columnFamilyName != null) {
Put put = new Put(Bytes.toBytes(rowKey));// 指定行,也就是键值
// 下面就是循环存储列
for (Entry<String, String> col : colvalue.entrySet()) {
put.add(Bytes.toBytes(columnFamilyName),
Bytes.toBytes(col.getKey()),
Bytes.toBytes(col.getValue()));
putlist.add(put);
} } else {
printErrorForClient(new Exception("send datatype is error!"));
} try {
table.put(putlist);
table.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
printErrorForClient(e);
}
} private void dealjsonArray(Table table,JSONArray articlesjars) throws IOException {
// TODO Auto-generated method stub
if(articlesjars.isEmpty()){
System.out.println("articlesjars is empty");
printErrorForClient(new Exception("send datatype is error!"));
return;
}
for(int i=0;i<articlesjars.size();i++){
JSONObject obj=articlesjars.getJSONObject(i);
Set<String>keysets=obj.keySet();
for(String key:keysets){
switch(key){
case "content":
colvalue.put("content", obj.getString("content"));
break;
case "picture_url":
colvalue.put("picture_url", obj.getString("picture_url"));
break;
case "time":
colvalue.put("time", obj.getString("time"));
break;
case "author":
colvalue.put("author", obj.getString("author"));
break;
case "url":
rowKey=obj.getString("url");
break;
case "title":
colvalue.put("title", obj.getString("title"));
break;
default:
printErrorForClient(new Exception("send datatype is error!"));
}
}
try {
insertColDataToHbase(table);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
printErrorForClient(e);
}
} } private void addColFamily(Table table, HTableDescriptor desc,
String colFamily) throws IOException { Admin ad = connection.getAdmin(); HColumnDescriptor family = new HColumnDescriptor(
Bytes.toBytes(colFamily));// 列簇
desc.addFamily(family);
ad.addColumn(table.getName(), family);
ad.close(); } private boolean TableIsExist(String userTableName2) throws IOException {
boolean flag = false;
try {
// Connection connection = ConnectionFactory.createConnection(conf);
Admin ad = connection.getAdmin();
if (ad.tableExists(TableName.valueOf(userTableName2))) {
flag = true;
//System.out.println("表存在");
} else {
//System.out.println("表不存在");
//printErrorForClient(new Exception("表不存在"));
}
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
printErrorForClient(e);
} return flag;
// TODO Auto-generated method stub } private void PrintJsonArray(JSONArray jsonArray) {
int size = jsonArray.size();
System.out.println("Size: " + size);
for (int i = 0; i < size; i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
Set<String> keysets = jsonObject.keySet();
for (String keyset : keysets) {
System.out.println(keyset);
}
}
} private void PrintJsonArray(JSONObject jsonobject, String... keys) {
int size = jsonobject.size();
System.out.println("Size: " + size);
for (int i = 0; i < size; i++) {
for (String key : keys) {
System.out.println(key + ":" + jsonobject.get(key));
} // System.out.println("[" + i + "]id=" + jsonObject.get("id"));
// System.out.println("[" + i + "]name=" + jsonObject.get("name"));
// System.out.println("[" + i + "]role=" + jsonObject.get("role"));
}
} //写日志文件
public static void writeByFileWrite(String _sDestFile, String _sContent)
throws IOException {
FileWriter fw = null;
try {
fw = new FileWriter(_sDestFile,true);
fw.write(_sContent);
fw.write('\n');
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (fw != null) {
fw.close();
fw = null;
}
}
} // create table
private void createTable(Configuration conf) {
// HBaseAdmin ha=new HBaseAdmin(conf);
try {
// Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(userTableName));
Admin ad = connection.getAdmin(); // TableName name= TableName.valueOf(Bytes.toBytes(tablename));//表名
HTableDescriptor desc = new HTableDescriptor(table.getName()); HColumnDescriptor family = new HColumnDescriptor(
Bytes.toBytes(columnFamilyName));// 列簇
desc.addFamily(family); ad.createTable(desc);
ad.close(); } catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
} } // Hbase获取所有的表信息
public static List getAllTables(Configuration conf)
throws MasterNotRunningException, ZooKeeperConnectionException,
IOException { HBaseAdmin ad = new HBaseAdmin(conf);
List<String> tables = null;
if (ad != null) {
try {
HTableDescriptor[] allTable = ad.listTables();
if (allTable.length > 0)
tables = new ArrayList<String>();
for (HTableDescriptor hTableDescriptor : allTable) {
tables.add(hTableDescriptor.getNameAsString());
System.out.println(hTableDescriptor.getNameAsString());
}
} catch (IOException e) {
e.printStackTrace();
}
}
return tables;
} }
实现了这些之后,通过eclipse将其导出程可运行的jar包,并将jar包放到服务器上进行部署,部署的方式很简单,但是也要注意一下:
java -jar myserver.jar com.yiban.datacenter.finalversion.HbaseServer &
最后一个&表示后台守护启动该进程。