消费阿里云日志服务SLS

时间:2023-11-21 13:22:56
此文档只关心消费接入,不关心日志接入,只关心消费如何接入,可直接跳转到【sdk消费接入]

SLS简介

  • 日志服务:
    • 日志服务(Log Service,简称 LOG)是针对日志类数据的一站式服务,在阿里巴巴集团经历大量大数据场景锤炼而成。您无需开发就能快捷完成日志数据采集、消费、投递以及查询分析等功能,提升运维、运营效率,建立 DT 时代海量日志处理能力
  • 功能
    • 实时采集与消费(LogHub)
    • 投递数仓(LogShipper)
    • 查询与实时分析(Search/Analytics)

接入消费流程

账号问题
  • 如果消费自己的日志,直接使用自己阿里云账号的key
  • 如果消费他人提供的日志,需要别人创建的子账号或者账号(推荐子账号,无安全问题)中的key,使用自己账号无法连接通
接入点EndPoint

消费接入(java)

概念
对象 明细
Log 日志、日志组表示等基本概念
Project 项目
Config 配置
LogStore 日志库
Index 索引
Shard 分区
ConsumerGroup 消费组
配置

就如同使用 API 和日志服务服务端交互一样,使用 SDK 也需要指定一些基本配置。目前,所有语言的 SDK 都定义了一个 Client 类作为入口类,这些基本配置信息在该入口类的构造时指定。

具体包括如下几项:

  • 服务入口(Endpoint):确认 Client 需要访问的服务入口
    • 当使用 SDK 时,首先需要明确访问的日志服务 Project 所在 Region(如“华东 1 (杭州)”、“华北 1 (青岛)”等),然后选择与其匹配的日志服务入口初始化 Client。该服务入口与 API 中的 服务入口 定义一致
    • 当选择 Client 的 Endpoint 时,必须要保证您需要访问的 Project 的 Region 和 Endpoint 对应的 Region 一致,否则 SDK 将无法访问您指定的 Project
    • 由于 Client 实例只能在构造时指定该服务入口,如果需要访问不同 Region 里的 Project,则需要用不同的 Endpoint 构建不同的 Client 实例
    • 目前,所有 API 的服务入口仅支持 HTTP 协议。
    • 如果在阿里云 ECS 虚拟机内使用 SDK,您还可以使用内网 Endpoint 避免公网带宽开销,具体请参考 服务入口
  • 阿里云访问秘钥(AccessKeyId/AccessKeySecret):指定 Client 访问日志服务时使用的访问秘钥
skd消费接入
原始接入
  • 参见参考文档,要消费日志服务中的数据,请尽量不要直接使用SDK的拉数据接口,我们提供了一个高级消费库消费组消费,该库屏蔽了日志服务的实现细节,并且提供了负载均衡、按序消费等高级功能
消费组接入
  • 同一个消费组下面的消费者名称必须不同,否则相同的消费者会同时消费logstore同份数据,造成数据重复

  • 协同消费库(Consumer Library)是对日志服务中日志进行消费的高级模式,提供了消费组(ConsumerGroup)的概念对消费端进行抽象和管理,和直接使用SDK进行数据读取的区别在于,用户无需关心日志服务的实现细节,只需要专注于业务逻辑,另外,消费者之间的负载均衡、failover等用户也都无需关心

  • 消费组(ConsumerGroup)

    • 一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个logstore中的数据,消费者之间不会重复消费数据
  • 消费组(Consumer)

    • 消费组的构成单元,实际承担消费任务,同一个消费组下面的消费者名称必须不同
  • shared消费组、消费组关系

    • 一个logstore下面会有多个shard,协同消费库的功能就是将shard分配给一个消费组下面的消费者
      • 每个shard只会分配到一个消费者
      • 一个消费者可以同时拥有多个shard
      • 新的消费者加入一个消费组,这个消费组下面的shard从属关系会调整,以达到消费负载均衡的目的,但是上面的分配原则不会变,分配过程对用户透明
  • maven
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.15</version>
</dependency>

阿里云client依赖log4j,如果项目中使用的logback,需要增加转换log4j到logback的转换

 <dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.25</version>
</dependency>
java文件
  • main
