grafana将自己的数据库(hbase)设置为数据源

时间:2022-12-11 00:55:45

全栈工程师开发手册 (作者:栾鹏)
​​ 架构系列文章​​


官方文档
​​​ http://docs.grafana.org/plugins/developing/development/​

grafana的插件分为 三种:application,panel,datasource,

本次我只写其中的datasource

要与grafana的其余部分进行交互,插件模块文件可以导出5个不同的组件。

  • Datasource(必填)
  • QueryCtrl(必填)
  • ConfigCtrl(必填)
  • AnnotationsQueryCtrl

json插件

plugin.json有两个特定于数据源的设置

"metrics": true,
"annotations": false,

这些设置指示插件可以提供哪种数据。至少其中一个必须是真的

数据源

与数据库通信并将数据转换为时间序列的javascript对象。

数据源应包含以下功能:

query(options) //used by panels to get data
testDatasource() //used by datasource configuration page to make sure the connection is working
annotationQuery(options) // used by dashboards to get annotations
metricFindQuery(options) // used by query editor to get metric suggestions.

testDatasource

当用户在添加新数据源时单击“ 保存并测试”按钮时,详细信息将首先保存到数据库中,然后将testDatasource调用数据源插件中定义的函数。建议此函数对数据源进行查询,该数据源还将测试身份验证详细信息是否正确。这样,当用户尝试在新仪表板中编写查询时,会正确配置数据源。

Query

传递给datasource.query函数的请求对象:

{
"range": { "from": "2015-12-22T03:06:13.851Z", "to": "2015-12-22T06:48:24.137Z" },
"interval": "5s",
"targets": [
{ "refId": "B", "target": "upper_75" },
{ "refId": "A", "target": "upper_90" }
],
"format": "json",
"maxDataPoints": 2495 //decided by the panel
}

数据源有两种不同的结果; 时间序列和表格。时间序列是最常见的格式,并且受所有数据源和面板支持。表格格式仅受InfluxDB数据源和表格面板支持。但是我们将来可能会看到更多。

下面是来自datasource.query的时间序列响应。

[
{
"target":"upper_75",
"datapoints":[
[622, 1450754160000],
[365, 1450754220000]
]
},
{
"target":"upper_90",
"datapoints":[
[861, 1450754160000],
[767, 1450754220000]
]
}
]

下面是来自datasource.query的表响应。

[
{
"columns": [
{
"text": "Time",
"type": "time",
"sort": true,
"desc": true,
},
{
"text": "mean",
},
{
"text": "sum",
}
],
"rows": [
[
1457425380000,
null,
null
],
[
1457425370000,
1002.76215352,
1002.76215352
],
],
"type": "table"
}
]

Annotation Query

请求对象传递给datasource.annotationQuery函数:

{
"range": { "from": "2016-03-04T04:07:55.144Z", "to": "2016-03-04T07:07:55.144Z" },
"rangeRaw": { "from": "now-3h", to: "now" },
"annotation": {
"datasource": "generic datasource",
"enable": true,
"name": "annotation name"
}
}

datasource.annotationQuery的预期结果:

[
{
"annotation": {
"name": "annotation name", //should match the annotation name in grafana
"enabled": true,
"datasource": "generic datasource",
},
"title": "Cluster outage",
"time": 1457075272576,
"text": "Joe causes brain split",
"tags": "joe, cluster, failure"
}
]

QueryCtrl

一个JavaScript类,当用户在面板中编辑度量标准时,将对其进行实例化并将其视为Angular控制器。该类必须继承app / plugins / sdk.QueryCtrl类。

需要一个静态模板或templateUrl变量,该变量将呈现为此控制器的视图。

ConfigCtrl

当用户尝试编辑或创建此类型的新数据源时,将实例化并视为Angular控制器的JavaScript类。

需要一个静态模板或templateUrl变量,该变量将呈现为此控制器的视图。

AnnotationsQueryCtrl

当用户在仪表板的模板菜单中选择此类型的数据源时,将实例化并视为Angular控制器的JavaScript类。

需要一个静态模板或templateUrl变量,该变量将呈现为此控制器的视图。然后将绑定到此控制器的字段发送到Database对象annotationQuery函数。

将hbase设置为数据源

使用数据源,grafana中的插件需要向后台发送请求数据,后台或者数据库返回时间序列数据。 所以我们要有前端插件的请求和后台的响应接口。前端我们直接使用simple-json-datasource的插件。后端我们需要自己开发一个接口,接收前端请求,解析请求,查询hbase数据,然后封装成时间序列数据。

simple_json的前端请求

请求的代码在/simple-json-datasource/src/datasource.js中

其中主要的包含constructor构造函数,query查询数据函数,testDatasource请求测试,annotationQuery注释查询,metricFindQuery查询函数提示名。

其中包含每个函数请求的内容,请求的地址。我们可以在后端接口中实现这些接口,解析接收这些请求函数。

python后端接口

我这里使用python后台实现接口,解析请求参数,查询hbase,返回时间系列数据。这里写的不详细,读者可以按自己的业务逻辑实现

import sys
import os
dir_common = os.path.split(os.path.realpath(__file__))[0] + '/../'
sys.path.append(dir_common) # 将根目录添加到系统目录,才能正常引用common文件夹
# 使用hbase向数据库中添加数据,使用前需要启动hbase和thrift服务器
# 启动hbase /usr/local/hbase/bin/start-hbase.sh 默认端口为60000,信息端口为60010
# 启动thrift服务器 /usr/local/hbase/bin/hbase-daemon.sh start thrift 默认端口为9090

