大数据离线分析平台 JavaSDK数据收集引擎编写

时间:2021-04-25 19:53:56

JavaSDK设计规则

JavaSDK提供两个事件触发方法,分别为onChargeSuccess和onChargeRefund。我们在java sdk中通过一个单独的线程来发送线程数据,这样可以减少对业务系统的延时性。

SDK测试

启动集群上的hdfs+nginx+flume进程,通过模拟数据的发送然后将数据发送到nginx服务器中,查看最终是否在hdfs中有数据的写入。

命令:

start-dfs.sh: 启动hdfs命令

su root:切换用户

service nginx restart: 启动nginx进程

启动flume进程:

进入flume安装根目录,执行命令:


flume-ng agent --conf ./conf/ --conf-file ./conf/test2.conf --name agent &


工程目录结构

大数据离线分析平台  JavaSDK数据收集引擎编写

AnalyticsEngineSDK如下:
package com.kk.ae.sdk;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger; /*
* 分析引擎sdk java服务器数据收集
* */
public class AnalyticsEngineSDK { //日志记录对象
private static final Logger log=Logger.getGlobal();
//请求url的主体部分
public static final String accessUrl="http://hadoop-001:8090/kkImg.gif";
public static final String platformName="java_server";
public static final String sdkName="jdk";
private static final String version = "1";
/**
* 触发订单支付成功事件,发送事件数据到服务器
*
* @param orderId
* 订单支付id
* @param memberIdd
* 订单支付会员id
* @return 如果发送数据成功(加入到发送队列中),那么返回true;否则返回false(参数异常&添加到发送队列失败).
* @throws InterruptedException
*/
public static boolean chargeSuccess(String orderId,String memberId) throws InterruptedException { if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) {
Map<String, String> map=new HashMap<String,String>();
map.put("u_mid", memberId);
map.put("oid", orderId);
map.put("c_time", String.valueOf(System.currentTimeMillis()));
map.put("ver", version);
map.put("en", "e_cs");
map.put("p1", platformName);
map.put("sdk", sdkName); //创建url
String url= buildUrl(map);
// 发送url&将url加入到队列
SendDataMonitor.addSendUrl(url);
System.out.println(url);
return true;
} else {
log.log(Level.WARNING, "订单id和会员id不能为空");
return false;
} }
/**
* 触发订单退款事件,发送退款数据到服务器
*
* @param orderId
* 退款订单id
* @param memberIdd
* 退款会员id
* @return 如果发送数据成功,返回true。否则返回false。
* @throws InterruptedException
*/
public static boolean chargeRefund(String orderId,String memberId) throws InterruptedException {
if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) {
Map<String, String> map=new HashMap<String,String>();
map.put("u_mid", memberId);
map.put("oid", orderId);
map.put("c_time", String.valueOf(System.currentTimeMillis()));
map.put("ver", version);
map.put("en", "e_cr");
map.put("p1", platformName);
map.put("sdk", sdkName); //创建url
String url= buildUrl(map);
// 发送url&将url加入到队列
SendDataMonitor.addSendUrl(url);
System.out.println(url);
return true;
} else {
log.log(Level.WARNING, "订单id和会员id不能为空");
return false;
} }
private static String buildUrl(Map<String, String> map) { StringBuffer stringBuffer=new StringBuffer();
stringBuffer.append(accessUrl).append("?");
for(Map.Entry<String, String> entry:map.entrySet()) {
if (entry.getKey()!=null&&!entry.getKey().isEmpty()&&entry.getValue()!=null&&!entry.getValue().isEmpty()) {
{
try {
stringBuffer.append(entry.getKey().trim()).append("=").append(URLEncoder.encode(entry.getValue().trim(),"utf-8")).append("&");
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} }
} }
return stringBuffer.substring(0, stringBuffer.length() - 1);
} }
SendDataMonitor 如下:
package com.kk.ae.sdk;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger; /**
* 发送url数据的监控者,用于启动一个单独的线程来发送数据
*
* @author gerry
*
*/
public class SendDataMonitor {
//收集日志
public static final Logger log=Logger.getGlobal();
// 队列,用户存储发送url
public static final BlockingQueue<String> queue=new LinkedBlockingQueue<String>();
//用于单例的一个类对象
private static SendDataMonitor monitor=null; private SendDataMonitor() {
// 私有构造方法,进行单列模式的创建
} public static SendDataMonitor getMonitor() {
if (monitor==null) {
synchronized (SendDataMonitor.class) {
if (monitor==null) {
monitor=new SendDataMonitor();
Thread thread=new Thread(new Runnable() { @Override
public void run() {
// TODO Auto-generated method stub
SendDataMonitor.monitor.run(); }
});
thread.start();
}
}
}
return monitor;
} protected void run() {
while (true) {
try {
String url=this.queue.take();
// 正式的发送url
HttpRequestUtil.sendData(url);
} catch (Throwable e) {
log.log(Level.WARNING, "发送url异常", e);
}
}
} public static void setMonitor(SendDataMonitor monitor) {
SendDataMonitor.monitor = monitor; } /**
* 添加一个url到队列中去
*
* @param url
* @throws InterruptedException
*/
public static void addSendUrl(String url) throws InterruptedException {
getMonitor().queue.put(url); }
/**
* 内部类,用户发送数据的http工具类
*
* @author gerry
*
*/
public static class HttpRequestUtil{
/**
* 具体发送url的方法
*
* @param url
* @throws IOException
*/
public static void sendData(String url) throws IOException {
HttpURLConnection con=null;
BufferedReader bf=null;
try {
URL obj=new URL(url);
con=(HttpURLConnection) obj.openConnection();
// 设置连接参数
con.setConnectTimeout(5000);//连接过期时间
con.setReadTimeout(5000);//读取数据过期时间
con.setRequestMethod("GET");//设置请求类型为get
System.out.println("发送url:" + url);
// 发送连接请求
bf=new BufferedReader(new InputStreamReader(con.getInputStream())); } finally {
try {
if (bf!=null) {
bf.close(); }
} catch (Throwable e) {
// TODO: handle exception }
try {
con.disconnect();
} catch (Throwable e) {
// TODO: handle exception
}
}
} } }

测试类:

package com.kk.ae.sdk;

public class Test {

public static void main(String[] args) {
try {
AnalyticsEngineSDK.chargeSuccess("order3516", "0958");
AnalyticsEngineSDK.chargeRefund("kk3", "9009");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}