[Apache Atlas] Atlas 架构设计及源代码简单分析

时间:2022-08-30 09:56:59

Apache Atlas 架构图

[Apache Atlas] Atlas 架构设计及源代码简单分析

Atlas 支持多数据源接入:Hive、HBase、Storm等

Type System

Type

Atlas 中定义了一些元数据类型

── AtlasBaseTypeDef
│ ├── AtlasEnumDef
│ └── AtlasStructDef
│ ├── AtlasBusinessMetadataDef
│ ├── AtlasClassificationDef
│ ├── AtlasEntityDef
│ └── AtlasRelationshipDef
├── AtlasStructType
│ ├── AtlasBusinessMetadataType
│ ├── AtlasClassificationType
│ ├── AtlasRelationshipType
│ └── AtlasEntityType
│ └── AtlasRootEntityType
├── AtlasType
│ ├── AtlasArrayType
│ ├── AtlasBigDecimalType
│ ├── AtlasBigIntegerType
│ ├── AtlasByteType
│ ├── AtlasDateType
│ ├── AtlasDoubleType
│ ├── AtlasEnumType
│ ├── AtlasFloatType
│ ├── AtlasIntType
│ ├── AtlasLongType
│ ├── AtlasMapType
│ ├── AtlasObjectIdType
│ ├── AtlasShortType
│ ├── AtlasStringType
│ └── AtlasStructType
│ ├── AtlasBusinessMetadataType
│ ├── AtlasClassificationType
│ ├── AtlasEntityType
│ └── AtlasRelationshipType
├── AtlasTypeDefStore
│ └── AtlasTypeDefGraphStore
│ └── AtlasTypeDefGraphStoreV2
└── StructTypeDefinition
└── HierarchicalTypeDefinition
├── ClassTypeDefinition
└── TraitTypeDefinition

Entity

Entity 是基于类型的具体实现

AtlasEntity
├── AtlasEntityExtInfo
│ ├── AtlasEntitiesWithExtInfo
│ └── AtlasEntityWithExtInfo
├── AtlasEntityStore
│ └── AtlasEntityStoreV2
├── AtlasEntityStream
│ └── AtlasEntityStreamForImport
├── AtlasEntityType
│ └── AtlasRootEntityType
└── IAtlasEntityChangeNotifier
├── AtlasEntityChangeNotifier
└── EntityChangeNotifierNop

Attributes

针对模型定义属性

AtlasAttributeDef
└── AtlasRelationshipAttributeDef

AtlasAttributeDef 属性字段:

private String                   name;
private String typeName;
private boolean isOptional;
private Cardinality cardinality;
private int valuesMinCount;
private int valuesMaxCount;
private boolean isUnique;
private boolean isIndexable;
private boolean includeInNotification;
private String defaultValue;
private String description;
private int searchWeight = DEFAULT_SEARCHWEIGHT;
private IndexType indexType = null;
private List<AtlasConstraintDef> constraints;
private Map<String, String> options;
private String displayName; 具体实现: db:
"name": "db",
"typeName": "hive_db",
"isOptional": false,
"isIndexable": true,
"isUnique": false,
"cardinality": "SINGLE" columns:
"name": "columns",
"typeName": "array<hive_column>",
"isOptional": optional,
"isIndexable": true,
“isUnique": false,
"constraints": [ { "type": "ownedRef" } ]
  • isComposite - 是否复合
  • isIndexable - 是否索引
  • isUnique - 是否唯一
  • multiplicity - 指示此属性是(必需的/可选的/还是可以是多值)的

System specific types and their significance

Referenceable

This type represents all entities that can be searched for using a unique attribute called qualifiedName.

  ├── Referenceable
├── ReferenceableDeserializer
├── ReferenceableSerializer
└── V1SearchReferenceableSerializer

Hooks

以Hive元信息采集为例分析采集过程:

全量导入

import-hive.sh

"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}"
org.apache.atlas.hive.bridge.HiveMetaStoreBridge $IMPORT_ARGS
importTables
└── importDatabases [addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +295]
└── importHiveMetadata [addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +289]

上面是调用过程:

importTables -> importTable --> registerInstances

