[置顶] 将Mysql数据导入到ElasticSearch集群

时间:2022-11-17 13:45:05

  一星期没有写博文了,只是因为最近领导交代了一项艰巨的任务,让我无暇顾及其他。将mysql数据库中的数据搬到我们的ES集群中。mysql数据是我们从阿里那边拉过来临时存放数据的地方,现在要将其存储到我们的集群中。说道这里,可能读者会觉得,这有什么难度。是的,如果数据量很少,几百万或者上千万确实多花点时间传送,就可以了。可是,如果是二十几亿的数据量呢,怎么办?领导给了我一星期时间,包括我写代码,调试速度,开始拉数据。为此,我被这二十几亿数据折磨得体无完肤。

  接下来就是我写代码,我用了半天时间写好了代码,进行了测试,调试了速度,开始部署,程序是跑起来了,我松了一口气。紧接着,我将其部署到正式环境,在ES中建好index,在mysql中建了一个测试表,插入了几十万数据,将mysql数据中的数据开始导入到集群,似乎还可以的样子。这时候一天时间已经过去了。

  然后我就将其开始正式操作。这时候问题来了,首先抛开速度不说,mysql中每张表一亿多条数据,数据里面未知的东西太多太多,数据格式、空、各种奇葩字符、特殊字符等。我之前没有想到的异常处理,改善了代码。然后程序可以跑通了。我从mysql中找了一张表,决定先插入100万数据,试试。问题又来了,从mysql中查找了100万数据,但是最终插入到集群中的却只有99万多,还有几千条数据去哪了,纵使我考虑了各种异常情况,还是有这么多数据量的丢失。老大给我的要求是数据量丢失控制在百万分之一啊。无奈,我再次改善代码,我打印了更多的log信息,记录下插入到集群中失败的数据。然后分析了这些数据,再次改进了代码,这次100万数据全部插入进去了。我终于缓了一缓。

  最后,就是调试速度了,采用多线程,如何从mysql中查询,查询出来的数据是放到内存中然后直接读取插入到集群中还是写入文件后在读取文件插入集群中,取决于自己了。个人推荐第一种。当然这其中还有很多问题需要考虑,比如连接如果断掉,失败数据记录,磁盘负载等之类,对于大数据量的导入,需要注意的问题很多。下面我将贴上我最开始的大概代码,供大家参考。

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;

import java.io.*;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Created by xxx on 2016/08/30.
*/

public class FileToEsOrderTest {

static ConcurrentLinkedQueue<String> queues = new ConcurrentLinkedQueue<String>();
static AtomicBoolean isInsert = new AtomicBoolean(true);
static TransportClient client = null;

public static void main(String[] agrs) throws Exception {
Settings settings = Settings.settingsBuilder()
.put("cluster.name", "elasticsearch-cluster").build();
client = TransportClient.builder().settings(settings).build();

try {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("xxxxx"), 9500));
} catch (UnknownHostException error) {
System.out.print(error.getMessage());
}
final long aa = System.currentTimeMillis();

final ConcurrentHashMap<String, Boolean> hashMap = new ConcurrentHashMap();
for (int i = 0; i < 20; i++) {
new Thread(new Runnable() {
Integer num = 1;
public void run() {
//Add transport addresses and do something with the client...
hashMap.put(Thread.currentThread().getName(), Boolean.FALSE);
final BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {

//批量成功后执行
public void afterBulk(long l, BulkRequest bulkRequest,
BulkResponse bulkResponse) {
System.out.println("请求数量是:" +
bulkRequest.numberOfActions());
if (bulkResponse.hasFailures()) {
for (BulkItemResponse item :
bulkResponse.getItems()) {
if (item.isFailed()) {
System.out.println("失败信息:--------" +
item.getFailureMessage());
}
}
}
}

//批量提交之前执行
public void beforeBulk(long executionId,
BulkRequest request) {
}

//批量失败后执行
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
System.out.println("happen fail = " +
failure.getMessage() + " ,
cause = "
+ failure.getCause());
}
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(100, ByteSizeUnit.MB))
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(
TimeValue.timeValueMillis(100), 3))
.setConcurrentRequests(1)
.build();
while (true) {
if (!queues.isEmpty()) {
try {
String json = queues.poll();
if (json == null) continue;
int index1 = json.indexOf("checksum");
int index2 = json.indexOf("}", index1);
index1 += 10;
String id = json.substring(index1 + 1, index2 - 1);

int index3 = json.indexOf("dp_id");
int index4 = json.indexOf(",", index3);
index3 += 7;
String routing = json.substring(index3 + 1, index4 - 1);
count++;
bulkProcessor.add(new IndexRequest("xxxx",
"xxxxx").id(id).routing(routing).source(json));
} catch (Exception e) {
System.out.print(e.getMessage());
}
}
if (queues.isEmpty() && !isInsert.get()) {
bulkProcessor.flush();
long jjj = System.currentTimeMillis() - aa;
System.out.print(" " + Thread.currentThread().getName()
+ ":" + jjj + " ");
hashMap.put(Thread.currentThread().getName(), Boolean.TRUE);
while (hashMap.values().contains(Boolean.FALSE)) {
try {
Thread.currentThread().sleep(1 * 1000);
} catch (Exception e) {
e.printStackTrace(System.out);
}
}
bulkProcessor.close();
break;
}
}
}
}).start();
}


