关于数据上传阿里云MaxCompute调研

时间:2022-09-06 19:15:14

1.背景

当前的数据存储基于mysql库表存储形式,目前已经无法满足愈加增大的数据存储需求,新项目基于Maxcompute数据仓库架构,需要将统计日志上传Maxcompute,本文对Maxcompute系统数据上传进行调研,测试,包括基于LogStash收集的DataHub实时数据通道和批量数据通道SDK等。

2技术调研及测试

2.1数据通道SDK

2.1.1 UploadSession

2.1.1.1 说明

MaxCompute Tunnel是MaxCompute的数据通道,可通过提供接口上传数据;

接口定义如下:

public class UploadSession {

UploadSession(Configuration conf, String projectName, String tableName,

String partitionSpec) throws TunnelException;

UploadSession(Configuration conf, String projectName, String tableName,

String partitionSpec, String uploadId) throws TunnelException;

public void commit(Long[] blocks);

public Long[] getBlockList();

public String getId();

public TableSchema getSchema();

public UploadSession.Status getStatus();

public Record newRecord();

public RecordWriter openRecordWriter(long blockId);

public RecordWriter openRecordWriter(long blockId, boolean compress);

public RecordWriter openBufferedWriter();

public RecordWriter openBufferedWriter(boolean compress);

}

接口说明如下

生命周期:从创建Upload实例到结束上传。

创建Upload实例:通过调用构造方法和TableTunnel两种方式进行创建。

请求方式:同步。

Server端会为该Upload创建一个session, 生成唯一UploadId标识该Upload,客户端可以通过getId获取。

上传数据

请求方式:同步。

调用openRecordWriter方法,生成RecordWriter实例,其中参数blockId用于标识此次上传的数据,也描述了数据在整个表中的位置,取值范围为[0,20000],当数据上传失败,可以根据blockId重新上传。

查看上传

请求方式:同步。

调用getStatus可以获取当前Upload状态。

调用getBlockList可以获取成功上传的blockid list,可以和上传的blockid list对比,对失败的blockId重新上传。

结束上传

请求方式:同步。

调用commit(Long[] blocks)方法,参数blocks列表表示已经成功上传的block列表,Server端会对该列表进行验证。

该功能是加强对数据正确性的校验,如果提供的block列表与Server端存在的block列表不一致抛出异常。

Commit失败可以进行重试。

状态如下

UNKNOWN:Server端刚创建一个Session时设置的初始值。

NORMAL:创建Upload对象成功。

CLOSING:当调用complete方法(结束上传)时,服务端会先把状态置为CLOSING。

CLOSED:完成结束上传(即把数据移动到结果表所在目录)后。

EXPIRED:上传超时。

CRITICAL:服务出错。

同一个Upload中Session的blockId不能重复。也就是说,对于同一个UploadSession,用一个blockId打开RecordWriter,写入一批数据后,调用close,然后再commit完成后,不可以重新再用该blockId打开另一个RecordWriter写入数据。

一个block大小上限100GB,建议大于64M的数据。

每个Session在服务端的生命周期为24小时。

上传数据时,Writer每写入8KB数据,便会触发一次网络动作,如果120秒内没有网络动作,服务端将主动关闭连接,此时Writer将不可用,重新打开一个新的Writer写入。

官方建议使用openBufferedWriter接口上传数据,屏蔽了blockId的细节,并且内部带有数据缓存区,会自动进行失败重试;

2.1.1.2示例代码