AtlasEntitiesWithExtInfo ret = null;
EntityMutationResponse response = atlasClientV2.createEntities(entities);
List<AtlasEntityHeader> createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE); if (CollectionUtils.isNotEmpty(createdEntities)) {
ret = new AtlasEntitiesWithExtInfo(); for (AtlasEntityHeader createdEntity : createdEntities) {
AtlasEntityWithExtInfo entity = atlasClientV2.getEntityByGuid(createdEntity.getGuid()); ret.addEntity(entity.getEntity()); if (MapUtils.isNotEmpty(entity.getReferredEntities())) {
for (Map.Entry<String, AtlasEntity> entry : entity.getReferredEntities().entrySet()) {
ret.addReferredEntity(entry.getKey(), entry.getValue());
}
} LOG.info("Created {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid());
}
}

通过Http Post 的请求将库表数据更新至Atlas

atlasClientV2有很多Http接口

Atlas HTTP 客户端API:

[Apache Atlas] Atlas 架构设计及源代码简单分析

实时监听

HiveHook implements ExecuteWithHookContext

ExecuteWithHookContext is a new interface that the Pre/Post Execute Hook can run with the HookContext.

实现run()方法来对Hive 相关事件做处理

Hive相关事件:

BaseHiveEvent
├── AlterTableRename
├── CreateHiveProcess
├── DropDatabase
├── DropTable
├── CreateDatabase
│ └── AlterDatabase
└── CreateTable
└── AlterTable
└── AlterTableRenameCol

以create database 为例分析流程:

//处理Hook 上下文信息
AtlasHiveHookContext context =
new AtlasHiveHookContext(this, oper, hookContext, getKnownObjects(), isSkipTempTables()); //建库事件处理,提取相关库信息
event = new CreateDatabase(context); if (event != null) {
final UserGroupInformation ugi = hookContext.getUgi() == null ? Utils.getUGI() : hookContext.getUgi();
super.notifyEntities(ActiveEntityFilter.apply(event.getNotificationMessages()), ugi);
} public enum HookNotificationType {
TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE,
ENTITY_CREATE_V2, ENTITY_PARTIAL_UPDATE_V2, ENTITY_FULL_UPDATE_V2, ENTITY_DELETE_V2
} //操作用户获取
if (context.isMetastoreHook()) {
try {
ugi = SecurityUtils.getUGI();
} catch (Exception e) {
//do nothing
}
} else {
ret = getHiveUserName(); if (StringUtils.isEmpty(ret)) {
ugi = getUgi();
}
} if (ugi != null) {
ret = ugi.getShortUserName();
} if (StringUtils.isEmpty(ret)) {
try {
ret = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException e) {
LOG.warn("Failed for UserGroupInformation.getCurrentUser() ", e); ret = System.getProperty("user.name");
}
}

主要:

获取实体信息, 传递Hook message的类型、操作用户

notifyEntities 可以看出其他组件HBase、impala也会调用该方法进行消息的发送

[Apache Atlas] Atlas 架构设计及源代码简单分析

public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries) {
if (executor == null) { // send synchronously
notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
} else {
executor.submit(new Runnable() {
@Override
public void run() {
notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
}
});
}
}

消息通知框架:

NotificationInterface
├── AtlasFileSpool
└── AbstractNotification
├── KafkaNotification
└── Spooler

数据写入Kafka中:

@Override
public void sendInternal(NotificationType notificationType, List<String> messages) throws NotificationException {
KafkaProducer producer = getOrCreateProducer(notificationType); sendInternalToProducer(producer, notificationType, messages);
}

根据NotificationType写入指定topic 中:

private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<NotificationType, String>() {
{
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
}
}; NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"),
NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"),

数据主要写入两个Topic中: ATLAS_ENTITIES、ATLAS_HOOK

ATLAS_HOOK是写入Hook事件消息, 创建库的事件元数据信息会写入该Topic中

如何唯一确定一个库:

public String getQualifiedName(Database db) {
return getDatabaseName(db) + QNAME_SEP_METADATA_NAMESPACE + getMetadataNamespace();
}

dbName@clusterName 确定唯一性

外延应用

一个基于Hive hook 实现Impala 元数据刷新的用例:

AutoRefreshImpala:https://github.com/Observe-secretly/AutoRefreshImpala

参考

[1] Apache Atlas – Data Governance and Metadata framework for Hadoop

[2] Apache Atlas 源码

