上一章完成了c#访问hbase的sdk封装,接下来以一个具体Demo对sdk进行测试验证。场景:每5秒抓取指定股票列表的实时价格波动行情,数据下载后,一方面实时刷新UI界面,另一方面将数据放入到在内存中模拟的MQ (实际生产情况,可用kafka等集群代替)->存入HBase数据库。提供按指定时间范围股票价格数据查询。
目录:
- 示例说明
- 示例效果图
- rest server运行状态检查
- 获取股票实时数据代码
- 数据持续化至Hbase代码
- 从HBase读取数据代码
示例说明:
- 在Hbase 中创建两个表,分别为:
- StocksInfo (股票信息表,用来存储设置的股票代码、股票名称)
- StockRealInfo (股票实时行情数据,包含开盘价、当前价、最高价、最低价、五档竞买、卖单价和数量、成交单价、数量、涨跌幅等)
- 每5秒钟抓取StocksInfo表中所有股票的数据,自动更新UI,持续化到HBase;支持增加、删除要监控的股票列表。
- 提供按指定时间范围从hbase中查询历史数据
示例效果图:
- 历史数据查询:
rest server运行状态检查:
- 在 HDP2.4安装(五):集群及组件安装 章节,Hbase 主机安装在 hdp4 192.168.2.21 上,使用xshell 工具连接到hbase master(hdp4)
- 查看8080端口是否正常,也可从 ambari UI 界面查看HBase状态,如图:
获取股票实时数据代码:
-
好多的网站提供股票实时交易数据的下载,我选择的是从 hq.sina 下载,注意抓取数据的频度不要设置的太高,否则你的IP可能会被封掉,代码如下:
public class SnatchFormSina
{
#region SnatchFormSina HttpClient client; private const string dataurl = "http://hq.sinajs.cn/list={0}"; public SnatchFormSina()
{
this.client = new HttpClient();
} /// <summary>
///
/// </summary>
public static SnatchFormSina Current
{
get {
return new SnatchFormSina();
}
} #endregion #region GetCurrentInfos /// <summary>
///
/// </summary>
/// <param name="stockIDs"></param>
/// <returns></returns>
public async Task<List<StockRealInfo>> GetCurrentInfosAsync(List<string> stockIDs)
{
List<StockRealInfo> list = new List<StockRealInfo>();
string dataUrl = this.ParseStockIDs(stockIDs);
dataUrl = dataUrl.Substring(, dataUrl.Length - ); string realInfo = await this.client.GetStringAsync(dataUrl);
string[] infos = realInfo.Split('\n'); StockRealInfo stockInfo;
foreach (string info in infos)
{
if (string.IsNullOrEmpty(info))
continue; stockInfo = new StockRealInfo(info);
stockInfo.ID = SimulatorCache.StockAccount[stockInfo.Name];
SimulatorCache.StockInfos[stockInfo.ID] = stockInfo;
list.Add(stockInfo);
} return list;
} #endregion #region ParseStockIDs /// <summary>
///
/// </summary>
/// <param name="stockIDs"></param>
/// <returns></returns>
private string ParseStockIDs(List<string> stockIDs)
{
StringBuilder sb = new StringBuilder();
foreach(string id in stockIDs)
{
if (id.Substring(, ) == "")//上海是600打头
{
sb.Append(string.Format("sh{0},", id));
}
else if (id.Substring(, ) == "")//上海基金
{
sb.Append(string.Format("sh{0},", id));
}
else //if (stockIDs.Substring(0, 2) == "00")//深圳
{
sb.Append(string.Format("sz{0},", id));
}
} sb[sb.Length - ].ToString().Replace(",", ""); return string.Format(dataurl, sb.ToString());
} #endregion #region ValiateStockID /// <summary>
///
/// </summary>
/// <param name="stockIDs"></param>
/// <returns></returns>
public async Task<string> ValiateStockID(string stockID)
{
string name = string.Empty;
string dataUrl = this.ParseStockIDs(new List<string> { stockID });
dataUrl = dataUrl.Substring(, dataUrl.Length - ); string realInfo = await this.client.GetStringAsync(dataUrl);
string[] infos = realInfo.Split('\n'); StockRealInfo stockInfo;
foreach (string info in infos)
{
if (string.IsNullOrEmpty(info))
continue; stockInfo = new StockRealInfo(info);
name = stockInfo.Name;
} return name;
} #endregion
}
数据持续化到Hbase代码示例:
- 代码中Utils.HBaseClient 是在一个工具类里面创建一个HBaseClient实例
public class StockRealWriter
{
#region StockRealWriter Queue<StockRealInfo> queue = new Queue<StockRealInfo>(); // use multithread write
Thread writerThread;
bool threadRunning = true; const string HBASESTOCKTBLNAME = "StockRealInfo"; public StockRealWriter()
{
// Start a thread for writting to HBase
Task task = new Task(WriterThreadFunction);
task.Start();
} ~StockRealWriter()
{
threadRunning = false;
} #endregion #region WriterThreadFunction /// <summary>
/// WriterThreadFunction
/// </summary>
public void WriterThreadFunction()
{
while (threadRunning)
{
if (queue.Count > )
{
lock (queue)
{
CellSet set = new CellSet();
do
{
StockRealInfo stock = queue.Dequeue();
this.CreateStockByRealInfos(set, stock);
} while (queue.Count > ); Utils.HBaseClient.StoreCellsAsync(HBASESTOCKTBLNAME, set);
}
} Thread.Sleep();
}
} #endregion #region CreateStockByRealInfos /// <summary>
///
/// </summary>
/// <param name="set"></param>
/// <param name="info"></param>
private void CreateStockByRealInfos(CellSet set, StockRealInfo info)
{
string key = string.Format("{0}_{1}_{2}", info.ID, info.Date, info.Time);
var row = new CellSet.Row { key = Encoding.UTF8.GetBytes(key) }; var value = new Cell { column = Encoding.UTF8.GetBytes("d:ID"), data = Encoding.UTF8.GetBytes(info.ID) };
row.values.Add(value); value = new Cell { column = Encoding.UTF8.GetBytes("d:Name"), data = Encoding.UTF8.GetBytes(info.Name) };
row.values.Add(value); //今日开盘价
value = new Cell { column = Encoding.UTF8.GetBytes("d:TodayOpen"), data = Encoding.UTF8.GetBytes(info.TodayOpen) };
row.values.Add(value); //昨日收盘价
value = new Cell { column = Encoding.UTF8.GetBytes("d:YesterdayClose"), data = Encoding.UTF8.GetBytes(info.YesterdayClose) };
row.values.Add(value); //当前价格
value = new Cell { column = Encoding.UTF8.GetBytes("d:Current"), data = Encoding.UTF8.GetBytes(info.Current) };
row.values.Add(value); //今日最高价
value = new Cell { column = Encoding.UTF8.GetBytes("d:High"), data = Encoding.UTF8.GetBytes(info.High) };
row.values.Add(value); //今日最低价
value = new Cell { column = Encoding.UTF8.GetBytes("d:Low"), data = Encoding.UTF8.GetBytes(info.Low) };
row.values.Add(value); //竟买价 买1
value = new Cell { column = Encoding.UTF8.GetBytes("d:Buy"), data = Encoding.UTF8.GetBytes(info.Buy) };
row.values.Add(value); //竟卖价 卖1
value = new Cell { column = Encoding.UTF8.GetBytes("d:Sell"), data = Encoding.UTF8.GetBytes(info.Sell) };
row.values.Add(value); // 成交数 单位股数 通常除于100成为手
value = new Cell { column = Encoding.UTF8.GetBytes("d:VolAmount"), data = Encoding.UTF8.GetBytes(info.VolAmount) };
row.values.Add(value); // 成交多少钱,单位元
value = new Cell { column = Encoding.UTF8.GetBytes("d:VolMoney"), data = Encoding.UTF8.GetBytes(info.VolMoney) };
row.values.Add(value); // 日期
value = new Cell { column = Encoding.UTF8.GetBytes("d:Date"), data = Encoding.UTF8.GetBytes(info.Date) };
row.values.Add(value); // 时间
value = new Cell { column = Encoding.UTF8.GetBytes("d:Time"), data = Encoding.UTF8.GetBytes(info.Time) };
row.values.Add(value); // 差额
value = new Cell { column = Encoding.UTF8.GetBytes("d:Diff"), data = Encoding.UTF8.GetBytes(info.Diff) };
row.values.Add(value); // 百分比
value = new Cell { column = Encoding.UTF8.GetBytes("d:DiffPrec"), data = Encoding.UTF8.GetBytes(info.DiffPrec) };
row.values.Add(value); DataRow buyInfo;
for(int i=;i<;i++)
{
buyInfo = info.BuyList.Rows[i]; value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Price0{0}",i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(buyInfo["Price"])) };
row.values.Add(value); value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Amount0{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(buyInfo["Amount"])) };
row.values.Add(value);
} DataRow sellInfo;
for (int i = ; i < ; i++)
{
sellInfo = info.SellList.Rows[i]; value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Price1{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(sellInfo["Price"])) };
row.values.Add(value); value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Amount1{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(sellInfo["Amount"])) };
row.values.Add(value);
} set.rows.Add(row);
} #endregion #region WriteStock /// <summary>
///
/// </summary>
/// <param name="stockInfo"></param>
public void WriteStock(List<StockRealInfo> stockInfos)
{
lock (queue)
{
foreach(var stockInfo in stockInfos)
{
queue.Enqueue(stockInfo);
}
}
} #endregion
}
从HBase读取数据代码:
- 代码中 Scanner 参数是指设置的查询范围 (设置StartRow、EndRow、Batch等参数)
public class StockRealReader
{
#region StockRealReader const string HBASESTOCKTBLNAME = "StockRealInfo"; public StockRealReader()
{ } #endregion #region QueryStockRealAsync public async Task<List<StockRealInfo>> QueryStockRealAsync(Scanner query)
{
List<StockRealInfo> list = new List<StockRealInfo>(); ScannerInformation info = await Utils.HBaseClient.CreateScannerAsync(HBASESTOCKTBLNAME, query); CellSet next;
while ((next = await Utils.HBaseClient.ScannerGetNextAsync(info)) != null)
{
StockRealInfo realInfo;
foreach (CellSet.Row row in next.rows)
{
realInfo = new StockRealInfo(); //开盘价
var temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:TodayOpen");
realInfo.TodayOpen = Encoding.UTF8.GetString(temp.data); //昨日收盘价
temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:YesterdayClose");
realInfo.YesterdayClose = Encoding.UTF8.GetString(temp.data); //当前价格
temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Current");
realInfo.Current = Encoding.UTF8.GetString(temp.data); //今日最高价
temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:High");
realInfo.High = Encoding.UTF8.GetString(temp.data); //今日最低价
temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Low");
realInfo.Low = Encoding.UTF8.GetString(temp.data); //竟买价 买1
temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Buy");
realInfo.Buy = Encoding.UTF8.GetString(temp.data); //竟卖价 卖1
temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Sell");
realInfo.Sell = Encoding.UTF8.GetString(temp.data); //成交数 单位股数 通常除于100成为手
temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:VolAmount");
realInfo.VolAmount = Encoding.UTF8.GetString(temp.data); //成交多少钱,单位元
temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:VolMoney");
realInfo.VolMoney = Encoding.UTF8.GetString(temp.data); //日期
temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Date");
realInfo.Date = Encoding.UTF8.GetString(temp.data); //时间
temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Time");
realInfo.Time = Encoding.UTF8.GetString(temp.data); //差额
temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Diff");
realInfo.Diff = Encoding.UTF8.GetString(temp.data); //百分比
temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:DiffPrec");
realInfo.DiffPrec = Encoding.UTF8.GetString(temp.data); list.Add(realInfo); }
} return list;
} #endregion
}