一.使用logstash同步订单数据(订单表和订单项表)到ElasticSearch:
1.到官网下载logstash:https://www.elastic.co/cn/downloads/logstash
2.安装logstash前,确保需要先安装java的jdk环境
3.下载后,解压:之后千万别到bin环境点击logstash.bat这个命令启动,这样会报错的
4.接下来,在logstash安装目录找到config文件夹,在那里新增一个文件夹,我新建的为shop文件夹,然后在里面添加如下文件:
5.开始时.last_run_item.txt和last_run_order.txt文件是没数据的
6.logstash_order.conf文件的配置如下:
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline. input {
jdbc { type => "order_mast" #下面同步ES可以根据type进行区分,单是单个表同步是,可以不写这个
jdbc_driver_library => "../config/shop/mysql-connector-java-5.1.6-bin.jar" #这个是shop文件夹下的jar包
jdbc_paging_enabled => "true"
jdbc_page_size => "2000"
jdbc_driver_class => "com.mysql.jdbc.Driver"
#jdbc跟账号密码需改成对应环境的
jdbc_connection_string => "jdbc:mysql://192.168.50.117:3306/shop_dm?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false"
jdbc_user => "shop"
jdbc_password => "shop"
schedule => "* * * * *" #这个代表每分钟同步一次
statement_filepath => "../config/shop/order_mast.sql" #这个是shop文件下的sql文件
record_last_run => true
use_column_value => false
last_run_metadata_path => "../config/shop/last_run_order.txt" #这个是记录上一次更新的是什么时间,这样就可以实现增量新增了
clean_run => false
#是否将 字段(column) 名称转小写
lowercase_column_names => false } jdbc { type => "order_item" #下面同步ES可以根据type进行区分,单是单个表同步是,可以不写这个
jdbc_driver_library => "../config/shop/mysql-connector-java-5.1.6-bin.jar" #这个是shop文件夹下的jar包
jdbc_paging_enabled => "true"
jdbc_page_size => "2000"
jdbc_driver_class => "com.mysql.jdbc.Driver" #这个代表每分钟同步一次
#jdbc跟账号密码需改成对应环境的
jdbc_connection_string => "jdbc:mysql://192.168.50.117:3306/shop_dm?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false"
jdbc_user => "shop"
jdbc_password => "shop"
schedule => "* * * * *"
statement_filepath => "../config/shop/order_item.sql" #这个是shop文件下的sql文件
record_last_run => true
use_column_value => false
last_run_metadata_path => "../config/shop/last_run_item.txt" #这个是记录上一次更新的是什么时间,这样就可以实现增量新增了
clean_run => false
#是否将 字段(column) 名称转小写
lowercase_column_names => false }
} filter {
#jdbc默认json,暂时没找到修改方法
#json {
# source => "message"
# remove_field => ["message"]
#} mutate {
#需要移除的字段
remove_field => "@timestamp"
remove_field => "@version" }
} output { if [type]=="order_mast"{
elasticsearch { hosts => ["http://localhost:9200"]
#如果有账号密码,在下面添加,并去除#号
#user => elastic
#password => "elastic@test.com" index => "shop_order_mast"
document_type => "order_mast" #这个在es7.0版本后就没有type属性了
document_id => "%{cod_order_id}"
}
} if [type]=="order_item"{
elasticsearch { hosts => ["http://localhost:9200"]
#如果有账号密码,在下面添加,并去除#号
#user => elastic
#password => "elastic@test.com" index => "shop_order_item"
document_type => "order_item"
document_id => "%{cod_order_item_id}"
}
} stdout {
codec => json_lines
} }
//如果只有一张表的时候,单表output的配置:
output { elasticsearch { hosts => ["http://localhost:9200"]
#如果有账号密码,在下面添加,并去除#号
#user => elastic
#password => "elastic@test.com" index => "shop_order_mast"
document_type => "order_mast" #这个在es7.0版本后就没有type属性了
document_id => "%{cod_order_id}"
}
stdout {
codec => json_lines
} }
}
//sql的写法,这里只提供orderItem
SELECT
`cod_order_item_id` , -- 注意,这里写了cod_order_item_id和下面同样下了cod_order_item_id的意义不一样,第一个是作为ES文档的Id,会跟上面logstash_order.conf文件的 document_id => "%{cod_order_item_id}"匹配上
`cod_order_item_id` as "orderItemId",
`cod_order_id`as "orderId",
`flg_item_type`as "itemType",
`cod_market_id`as "marketId",
`cod_item_id`as "itemId",
`cod_item_id_main`as "mainItemId",
`txt_name`as "itemTitle",
`cod_item_quantity`as "quantity",
`amt_item`as "itemPrice",
`cod_score_total`as "scoreTotal",
`amt_score`as "scoreAmount",
`amt_charge`as "chargeAmount",
`amt_standard_price`as "standardPrice",
`amt_balance_discount`as "balanceDiscountAmount",
`amt_payment_total`as "itemTotalAmount",
`amt_coupon_total`as "couponTotalAmount",
`amt_act_discount`as "actDiscountAmount",
`cod_order_parent_id`as "parentOrderId",
`cod_merchant_no`as "shopId",
`cod_create_user`as "createUserId", DATE_FORMAT(
`dat_modify`,
'%Y-%m-%d %T'
) AS "updateTime",
DATE_FORMAT(
`dat_create`,
'%Y-%m-%d %T'
) AS "createTime", `cod_modify_user`as "updateUserId" from
shop_order_item
WHERE
dat_modify >= :sql_last_value -- 这个sql_last_value会读取shop文件夹下的last_run_item.txt的值,第一次同步时,没有该值,所以默认就会是1970年7月1日,相当于是全量新增了
7.如果运行过一次后,打开last_run_item.txt可以看到
8.启动logstash:需要保证你的ES已经启动了,并创建了对应的index和type
window环境:在安装目录bin文件下,打开命令窗口,或者打开命令窗口,切换到该路径: logstash -f ../config/shop/logstash_order.conf
如果是在linux环境,切换安装的bin目录执行:
nohup logstash -f ../config/shop/logstash_order.conf > ../logs/logstash.out &
9.之后打开ES查询数据
可以看到数据已经同步过来了
10.之后可以在项目中进行对应的数据操作了,因为该同步是一分钟同步一次,所以对于实时性要求特别高的,可以在代码中使用ES的crud操作也进行同步,这样就可以保证万无一失了
11.ES相关操作可以参考:https://www.cnblogs.com/yangxiaohui227/p/11237268.html
12.附上一个orderItem表的(ES版本为6.4.3)操作
@Configuration
public class ElasticsearchConfig implements InitializingBean{ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class); @Value("${elasticsearch.cluster.name}")
private String clusterName; @Value("${elasticsearch.port}")
private Integer port; @Value("${elasticsearch.host}")
private String host; /**
* Springboot整合Elasticsearch 在项目启动前设置一下的属性,防止报错
* 解决netty冲突后初始化client时还会抛出异常
* java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4]
*/
@PostConstruct
void init() {
System.setProperty("es.set.netty.runtime.available.processors", "false");
} // @Before
@Bean
public TransportClient getTransportClient() {
TransportClient client=null;
LOGGER.info("elasticsearch init.");
try {
Settings settings = Settings.builder()
.put("cluster.name", clusterName) //集群名字
.put("client.transport.sniff", true)//增加嗅探机制,找到ES集群
.put("thread_pool.search.size", 5).build();//增加线程池个数
client = new PreBuiltTransportClient(settings);
TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(host), port);
client.addTransportAddresses(transportAddress);
LOGGER.info("elasticsearch init success.");
return client;
} catch (Exception e) {
throw new RuntimeException("elasticsearch init fail."+ e);
}
}
}
//高级查询对象
public class EsQueryObject {
private String orderId; private String customerId; private String txtOrderTitle; private Integer orderStatus; private Integer paymentStatus; private String phone; private String recieveName; private String addresss; private String orderSubmitTime_S;
private String orderSubmitTime_E; private String payTime_S;
private String payTime_E; private BigDecimal minPayAmount; private BigDecimal maxPayAmount; private String shopId; private String itemId; private String itemTile; private Page page; public String getOrderId() {
return orderId;
} public void setOrderId(String orderId) {
this.orderId = orderId;
} public String getCustomerId() {
return customerId;
} public void setCustomerId(String customerId) {
this.customerId = customerId;
} public String getTxtOrderTitle() {
return txtOrderTitle;
} public void setTxtOrderTitle(String txtOrderTitle) {
this.txtOrderTitle = txtOrderTitle;
} public Integer getOrderStatus() {
return orderStatus;
} public void setOrderStatus(Integer orderStatus) {
this.orderStatus = orderStatus;
} public Integer getPaymentStatus() {
return paymentStatus;
} public void setPaymentStatus(Integer paymentStatus) {
this.paymentStatus = paymentStatus;
} public String getPhone() {
return phone;
} public void setPhone(String phone) {
this.phone = phone;
} public String getRecieveName() {
return recieveName;
} public void setRecieveName(String recieveName) {
this.recieveName = recieveName;
} public String getAddresss() {
return addresss;
} public void setAddresss(String addresss) {
this.addresss = addresss;
} public String getOrderSubmitTime_S() {
return orderSubmitTime_S;
} public void setOrderSubmitTime_S(String orderSubmitTime_S) {
this.orderSubmitTime_S = orderSubmitTime_S;
} public String getOrderSubmitTime_E() {
return orderSubmitTime_E;
} public void setOrderSubmitTime_E(String orderSubmitTime_E) {
this.orderSubmitTime_E = orderSubmitTime_E;
} public String getPayTime_S() {
return payTime_S;
} public void setPayTime_S(String payTime_S) {
this.payTime_S = payTime_S;
} public String getPayTime_E() {
return payTime_E;
} public void setPayTime_E(String payTime_E) {
this.payTime_E = payTime_E;
} public BigDecimal getMinPayAmount() {
return minPayAmount;
} public void setMinPayAmount(BigDecimal minPayAmount) {
this.minPayAmount = minPayAmount;
} public BigDecimal getMaxPayAmount() {
return maxPayAmount;
} public void setMaxPayAmount(BigDecimal maxPayAmount) {
this.maxPayAmount = maxPayAmount;
} public String getShopId() {
return shopId;
} public void setShopId(String shopId) {
this.shopId = shopId;
} public String getItemId() {
return itemId;
} public void setItemId(String itemId) {
this.itemId = itemId;
} public String getItemTile() {
return itemTile;
} public void setItemTile(String itemTile) {
this.itemTile = itemTile;
} public Page getPage() {
return page;
} public void setPage(Page page) {
this.page = page;
}
}
package com.tft.shop.service.order; import com.alibaba.fastjson.JSON;
import com.bootcrabframework.cloud.core.common.base.GenericBaseService;
import com.bootcrabframework.cloud.core.util.CommonUtil;
import com.bootcrabframework.cloud.core.util.DateUtil;
import com.google.common.collect.Lists;
import com.tft.shop.constant.order.OrderConstant;
import com.tft.shop.entity.es.EsShopOrderItem;
import com.tft.shop.entity.es.EsShopOrderItemRequestDTO;
import com.tft.shop.entity.order.ShopOrderItem;
import com.tft.shop.util.StringUtil;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; @Service
public class EsShopOrderItemService extends GenericBaseService { @Resource
private TransportClient transportClient; //批量新增
public void batchInsert(List<EsShopOrderItem> list){
if(CommonUtil.isNull(list)){
return;
}
BulkRequest bulkRequest = new BulkRequest();
list.forEach(a->{
IndexRequest indexRequest = new IndexRequest(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, a.getOrderItemId());
indexRequest.source(JSON.toJSONString(a), XContentType.JSON);
bulkRequest.add(indexRequest); }); ActionFuture<BulkResponse> bulk = transportClient.bulk(bulkRequest);
boolean failures = bulk.actionGet().hasFailures();
if(!failures){
return; //没有失败
}
//如果有失败,输出哪一条是失败的
try {
BulkResponse bulkItemResponses = bulk.get();
if(bulkItemResponses==null){
return;
}
if(CommonUtil.isNull(bulkItemResponses.getItems())){
return;
}
for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) {
boolean failed = bulkItemResponse.isFailed();
if(failed){ logger.error("订单项插入ES失败,错误信息{},对应订单项编号{}",bulkItemResponse.getId(),bulkItemResponse.getFailureMessage());
}
} } catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} }
//单条新增
public void insertOne(EsShopOrderItem item){
if(null==item){
return;
}
List<EsShopOrderItem> list =Lists.newArrayList();
list.add(item);
this.batchInsert(list); }
//单条新增
public void insertOne(ShopOrderItem orderItem){
this.insertOne(shopOrderItemChangeToEsOrderItem(orderItem)); } private EsShopOrderItem shopOrderItemChangeToEsOrderItem(ShopOrderItem orderItem){
if(null==orderItem){
return null;
}
EsShopOrderItem shopOrderItem = new EsShopOrderItem();
shopOrderItem.setOrderItemId(orderItem.getCodOrderItemId());
shopOrderItem.setOrderId(orderItem.getCodOrderId());
shopOrderItem.setItemType(orderItem.getFlgItemType());
shopOrderItem.setMarketId(orderItem.getCodMarketId());
shopOrderItem.setItemId(orderItem.getCodItemId());
shopOrderItem.setMainItemId(orderItem.getCodItemIdMain());
shopOrderItem.setItemTitle(orderItem.getTxtName());
shopOrderItem.setQuantity(orderItem.getCodItemQuantity());
shopOrderItem.setItemPrice(orderItem.getAmtItem());
shopOrderItem.setScoreTotal(orderItem.getCodScoreTotal());
shopOrderItem.setScoreAmount(orderItem.getAmtScore());
shopOrderItem.setChargeAmount(orderItem.getAmtCharge());
shopOrderItem.setStandardPrice(orderItem.getAmtStandardPrice());
shopOrderItem.setBalanceDiscountAmount(orderItem.getAmtBalanceDiscount());
shopOrderItem.setItemTotalAmount(orderItem.getAmtPaymentTotal());
shopOrderItem.setActDiscountAmount(orderItem.getAmtActDiscount());
shopOrderItem.setCouponTotalAmount(orderItem.getAmtCouponTotal());
shopOrderItem.setParentOrderId(orderItem.getCodOrderParentId());
shopOrderItem.setShopId(orderItem.getCodMerchantNo());
shopOrderItem.setCreateUserId(orderItem.getCodCreateUser());
if(null!=orderItem.getDatCreate()){
shopOrderItem.setCreateTime(DateUtil.dateFormat(orderItem.getDatCreate(),DateUtil.TIME_FORMAT_FULL));
}
if(null!=orderItem.getDatModify()){
shopOrderItem.setUpdateTime(DateUtil.dateFormat(orderItem.getDatModify(),DateUtil.TIME_FORMAT_FULL));
}
shopOrderItem.setUpdateUserId(orderItem.getCodModifyUser());
return shopOrderItem;
} //删除
public void deleteOne(String orderItemId){
if(CommonUtil.isNull(orderItemId)){
return;
}
ActionFuture<DeleteResponse> actionFuture = transportClient.delete(new DeleteRequest(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, orderItemId));
if(actionFuture==null){
return;
}
DeleteResponse deleteResponse = actionFuture.actionGet();
if(null==deleteResponse || null==deleteResponse.status()){
return;
}
if(deleteResponse.status().getStatus()!=200){ logger.error("删除ES订单项,编号为{},删除失败",orderItemId);
} } //修改
public void updateOne(EsShopOrderItem esShopOrderItem){ if(null==esShopOrderItem){
return;
}
UpdateResponse updateResponse = transportClient.prepareUpdate(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, esShopOrderItem.getOrderItemId())
.setDoc(JSON.toJSONString(esShopOrderItem), XContentType.JSON).execute().actionGet();
if(null==updateResponse || null==updateResponse.status()){
return;
}
if(updateResponse.status().getStatus()!=200){
logger.error("修改ES订单项失败,编号为{}",esShopOrderItem.getOrderItemId());
} }
//修改
public void updateOne(ShopOrderItem orderItem){
this.updateOne(this.shopOrderItemChangeToEsOrderItem(orderItem));
} //查询单个 public EsShopOrderItem selectById(String orderItemId){
if(StringUtil.isEmpty(orderItemId)){
return null;
}
GetRequestBuilder ret = transportClient.prepareGet(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, orderItemId);
if(null==ret || null==ret.get()){
return null;
}
GetResponse response = ret.get();
if(StringUtil.isEmpty(response.getSourceAsString())){
return null;
}
return JSON.parseObject(response.getSourceAsString(),EsShopOrderItem.class); } /**
*
*
* @param req 高级查询对象,当用商品标题查询的时候,限制只返回最大2000条
* @return
*/
public List<EsShopOrderItem> queryAdvanced(EsShopOrderItemRequestDTO req){
if(null==req){
return null;
} SearchRequest searchRequest = new SearchRequest(OrderConstant.ES_ORDER_ITEM_INDEX);
searchRequest.types(OrderConstant.ES_ORDER_ITEM_TYPE);
// 构造查询器
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if(!StringUtils.isEmpty(req.getItemTitle())){
boolQueryBuilder.must(QueryBuilders.matchQuery("itemTitle",req.getItemTitle()));
}
if(!StringUtils.isEmpty(req.getItemId())){
boolQueryBuilder.must(QueryBuilders.termQuery("itemId",req.getItemId()));
}
if(!StringUtils.isEmpty(req.getShopId())){
boolQueryBuilder.must(QueryBuilders.termQuery("shopId",req.getShopId()));
}
if(!StringUtils.isEmpty(req.getCustomerId())){
boolQueryBuilder.must(QueryBuilders.termQuery("createUserId",req.getCustomerId()));
}
if(!StringUtils.isEmpty(req.getParentOrderId())){
boolQueryBuilder.must(QueryBuilders.termQuery("parentOrderId",req.getParentOrderId()));
}
if(!StringUtils.isEmpty(req.getOrderId())){
boolQueryBuilder.must(QueryBuilders.termQuery("orderId",req.getOrderId()));
}
if(null!=req.getItemType() && req.getItemType()>=0){
boolQueryBuilder.must(QueryBuilders.termQuery("itemType",req.getItemType()));
}
if(!StringUtils.isEmpty(req.getCreateStartTime())){
boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").gte(req.getCreateStartTime()));
}
if(!StringUtils.isEmpty(req.getCreateEndTime())){
boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").lte(req.getCreateEndTime()));
}
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
if(!StringUtils.isEmpty(req.getItemTitle())){
//注意分页from()的参数并不是页码,而是偏移量,如页数为num时,偏移量=(num-1)* pageSize
sourceBuilder.from(0).size(2000);
}
sourceBuilder.sort(new FieldSortBuilder("createTime").order(SortOrder.DESC));
searchRequest.source(sourceBuilder);
searchRequest.searchType(SearchType.QUERY_THEN_FETCH);
SearchResponse searchResponse = transportClient.search(searchRequest).actionGet();
if(null==searchResponse || null==searchResponse.getHits() || searchResponse.getHits().totalHits<=0){
return null;
}
List<EsShopOrderItem> list = new ArrayList<>();
SearchHits hits = searchResponse.getHits();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
EsShopOrderItem orderItem = JSON.parseObject(sourceAsString, EsShopOrderItem.class);
list.add(orderItem);
}
return list; } public List<String> queryOrderIdList(EsShopOrderItemRequestDTO req){
if(null==req){
return null;
}
List<EsShopOrderItem> shopOrderItems = this.queryAdvanced(req);
if(CommonUtil.isNull(shopOrderItems)){
return null;
}
return shopOrderItems.stream().map(a->a.getOrderId()).collect(Collectors.toList()); } }
//附上shop_order_item的mapping配置:
put shop_order_item
{
"settings": {
"analysis": {
"analyzer": {
"thai_analyzer": {
"type": "custom",
"tokenizer": "thai",
"filter": [
"lowercase",
"asciifolding"
]
},
"caseSensitive": {
"filter": "lowercase",
"type": "custom",
"tokenizer": "keyword"
}
}
}
},
"mappings": {
"order_item": {
"properties": {
"orderId": {
"type": "keyword"
},
"parentOrderId": {
"type": "keyword"
},
"shopId": {
"type": "keyword"
},
"orderItemId": {
"type": "keyword"
},
"itemTitle": {
"type": "text",
"analyzer": "thai_analyzer",
"search_analyzer": "thai_analyzer"
},
"itemId": {
"type": "keyword"
},
"mainItemId": {
"type": "keyword"
},
"marketId": {
"type": "keyword"
},
"itemType": {
"type": "integer"
},
"quantity": {
"type": "integer"
},
"scoreTotal": {
"type": "integer"
},
"scoreAmount": {
"type": "double"
},
"chargeAmount": {
"type": "double"
},
"itemPrice": {
"type": "double"
},
"standardPrice": {
"type": "double"
},
"itemTotalAmount": {
"type": "double"
},
"couponTotalAmount": {
"type": "double"
},
"balanceDiscountAmount": {
"type": "double"
},
"actDiscountAmount": {
"type": "double"
},
"createTime": {
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd",
"type": "date"
},
"updateTime": {
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd",
"type": "date"
},
"createUserId": {
"type": "keyword"
},
"updateUserId": {
"type": "keyword"
}
}
}
}
}