Solr 学习(3) —-Solr 数据导入 DIH简单使用

时间:2022-08-17 07:58:22

转载自 http://martin3000.iteye.com/blog/1328833 
 

使用DataImportHandler进行简单数据导入还是比较有效的,特别是DIH中针对简单的数据库表,可以把完全导入和增量导入合并成一个语句,非常方便。我的使用方式如下所示

1。配置schema

 

 

Xml代码    Solr 学习(3) —-Solr 数据导入 DIH简单使用
  1. <requestHandler name="/dataimport" class="org.apache.solr.handler.dataimport.DataImportHandler">  
  2.     <lst name="defaults">  
  3.       <str name="config">/home/tomcat/bin/solr/conf/data-config.xml</str>  
  4.     </lst>  
  5.   </requestHandler>  
 

 

2.添加data-config文件

data-config.xml

 

 

Xml代码    Solr 学习(3) —-Solr 数据导入 DIH简单使用
  1. <dataConfig>  
  2.   <dataSource type="JdbcDataSource"   
  3.               driver="com.mysql.jdbc.Driver"  
  4.               url="jdbc:mysql://127.0.0.1/db"   
  5.               user="root"   
  6.               password="pass"  
  7.               batchSize="-1"/>  
  8.   <document>  
  9.     <entity name="id" pk="id"    
  10.             query="select id,username,text,cat  from hot where '${dataimporter.request.clean}' != 'false' OR timestamp > '${dataimporter.last_index_time}'">  
  11.          <field column="id" name="id"/>  
  12.          <field column="text" name="text"/>  
  13.          <field column="username" name="username_s"/>  
  14.          <field column="cat" name="cat_t"/>  
  15.     </entity>  
  16.   </document>  
  17. </dataConfig>  

 

3.让DIH周期性的运行

 修改dataimport.properties文件,这个是自动生成的,同在solr/conf下,添加参数

interval 间隔时间 单位 分钟

syncEnabled=1 打开周期运行

params 其实就是具体调用的url,周期运行就是周期性的访问一个url

 

 

Java代码    Solr 学习(3) —-Solr 数据导入 DIH简单使用
  1. #Wed Dec 28 09:29:42 UTC 2011  
  2. port=8983  
  3. interval=5  
  4. last_index_time=2011-12-28 09\:29\:26  
  5. syncEnabled=1  
  6. webapp=solr  
  7. id.last_index_time=2011-12-28 09\:29\:26  
  8. server=127.0.0.1  
  9. params=/select?qt\=/dataimport&command\=full-import&clean\=false&commit\=true&optimize\=false  
 

 

到此还并不能周期运行,在solr的wiki中有一段实现这个功能的代码,但并没有加入到solr的发行包中,于是我们需要重新编译这段代码,打包放到webapp/solr/WEB-INF/lib中才行

 

Xml代码    Solr 学习(3) —-Solr 数据导入 DIH简单使用
  1. <web-app>  
  2.    <listener>  
  3.        <listener-class>org.apache.solr.handler.dataimport.scheduler.ApplicationListener</listener-class>  
  4.   </listener>  
  5.   ...  
  6. </web-app>  
 

 

 

以下是solr wiki上周期运行的代码,我已打好包,放在附件里。

 

 