// File file = new File("/test/rfm/rfm_data.txt");
// FileOutputStream fileOutputStream = new FileOutputStream((file));
// OutputStreamWriter outputStreamWriter =
// new OutputStreamWriter(fileOutputStream);
// bufferedWriter = new BufferedWriter(outputStreamWriter);
for(int i = 2; i <= 23; i++){
WriteData("xxx" + i);
}
// WriteData("rfm_1");
// bufferedWriter.close();
// outputStreamWriter.close();
// fileOutputStream.close();
System.out.println("数据写入完毕");
}

// 写数据
public static void WriteData(String tableName) throws IOException {
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
Integer count = 1;
List<String> columnName = Arrays.asList("trade_last_interval","trade_first_interval");
List<String> columnDateName = Arrays.asList("modify","trade_first_time","trade_last_time");
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
try {
Class.forName("com.mysql.jdbc.Driver");
String url = "jdbc:mysql://xxxxxxxxx";
conn = DriverManager.getConnection(url, "xxxx", "xxxx");
System.out.println("写入数据开始,成功连接MySQL:" + tableName);

String sql = "select * from " + tableName;
ps = conn.prepareStatement(sql,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(Integer.MIN_VALUE);
rs = ps.executeQuery();

ResultSetMetaData rsmd = rs.getMetaData();
int colCount = rsmd.getColumnCount();
ObjectMapper objectMapper = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.setDateFormat(new SimpleDateFormat("yyyy-MM-dd"))
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
.setPropertyNamingStrategy(
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);

while(rs.next()) { //while控制行数
Map<String, String> map = new LinkedHashMap<>();
//StringBuilder buffer = new StringBuilder();
for(int i = 1; i <= colCount; i++ ) {
String name = rsmd.getColumnName(i);
if(!columnName.contains(name)) {
String value = rs.getString(i);
boolean flag = true;
if(columnDateName.contains(name)){
try {
dateFormat.parse(value);
} catch (Exception e){
flag = false;
}
} else if("buyer_nick".equalsIgnoreCase(name)){
value = encrypt(value);
}
if (flag && value != null && !"".equals(value.trim()) && value.trim().length() > 0) {
//buffer.append("\"" + name + "\":\"" + value + "\"");
//buffer.append(",");
map.put(name, value);
}
}
}
count++;

if(map != null && map.size() > 0){
queues.add(objectMapper.writeValueAsString(map));
}

if(count % 200000 == 0){
int number = queues.size();
int jj = number/200000;
System.out.println("index: " + count + ",
jj: "
+ jj + ", number: " + number);
while(jj > 0){
try {
Thread.sleep(2000*jj);
} catch (InterruptedException e) {
e.printStackTrace();
}
int number2 = queues.size();
jj = number2 / 200000;
System.out.println("index2: " + count + ",
jj: "
+ jj + ", number2: " + number2);
}
}
}
isInsert = new AtomicBoolean(false);

} catch (ClassNotFoundException e) {
e.printStackTrace();
}catch (SQLException e) {
e.printStackTrace();
} finally {
try {
if(rs != null) {
rs.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
try {
if(ps != null) {
ps.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
try {
if(conn != null) {
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
System.out.println(tableName + "数据写入完毕,共有数据:" + count);
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}