public class UploadSample {

private static String accessId = “******";

private static String accessKey = "******";

private static String odpsUrl = "http://service.odps.aliyun.com/api";

private static String tunnelUrl = "http://dt.cn-shanghai.maxcompute.aliyun.com";

//设置tunnelUrl,若需要走内网时必须设置”aliyun-inc”,否则默认公网。

private static String project = "BaseDatawarehouse";

private static String table = "origin_data";

private static String partition = "2018-07-02";

public static void main(String args[]) {

Account account = new AliyunAccount(accessId, accessKey);

Odps odps = new Odps(account);

odps.setEndpoint(odpsUrl);

odps.setDefaultProject(project);

try {

TableTunnel tunnel = new TableTunnel(odps);

tunnel.setEndpoint(tunnelUrl);//tunnelUrl设置

PartitionSpec partitionSpec = new PartitionSpec(partition);

UploadSession uploadSession = tunnel.createUploadSession(project,

table, partitionSpec);

TableSchema schema = uploadSession.getSchema();

// 准备数据后打开Writer开始写入数据,准备数据后写入一个Block

RecordWriter recordWriter = uploadSession.openRecordWriter(0);

Record record = uploadSession.newRecord();

for (int i = 0; i < schema.getColumns().size(); i++) {

Column column = schema.getColumn(i);

switch (column.getType()) {

case BIGINT:

record.setBigint(i, 1L);

break;

case BOOLEAN:

record.setBoolean(i, true);

break;

case DATETIME:

record.setDatetime(i, new Date());

break;

case DOUBLE:

record.setDouble(i, 0.0);

break;

case STRING:

record.setString(i, "sample");

break;

default:

throw new RuntimeException("Unknown column type: "

+ column.getType());

}

}

for (int i = 0; i < 10; i++)

recordWriter.write(record);

}

recordWriter.close();

uploadSession.commit(new Long[]{0L});

System.out.println("upload success!");

} catch (TunnelException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

}

2.1.2 TunnelBufferedWriter

2.1.2.1 说明

TunnelBufferedWriter是官方推荐的接口,也是本次测试所用的接口;

一次完整的上传流程通常包括以下步骤:

1、先对数据进行划分。

2、为每个数据块指定block Id,即调用openRecordWriter(id)。

3、用一个或多个线程分别将这些block上传上去,并在某个block上传失败以后,需要对整个block进行重传。

4、在所有block都上传以后,向服务端提供上传成功的blockid list进行校验,即调用session.commit([1,2,3,…])。

5、由于服务端对block管理,连接超时等的一些限制,上传过程逻辑变得比较复杂,为了简化上传过程,SDK提供了更高级的一种RecordWriter—TunnelBufferWriter。

接口定义如下:

public class TunnelBufferedWriter implements RecordWriter {

public TunnelBufferedWriter(TableTunnel.UploadSession session, CompressOption option) throws IOException;

public long getTotalBytes();

public void setBufferSize(long bufferSize);

public void setRetryStrategy(RetryStrategy strategy);

public void write(Record r) throws IOException;

public void close() throws IOException;

}

TunnelBufferedWriter 对象:

生命周期:从创建RecordWriter到数据上传结束。

创建TunnelBufferedWriter实例:通过调用UploadSession的openBufferedWriter接口创建。

数据上传:调用Write接口,数据会先写入本地缓存区,缓存区满后会批量提交到服务端,避免了连接超时,同时,如果上传失败会自动进行重试。

结束上传:调用close接口,最后再调用UploadSession的commit接口,即可完成上传。

缓冲区控制:可以通过setBufferSize接口修改缓冲区占内存的字节数(bytes),建议设置64M以上的大小,避免服务端产生过多小文件,影响性能,一般无须设置,维持默认值即可。

重试策略设置:可以选择三种重试回避策略:指数回避(EXPONENTIAL_BACKOFF)、线性时间回避(LINEAR_BACKOFF)、常数时间回避(CONSTANT_BACKOFF)。例如:下面这段代码可以将Write的重试次数调整为6,每一次重试之前先分别回避4s、8s、16s、32s、64s和128s(从4开始的指数递增的序列),这个也是默认的行为,一般情况不建议调整。

2.1.2.2 测试代码

public class uplodsession {
    private static String accessId = "******";
    private static String accessKey = "******";
    private static String odpsUrl = "http://service.odps.aliyun.com/api";
    private static String tunnelUrl = "http://dt.cn-beijing.maxcompute.aliyun.com";
    //设置tunnelUrl,若需要走内网时必须设置,否则默认公网。
    private static String project = "******";
    private static String table = "******";

public static void main(String[] args) {

Account account = new AliyunAccount(accessId, accessKey);
        Odps odps = new Odps(account);
        odps.setEndpoint(odpsUrl);
        odps.setDefaultProject(project);

TableTunnel tunnel = new TableTunnel(odps);
        tunnel.setEndpoint(tunnelUrl);//tunnelUrl
        TableTunnel.UploadSession uploadSession = null;
        //获取开始时间
        long startTime = System.currentTimeMillis();
        try {
            uploadSession = tunnel.createUploadSession(project,table);
            System.out.println("Session Status is : "
                    + uploadSession.getStatus().toString());
            RecordWriter recordWriter = uploadSession.openBufferedWriter();
            Record product = uploadSession.newRecord();
            String message = ""
            for (int i=0;i<1000000;i++){
                product.setString("log",message);
                recordWriter.write(product);
                count=count+1;
                if(count>=10000){
                    recordWriter.close();
                    uploadSession.commit();
                    System.out.println("開始提交");
                    break;
                }
            }

System.out.println("upload success!");
            //获取结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("程序运行时间:" + (endTime - startTime) + "ms");    //输出程序运行时间
        } catch (TunnelException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

2.1.3 消费kafka数据写入Maxcompute
2.1.3.1 示例代码 

public class up {
    public static void main(String[] args) throws IOException, TunnelException {
        ConsumerConnector consumer;
        // 连接的Zookeeper接口, 不同业务模块使用的数据是相同的
        String zookeeper = "10.172.217.86:2181,10.44.143.42:2181,10.252.0.171:2181";
        // Kafka消费者订阅的主题,临时测试主
        String topic = "testReceivelog";
        String groupId = "testConsumer";
        String accessId = "******";
        String accessKey = "******";
        String odpsUrl = "http://service.odps.aliyun.com/api";
        String tunnelUrl = "http://dt.cn-beijing.maxcompute.aliyun.com";
        //设置tunnelUrl,若需要走内网时必须设置,否则默认公网。此处给的是华东2经典网络Tunnel Endpoint,其他region可以参考文档《访问域名和数据中心》。
        String project = "BaseDatawarehouse";
        String table = "origin_data";
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        Account account = new AliyunAccount(accessId, accessKey);
        Odps odps = new Odps(account);
        odps.setEndpoint(odpsUrl);
        odps.setDefaultProject(project);

TableTunnel tunnel = new TableTunnel(odps);
        tunnel.setEndpoint(tunnelUrl);//tunnelUrl
        TableTunnel.UploadSession uploadSession = null;
        RecordWriter recordWriter = null;
        try {
            uploadSession = tunnel.createUploadSession(project, table);
            System.out.println("Session Status is : "
                    + uploadSession.getStatus().toString());
            recordWriter = uploadSession.openBufferedWriter();
        } catch (TunnelException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        Record product = uploadSession.newRecord();
        Map<String, Integer> topicCount = new HashMap<>();
        //3线程消费3partition数据
        topicCount.put(topic, 3);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            int count = 0;
            while (it.hasNext()) {
                String message = new String(it.next().message());
                //写入
                product.setString("log", message);
                recordWriter.write(product);
                count = count + 1;
                if (count >= 10000) {
                    recordWriter.close();
                    uploadSession.commit();
                    count = 0;
                    uploadSession = tunnel.createUploadSession(project, table);
                    recordWriter = uploadSession.openBufferedWriter();
                    product = uploadSession.newRecord();
                }
            }
            if (consumer != null) {
                consumer.shutdown();
            }
            if(recordWriter!=null){
                recordWriter.close();
            }
        }
    }

优点:

批量导入,效率高,数据无丢失;

缺点:

1、批量倒入,数据无法精确归档(根据统计需求,需要将日志按照天、小时归档,如果对每条数据处理则更不适合,需要实时处理);

2、批量导入,上传效率依赖于消费kafka;

3、Kafka适合实时处理,不适合批量操作;

4、批量导入,数据归档有误差,每批次中含有除当前分区外的其他分区数据,造成统计指标失准;

2.2 DataHub实时数据通道

2.2.1 说明

产品优势:

1、高吞吐

最高支持单主题(Topic)每日T级别的数据量写入,每个分片(Shard)支持最高每日8000万Record级别的写入量。

2、实时性

通过DataHub ,您可以实时的收集各种方式生成的数据并进行实时的处理,对您的业务产生快速的响应。

3、易用性

DataHub提供丰富的SDK包,包括C++, JAVA, Pyhon, Ruby, Go等语言;DataHub服务也提供Restful API规范,您可以用自己的方式实现访问接口。除了SDK以外,DataHub还提供一些常用的客户端插件,包括:Fluentd,LogStash,Flume等,可以使用这些客户端工具往DataHub中写入流式数据。

DataHub同时支持强Schema的结构化数据和无类型的非结构化数据。

4、高可用

服务可用性不低于99.999%。

规模自动扩展,不影响对外服务。

数据持久性不低于99.999%。

数据自动多重冗余备份。

5、动态伸缩

每个主题(Topic)的数据流吞吐能力可以动态扩展和减少,最高可达到每主题256000 Records/s的吞吐量。

6、高安全性

提供企业级多层次安全防护,多用户资源隔离机制。

提供多种鉴权和授权机制及白名单、主子账号功能。

7、DataHub服务基于阿里云自研的飞天平台,具有高可用,低延迟,高可扩展,高吞吐的特点。DataHub与阿里云流计算引擎StreamCompute无缝连接,可以轻松使用SQL进行流数据分析。

DataHub服务也提供分发流式数据到各种云产品的功能,目前支持分发到MaxCompute(原ODPS),OSS等。

功能图:

2.2.1.1 流式数据同步DataConnector

DataHub DataConnector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)、OSS、ElasticSearch、RDS Mysql、ADS、TableStore中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在各个云产品中使用这份数据。数据同步支持at least once语义,在网络服务异常等小概率场景下可能会导致目的端的数据产生重复。

DataConnector支持系统

目标系统

时效性

描述

MaxCompute(Odps)

准实时,通常情况5分钟延迟

同步Topic中流式数据到离线MaxCompute表,字段类型名称需一一对应,且DataHub中必须包含一列(或多列)MaxCompute表中分区列对应的字段

OSS

实时

同步数据到对象存储OSS指定Bucket的文件中,将以csv格式保存

ElasticSearch

实时

同步数据到ElasticSearch指定Index中,Shard之间数据同步不保证时序,所以需将同样ID的数据写入相同的Shard中

Mysql

实时

同步数据到指定的Rds Mysql表中

ADS

实时

同步数据到指定的ADS表中

TableStore

实时

同步数据到指定的TableStore表中

如何创建

创建Connector主要需要如下前置条件:

准备对应的MaxCompute表,该表字段类型、名称、顺序必须与DataHub Topic字段完全一致,如果三个条件中的任意一个不满足,则归档Connector无法创建。字段类型对应表见后表。

访问MaxCompute账号的设置,该账号必须具备该MaxCompute的Project的CreateInstance权限和归档MaxCompute表的Desc、Alter、Update权限,建议使用一个特殊最小权限的账号。官方建议使用RAM用户账号。

DataHub Topic的Owner/Creator账号, 才有相应的权限操作Connector,包括创建,删除等。

只支持将TUPLE类型的DataHub Topic同步到MaxCompute表中

操作流程: Project列表->Project查看->Topic查看->点击归档MaxCompute->填写配置,点击创建

配置说明:

名称

是否必须

描述

MaxCompute Project

yes

MaxComputeProject名称

MaxCompute Table

yes

MaxCompute表名称

AccessId

yes

访问MaxCompute的阿里云账号AccessId

AccessKey

yes

访问MaxCompute的阿里云账号AccessKey

分区选项

yes

SYSTEM_TIME、EVENT_TIME、USER_DEFINE三种模式,SystemTime模式会使用写入时间转化为字符串进行分区,EventTime模式会根据topic中固定的event_time字段时间进行分区(需要在创建Topic时增加一个TIMESTAMP类型名称为event_time的字段,并且写入数据时向这个字段写入其微秒时间),UserDefine模式将会直接使用用户自定义的分区字段字符串分区

分区范围

yes

划分分区的时间间隔,在SYSTEM_TIME、EVENT_TIME两种模式下生效,最少为15分钟

分区格式定义

yes

目前仅支持固定格式,未来将会开放为自定义格式,目前格式下,若为15分钟的分区范围,则会产生ds=20170704,hh=01,mm=15这样的分区

注意:

支持MaxCompute分区表

分区选项选择SYSTEM_TIME模式,表结构对应如下:

MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, hh string, mm string)

对应Topic应为如下的Schema:

Topic: topic_test(f1 string, f2 string, f3 double)

数据同步时根据写入时间确定ds/hh/mm的值写入MaxCompute

分区选项选择EVENT_TIME模式,表结构对应如下:

MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, hh string,mm string)

对应Topic应为如下的Schema:

Topic: topic_test(f1 string, f2 string, f3 double, event_time timestamp)

数据同步时根据event_time字段时间戳确定ds/hh/mm的值写入MaxCompute

分区选项选择USER_DEFINE模式,表结构对应如下:

MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, pt string)

对应Topic应为如下的Schema:

Topic: topic_test(f1 string, f2 string, f3 double, ds string, pt string)

数据同步时根据ds、pt的数据值,写入对应分区

USER_DEFINE模式下MaxCompute分区字段内容必须非空UTF8字符串,否则归档任务将无法正常运行或数据无效被丢弃。

USER_DEFINE模式下MaxCompute的分区字段的值必须符合MaxCompute对分区字段值的范围要求,否则归档任务将无法正常运行。

分区选项为EventTime模式时,若event_time字段的数据值是null或非法,会降级使用数据写入DataHub的SystemTime写入对应分区。

数据归档的频率为每个Shard每5分钟或者Shard中新写入的数据量达到64MB,DataConnector服务会批量进行一次数据归档进入MaxCompute表的操作。所以数据写入DataHub Topic后至多5分钟后在MaxCompute可以被查询到。

DataHub与MaxCompute字段类型对应表:

MaxCompute表中的类型

DataHub Topic中的类型

STRING

STRING

DOUBLE

DOUBLE

BIGINT

BIGINT

DATETIME

TIMESTAMP

BOOLEAN

BOOLEAN

DECIMAL

DECIMAL

MAP

不支持

ARRAY

不支持

最高支持单主题(Topic)每日T级别的数据量写入,每个分片(Shard)支持最高每日8000万Record级别的写入量,每个主题(Topic)的数据流吞吐能力可以动态扩展和减少,最高可达到每主题256000 Records/s的吞吐量,目前1个分片就足以支撑我们的数据量。

3. 对比与结论

3.1数据通道SDK

数据通道SDK上传流程如图:图略

说明:

延续用已有的数据采集架构,新建groupid 消费kafka的统一topic,然后springboot后台实时消费kafka数据,每10000条上传一次Maxcompute。

优点:

可以不用log重新采集,使用数据通道SDK可以解决DataHub的使用成本(SDK上传MaxCOmpute目前只有杭州能走内网,华北2北京要走公网,会流量费用)

不足:

1、批量上传,数据归档存在问题,造成统计结果存在误差。

2、数据上传效率依赖卡夫卡吞吐,存在单点故障隐患。

3.2 DataHub实时数据通道

DataHub 上传数据流程图:图略

说明:

从新搭建数据采集,与原有采集系统隔离,通过logstash直接上传数据到DataHub,数据实时归档到MaxCompute

优点:

1、以后实时计算直接可在DataHub接StreamCompute复用性扩展性比较好。

2、数据可实时归档,保证统计结果正确。

3、链路较短易维护,同时使用DataHub零维护,可靠性好。

4、吞吐量大,完全支撑日益增长的数据量。

3.3 结论

对比表:

DataHub

数据通道SDK

吞吐量

最高支持单主题(Topic)每日T级别的数据量写入,每个分片(Shard)支持最高每日8000万Record级别的写入量每个主题(Topic)的数据流吞吐能力可以动态扩展和减少,最高可达到每主题256000 Records/s的吞吐量。

单个block上传的数据限制为100G,最多可20000个block

实时性

实时数据通道数据直接由logstash收集不用读kafka数据实时写入,每topic数据量达64M,或者当前数据超过五分钟会自动写入Maxcompute归档,实时性好

因为需要实时读取kafka数据,每读64m的数据会提交一次,需要http访问,目前华北二只能走公网,所以延迟很大

可靠性

服务可用性不低于99.999%

规模自动扩展,不影响对外服务

数据持久性不低于99.999%

数据自动多重冗余备份

存在单点故障问题,无法保证续传

数据归档

通过创建DataHub Connector,指定相关配置,即可创建将Datahub中流式数据定期归档的同步任务,数据产生时间与归档时间误差极小,目前测试按照天,小时,分钟对数据进行归档,以便统计和存储

批量上传无法准确归档,如准确归档需要每一条数据处理一次,上传一次,那么吞吐十分低,满足不了我们日益增长的数据量

维护成本

云服务零维护成本

需要根据需求开发上传代码,监控任务以及上传的数据完整性,维护成本高

费用成本

因为数据最终存储MaxCompute,datahub作为其一组件不产生费用

共享已有服务器资源无费用

通过两种上传方式调研、测试,以及上传数据流程对比,DataHub实时数据通道,有更好的可扩展性,能准确的对数据进行归档,以及良好的可靠性和零维护,所以采用这种方式作为数据上传比较合适。