Java代码    Solr 学习(3) —-Solr 数据导入 DIH简单使用
  1. package org.apache.solr.handler.dataimport.scheduler;  
  2.   
  3. import java.io.FileInputStream;  
  4. import java.io.FileNotFoundException;  
  5. import java.io.IOException;  
  6. import java.util.Properties;  
  7.   
  8. import org.apache.solr.core.SolrResourceLoader;  
  9. import org.slf4j.Logger;  
  10. import org.slf4j.LoggerFactory;  
  11.   
  12. public class SolrDataImportProperties {  
  13.         private Properties properties;  
  14.   
  15.         public static final String SYNC_ENABLED         = "syncEnabled";  
  16.         public static final String SYNC_CORES           = "syncCores";  
  17.         public static final String SERVER               = "server";  
  18.         public static final String PORT                 = "port";  
  19.         public static final String WEBAPP               = "webapp";  
  20.         public static final String PARAMS               = "params";  
  21.         public static final String INTERVAL             = "interval";  
  22.   
  23.         private static final Logger logger = LoggerFactory.getLogger(SolrDataImportProperties.class);  
  24.   
  25.         public SolrDataImportProperties(){  
  26. //              loadProperties(true);  
  27.         }  
  28.   
  29.         public void loadProperties(boolean force){  
  30.                 try{  
  31.                         SolrResourceLoader loader = new SolrResourceLoader(null);  
  32.                         logger.info("Instance dir = " + loader.getInstanceDir());  
  33.   
  34.                         String configDir = loader.getConfigDir();  
  35.                         configDir = SolrResourceLoader.normalizeDir(configDir);  
  36.                         if(force || properties == null){  
  37.                                 properties = new Properties();  
  38.   
  39.                                 String dataImportPropertiesPath = configDir + "\\dataimport.properties";  
  40.   
  41.                                 FileInputStream fis = new FileInputStream(dataImportPropertiesPath);  
  42.                                 properties.load(fis);  
  43.                         }  
  44.                 }catch(FileNotFoundException fnfe){  
  45.                         logger.error("Error locating DataImportScheduler dataimport.properties file", fnfe);  
  46.                 }catch(IOException ioe){  
  47.                         logger.error("Error reading DataImportScheduler dataimport.properties file", ioe);  
  48.                 }catch(Exception e){  
  49.                         logger.error("Error loading DataImportScheduler properties", e);  
  50.                 }  
  51.         }  
  52.   
  53.         public String getProperty(String key){  
  54.                 return properties.getProperty(key);  
  55.         }  
  56. }  
 

 

 

 

Java代码    Solr 学习(3) —-Solr 数据导入 DIH简单使用
  1. package org.apache.solr.handler.dataimport.scheduler;  
  2.   
  3. import java.util.Calendar;  
  4. import java.util.Date;  
  5. import java.util.Timer;  
  6.   
  7. import javax.servlet.ServletContext;  
  8. import javax.servlet.ServletContextEvent;  
  9. import javax.servlet.ServletContextListener;  
  10.   
  11. import org.slf4j.Logger;  
  12. import org.slf4j.LoggerFactory;  
  13.   
  14. public class ApplicationListener implements ServletContextListener {  
  15.   
  16.         private static final Logger logger = LoggerFactory.getLogger(ApplicationListener.class);  
  17.   
  18.         @Override  
  19.         public void contextDestroyed(ServletContextEvent servletContextEvent) {  
  20.                 ServletContext servletContext = servletContextEvent.getServletContext();  
  21.   
  22.                 // get our timer from the context  
  23.                 Timer timer = (Timer)servletContext.getAttribute("timer");  
  24.   
  25.                 // cancel all active tasks in the timers queue  
  26.                 if (timer != null)  
  27.                         timer.cancel();  
  28.   
  29.                 // remove the timer from the context  
  30.                 servletContext.removeAttribute("timer");  
  31.   
  32.         }  
  33.   
  34.         @Override  
  35.         public void contextInitialized(ServletContextEvent servletContextEvent) {  
  36.                 ServletContext servletContext = servletContextEvent.getServletContext();  
  37.                 try{  
  38.                         // create the timer and timer task objects  
  39.                         Timer timer = new Timer();  
  40.                         HTTPPostScheduler task = new HTTPPostScheduler(servletContext.getServletContextName(), timer);  
  41.   
  42.                         // get our interval from HTTPPostScheduler  
  43.                         int interval = task.getIntervalInt();  
  44.   
  45.                         // get a calendar to set the start time (first run)  
  46.                         Calendar calendar = Calendar.getInstance();  
  47.   
  48.                         // set the first run to now + interval (to avoid fireing while the app/server is starting)  
  49.                         calendar.add(Calendar.MINUTE, interval);  
  50.                         Date startTime = calendar.getTime();  
  51.   
  52.                         // schedule the task  
  53.                         timer.scheduleAtFixedRate(task, startTime, 1000 * 60 * interval);  
  54.   
  55.                         // save the timer in context  
  56.                         servletContext.setAttribute("timer", timer);  
  57.   
  58.                 } catch (Exception e) {  
  59.                         if(e.getMessage().endsWith("disabled")){  
  60.                                 logger.info("Schedule disabled");  
  61.                         }else{  
  62.                                 logger.error("Problem initializing the scheduled task: ", e);  
  63.                         }  
  64.                 }  
  65.         }  
  66.   
  67. }  
 

 