import logging
import time,datetime

import argparse
from aiohttp import web
import aiohttp
import asyncio
import base64
import logging
import uvloop
import time,datetime
import json
import requests,random


routes = web.RouteTableDef()

@routes.get('/')
async def hello(request):
return web.Response(text="Hello, world")


# 查询
@routes.post('/query')
async def query(request): # 异步监听,只要一有握手就开始触发
try:
data = await request.json() # 等待post数据完成接收,只有接收完成才能进行后续操作.data['key']获取参数
# print('query:\n', data)
except Exception as e:
logging.error("image file too large")
return web.json_response(write_response(1,"image file too large",8,{}))
logging.info('query request start %s' % datetime.datetime.now())
range_from = datetime.datetime.strptime(data['range']['from'], "%Y-%m-%dT%H:%M:%S.%fZ")
range_to = datetime.datetime.strptime(data['range']['to'], "%Y-%m-%dT%H:%M:%S.%fZ")
intervalMs = data['intervalMs']
print(range_from,range_to,intervalMs)
querys = data['targets']
results = []
for query in querys:
querystr = query['target'] # 查询语句
if querystr[0]!="{":
querystr = '{'+querystr+"}"


result = {
"target": querystr,
"datapoints": []
}
for timetamp in range(int(range_from.timestamp()*1000),int(range_to.timestamp()*1000),intervalMs):
habse数据= 你的业务逻辑查询出来的hbase数据
result['datapoints'].append([habse数据,timetamp])
results.append(result)

logging.info('query request finish %s, %s' % (datetime.datetime.now(),str(results)))
header = {"Access-Control-Allow-Origin": "*", 'Access-Control-Allow-Methods': 'GET,POST'}
# print(results)

return web.json_response(results,headers=header)


# 查询
@routes.post('/annotations')
async def annotations(request): # 异步监听,只要一有握手就开始触发
try:
data = await request.json() # 等待post数据完成接收,只有接收完成才能进行后续操作.data['key']获取参数
print('annotations:\n',data)
except Exception as e:
logging.error("image file too large")
return web.json_response(write_response(1,"image file too large",8,{}))
logging.info('annotations request start %s' % datetime.datetime.now())
header = {"Access-Control-Allow-Origin": "*", 'Access-Control-Allow-Methods': 'GET,POST'}
results=[
{
"annotation": data['annotations'],
"title": "hbase traffic",
"time": int(time.time()*1000),
"text": "Joe causes brain split",
"tags": "joe, cluster, failure"
}
]

return web.json_response(results,headers=header)


# 搜索,就是自动提示功能,当用户输入一个字符,返回可以补齐的内容
@routes.post('/search')
async def search(request): # 异步监听,只要一有握手就开始触发
try:
data = await request.json() # 等待post数据完成接收,只有接收完成才能进行后续操作.data['key']获取参数
searchstr=data['target'] # 搜索字符串
print('search:\n',data)
except Exception as e:
logging.error("image file too large")
return web.json_response(write_response(1,"image file too large",8,{}))
logging.info('face_det dect request start %s' % datetime.datetime.now())
result = ['提示内容1','提示内容2']
logging.info('face_det dect request finish %s, %s' % (datetime.datetime.now(),json.dumps(result)))
header = {"Access-Control-Allow-Origin": "*", 'Access-Control-Allow-Methods': 'GET,POST'}
return web.json_response(result,headers=header)




# 查询
@routes.post('/tag-keys')
async def tagkeys(request): # 异步监听,只要一有握手就开始触发
try:
data = await request.json() # 等待post数据完成接收,只有接收完成才能进行后续操作.data['key']获取参数
print('tag-keys:\n',data)
except Exception as e:
logging.error("image file too large")
return web.json_response(write_response(1,"image file too large",8,{}))
logging.info('face_det dect request start %s' % datetime.datetime.now())
result = {'data': 1}
logging.info('face_det dect request finish %s, %s' % (datetime.datetime.now(),json.dumps(result)))
header = {"Access-Control-Allow-Origin": "*", 'Access-Control-Allow-Methods': 'GET,POST'}

return web.json_response(result,headers=header)


# 查询
@routes.post('/tag-values')
async def tagvalues(request): # 异步监听,只要一有握手就开始触发
try:
data = await request.json() # 等待post数据完成接收,只有接收完成才能进行后续操作.data['key']获取参数
print('tag-values:\n',data)
except Exception as e:
logging.error("image file too large")
return web.json_response(write_response(1,"image file too large",8,{}))
result = {'data':1}
logging.info('face_det dect request finish %s, %s' % (datetime.datetime.now(),json.dumps(result)))
header = {"Access-Control-Allow-Origin": "*", 'Access-Control-Allow-Methods': 'GET,POST'}

return web.json_response(result,headers=header)



if __name__ == '__main__':

app = web.Application(client_max_size=2*1024**2) # 创建app,设置最大接收图片大小为2M
app.add_routes(routes) # 添加路由映射
web.run_app(app,host='0.0.0.0',port=4567) # 启动app
logging.info('server close:%s'% datetime.datetime.now())

这样就可以了