[Apache Atlas] Atlas 架构设计及源代码简单分析的更多相关文章

  1. FFmpeg的HEVC解码器源代码简单分析:环路滤波(Loop Filter)

    ===================================================== HEVC源代码分析文章列表: [解码 -libavcodec HEVC 解码器] FFmpe ...

  2. FFmpeg的HEVC解码器源代码简单分析:CTU解码(CTU Decode)部分-TU

    ===================================================== HEVC源代码分析文章列表: [解码 -libavcodec HEVC 解码器] FFmpe ...

  3. FFmpeg的HEVC解码器源代码简单分析:CTU解码(CTU Decode)部分-PU

    ===================================================== HEVC源代码分析文章列表: [解码 -libavcodec HEVC 解码器] FFmpe ...

  4. FFmpeg源代码简单分析:libavdevice的gdigrab

    ===================================================== FFmpeg的库函数源代码分析文章列表: [架构图] FFmpeg源代码结构图 - 解码 F ...

  5. FFmpeg源代码简单分析:libavdevice的avdevice&lowbar;register&lowbar;all&lpar;&rpar;

    ===================================================== FFmpeg的库函数源代码分析文章列表: [架构图] FFmpeg源代码结构图 - 解码 F ...

  6. FFmpeg源代码简单分析:configure

    ===================================================== FFmpeg的库函数源代码分析文章列表: [架构图] FFmpeg源代码结构图 - 解码 F ...

  7. FFmpeg源代码简单分析:makefile

    ===================================================== FFmpeg的库函数源代码分析文章列表: [架构图] FFmpeg源代码结构图 - 解码 F ...

  8. FFmpeg源代码简单分析:libswscale的sws&lowbar;scale&lpar;&rpar;

    ===================================================== FFmpeg的库函数源代码分析文章列表: [架构图] FFmpeg源代码结构图 - 解码 F ...

  9. FFmpeg源代码简单分析:libswscale的sws&lowbar;getContext&lpar;&rpar;

    ===================================================== FFmpeg的库函数源代码分析文章列表: [架构图] FFmpeg源代码结构图 - 解码 F ...

随机推荐

  1. csharp&colon; ODP&period;NET&comma;System&period;Data&period;OracleClient&lpar;&period;net 4&period;0&rpar; and System&period;Data&period;OleDb读取Oracle g 11&period;2&period;0的区别

    ODP.NET: 引用: using Oracle.DataAccess; //Oracle g 11.2.0 using Oracle.DataAccess.Client; using Oracle ...

  2. C&num;&period;NET 大型通用信息化系统集成快速开发平台 4&period;1 版本 - 网络订单提醒功能,网点区域功能增强

    客户端会提醒是否有网络订单来了,这样及时处理网络上的用户下单,当然也会有手机短信系统,全国几千个网点就可以协同作战了,竟然有序的处理海量用户的下单.网络订单提醒功能增强效果如下: 系统每5分钟会检查一 ...

  3. 利用TaskCompletionSource将EAP转换成TAP

        1.原始的异步方法的调用   我们来看个简单的例子,在这里演示调用 WebClient.DownloadStringAsync 方法(这个方法不是 TAP),然后由 WebClient.Dow ...

  4. Repository模式介绍汇总

    1.Linq To Sql中Repository模式应用场景 http://www.cnblogs.com/zhijianliutang/archive/2012/02/24/2367305.html ...

  5. IE6 浏览器常见兼容问题 大汇总(23个)

    IE6以及各个浏览器常见兼容问题 大汇总 综述:虽然说IE6在2014年4月将被停止支持,但是不得不说的是,IE6的市场并不会随着支持的停止而立刻消散下去,对于WEB前端开发工程师来说,兼容IE6 兼 ...

  6. Bower —— 一个Web的包管理工具

    作者:江剑锋   github地址:https://github.com/bower/bower Bower为何物 Bower是一个Web开发的包管理软件.前端开发中,或多或少,都会以来于现成的fra ...

  7. Emacs经常使用快捷键的注意事项

    一直用VIM,尝试了好几次Emacs都被它"多得像天上的星星"一样的快捷键给吓倒了.这几天最终下定决心再次尝试. 将它的Tutor练习了一下,顺便对经常使用快捷键做了一下笔记,方便 ...

  8. css3变形动画

    transform:变形 rotate:旋转 translate:移动 scale:缩放 skew:扭曲 一切变化都是“形变”引起的变化,所以transform就是老大,大家都围着他转 1.trans ...

  9. python3和grpc的微服务探索实践

    对于微服务的实践,一般都是基于Java和Golang的,博主最近研究了下基于Python的微服务实践,现在通过一个简单的服务来分析Python技术栈的微服务实践 技术栈:Python3 + grpc ...

  10. ABP入门系列之1——ABP总体介绍

    ABP是“ASP.NET Boilerplate Project (ASP.NET样板项目)”的简称. ASP.NET Boilerplate是一个用最佳实践和流行技术开发现代WEB应用程序的新起点, ...