1. 执行流程
2. Solr Cloud实现
http://blog.csdn.net/u011462328/article/details/53008344
3. HBase实现
1) 自定义Observer
① 代码
- package cn.bfire.coprocessor;
- import com.typesafe.config.Config;
- import com.typesafe.config.ConfigFactory;
- import org.apache.hadoop.hbase.Cell;
- import org.apache.hadoop.hbase.CellUtil;
- import org.apache.hadoop.hbase.client.Delete;
- import org.apache.hadoop.hbase.client.Durability;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
- import org.apache.hadoop.hbase.coprocessor.ObserverContext;
- import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
- import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.solr.common.SolrInputDocument;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.IOException;
- import java.util.List;
- /**
- * 为hbase提供二级索引的协处理器 Coprocesser
- */
- public class UserDevPiSolrObserver extends BaseRegionObserver {
- //加载配置文件属性
- static Config config = ConfigFactory.load("userdev_pi_solr.properties");
- //log记录
- private static final Logger logger = LoggerFactory.getLogger(UserDevPiSolrObserver.class);
- @Override
- public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
- // 获取行键值
- String rowkey = Bytes.toString(put.getRow());
- //实例化 SolrDoc
- SolrInputDocument doc = new SolrInputDocument();
- //添加Solr uniqueKey值
- doc.addField("rowkey", rowkey);
- // 获取需要索引的列
- String[] hbase_columns = config.getString("hbase_column").split(",");
- // 获取需要索引的列的值并将其添加到SolrDoc
- for (int i = 0; i < hbase_columns.length; i++) {
- String colName = hbase_columns[i];
- String colValue = "";
- // 获取指定列
- List<Cell> cells = put.get("cf".getBytes(), colName.getBytes());
- if (cells != null) {
- try {
- colValue = Bytes.toString(CellUtil.cloneValue(cells.get(0)));
- } catch (Exception ex) {
- logger.error("添加solrdoc错误", ex);
- }
- }
- doc.addField(colName, colValue);
- }
- //发送数据到本地缓存
- SolrIndexTools.addDoc(doc);
- }
- @Override
- public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
- //得到rowkey
- String rowkey = Bytes.toString(delete.getRow());
- //发送数据本地缓存
- String solr_collection = config.getString("solr_collection");
- SolrIndexTools.delDoc(rowkey);
- }
- }
- package cn.bfire.coprocessor;
- import com.typesafe.config.Config;
- import com.typesafe.config.ConfigFactory;
- import org.apache.solr.client.solrj.impl.CloudSolrClient;
- import org.apache.solr.common.SolrInputDocument;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Timer;
- import java.util.TimerTask;
- import java.util.concurrent.Semaphore;
- /**
- * solr索引处理客户端
- * 注意问题,并发提交时,需要线程协作资源
- */
- public class SolrIndexTools {
- //加载配置文件属性
- static Config config = ConfigFactory.load("userdev_pi_solr.properties");
- //log记录
- private static final Logger logger = LoggerFactory.getLogger(SolrIndexTools.class);
- //实例化solr的client
- static CloudSolrClient client = null;
- //添加批处理阈值
- static int add_batchCount = config.getInt("add_batchCount");
- //删除的批处理阈值
- static int del_batchCount = config.getInt("del_batchCount");
- //添加的集合缓冲
- static List<SolrInputDocument> add_docs = new ArrayList<SolrInputDocument>();
- //删除的集合缓冲
- static List<String> del_docs = new ArrayList<String>();
- static final List<String> zkHosts = new ArrayList<String>();
- static {
- logger.info("初始化索引调度........");
- String zk_host = config.getString("zk_host");
- String[] data = zk_host.split(",");
- for (String zkHost : data) {
- zkHosts.add(zkHost);
- }
- client = new CloudSolrClient.Builder().withZkHost(zkHosts).build();
- // 获取Solr collection
- String solr_collection = config.getString("solr_collection");
- client.setDefaultCollection(solr_collection);
- client.setZkClientTimeout(10000);
- client.setZkConnectTimeout(10000);
- //启动定时任务,第一次延迟1s执行,之后每隔指定时间30S执行一次
- Timer timer = new Timer();
- timer.schedule(new SolrCommit(), config.getInt("first_delay") * 1000, config.getInt("interval_commit_index") * 1000);
- }
- public static class SolrCommit extends TimerTask {
- @Override
- public void run() {
- logger.info("索引线程运行中........");
- //只有等于true时才执行下面的提交代码
- try {
- semp.acquire();//获取信号量
- if (add_docs.size() > 0) {
- client.add(add_docs);//添加
- }
- if (del_docs.size() > 0) {
- client.deleteById(del_docs);//删除
- }
- //确保都有数据才提交
- if (add_docs.size() > 0 || del_docs.size() > 0) {
- client.commit();//共用一个提交策略
- //清空缓冲区的添加和删除数据
- add_docs.clear();
- del_docs.clear();
- } else {
- logger.info("暂无索引数据,跳过commit,继续监听......");
- }
- } catch (Exception e) {
- logger.error("间隔提交索引数据出错!", e);
- } finally {
- semp.release();//释放信号量
- }
- }
- }
- /**
- * 添加数据到临时存储中,如果
- * 大于等于batchCount时,就提交一次,
- * 再清空集合,其他情况下走对应的时间间隔提交
- *
- * @param doc 单个document对象
- */
- public static void addDoc(SolrInputDocument doc) {
- commitIndex(add_docs, add_batchCount, doc, true);
- }
- /***
- * 删除的数据添加到临时存储中,如果大于
- * 对应的批处理就直接提交,再清空集合,
- * 其他情况下走对应的时间间隔提交
- *
- * @param rowkey 删除的rowkey
- */
- public static void delDoc(String rowkey) {
- commitIndex(del_docs, del_batchCount, rowkey, false);
- }
- // 任何时候,保证只能有一个线程在提交索引,并清空集合
- final static Semaphore semp = new Semaphore(1);
- /***
- * 此方法需要加锁,并且提交索引时,与时间间隔提交是互斥的
- * 百分百确保不会丢失数据
- *
- * @param datas 用来提交的数据集合
- * @param count 对应的集合提交数量
- * @param doc 添加的单个doc
- * @param isAdd 是否为添加动作
- */
- public synchronized static void commitIndex(List datas, int count, Object doc, boolean isAdd) {
- try {
- semp.acquire();//获取信号量
- if (datas.size() >= count) {
- if (isAdd) {
- client.add(datas);//添加数据到服务端中
- } else {
- client.deleteById(datas);//删除数据
- }
- client.commit();//提交数据
- datas.clear();//清空临时集合
- }
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("按阈值" + (isAdd == true ? "添加" : "删除") + "操作索引数据出错!", e);
- } finally {
- datas.add(doc);//添加单条数据
- semp.release();//释放信号量
- }
- }
- }
- <pre code_snippet_id="1962705" snippet_file_name="blog_20161102_1_8333418" style="font-family: Consolas; font-size: 11.3pt; background-color: rgb(255, 255, 255);">pom文件配置</pre>
- <pre style="font-family:Consolas; font-size:11.3pt; background-color:rgb(255,255,255)"><pre code_snippet_id="1962705" snippet_file_name="blog_20161227_4_7977704" name="code" class="html"><?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>cn.gcks</groupId>
- <artifactId>hbase</artifactId>
- <version>1.0-SNAPSHOT</version>
- <dependencies>
- <!-- https://mvnrepository.com/artifact/org.apache.solr/solr-solrj -->
- <dependency>
- <groupId>org.apache.solr</groupId>
- <artifactId>solr-solrj</artifactId>
- <version>6.2.1</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>1.1.2</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>1.1.2</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- https://mvnrepository.com/artifact/com.typesafe/config -->
- <dependency>
- <groupId>com.typesafe</groupId>
- <artifactId>config</artifactId>
- <version>1.3.1</version>
- </dependency>
- </dependencies>
- </project></pre></pre>
- <pre style="font-family:Consolas; font-size:11.3pt; background-color:rgb(255,255,255)"><p>
- </p><p><span style="font-weight:bold; color:rgb(0,128,0); font-size:11.3pt; background-color:rgb(228,228,255)">userdev_pi_solr.properties</span></p><p></p><pre code_snippet_id="1962705" snippet_file_name="blog_20161227_5_7563783" name="code" class="plain">#需要建索引的列
- hbase_column=oid,pi_id,statdate
- # solr的collection名称
- solr_collection=userdev_pi_day
- #定义solr的url地址,如果是cloud模式,可以配置多个以逗号分隔
- zk_host=1.1.1.1:2181,1.1.1.2:2181,1.1.1.3:2181
- #调度第一次开始时,延迟多少秒执行
- first_delay=10
- #后台线程多久提交一次索引,单位秒
- interval_commit_index=30
- #添加索引的批处理数量
- add_batchCount=10000
- #删除索引的批处理数量
- del_batchCount=2000</pre><br><br><p></p><p></p><p>② 打包代码并上传到<span style="font-family:Calibri">hdfs</span><span style="font-family:宋体">目录</span></p><p>③ 修改<span style="font-family:Calibri">HBase</span><span style="font-family:宋体">表(设置自定义</span><span style="font-family:Calibri">observer</span><span style="font-family:宋体">所在</span><span style="font-family:Calibri">hdfs</span><span style="font-family:宋体">位置,以及指定自定义</span><span style="font-family:Calibri">Observer</span><span style="font-family:宋体">全类名)</span></p><p>alter 'radius:raduserlog', 'coprocessor' => 'hdfs://<span style="color:rgb(0,112,192)">/apps/hbase/jars/hbase_solr.jar</span>|cn.bfire.coprocessor.UserDevPiSolrObserver|'</p><p>2) 数据查询代码</p><p></p><pre code_snippet_id="1962705" snippet_file_name="blog_20161102_4_5934630" name="code" class="java">package cn.bfire.solr;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.hbase.Cell;
- import org.apache.hadoop.hbase.CellUtil;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.*;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.solr.client.solrj.SolrQuery;
- import org.apache.solr.client.solrj.impl.CloudSolrClient;
- import org.apache.solr.client.solrj.response.QueryResponse;
- import org.apache.solr.common.SolrDocument;
- import org.apache.solr.common.SolrDocumentList;
- import org.apache.solr.common.SolrInputDocument;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.List;
- public class SolrCloudTest {
- public static final Log LOG = LogFactory.getLog(SolrCloudTest.class);
- private static CloudSolrClient cloudSolrClient;
- private static Connection connection;
- private static Table table;
- private static Get get;
- private static String defaultCollection = "userdev_pi_day";
- private static String hbaseTable = "<span style="font-family: Arial, Helvetica, sans-serif;">userdev_pi_day</span><span style="font-family: Arial, Helvetica, sans-serif;">";</span>
- List<Get> list = new ArrayList<Get>();
- static {
- final List<String> zkHosts = new ArrayList<String>();
- zkHosts.add("1.1.1.1:2181");
- zkHosts.add("1.1.1.2:2181");
- zkHosts.add("1.1.1.3:2181");
- cloudSolrClient = new CloudSolrClient.Builder().withZkHost(zkHosts).build();
- final int zkClientTimeout = 10000;
- final int zkConnectTimeout = 10000;
- cloudSolrClient.setDefaultCollection(defaultCollection);
- cloudSolrClient.setZkClientTimeout(zkClientTimeout);
- cloudSolrClient.setZkConnectTimeout(zkConnectTimeout);
- try {
- connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
- table = connection.getTable(TableName.valueOf(hbaseTable));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- private void addIndex(CloudSolrClient cloudSolrClient) throws Exception {
- Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
- for (int i = 0; i <= 100; i++) {
- SolrInputDocument doc = new SolrInputDocument();
- String key = "";
- key = String.valueOf(i);
- doc.addField("rowkey", key);
- doc.addField("usermac", key + "usermac");
- doc.addField("userid", key + "userid");
- doc.addField("usertype", key + "usertype");
- doc.addField("city_id", key + "city_id");
- docs.add(doc);
- }
- LOG.info("docs info:" + docs + "\n");
- cloudSolrClient.add(docs);
- cloudSolrClient.commit();
- }
- public void search(CloudSolrClient cloudSolrClient, String Str) throws Exception {
- SolrQuery query = new SolrQuery();
- query.setRows(100);
- query.setQuery(Str);
- LOG.info("query string: " + Str);
- QueryResponse response = cloudSolrClient.query(query);
- SolrDocumentList docs = response.getResults();
- System.out.println("文档个数:" + docs.getNumFound()); //数据总条数也可轻易获取
- System.out.println("查询时间:" + response.getQTime());
- System.out.println("查询总时间:" + response.getElapsedTime());
- for (SolrDocument doc : docs) {
- String rowkey = (String) doc.getFieldValue("rowkey");
- get = new Get(Bytes.toBytes(rowkey));
- list.add(get);
- }
- Result[] res = table.get(list);
- for (Result rs : res) {
- Cell[] cells = rs.rawCells();
- for (Cell cell : cells) {
- System.out.println("============");
- System.out.println(new String(CellUtil.cloneRow(cell)));
- System.out.println(new String(CellUtil.cloneFamily(cell)));
- System.out.println(new String(CellUtil.cloneQualifier(cell)));
- System.out.println(new String(CellUtil.cloneValue(cell)));
- System.out.println("============");
- break;
- }
- }
- table.close();
- }
- public static void main(String[] args) throws Exception {
- cloudSolrClient.connect();
- SolrCloudTest solrt = new SolrCloudTest();
- // solrt.addIndex(cloudSolrClient);
- solrt.search(cloudSolrClient, "userid:11111");
- cloudSolrClient.close();
- }
- }
- </pre><br><br><p></p><p></p><pre></pre><pre></pre></pre>
- <pre></pre>
- <link rel="stylesheet" href="http://static.blog.csdn.net/public/res-min/markdown_views.css?v=2.0">
HBase + Solr Cloud实现HBase二级索引的更多相关文章
-
使用ElasticSearch赋能HBase二级索引 | 实践一年后总结
前言:还记得那是2018年的一个夏天,天气特别热,我一边擦汗一边听领导大刀阔斧的讲述自己未来的改革蓝图.会议开完了,核心思想就是:我们要搞一个数据大池子,要把公司能灌的数据都灌入这个大池子,然后让别人 ...
-
「从零单排HBase 12」HBase二级索引Phoenix使用与最佳实践
Phoenix是构建在HBase上的一个SQL层,能让我们用标准的JDBC APIs对HBase数据进行增删改查,构建二级索引.当然,开源产品嘛,自然需要注意“避坑”啦,阿丸会把使用方式和最佳实践都告 ...
-
基于Solr实现HBase的二级索引
文章来源:http://www.open-open.com/lib/view/open1421501717312.html 实现目的: 由于hbase基于行健有序存储,在查询时使用行健十分高效,然后想 ...
-
hbase基于solr配置二级索引
一.概述 Hbase适用于大表的存储,通过单一的RowKey查询虽然能快速查询,但是对于复杂查询,尤其分页.查询总数等,实现方案浪费计算资源,所以可以针对hbase数据创建二级索引(Hbase Sec ...
-
CDH使用Solr实现HBase二级索引
一.为什么要使用Solr做二级索引二.实时查询方案三.部署流程3.1 安装HBase.Solr3.2 增加HBase复制功能3.3创建相应的 SolrCloud 集合3.4 创建 Lily HBa ...
-
HBase协处理器同步二级索引到Solr(续)
一. 已知的问题和不足二.解决思路三.代码3.1 读取config文件内容3.2 封装SolrServer的获取方式3.3 编写提交数据到Solr的代码3.4 拦截HBase的Put和Delete操作 ...
-
HBase协处理器同步二级索引到Solr
一. 背景二. 什么是HBase的协处理器三. HBase协处理器同步数据到Solr四. 添加协处理器五. 测试六. 协处理器动态加载 一. 背景 在实际生产中,HBase往往不能满足多维度分析,我们 ...
-
Lily HBase Indexer同步HBase二级索引到Solr丢失数据的问题分析
一.问题描述二.分析步骤2.1 查看日志2.2 修改Solr的硬提交2.3 寻求*帮助2.4 修改了read-row="never"后,丢失部分字段2.5 ...
-
HBase协处理器的使用(添加Solr二级索引)
给HBase添加一二级索引,HBase协处理器结合solr 代码如下 package com.hbase.coprocessor; import java.io.IOException; import ...
随机推荐
-
nginx安装及配置为简单的文件服务器
centos 6.5 直接yum安装即可 yum install nginx -y 配置文件位于:/etc/nginx/nginx.conf,里面可以修改处理器数量.日志路径.pid文件路径等,默认的 ...
-
Irrlicht 鬼火
1.下载引擎 2.引入头文件 在VS2010下新建项目,项目->属性->配置属性->VC++目录 在包含目录中:添加 引擎安装目录\include\ 在库目录中:添加 引擎安装目录\ ...
-
CPU informition
tar jxvf util-linux-ng-2.18.bz2cd util-linux-ng-2.18/./configure --enable-arch --enable-partx --enab ...
-
编写who命令:文件操作,缓冲区与联机帮助
最近阅读UULP(Understanding Unix/Linux Programming),按照书中介绍对Unix/Linux系统编程进行学习梳理,总结如下. 1. who命令能做什么 who命令用 ...
-
微信开发第3章 通过accesstoken获取用户分组
上一章我们获取到了access_token,那么我们可以试着拿token获取用户粉丝分组,调用接口地址为: http请求方式: GET(请使用https协议) https://api.weixin.q ...
-
Java字符串之String与StringBuilder
String与SringBuiler的一些比较 在Java中,我们会大量使用字符串,但是String究竟是怎样工作的我们可能没有想过太多,其实在String类中,每一个看起来会修改String值的 ...
-
JS中this到底指向谁?
关于this的指向,是一个令人很头疼的问题.但是,你运气好,碰到了我.老夫这儿有本祖传秘籍,看懂这个,妈妈再也不用担心你的this指向不对啦! 归根结底,this指向就一句话:谁最终调用函数,this ...
-
Eclipse 扩展activiti-desinger 安装
activiti-desinger 工作流画图工具分为在线安装.离线安装两种方式:下图提供当前所用eclipse版本信息 1.1 在线安装 打开Eclipse -> Help -& ...
-
基于 IJKPlayer-concat 协议的视频无缝拼接技术实现
一.前言 Hi,大家好,我是承香墨影! 开门见山,开篇名义.今天来聊聊如何将多段视频,拼接成一个完整而连续的视频,然后无缝进行播放. 这样的需求应该不算偏门吧? 最简单的就是一些视频 App,会将大段 ...
-
Android应用程序类型和进程状态
来自<Android4高级编程> Android应用程序不能控制自己的生命周期,应用程序组件(Activity.Service等其他组件)必须监听应用程序状态的变化并做出适当的反应,而且特 ...