public class Main {
// 日志服务域名,根据实际情况填写
private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
// 日志服务项目名称,根据实际情况填写
private static String sProject = "ali-cn-hangzhou-sls-admin";
// 日志库名称,根据实际情况填写
private static String sLogstore = "sls_operation_log";
// 消费组名称,根据实际情况填写
private static String sConsumerGroup = "consumerGroupX";
// 消费数据的ak,根据实际情况填写
private static String sAccessKeyId = "";
private static String sAccessKey = "";
public static void main(String []args) throws LogHubClientWorkerException, InterruptedException
{
// 第二个参数是消费者名称,同一个消费组下面的消费者名称必须不同,可以使用相同的消费组名称,不同的消费者名称在多台机器上启动多个进程,来均衡消费一个Logstore,这个时候消费者名称可以使用机器ip来区分。第9个参数(maxFetchLogGroupSize)是每次从服务端获取的LogGroup数目,使用默认值即可,如有调整请注意取值范围(0,1000]
LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
//Thread运行之后,Client Worker会自动运行,ClientWorker扩展了Runnable接口。
thread.start();
Thread.sleep(60 * 60 * 1000);
//调用worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
worker.shutdown();
//ClientWorker运行过程中会生成多个异步的Task,Shutdown之后最好等待还在执行的Task安全退出,建议sleep 30s。
Thread.sleep(30 * 1000);
}
}
  • SampleLogHubProcessor
public class SampleLogHubProcessor implements ILogHubProcessor
{
private int mShardId;
// 记录上次持久化 check point 的时间
private long mLastCheckTime = 0;
public void initialize(int shardId)
{
mShardId = shardId;
}
// 消费数据的主逻辑,这里面的所有异常都需要捕获,不能抛出去。
public String process(List<LogGroupData> logGroups,
ILogHubCheckPointTracker checkPointTracker)
{
// 这里简单的将获取到的数据打印出来
for(LogGroupData logGroup: logGroups){
FastLogGroup flg = logGroup.GetFastLogGroup();
System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
System.out.println("Tags");
for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
FastLogTag logtag = flg.getLogTags(tagIdx);
System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
}
for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
FastLog log = flg.getLogs(lIdx);
System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
FastLogContent content = log.getContents(cIdx);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
// 每隔 30 秒,写一次 check point 到服务端,如果 30 秒内,worker crash,
// 新启动的 worker 会从上一个 checkpoint 其消费数据,有可能有少量的重复数据
if (curTime - mLastCheckTime > 30 * 1000)
{
try
{
//参数true表示立即将checkpoint更新到服务端,为false会将checkpoint缓存在本地,后台默认隔60s会将checkpoint刷新到服务端。
checkPointTracker.saveCheckPoint(true);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
mLastCheckTime = curTime;
}
return null;
}
// 当 worker 退出的时候,会调用该函数,用户可以在此处做些清理工作。
public void shutdown(ILogHubCheckPointTracker checkPointTracker)
{
//将消费断点保存到服务端。
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory
{
public ILogHubProcessor generatorProcessor()
{
// 生成一个消费实例
return new SampleLogHubProcessor();
}
}
  • 上述代码,工厂类可以用lambda替换
  • client继承Runnable,必须以thread方式启动
  • client原理:是启动线程,底层定时发送心跳给服务端,拿到要消费的必要信息,异步提交http请求任务(线程池),请求处理数据。所以调用client.shutdown,方法并不能立马把所有任务关闭,最好有个时间差,同时client中运行线程标记是否关闭的变量不是线程安全的,所以关闭的时候,依然有可能提交请求任务处理
错误处理
  • SDK 可能出现的异常错误可以分成如下几类:

    • 由日志服务端返回的错误。这类错误由日志服务端返回并由 SDK 处理。关于这类错误的详细细节可以参考日志服务 API 的通用错误码和各个 API 接口的具体说明。
    • 由 SDK 在向服务端发出请求时出现的网络错误。这类错误包括网络连接不通,服务端返回超时等。日志服务内部并未对此做任何重试逻辑,所以,您在使用 SDK 时需要自己定义相应的处理逻辑(重试请求或者直接报错等)
    • 由 SDK 自身产生的、与平台及语言相关的错误,如内存溢出等。
  • 目前,各个语言 SDK 的实现都采取抛出异常的方式处理错误。具体原则如下:

    • 由如上第一或者第二类错误将会被 SDK 处理并包装在统一的 LogException 类抛出给用户处理
    • 由如上第三类错误不会被 SDK 处理,而是直接抛出平台及语言的 Native Exception 类给用户处理
  • API错误重试

    • 在ILogHubProcessor的process方法中,方法返回空表示正常处理数据, 如果需要回滚到上个check point的点进行重试的话,可以return checkPointTracker.getCheckpoint(),但是这里有可能会造成重复消费
    • 自己增加重试策略,避免重复消费
参考文档