需求:将前些日子采集的评论存储到hbase中
思路:
先用fastjson解析评论,然后构造rdd,最后使用spark与phoenix交互,把数据存储到hbase中
部分数据:
[
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
"creationTime": "2019-04-08 01:13:42",
"content": "此用户没有填写评价内容",
"label": []
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
"creationTime": "2019-03-29 11:49:36",
"content": "不错",
"label": []
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
"creationTime": "2019-03-21 21:13:07",
"content": "正品没什么毛病。信号好像是照别的差一点,但是还可以,不是特别差。分辨率不是那么好,但是也不是特别差。一般般。手机不卡。打游戏很顺畅。官方正品没有翻车。",
"label": [
{
"labelName": "功能齐全"
}
]
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
"creationTime": "2019-03-22 09:56:22",
"content": "不错是正品",
"label": [
{
"labelName": "系统流畅"
},
{
"labelName": "声音大"
},
{
"labelName": "做工精致"
},
{
"labelName": "待机时间长"
}
]
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
"creationTime": "2019-03-13 07:27:56",
"content": "性价比很高的手机,用习惯了ios转安卓7个月,重新回归,只能说,用苹果省心。苏宁质量有保障,送货快,价格优惠,推荐购买!",
"label": [
{
"labelName": "系统流畅"
},
{
"labelName": "功能齐全"
},
{
"labelName": "包装一般"
}
]
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
"creationTime": "2019-02-25 22:03:18",
"content": "弟弟就想要一个苹果手机。本来打算买8p的。然后我,推荐他这款xr,是最新款。价格总体来说性价比,比8p好。买了很快就到了,第二天就。屏幕很大,还是面容id。苹果x大很多。比***x小一点",
"label": [
{
"labelName": "外观漂亮"
},
{
"labelName": "做工精致"
},
{
"labelName": "反应快"
},
{
"labelName": "系统流畅"
}
]
},
{
"referenceName": "【券后低至5388】Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
"creationTime": "2019-02-21 12:45:22",
"content": "物流很棒,xr价格可以接受、性能稳定!",
"label": []
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
"creationTime": "2019-02-22 12:14:55",
"content": "很不错的手机除了有点厚,极致的体验,黑边完全没有影响",
"label": [
{
"labelName": "拍照效果好"
},
{
"labelName": "待机时间长"
},
{
"labelName": "电池耐用"
},
{
"labelName": "性价比高 "
}
]
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
"creationTime": "2019-02-13 00:28:03",
"content": "非常非常好的商品。很不错,下次再来",
"label": [
{
"labelName": "信号稳定"
},
{
"labelName": "反应快"
},
{
"labelName": "声音大"
},
{
"labelName": "做工精致"
}
]
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G 手机",
"creationTime": "2019-04-02 17:29:43",
"content": "此用户没有填写评价内容",
"label": []
}
]
[
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
"creationTime": "2019-04-05 18:13:14",
"content": "满意嘻嘻",
"label": [
{
"labelName": "音质好"
},
{
"labelName": "拍照效果好"
},
{
"labelName": "功能齐全"
},
{
"labelName": "外观漂亮"
}
]
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
"creationTime": "2019-03-21 00:13:17",
"content": "棒棒哒",
"label": []
},
{
"referenceName": "【双12爆款】Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
"creationTime": "2019-01-19 10:23:57",
"content": "双十二买的正好赶上手机坏了就买了xr太贵没舍得买一直用苹果的系统用顺手了不愿意换",
"label": [
{
"labelName": "反应快"
},
{
"labelName": "做工精致"
},
{
"labelName": "信号稳定"
},
{
"labelName": "待机时间长"
},
{
"labelName": "性价比一般般"
}
]
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
"creationTime": "2019-03-21 12:47:34",
"content": "用了几天感觉还可以,只是信号不是很好,总体上是可以的",
"label": [
{
"labelName": "音质好"
},
{
"labelName": "分辨率高"
}
]
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
"creationTime": "2019-03-09 08:32:15",
"content": "苹果手机做工精细,手感不错,外观设计也是很有时尚感!系统运行十分流畅,操作体验不错;功能齐全,屏幕分辨率高,总体来说很满意!",
"label": [
{
"labelName": "做工精致"
},
{
"labelName": "系统流畅"
},
{
"labelName": "功能齐全"
}
]
},
{
"referenceName": "【低至5399】Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
"creationTime": "2019-01-15 22:44:53",
"content": "真心喜欢,一直在徘徊x.还是xr.个人觉得真不错,一直信赖苏宁,新机为激活,价钱能接受,值得推荐,黑边什么的不影响。",
"label": [
{
"labelName": "拍照效果好"
},
{
"labelName": "外观漂亮"
},
{
"labelName": "屏幕清晰"
}
]
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
"creationTime": "2019-03-08 22:12:18",
"content": "手机运行流畅,外观也漂亮,黑色彰显档次,音质也好,又是双卡双待手机,性价比不错,比起XS便宜不少。",
"label": [
{
"labelName": "外观漂亮"
},
{
"labelName": "系统流畅"
}
]
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
"creationTime": "2019-03-01 15:54:33",
"content": "手机很好,电池非常抗用,价钱也非常美丽,值得购买。",
"label": [
{
"labelName": "系统流畅"
},
{
"labelName": "电池耐用"
},
{
"labelName": "分辨率高"
},
{
"labelName": "待机时间长"
},
{
"labelName": "包装一般"
}
]
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待",
"creationTime": "2019-02-19 09:37:25",
"content": "春节期间配送超快,手机没有任何问题,苏宁易购确实做到了全网最低价",
"label": []
},
{
"referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待",
"creationTime": "2019-02-23 13:27:15",
"content": "挺好的懒得拍照了借了几张黑边确实大",
"label": [
{
"labelName": "待机时间长"
},
{
"labelName": "反应快"
},
{
"labelName": "做工精致"
}
]
}
]
数据是json数组,因此采用fastjson进行解析,为了最终存数据的更方便,需要构造Comment类
依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency> <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.9</version>
</dependency> <!-- hbase -->
<!-- <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>2.0.2</version>
</dependency> --> <!-- phoenix -->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>5.0.0-HBase-2.0</version>
</dependency> <!-- phoenix_spark -->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
</dependency> <!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency> <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.2</version>
</dependency>
Comment.java
package cn.tele.bean; /**
*
* @author Tele
*
*/
public class Comment {
private Integer id;
private String name;
private String content;
private String creationtime;
private String label;
private String platform; public Integer getId() {
return id;
} public void setId(Integer id) {
this.id = id;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public String getContent() {
return content;
} public void setContent(String content) {
this.content = content;
} public String getCreationtime() {
return creationtime;
} public void setCreationtime(String creationtime) {
this.creationtime = creationtime;
} public String getLabel() {
return label;
} public void setLabel(String label) {
this.label = label;
} public String getPlatform() {
return platform;
} public void setPlatform(String platform) {
this.platform = platform;
}
}
StoreData.java
package cn.tele.spark; import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import cn.tele.bean.Comment;
import scala.Tuple2; /**
* 存储爬取的评论到hbase中
*
* @author Tele
*
*/
public class StoreData {
private static SparkConf conf = new SparkConf().setAppName("storedata").setMaster("local").set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
private static JavaSparkContext jsc = new JavaSparkContext(conf);
private static SparkSession session = new SparkSession(jsc.sc());
static {
// 注册
conf.registerKryoClasses(new Class[] { Comment.class });
}
// 链接信息
private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop002,hadoop003,hadoop004";
/*
* private static final String DB_PHOENIX_DRIVER =
* "org.apache.phoenix.jdbc.PhoenixDriver"; private static final String
* DB_PHOENIX_USER = ""; private static final String DB_PHOENIX_PASS = "";
* private static final String DB_PHOENIX_FETCHSIZE = "10000";
*/ public static void main(String[] args) throws SQLException { // 遍历文件夹
Path path = Paths.get("F:\\comment\\"); try {
MyFileVisitor myFileVisitor = new MyFileVisitor();
Files.walkFileTree(path, myFileVisitor);
List<Map<String, Object>> list = myFileVisitor.getData();
JavaRDD<Comment> commentRDD = getCommentRDD(list);
// 存储至hbase
storeData(commentRDD); } catch (IOException e) {
e.printStackTrace();
} // 读取数据
/*
* JavaRDD<String> rdd =
* jsc.textFile("file:\\F:\\comment\\sn_comment\\iphonexr-2019-04-16-18-27-36\\"
* ); List<Comment> commentList = getCommentList(rdd,"iphonexr");
*/ jsc.close(); } private static int storeData(JavaRDD<Comment> rdd) {
int successCount = 0;
/*
* DataTypes.createStructType(Arrays.asList(
* DataTypes.createStructField("id",DataTypes.IntegerType,false),
* DataTypes.createStructField("name",DataTypes.StringType,false),
* DataTypes.createStructField("content",DataTypes.StringType,false),
* DataTypes.createStructField("creationtime",DataTypes.StringType,false),
* DataTypes.createStructField("label",DataTypes.StringType,true),
* DataTypes.createStructField("platform",DataTypes.StringType,false) ));
*/ Dataset<Row> ds = session.createDataFrame(rdd, Comment.class);
ds.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark").option("zkUrl", DB_PHOENIX_URL)
.option("table", "comment").save();
; return successCount;
} @SuppressWarnings("unchecked")
private static <U> JavaRDD<Comment> getCommentRDD(List<Map<String, Object>> list) {
JavaRDD<Map<String, Object>> originalRDD = jsc.parallelize(list);
JavaRDD<List<Comment>> listCommentRDD = originalRDD.map(new Function<Map<String, Object>, List<Comment>>() { private static final long serialVersionUID = 1L; List<Comment> dataList = new ArrayList<>(); public List<Comment> call(Map<String, Object> v1) throws Exception {
Set<Entry<String, Object>> entrySet = v1.entrySet();
Iterator<Entry<String, Object>> iterator = entrySet.iterator();
while (iterator.hasNext()) {
Entry<String, Object> entry = iterator.next();
String referenceName = entry.getKey();
String platform = referenceName.split("#")[0];
List<Comment> commentList = (List<Comment>) entry.getValue();
commentList.forEach(cm -> {
cm.setPlatform(platform);
dataList.add(cm);
});
println(referenceName + "评论量------------------:" + commentList.size());
}
return dataList;
}
}).persist(StorageLevel.MEMORY_ONLY()); JavaRDD<Comment> commentRDD = listCommentRDD.flatMap(new FlatMapFunction<List<Comment>, Comment>() { private static final long serialVersionUID = 1L; @Override
public Iterator<Comment> call(List<Comment> t) throws Exception {
return t.iterator();
}
}); long totalSize = commentRDD.count();
println("评论总量:-----------" + totalSize); // 设置id
JavaRDD<Comment> resultRDD = commentRDD.zipWithIndex().map(new Function<Tuple2<Comment, Long>, Comment>() { private static final long serialVersionUID = 1L; @Override
public Comment call(Tuple2<Comment, Long> v1) throws Exception {
v1._1.setId(Integer.valueOf(v1._2.toString()));
return v1._1;
}
});
return resultRDD;
} private static List<Comment> getCommentList(JavaRDD<String> rdd, String referenceName) {
List<Comment> commentList = new ArrayList<Comment>(); String originalStr = rdd.reduce(new Function2<String, String, String>() {
private static final long serialVersionUID = 1L; public String call(String v1, String v2) throws Exception {
return v1.trim() + v2.trim();
}
});
String uuid = UUID.randomUUID().toString();
originalStr = originalStr.replace("][", "]" + uuid + "["); // 解析json
String[] pages = originalStr.split(uuid);
for (String page : pages) {
try {
JSONArray jsonArray = JSON.parseArray(page);
for (Object obj : jsonArray) {
JSONObject jsonObject = (JSONObject) obj;
// String referenceName = jsonObject.getString("referenceName");
String creationTime = jsonObject.getString("creationTime");
String content = jsonObject.getString("content");
println("referenceName:" + referenceName);
println("creationTime:" + creationTime);
println("content:" + content); // 封装
Comment comment = new Comment();
comment.setName(referenceName);
comment.setCreationtime(creationTime);
comment.setContent(content); JSONArray labelArray = jsonObject.getJSONArray("label");
if (labelArray != null) {
String label = "";
for (Object labelObj : labelArray) {
JSONObject labelObject = (JSONObject) labelObj;
label += labelObject.getString("labelName") + "#";
}
comment.setLabel(label);
println("label:" + label);
}
commentList.add(comment);
}
} catch (Exception e) {
continue;
}
}
return commentList;
} private static class MyFileVisitor extends SimpleFileVisitor<Path> {
private List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
String platform = null; @Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
println("当前访问的文件夹是------" + dir.toAbsolutePath() /* + dir.getFileName() */);
String fileName = dir.getFileName().toString();
if (fileName.contains("_")) {
platform = fileName.split("_")[0];
}
if (platform != null) {
if (fileName.contains("-")) {
String referenceName = fileName.split("-")[0];
JavaRDD<String> rdd = jsc.textFile(dir.toAbsolutePath().toString());
List<Comment> commentList = getCommentList(rdd, referenceName);
Map<String, Object> map = new HashMap<String, Object>();
// 平台_品牌--评论
map.put(platform + "#" + referenceName, commentList);
list.add(map);
}
} return super.preVisitDirectory(dir, attrs);
} @Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
return super.visitFile(file, attrs);
} @Override
public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
return super.visitFileFailed(file, exc);
} @Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
return super.postVisitDirectory(dir, exc);
} public List<Map<String, Object>> getData() {
return list;
} } private static void println(Object obj) {
System.out.println(obj);
} }
可能会报如下异常,但并不影响,似乎和我使用的hbase版本有关
在终端进行查看
说明:
1.StorgeData还可以再进行优化,比如解析json的时候可以直接构造rdd而不是用list,也可以改造成集群上运行的版本,但由于我的数据量不多,直接用本地模式就足够了
2.SaveMode只能是SaveMode.Overwrite其他模式phoenix都不支持,实际测试时发现还是append
3.生成id时用了zipWithIndex,但连续的id容易造成集群热点问题,使用phoenix建表时最好加盐
4.与phoenix交互时也可以用spark的jdbc,可以参考https://blog.****.net/xiongbingcool/article/details/81458602
5.关于目录问题,spark支持对目录下的多个文件进行读取构造rdd,因此遍历时到父文件夹即可,当然,遍历到每个文件也可以
附测试用例:
/**
*
*@author Tele
*测试spark与phoenix集成
*/
public class SparkPhoneix {
private static SparkConf conf = new SparkConf().setAppName("demo").setMaster("local");
private static JavaSparkContext jsc = new JavaSparkContext(conf);
private static SparkSession session = new SparkSession(jsc.sc()); //链接信息
private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop002,hadoop003,hadoop004";
/*private static final String DB_PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
private static final String DB_PHOENIX_USER = "";
private static final String DB_PHOENIX_PASS = "";
private static final String DB_PHOENIX_FETCHSIZE = "10000";*/ public static void main(String[] args) {
Dataset<Row> ds = session.read().format("org.apache.phoenix.spark")
.option("zkUrl", DB_PHOENIX_URL)
.option("table", "TEST")
.load();
ds.createOrReplaceTempView("test"); ds.show(); //插入数据测试_SALT,ID,INFO.ITEM,INFO.CONTENT,INFO.LABEL
StructType schema = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("id",DataTypes.IntegerType,false),
DataTypes.createStructField("item",DataTypes.StringType,false),
DataTypes.createStructField("content",DataTypes.StringType,false),
DataTypes.createStructField("label",DataTypes.StringType,true)
)); // 创建数据
List<Row> list = new ArrayList<Row>();
Row row1 = RowFactory.create(3,"iphone","不错",null);
list.add(row1); Dataset<Row> dataset = session.createDataFrame(list,schema);
//对于phoenix只能使用overwrite模式,但实际操作时发现是append数据
dataset.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark")
.option("zkUrl", DB_PHOENIX_URL)
.option("table", "TEST").save();;
ds.show();
session.stop();
jsc.close();
}
}