Java代码    Solr 学习(3) —-Solr 数据导入 DIH简单使用
  1. package org.apache.solr.handler.dataimport.scheduler;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.HttpURLConnection;  
  5. import java.net.MalformedURLException;  
  6. import java.net.URL;  
  7. import java.text.DateFormat;  
  8. import java.text.SimpleDateFormat;  
  9. import java.util.Date;  
  10. import java.util.Timer;  
  11. import java.util.TimerTask;  
  12.   
  13. import org.slf4j.Logger;  
  14. import org.slf4j.LoggerFactory;  
  15.   
  16.   
  17. public class HTTPPostScheduler extends TimerTask {  
  18.         private String syncEnabled;  
  19.         private String[] syncCores;  
  20.         private String server;  
  21.         private String port;  
  22.         private String webapp;  
  23.         private String params;  
  24.         private String interval;  
  25.         private String cores;  
  26.         private SolrDataImportProperties p;  
  27.         private boolean singleCore;  
  28.   
  29.         private static final Logger logger = LoggerFactory.getLogger(HTTPPostScheduler.class);  
  30.   
  31.         public HTTPPostScheduler(String webAppName, Timer t) throws Exception{  
  32.                 //load properties from global dataimport.properties  
  33.                 p = new SolrDataImportProperties();  
  34.                 reloadParams();  
  35.                 fixParams(webAppName);  
  36.   
  37.                 if(!syncEnabled.equals("1")) throw new Exception("Schedule disabled");  
  38.   
  39.                 if(syncCores == null || (syncCores.length == 1 && syncCores[0].isEmpty())){  
  40.                         singleCore = true;  
  41.                         logger.info("<index update process> Single core identified in dataimport.properties");  
  42.                 }else{  
  43.                         singleCore = false;  
  44.                         logger.info("<index update process> Multiple cores identified in dataimport.properties. Sync active for: " + cores);  
  45.                 }  
  46.         }  
  47.   
  48.         private void reloadParams(){  
  49.                 p.loadProperties(true);  
  50.                 syncEnabled = p.getProperty(SolrDataImportProperties.SYNC_ENABLED);  
  51.                 cores           = p.getProperty(SolrDataImportProperties.SYNC_CORES);  
  52.                 server          = p.getProperty(SolrDataImportProperties.SERVER);  
  53.                 port            = p.getProperty(SolrDataImportProperties.PORT);  
  54.                 webapp          = p.getProperty(SolrDataImportProperties.WEBAPP);  
  55.                 params          = p.getProperty(SolrDataImportProperties.PARAMS);  
  56.                 interval        = p.getProperty(SolrDataImportProperties.INTERVAL);  
  57.                 syncCores       = cores != null ? cores.split(",") : null;  
  58.         }  
  59.   
  60.         private void fixParams(String webAppName){  
  61.                 if(server == null || server.isEmpty())  server = "localhost";  
  62.                 if(port == null || port.isEmpty())              port = "8080";  
  63.                 if(webapp == null || webapp.isEmpty())  webapp = webAppName;  
  64.                 if(interval == null || interval.isEmpty() || getIntervalInt() <= 0) interval = "30";  
  65.         }  
  66.   
  67.         public void run() {  
  68.                 try{  
  69.                         // check mandatory params  
  70.                         if(server.isEmpty() || webapp.isEmpty() || params == null || params.isEmpty()){  
  71.                                 logger.warn("<index update process> Insuficient info provided for data import");  
  72.                                 logger.info("<index update process> Reloading global dataimport.properties");  
  73.                                 reloadParams();  
  74.   
  75.                         // single-core  
  76.                         }else if(singleCore){  
  77.                                 prepUrlSendHttpPost();  
  78.   
  79.                         // multi-core  
  80.                         }else if(syncCores.length == 0 || (syncCores.length == 1 && syncCores[0].isEmpty())){  
  81.                                 logger.warn("<index update process> No cores scheduled for data import");  
  82.                                 logger.info("<index update process> Reloading global dataimport.properties");  
  83.                                 reloadParams();  
  84.   
  85.                         }else{  
  86.                                 for(String core : syncCores){  
  87.                                         prepUrlSendHttpPost(core);  
  88.                                 }  
  89.                         }  
  90.                 }catch(Exception e){  
  91.                         logger.error("Failed to prepare for sendHttpPost", e);  
  92.                         reloadParams();  
  93.                 }  
  94.         }  
  95.   
  96.   
  97.         private void prepUrlSendHttpPost(){  
  98.                 String coreUrl = "http://" + server + ":" + port + "/" + webapp + params;  
  99.                 sendHttpPost(coreUrl, null);  
  100.         }  
  101.   
  102.         private void prepUrlSendHttpPost(String coreName){  
  103.                 String coreUrl = "http://" + server + ":" + port + "/" + webapp + "/" + coreName + params;  
  104.                 sendHttpPost(coreUrl, coreName);  
  105.         }  
  106.   
  107.   
  108.         private void sendHttpPost(String completeUrl, String coreName){  
  109.                 DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss SSS");  
  110.                 Date startTime = new Date();  
  111.   
  112.                 // prepare the core var  
  113.                 String core = coreName == null ? "" : "[" + coreName + "] ";  
  114.   
  115.                 logger.info(core + "<index update process> Process started at .............. " + df.format(startTime));  
  116.   
  117.                 try{  
  118.   
  119.                     URL url = new URL(completeUrl);  
  120.                     HttpURLConnection conn = (HttpURLConnection)url.openConnection();  
  121.   
  122.                     conn.setRequestMethod("POST");  
  123.                     conn.setRequestProperty("type", "submit");  
  124.                     conn.setDoOutput(true);  
  125.   
  126.                         // Send HTTP POST  
  127.                     conn.connect();  
  128.   
  129.                     logger.info(core + "<index update process> Request method\t\t\t" + conn.getRequestMethod());  
  130.                     logger.info(core + "<index update process> Succesfully connected to server\t" + server);  
  131.                     logger.info(core + "<index update process> Using port\t\t\t" + port);  
  132.                     logger.info(core + "<index update process> Application name\t\t\t" + webapp);  
  133.                     logger.info(core + "<index update process> URL params\t\t\t" + params);  
  134.                     logger.info(core + "<index update process> Full URL\t\t\t\t" + conn.getURL());  
  135.                     logger.info(core + "<index update process> Response message\t\t\t" + conn.getResponseMessage());  
  136.                     logger.info(core + "<index update process> Response code\t\t\t" + conn.getResponseCode());  
  137.   
  138.                     //listen for change in properties file if an error occurs  
  139.                     if(conn.getResponseCode() != 200){  
  140.                         reloadParams();  
  141.                     }  
  142.   
  143.                     conn.disconnect();  
  144.                     logger.info(core + "<index update process> Disconnected from server\t\t" + server);  
  145.                     Date endTime = new Date();  
  146.                     logger.info(core + "<index update process> Process ended at ................ " + df.format(endTime));  
  147.                 }catch(MalformedURLException mue){  
  148.                         logger.error("Failed to assemble URL for HTTP POST", mue);  
  149.                 }catch(IOException ioe){  
  150.                         logger.error("Failed to connect to the specified URL while trying to send HTTP POST", ioe);  
  151.                 }catch(Exception e){  
  152.                         logger.error("Failed to send HTTP POST", e);  
  153.                 }  
  154.         }  
  155.   
  156.         public int getIntervalInt() {  
  157.                 try{  
  158.                         return Integer.parseInt(interval);  
  159.                 }catch(NumberFormatException e){  
  160.                         logger.warn("Unable to convert 'interval' to number. Using default value (30) instead", e);  
  161.                         return 30; //return default in case of error  
  162.                 }  
  163.         }  
  164. }