dremio 实际上添加了支持基于opentelemetry 的监控处理,代码处理上官方是已经支持了metrics ,当然也是支持trace的
参考处理机制
处理上基于了配置,同时利用了jackson 多太类型处理,以及基于spi模块加载机制
包装了TelemetryConfigurator 类,同时还支持reload
public class TelemetryConfigurator implements AutoCloseable {
public class TelemetryConfigurator implements AutoCloseable {
private final List<MetricsConfigurator> metricsConfigs;
private final RefreshConfiguration refreshConfig;
private final TracerConfigurator traceConfig;
// 可以看出同时支持metrics 以及trace
@JsonCreator
public TelemetryConfigurator(
@JsonProperty("auto-reload") RefreshConfiguration refreshConfig,
@JsonProperty("metrics") List<MetricsConfigurator> metricsConfigs,
@JsonProperty("tracing") TracerConfigurator traceConfig
) {
super();
this.refreshConfig = refreshConfig;
this.metricsConfigs = metricsConfigs;
this.traceConfig = traceConfig;
}
我们一般metrics 的配置如下
metrics:
- name: prometheus_reporter
comment: >
Publish metrics on prometheus
reporter:
type: prometheus // 多太类型处理基于type 区分
port: 12543
services/telemetry-api 此模块包装了标准的接口定义,包含了配置,metrics,trace,对于metrics 的开发官方开发了一个ReporterConfigurator 抽象定义
方便扩展,默认实现如下(prometheus 的也是扩展了此抽象类,可以参考github 源码),核心是需要实现configureAndStart
同时dremio 为了方便使用自己包装了不少metrics 定义,如下
trace 处理上主要还是对于http 的处理,支持了opentelemetry 以及jaeger
比如prometheus 的处理
// prometheus 就是上边配置type 的名称
@JsonTypeName("prometheus")
public class PrometheusConfigurator extends ReporterConfigurator {
private final int port;
private volatile HTTPServer server;
@JsonCreator
public PrometheusConfigurator(@JsonProperty("port") int port) {
super();
this.port = (port > 0) ? port : Integer.parseInt(System.getProperty("dremio.prometheus.port", "12543"));
}
// 此处进行prometheus 的服务启动
@Override
public void configureAndStart(String name, MetricRegistry registry, MetricFilter filter) {
CollectorRegistry.defaultRegistry.register(new DropwizardExports(registry));//todo filter and rates maybe switch to pushgateway
try {
server = new HTTPServer(port);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PrometheusConfigurator that = (PrometheusConfigurator) o;
return port == that.port;
}
@Override
public int hashCode() {
return Objects.hashCode(port);
}
@Override
public void close() {
if (server != null) {
server.stop();
}
}
/**
* Module that may be added to a jackson object mapper
* so it can parse jmx config. jackson 模块,通过spi 加载
*/
public static class Module extends ConfigModule {
@Override
public void setupModule(com.fasterxml.jackson.databind.Module.SetupContext context) {
context.registerSubtypes(PrometheusConfigurator.class);
}
}
}
spi 配置定义(方便jackson 的模块注册处理)
spi 加载模块
Telemetry 类中,核心是ObjectMapper 的构建在startTelemetry 方法中
final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
ImmutableList.Builder<ConfigModule> modulesBuilder = ImmutableList.builder();
// spi 加载多太类型配置
ServiceLoader.load(ConfigModule.class).forEach(modulesBuilder::add);
ImmutableList<ConfigModule> modules = modulesBuilder.build();
if (modules.isEmpty()) {
logger.warn("Unable to discover any modules for telemetry config. Will not refresh config.");
return;
}
mapper.registerModules(modules);
metrics 任务的启动
dremio 在处理这部分的时候核心还是放在了Metrics 类中,使用了ReporterManager 通过onChange 方法对于metrics 的运行
public synchronized void onChange(Collection<MetricsConfigurator> newConfs) {
public synchronized void onChange(Collection<MetricsConfigurator> newConfs) {
try {
final Set<MetricsConfigurator> current = new HashSet<>(configurators);
final Set<MetricsConfigurator> toStart = new HashSet<>();
final Set<MetricsConfigurator> totalSet = new HashSet<>();
for(MetricsConfigurator c : newConfs) {
if (current.remove(c)) {
totalSet.add(c);
} else {
toStart.add(c);
totalSet.add(c);
}
}
for(MetricsConfigurator c : toStart) {
try {
// 此处启动metrics 服务模块
c.start(registry);
} catch (Exception ex) {
logger.error("Failure while starting configuration {} metric reporter.", c.getName(), ex);
// don't save this as something running. instead, try again on next refresh.
totalSet.remove(c);
}
}
Telemetry 服务的启动
在BootstrapContext 中处理的
如下
// 构造函数中就进行Telemetry服务的启动
public BootStrapContext(DremioConfig config, ScanResult classpathScan, SingletonRegistry registry) {
registry.bind(Tracer.class, TracerFacade.INSTANCE);
registry.bind(GrpcTracerFacade.class, new GrpcTracerFacade(TracerFacade.INSTANCE));
Telemetry.startTelemetry();
this.config = config.getSabotConfig();
this.classpathScan = classpathScan;
this.allocator = RootAllocatorFactory.newRoot(config);
this.executor = new ContextMigratingCloseableExecutorService<>(new CloseableThreadPool("dremio-general-"), registry.lookup(Tracer.class));
this.lpPersistance = new LogicalPlanPersistence(config.getSabotConfig(), classpathScan);
this.dremioConfig = config;
registerMetrics();
this.nodeDebugContextProvider = (allocator instanceof DremioRootAllocator)
? new NodeDebugContextProviderImpl((DremioRootAllocator) allocator)
: NodeDebugContextProvider.NOOP;
}
实际BootstrapContext 的处理
DACDaemonModule 类的bootstrap 方法中
final BootStrapContext bootStrapContext = new BootStrapContext(config, scanResult, bootstrapRegistry);
说明
dremio metrics 部分还是比较完整的,而且官方提供了文档说明,但是关于trace 就比较少了,但是通过源码阅读我们还是可以了解集成的模式以及如果进行配置
实际上与metrics 是类似的,具体可以参考实现的配置类,dremio 基于jackson 多太处理模式使用是比较多的,是一个很不错的实践,很值得学习参考
参考资料
services/telemetry-impl
services/telemetry-api/src/main/java/com/dremio/telemetry/api/Telemetry.java
services/telemetry-api/src/main/java/com/dremio/telemetry/api/Telemetry.java
sabot/kernel/src/main/java/com/dremio/exec/server/BootStrapContext.java
dac/backend/src/main/java/com/dremio/dac/daemon/DACDaemonModule.java
dac/backend/src/main/java/com/dremio/dac/util/JSONUtil.java
services/telemetry-api/src/main/java/com/dremio/telemetry/api/metrics/Metrics.java
services/telemetry-impl/src/main/java/com/dremio/telemetry/impl/config/tracing/OpenTelemetryConfigurator.java
services/telemetry-impl/src/main/java/com/dremio/telemetry/impl/config/tracing/JaegerConfigurator.java
https://docs.dremio.com/software/advanced-administration/telemetry/
https://github.com/dremio-hub/dremio-prometheus-exporter