转载自 http://martin3000.iteye.com/blog/1328833
使用DataImportHandler进行简单数据导入还是比较有效的,特别是DIH中针对简单的数据库表,可以把完全导入和增量导入合并成一个语句,非常方便。我的使用方式如下所示
1。配置schema
Xml代码
- <requestHandler name="/dataimport" class="org.apache.solr.handler.dataimport.DataImportHandler">
- <lst name="defaults">
- <str name="config">/home/tomcat/bin/solr/conf/data-config.xml</str>
- </lst>
- </requestHandler>
2.添加data-config文件
data-config.xml
Xml代码
- <dataConfig>
- <dataSource type="JdbcDataSource"
- driver="com.mysql.jdbc.Driver"
- url="jdbc:mysql://127.0.0.1/db"
- user="root"
- password="pass"
- batchSize="-1"/>
- <document>
- <entity name="id" pk="id"
- query="select id,username,text,cat from hot where '${dataimporter.request.clean}' != 'false' OR timestamp > '${dataimporter.last_index_time}'">
- <field column="id" name="id"/>
- <field column="text" name="text"/>
- <field column="username" name="username_s"/>
- <field column="cat" name="cat_t"/>
- </entity>
- </document>
- </dataConfig>
3.让DIH周期性的运行
修改dataimport.properties文件,这个是自动生成的,同在solr/conf下,添加参数
interval 间隔时间 单位 分钟
syncEnabled=1 打开周期运行
params 其实就是具体调用的url,周期运行就是周期性的访问一个url
Java代码
- #Wed Dec 28 09:29:42 UTC 2011
- port=8983
- interval=5
- last_index_time=2011-12-28 09\:29\:26
- syncEnabled=1
- webapp=solr
- id.last_index_time=2011-12-28 09\:29\:26
- server=127.0.0.1
- params=/select?qt\=/dataimport&command\=full-import&clean\=false&commit\=true&optimize\=false
到此还并不能周期运行,在solr的wiki中有一段实现这个功能的代码,但并没有加入到solr的发行包中,于是我们需要重新编译这段代码,打包放到webapp/solr/WEB-INF/lib中才行
Xml代码
- <web-app>
- <listener>
- <listener-class>org.apache.solr.handler.dataimport.scheduler.ApplicationListener</listener-class>
- </listener>
- ...
- </web-app>
以下是solr wiki上周期运行的代码,我已打好包,放在附件里。
Java代码
- package org.apache.solr.handler.dataimport.scheduler;
- import java.io.FileInputStream;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.util.Properties;
- import org.apache.solr.core.SolrResourceLoader;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class SolrDataImportProperties {
- private Properties properties;
- public static final String SYNC_ENABLED = "syncEnabled";
- public static final String SYNC_CORES = "syncCores";
- public static final String SERVER = "server";
- public static final String PORT = "port";
- public static final String WEBAPP = "webapp";
- public static final String PARAMS = "params";
- public static final String INTERVAL = "interval";
- private static final Logger logger = LoggerFactory.getLogger(SolrDataImportProperties.class);
- public SolrDataImportProperties(){
- // loadProperties(true);
- }
- public void loadProperties(boolean force){
- try{
- SolrResourceLoader loader = new SolrResourceLoader(null);
- logger.info("Instance dir = " + loader.getInstanceDir());
- String configDir = loader.getConfigDir();
- configDir = SolrResourceLoader.normalizeDir(configDir);
- if(force || properties == null){
- properties = new Properties();
- String dataImportPropertiesPath = configDir + "\\dataimport.properties";
- FileInputStream fis = new FileInputStream(dataImportPropertiesPath);
- properties.load(fis);
- }
- }catch(FileNotFoundException fnfe){
- logger.error("Error locating DataImportScheduler dataimport.properties file", fnfe);
- }catch(IOException ioe){
- logger.error("Error reading DataImportScheduler dataimport.properties file", ioe);
- }catch(Exception e){
- logger.error("Error loading DataImportScheduler properties", e);
- }
- }
- public String getProperty(String key){
- return properties.getProperty(key);
- }
- }
Java代码
- package org.apache.solr.handler.dataimport.scheduler;
- import java.util.Calendar;
- import java.util.Date;
- import java.util.Timer;
- import javax.servlet.ServletContext;
- import javax.servlet.ServletContextEvent;
- import javax.servlet.ServletContextListener;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class ApplicationListener implements ServletContextListener {
- private static final Logger logger = LoggerFactory.getLogger(ApplicationListener.class);
- @Override
- public void contextDestroyed(ServletContextEvent servletContextEvent) {
- ServletContext servletContext = servletContextEvent.getServletContext();
- // get our timer from the context
- Timer timer = (Timer)servletContext.getAttribute("timer");
- // cancel all active tasks in the timers queue
- if (timer != null)
- timer.cancel();
- // remove the timer from the context
- servletContext.removeAttribute("timer");
- }
- @Override
- public void contextInitialized(ServletContextEvent servletContextEvent) {
- ServletContext servletContext = servletContextEvent.getServletContext();
- try{
- // create the timer and timer task objects
- Timer timer = new Timer();
- HTTPPostScheduler task = new HTTPPostScheduler(servletContext.getServletContextName(), timer);
- // get our interval from HTTPPostScheduler
- int interval = task.getIntervalInt();
- // get a calendar to set the start time (first run)
- Calendar calendar = Calendar.getInstance();
- // set the first run to now + interval (to avoid fireing while the app/server is starting)
- calendar.add(Calendar.MINUTE, interval);
- Date startTime = calendar.getTime();
- // schedule the task
- timer.scheduleAtFixedRate(task, startTime, 1000 * 60 * interval);
- // save the timer in context
- servletContext.setAttribute("timer", timer);
- } catch (Exception e) {
- if(e.getMessage().endsWith("disabled")){
- logger.info("Schedule disabled");
- }else{
- logger.error("Problem initializing the scheduled task: ", e);
- }
- }
- }
- }
Java代码
- package org.apache.solr.handler.dataimport.scheduler;
- import java.io.IOException;
- import java.net.HttpURLConnection;
- import java.net.MalformedURLException;
- import java.net.URL;
- import java.text.DateFormat;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.Timer;
- import java.util.TimerTask;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class HTTPPostScheduler extends TimerTask {
- private String syncEnabled;
- private String[] syncCores;
- private String server;
- private String port;
- private String webapp;
- private String params;
- private String interval;
- private String cores;
- private SolrDataImportProperties p;
- private boolean singleCore;
- private static final Logger logger = LoggerFactory.getLogger(HTTPPostScheduler.class);
- public HTTPPostScheduler(String webAppName, Timer t) throws Exception{
- //load properties from global dataimport.properties
- p = new SolrDataImportProperties();
- reloadParams();
- fixParams(webAppName);
- if(!syncEnabled.equals("1")) throw new Exception("Schedule disabled");
- if(syncCores == null || (syncCores.length == 1 && syncCores[0].isEmpty())){
- singleCore = true;
- logger.info("<index update process> Single core identified in dataimport.properties");
- }else{
- singleCore = false;
- logger.info("<index update process> Multiple cores identified in dataimport.properties. Sync active for: " + cores);
- }
- }
- private void reloadParams(){
- p.loadProperties(true);
- syncEnabled = p.getProperty(SolrDataImportProperties.SYNC_ENABLED);
- cores = p.getProperty(SolrDataImportProperties.SYNC_CORES);
- server = p.getProperty(SolrDataImportProperties.SERVER);
- port = p.getProperty(SolrDataImportProperties.PORT);
- webapp = p.getProperty(SolrDataImportProperties.WEBAPP);
- params = p.getProperty(SolrDataImportProperties.PARAMS);
- interval = p.getProperty(SolrDataImportProperties.INTERVAL);
- syncCores = cores != null ? cores.split(",") : null;
- }
- private void fixParams(String webAppName){
- if(server == null || server.isEmpty()) server = "localhost";
- if(port == null || port.isEmpty()) port = "8080";
- if(webapp == null || webapp.isEmpty()) webapp = webAppName;
- if(interval == null || interval.isEmpty() || getIntervalInt() <= 0) interval = "30";
- }
- public void run() {
- try{
- // check mandatory params
- if(server.isEmpty() || webapp.isEmpty() || params == null || params.isEmpty()){
- logger.warn("<index update process> Insuficient info provided for data import");
- logger.info("<index update process> Reloading global dataimport.properties");
- reloadParams();
- // single-core
- }else if(singleCore){
- prepUrlSendHttpPost();
- // multi-core
- }else if(syncCores.length == 0 || (syncCores.length == 1 && syncCores[0].isEmpty())){
- logger.warn("<index update process> No cores scheduled for data import");
- logger.info("<index update process> Reloading global dataimport.properties");
- reloadParams();
- }else{
- for(String core : syncCores){
- prepUrlSendHttpPost(core);
- }
- }
- }catch(Exception e){
- logger.error("Failed to prepare for sendHttpPost", e);
- reloadParams();
- }
- }
- private void prepUrlSendHttpPost(){
- String coreUrl = "http://" + server + ":" + port + "/" + webapp + params;
- sendHttpPost(coreUrl, null);
- }
- private void prepUrlSendHttpPost(String coreName){
- String coreUrl = "http://" + server + ":" + port + "/" + webapp + "/" + coreName + params;
- sendHttpPost(coreUrl, coreName);
- }
- private void sendHttpPost(String completeUrl, String coreName){
- DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss SSS");
- Date startTime = new Date();
- // prepare the core var
- String core = coreName == null ? "" : "[" + coreName + "] ";
- logger.info(core + "<index update process> Process started at .............. " + df.format(startTime));
- try{
- URL url = new URL(completeUrl);
- HttpURLConnection conn = (HttpURLConnection)url.openConnection();
- conn.setRequestMethod("POST");
- conn.setRequestProperty("type", "submit");
- conn.setDoOutput(true);
- // Send HTTP POST
- conn.connect();
- logger.info(core + "<index update process> Request method\t\t\t" + conn.getRequestMethod());
- logger.info(core + "<index update process> Succesfully connected to server\t" + server);
- logger.info(core + "<index update process> Using port\t\t\t" + port);
- logger.info(core + "<index update process> Application name\t\t\t" + webapp);
- logger.info(core + "<index update process> URL params\t\t\t" + params);
- logger.info(core + "<index update process> Full URL\t\t\t\t" + conn.getURL());
- logger.info(core + "<index update process> Response message\t\t\t" + conn.getResponseMessage());
- logger.info(core + "<index update process> Response code\t\t\t" + conn.getResponseCode());
- //listen for change in properties file if an error occurs
- if(conn.getResponseCode() != 200){
- reloadParams();
- }
- conn.disconnect();
- logger.info(core + "<index update process> Disconnected from server\t\t" + server);
- Date endTime = new Date();
- logger.info(core + "<index update process> Process ended at ................ " + df.format(endTime));
- }catch(MalformedURLException mue){
- logger.error("Failed to assemble URL for HTTP POST", mue);
- }catch(IOException ioe){
- logger.error("Failed to connect to the specified URL while trying to send HTTP POST", ioe);
- }catch(Exception e){
- logger.error("Failed to send HTTP POST", e);
- }
- }
- public int getIntervalInt() {
- try{
- return Integer.parseInt(interval);
- }catch(NumberFormatException e){
- logger.warn("Unable to convert 'interval' to number. Using default value (30) instead", e);
- return 30; //return default in case of error
- }
- }
- }