1、如何增量导入MYSQL数据库中的数据?
2、我们怎样测试增量导入?
(一)引言:
前面我的文章 DIH全量导入 中已经学会了如何全量导入Oralce和MySQL的数据,大家都知道全量导入在数据量大的时候代价非常大,一般来说都会适用增量的方式来导入数据,下面介绍如何增量导入MYSQL数据库中的数据,以及如何设置 定时来做。
下面介绍的所有操作都是基于前面已经完成的全量导入的基础上来做的。
(一)DIH增量从MYSQL数据库导入数据:
1、数据库表的更改:
表,这里为了能够进行增量导入,需要新增一个字段,类型为TIMESTAMP,默认值为CURRENT_TIMESTAMP。
有了这样一个字段,Solr才能判断增量导入的时候,哪些数据是新的。
因为Solr本身有一个默认值last_index_time,记录最后一次做full import或者是delta import(增量导入)的时间,这个值存储在文件conf目录的dataimport.properties文件中。
2、data-config.xml中必要属性的设置:
<!-- transformer 格式转化:HTMLStripTransformer 索引中忽略HTML标签 --->
<!-- query:查询数据库表符合记录数据 --->
<!-- deltaQuery:增量索引查询主键ID ---> 注意这个只能返回ID字段
<!-- deltaImportQuery:增量索引查询导入的数据 --->
<!-- deletedPkQuery:增量索引删除主键ID查询 ---> 注意这个只能返回ID字段
有关“query”,“deltaImportQuery”, “deltaQuery”的解释,引用官网说明,如下所示:
The query gives the data needed to populate fields of the Solr document in full-import
The deltaImportQuery gives the data needed to populate fields when running a delta-import
The deltaQuery gives the primary keys of the current entity which have changes since the last index time
最终针对步骤一中创建的表,我们的data-config.xml文件的配置内容如下:
- <dataConfig>
<dataSource name="jdbcDataSource" type="JdbcDataSource" driver="com.mysql.jdbc.Driver"
url="jdbc:mysql://127.0.0.1:3306/b?useUnicode=true&characterEncoding=utf-8" user="root" password="root"/>
<document name="book_publishing">
<entity dataSource="jdbcDataSource" name="resource"
query="select resourceid,resourceName,author,keyword,intro,bookid,bookName,path,resourceType,classifyCode,price from resource">
<field column="resourceid" name="id" />
<field column="resourceName" name="title" />
<field column="resourceType" name="resourcetype" />
<field column="author" name="author" />
<field column="keyword" name="keyword" />
<field column="intro" name="content" stripHTML="true"/>
<field column="bookid" name="bookid" />
<field column="bookName" name="bookname" />
<field column="path" name="url" />
<field column="classifyCode" name="classifycode" />
<field column="price" name="price" />
</entity>
</document>
</dataConfig>
<dataConfig>
<dataSource name="jdbcDataSource" type="JdbcDataSource" driver="com.mysql.jdbc.Driver"
url="jdbc:mysql://127.0.0.1:3306/b?useUnicode=true&characterEncoding=utf-8" user="root" password="root"/>
<document name="book_publishing">
<entity dataSource="jdbcDataSource" name="resource"
query="select resourceid,resourceName,author,keyword,intro,bookid,bookName,path,resourceType,classifyCode,price from resource"
deltaImportQuery="select resourceid,resourceName,author,keyword,intro,bookid,bookName,path,resourceType,classifyCode,price from resource where resourceid='${dih.delta.resourceid}'"
deltaQuery="select resourceid,resourceName,author,keyword,intro,bookid,bookName,path,resourceType,classifyCode,price from resource where updateTime > '${dih.last_index_time}'"
>
<field column="resourceid" name="id" />
<field column="resourceName" name="title" />
<field column="resourceType" name="resourcetype" />
<field column="author" name="author" />
<field column="keyword" name="keyword" />
<field column="intro" name="content" stripHTML="true"/>
<field column="bookid" name="bookid" />
<field column="bookName" name="bookname" />
<field column="path" name="url" />
<field column="classifyCode" name="classifycode" />
<field column="price" name="price" />
</entity>
</document>
</dataConfig>
注意:刚新加上的UpdateTime字段也要在field属性中配置,同时也要在schema.xml文件中配置:<field name="updateTime" type="date" indexed="true" stored="true" />
deltaImportQuery="select resourceid,resourceName,author,keyword,intro,bookid,bookName,path,resourceType,classifyCode,price from resource where resourceid='${dih.delta.resourceid}'" 注意最后的resourceid,这里容易出错
solrconfig.xml配置如下:
<requestHandler name="/dataimport" class="solr.DataImportHandler">
<lst name="defaults">
<str name="config">data-config.xml</str>
</lst>
</requestHandler>
<requestHandler name="/deltaimport" class="solr.DataImportHandler">
<lst name="defaults">
<str name="config">delta-data-config.xml</str>
</lst>
</requestHandler>
注意:看清楚requestHandler的name属性。
schema.xml的代码就不贴了,如果不会,请不要看这篇文章了,先看基础
3、测试增量导入:
public static HttpSolrServer server = new HttpSolrServer("http://localhost:8080/solr/");
public static void main(String[] args) {
//创建索引
//createIndex();
//增量更新索引
//deltaImportIndex();
}
public static void createIndex() {
String status = "0";
SolrQuery sq=new SolrQuery();
sq.set("qt","/dataimport");
sq.set("command", "full-import");
try {
server.query(sq);
//server.commit();
status = "1";
} catch (SolrServerException e) {
System.out.println("索引创建失败...");
e.printStackTrace();
}
System.out.println(status);
}
public static void deltaImportIndex() {
String status = "0";
SolrQuery sq=new SolrQuery();
sq.set("qt","/deltaimport");
sq.set("command", "delta-import");
try {
server.query(sq);
server.commit();
status = "1";
} catch (Exception e) {
System.out.println("索引创建失败...");
e.printStackTrace();
}
System.out.println(status);
}
4、spring的定时任务:
当然我用java代码测试时候懒得调用框架了,就直接main函数执行了。
<bean id="startQuertz" lazy-init="false" autowire="no" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="createIndexId" />
</list>
</property>
</bean>
<!-- 定义触发时间 -->
<bean id="createIndexId" class="org.springframework.scheduling.quartz.CronTriggerBean">
<!-- 调度任务 -->
<property name="jobDetail" ref="createIndexJob" />
<!-- 每隔2秒执行一次 -->
<!-- <property name="cronExpression" value="0/2 * * * * ?"/> -->
<!-- "0 0 1 * * ?" 每天凌晨1点触发 -->
<property name="cronExpression" value="0 0 1 * * ?" />
</bean>
<!-- 定义调用对象和调用对象的方法 -->
<bean id="createIndexJob" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
<!-- 调用的类 -->
<property name="targetObject" ref="createIndexClass" />
<!-- 调用类中的方法名称 -->
<property name="targetMethod">
<value>createIndex</value>
</property>
</bean>
<!-- 要调用的工作类 -->
<bean id="createIndexClass" class="com.book.controller.fg.solr.SolrController" />