您当前的位置:首页 > 互联网百科 > 大数据

如何轻松做数据治理?开源技术栈告诉你答案

时间:2022-12-27 13:39:46  来源:今日头条  作者:NebulaGraph


 

搭建一套数据治理体系耗时耗力,但或许我们没有必要从头开始搞自己的数据血缘项目。本文分享如何用开源、现代的 DataOps、ETL、Dashboard、元数据、数据血缘管理系统构建大数据治理基础设施。

元数据治理系统

元数据治理系统是一个提供了所有数据在哪、格式化方式、生成、转换、依赖、呈现和所属的一站式视图

元数据治理系统是所有数据仓库、数据库、表、仪表板、ETL 作业等的目录接口(catalog),有了它,我们就不用在群里喊 “大家好,我可以更改这个表的 schema 吗?”、 “请问谁知道我如何找到 table-view-foo-bar 的原始数据?”… 一个成熟的数据治理方案中的元数据治理系统,对数据团队来说非常必要。

数据血缘则是元数据治理系统众多需要管理的元数据之一,例如,某些 Dashboard 是某一个 Table View 的下游,而这个 Table View 又是从另外两个上游表 JOIN 而来。显然,应该清晰地掌握、管理这些信息,去构建一个可信、可控的系统和数据质量控制体系。

数据治理的可行方案数据治理方案设计

元数据和数据血缘本质上非常适合采用图数据建模、图数据库。因为数据治理涉及的典型查询便是面向图关系的查询,像 “查找指定组件(即表)的所有 n 度(深度)的数据血缘” 就是图查询语句 FIND ALL PATH 跑起来的事。从日常大家在论坛、微信群里讨论的查询和图建模来看,NebulaGraph 社区很多人在从零开始搭建数据血缘系统,而这些工作看起来大多是在重复造轮子,而且还是不容易造的轮子。

既然如此,前人种树后人乘凉,这里我决定搭建一个完备、端到端(不只有元数据管理)的数据系统,供大家参考解决数据血缘、数据治理问题。这个套数据系统会采用市面上优秀的开源项目,而图数据库这块还是采用大家的老朋友 ——NebulaGraph。希望对大家能有所启发,在此基础之上拥有一个相对完善的图模型,以及设计精巧、开箱即用的元数据治理系统。

下面,来看看元数据治理系统的轮子都需要哪些功能组件:

 

  • 元数据抽取这部分需要从不同的数据栈拉 / 推数据,像是从数据库、数仓、Dashboard,甚至是 ETL Pipeline 和应用、服务中搞数据。
  • 元数据存储可以存在数据库、图数据库里,甚至存成超大的 JSON manifest 文件都行
  • 元数据目录接口系统 Catalog提供 API / GUI 来读写元数据和数据血缘系统

 

下图是整个方案的简单示意图:

其中,上面的虚线框是元数据的来源与导入、下面的虚线框是元数据的存储与展示、发现。


 


开源技术栈

下面,介绍下数据治理系统的每个部分。

数据库和数仓

为了处理和使用原始和中间数据,这里一定涉及至少一个数据库或者数仓。它可以是 Hive、Apache Delta、TiDB、Cassandra、MySQL 或 Postgres。

在这个参考项目中,我们选一个简单、流行的 Postgres。

✓ 数据仓库:Postgres

数据运维 DataOps

我们应该有某种 DataOps 的方案,让 Pipeline 和环境具有可重复性、可测试性和版本控制性。

在这里,我们使用了 GitLab 创建的 Meltano。

Meltano 是一个 just-work 的 DataOps 平台,它可以用巧妙且优雅的方式将 Singer 作为 EL 和 dbt 作为 T 连接起来。此外,它还连接到其他一些 datAInfra 实用程序,例如 Apache Superset 和 Apache Airflow 等。

至此,我们又纳入了一个成员:

✓ GitOps:Meltano https://gitlab.com/meltano/meltano

ETL 工具

上面我们提到过组合 Singer 与 Meltano 将来自许多不同数据源的数据 E(提取)和 L(加载)数据目标,并使用 dbt 作为 Transform 的平台。于是我们得到:

✓ EL:Singer ✓ T: dbt

数据可视化

在数据之上创建 Dashboard、图表和表格得到数据的洞察是很符合直觉的,类似大数据之上的 Excel 图标功能。


 

Apache Superset 是我很喜欢的开源数据可视化项目,我准备用它来作为被治理管理的目标之一。同时,还会利用它实现可视化功能来完成元数据洞察。于是,

✓ Dashboard:Apache Superset

任务编排(DAG Job Orchestration)

在大多数情况下,我们的 DataOps 作业、任务会演变成需要编排系统的规模,我们可以用 Apache Airflow 来负责这一块。

✓ DAG:Apache Airflow https://airflow.apache.org/

元数据治理

随着越来越多的组件和数据被引入数据基础设施,在数据库、表、数据建模(schema)、Dashboard、DAG(编排系统中的有向无环图)、应用与服务的各个生命周期阶段中都将存着海量的元数据,需要对它们的管理员和团队进行协同管理、连接和发现。

linux Foundation Amundsen 是解决该问题的最佳项目之一。 Amundsen 用图数据库为事实源(single source of truth)以加速多跳查询,Elasticsearch 为全文搜索引擎。它在顺滑地处理所有元数据及其血缘之余,还提供了优雅的 UI 和 API。 Amundsen 支持多种图数据库为后端,这里咱们用 NebulaGraph。


 

现在的技术栈:

✓ 数据发现:Linux Foundation Amundsen ✓ 全文搜索:Elasticsearch ✓ 图数据库:NebulaGraph

好的,所有组件都齐正了,开始组装它们吧。

环境搭建与各组件初识

本次实践的项目方案已开源,你可以访问 https://Github.com/wey-gu/data-lineage-ref-solution 来获得对应的代码。

整个实践过程,我遵循了尽量干净、鼓励共建的原则。项目预设在一个 unix-like 系统上运行,且联网和装有 Docker-Compose。

这里,我将在 Ubuntu 20.04 LTS X86_64 上运行它,当然在其他发行版或 Linux 版本上应该也没有问题。

运行一个数仓、数据库

首先,安装 Postgres 作为我们的数仓。

这个单行命令会创建一个使用 Docker 在后台运行的 Postgres,进程关闭之后容器不会残留而是被清理掉(因为参数 --rm)。

docker run --rm --name postgres -e POSTGRES_PASSword=lineage_ref -e POSTGRES_USER=lineage_ref -e POSTGRES_DB=warehouse -d -p 5432:5432 postgres

我们可以用 Postgres CLI 或 GUI 客户端来验证命令是否执行成功。

DataOps 工具链部署

接下来,安装有机结合了 Singer 和 dbt 的 Meltano。

Meltano 帮助我们管理 ETL 工具(作为插件)及其所有配置和 pipeline。这些元信息位于 Meltano 配置及其系统数据库中,其中配置是基于文件的(可以使用 GitOps 管理),它的默认系统数据库是 SQLite。

安装 Meltano

使用 Meltano 的工作流是启动一个 “meltano 项目” 并开始将 E、L 和 T 添加到配置文件中。Meltano 项目的启动只需要一个 CLI 命令 meltano init yourprojectname。不过,在那之前,先用 Python/ target=_blank class=infotextkey>Python 的包管理器 pip 或者 Docker 镜像安装 Meltano,像我示范的这样:

在 Python 虚拟环境中使用 pip 安装 Meltano:

mkdir .venv# example in a debian flavor Linux distrosudo apt-get install python3-dev python3-pip python3-venv python3-wheel -ypython3 -m venv .venv/meltanosource .venv/meltano/bin/activatepython3 -m pip install wheelpython3 -m pip install meltano# init a projectmkdir meltano_projects && cd meltano_projects# replace with your own .NETouch .envmeltano init

或者,用 Docker 容器安装 Meltano:

docker pull meltano/meltano:latestdocker run --rm meltano/meltano --version# init a projectmkdir meltano_projects && cd meltano_projects# replace with your own onetouch .envdocker run --rm -v "$(pwd)":/projects -w /projects --env-file .env meltano/meltano init

除了知晓 meltano init 之外,最好掌握 Meltano 部分命令,例如 meltano etl 表示 ETL 的执行,meltano invoke 来调用插件命令。详细可以参考它的速查表 https://docs.meltano.com/reference/command-line-interface。

Meltano Gui 界面

Meltano 自带一个基于 Web 的 UI,执行 ui 子命令就能启动它:

meltano ui

它默认会跑在 http://localhost:5000 上。

针对 Docker 的运行环境,在暴露 5000 端口的情况下运行容器即可。由于容器的默认命令已经是 meltano ui,所以 run 的命令只需:

docker run -v "$(pwd)":/project -w /project -p 5000:5000 meltano/meltanoMeltano 项目示例

gitHub 用户 Pat Nadolny 在开源项目 singer_dbt_jaffle 中做了很好的示例。他采用 dbt 的 Meltano 示例数据集,利用 Airflow 编排 ETL 任务。

不只这样,他还有利用 Superset 的例子,见 jaffle_superset。

前人种树我们来吃果,按照 Pat Nadolny 的实践,我们可以这样地运行数据管道(pipeline):

 

  • tap-CSV(Singer)从 CSV 文件中提取数据
  • target-postgres(Singer) 将数据加载到 Postgres
  • dbt 将数据转换为聚合表或视图

 

注意,上面我们已经启动了 Postgres,可以跳过容器启动 Postgres 这步。

操作过程是:

git clone https://github.com/pnadolny13/meltano_example_implementations.gitcd meltano_example_implementations/meltano_projects/singer_dbt_jaffle/meltano installtouch .envecho PG_PASSWORD="lineage_ref" >> .envecho PG_USERNAME="lineage_ref" >> .env# Extract and Load(with Singer)meltano run tap-CSV target-postgres# Trasnform(with dbt)meltano run dbt:run# Generate dbt docsmeltano invoke dbt docs generate# Serve generated dbt docsmeltano invoke dbt docs to serve# Then visit http://localhost:8080

现在,我们可以连接到 Postgres 来查看加载和转换后的数据预览。如下所示,截图来自 VS Code 的 SQLTool。

payments 表里长这样子:


 

搭一个 BI Dashboard 系统

现在,我们的数据仓库有数据了。接下来,可以试着用下这些数据。

像仪表盘 Dashbaord 这样的 BI 工具能帮我们从数据中获得有用的洞察。使用可视化工具 Apache Superset 可以很容易地创建和管理这些基于数据源的 Dashboard 和各式各样的图表。

本章的重点不在于 Apache Superset 本身,所以,咱们还是复用 Pat Nadolny 的 jaffle_superset 例子。

Bootstrap Meltano 和 Superset

创建一个安装了 Meltano 的 Python VENV:

mkdir .venvpython3 -m venv .venv/meltanosource .venv/meltano/bin/activatepython3 -m pip install wheelpython3 -m pip install meltano

参考 Pat 的小抄,做一些细微的调整:

克隆 repo,进入 jaffle_superset 项目:

git clone https://github.com/pnadolny13/meltano_example_implementations.gitcd meltano_example_implementations/meltano_projects/jaffle_superset/

修改 meltano 配置文件,让 Superset 连接到我们创建的 Postgres:

vim meltano_projects/jaffle_superset/meltano.yml

这里,我将主机名更改为 “10.1.1.111”,这是我当前主机的 IP。如果你是 windows 或者 macOS 上的 Docker Desktop,这里不要修改主机名,否则就要和我一样手动改成实际地址:

--- a/meltano_projects/jaffle_superset/meltano.yml+++ b/meltano_projects/jaffle_superset/meltano.yml plugins:A list of database driver dependencies can be found here https://superset.apache.org/docs/databases/installing-database-driversconfig:database_name: my_postgres- sqlalchemy_uri: postgresql+psycopg2://${PG_USERNAME}:${PG_PASSWORD}@host.docker.internal:${PG_PORT}/${PG_DATABASE}+ sqlalchemy_uri: postgresql+psycopg2://${PG_USERNAME}:${PG_PASSWORD}@10.1.1.168:${PG_PORT}/${PG_DATABASE}tables:- model.my_meltano_project.customers- model.my_meltano_project.orders

添加 Postgres 登录的信息到 .env 文件:

echo PG_USERNAME=lineage_ref >> .envecho PG_PASSWORD=lineage_ref >> .env

安装 Meltano 项目,运行 ETL 任务:

meltano installmeltano run tap-csv target-postgres dbt:run

调用、启动 Superset,这里注意 ui 不是 meltano 的内部命令,而是一个配置进去的自定义行为(user-defined action):

meltano invoke superset:ui

在另一个命令行终端执行自定义的命令 load_datasources:

meltano invoke superset:load_datasources

通过浏览器访问 http://localhost:8088/ 就是 Superset 的图形界面了:


 

创建一个 Dashboard

现在,我们站在 Meltano、Postgres 的肩膀上,用 ETL 数据建一个 Dashboard 吧:

点击 + DASHBOARD,填写仪表盘名称,再先后点击 SAVE 和 + CREATE A NEW CHART:


 

在新图表(Create a new chart)视图中,选择图表类型和数据集。在这里,我选择了 orders 表作为数据源和 Pie Chart 图表类型:


 

点击 CREATE NEW CHART 后,在图表定义视图中选择 “status” 的 “Query” 为 “DIMENSIONS”,“COUNT (amount)” 为 “METRIC”。至此,咱们就可以看到每个订单状态分布的饼图了。


 

点击 SAVE,系统会询问应该将此图表添加到哪个 Dashboard。选择后,单击 SAVE & GO TO DASHBOARD。


 

在 Dashboard 中,我们可以看到所有的图表。这不,你可以看到我额外添加的、用来显示客户订单数量分布的图表:


 

点 ··· 能看到刷新率设置、下载渲染图等其他的功能。


 

现在,我们有一个简单但典型的 HomeLAB 数据技术栈了,并且所有东西都是开源的!

想象一下,我们在 CSV 中有 100 个数据集,在数据仓库中有 200 个表,并且有几个数据工程师在运行不同的项目,这些项目使用、生成不同的应用与服务、Dashbaord 和数据库。当有人想要查找、发现或者修改其中的一些表、数据集、Dashbaord 和管道,在沟通和工程方面可能都是非常不好管理的。

上面我们提到,这个示例项目的主要功能是元数据发现系统

元数据发现系统

现在,需要我们部署一个带有 NebulaGraph 和 Elasticsearch 的 Amundsen。有了 Amundsen,我们可以在一个地方发现和管理整个数据栈中的所有元数据。

Amundsen 主要有两个部分组成:

 

  • 元数据导入 Metadata IngestionAmundsen Databuilder
  • 元数据目录服务 Metadata CatalogAmundsen Frontend ServiceAmundsen Metadata ServiceAmundsen Search Service

 

它的工作原理是:利用 Databuilder 提取不同数据源的元数据,并将元数据持久化到 Metadata Service 和 Search Service 中,用户从 Frontend Service 或 Metadata Service 的 API 获取数据。

部署 Amundsen元数据服务 Metadata Service

我们用 docker-compose 部署一个 Amundsen 集群。由于 Amundsen 对 NebulaGraph 后端的支持 pr#1817 尚未合并,还不能用官方的代码。这里,先用我的 fork 版本。

先克隆包含所有子模块的 repo:

git clone -b amundsen_nebula_graph --recursive git@github.com:wey-gu/amundsen.gitcd amundsen

启动所有目录服务 catalog services 及其后端存储:

docker-compose -f docker-amundsen-nebula.yml up

由于这个 docker-compose 文件是供开发人员试玩、调试 Amundsen 用的,而不是给生产部署准备的,它在启动的时候会从代码库构建镜像,第一次跑的时候启动会慢一些。

部署好了之后,我们使用 Databuilder 将一些示例、虚构的数据加载存储里。

抓取元数据 Databuilder

Amundsen Databuilder 就像 Meltano 系统一样,只不过是用在元数据的上的 ETL ,它把元数据加载到 Metadata Service 和 Search Service 的后端存储:NebulaGraph 和 Elasticsearch 里。这里的 Databuilder 只是一个 Python 模块,所有的元数据 ETL 作业可以作为脚本运行,也可以用 Apache Airflow 等 DAG 平台进行编排。

安装 Amundsen Databuilder:

cd databuilderpython3 -m venv .venvsource .venv/bin/activatepython3 -m pip install wheelpython3 -m pip install -r requirements.txtpython3 setup.py install

调用这个示例数据构建器 ETL 脚本来把示例的虚拟数据导进去。

python3 example/scripts/sample_data_loader_nebula.py验证一下 Amundsen

在访问 Amundsen 之前,我们需要创建一个测试用户:

# run a container with curl attached to amundsenfrontenddocker run -it --rm --net container:amundsenfrontend nicolaka/netshoot# Create a user with id test_user_idcurl -X PUT -v http://amundsenmetadata:5002/user -H "Content-Type: Application/json" --data '{"user_id":"test_user_id","first_name":"test","last_name":"user", "email":"test_user_id@mail.com"}'exit

然后我们可以在 http://localhost:5000 查看 UI 并尝试搜索 test,它应该会返回一些结果。


 

然后,可以单击并浏览在 sample_data_loader_nebula.py 期间加载到 Amundsen 的那些示例元数据。

此外,我们还可以通过 NebulaGraph Studio 的地址 http://localhost:7001 访问 NebulaGraph 里的这些数据。

下图显示了有关 Amundsen 组件的更多详细信息:

┌────────────────────────┐ ┌────────────────────────────────────────┐│ Frontend:5000 │ │ Metadata Sources │├────────────────────────┤ │ ┌────────┐ ┌─────────┐ ┌─────────────┐ ││ Metaservice:5001 │ │ │ │ │ │ │ │ ││ ┌──────────────┐ │ │ │ Foo DB │ │ Bar App │ │ X Dashboard │ │┌────┼─┤ Nebula Proxy │ │ │ │ │ │ │ │ │ ││ │ └──────────────┘ │ │ │ │ │ │ │ │ ││ ├────────────────────────┤ │ └────────┘ └─────┬───┘ └─────────────┘ │┌─┼────┤ Search searvice:5002 │ │ │ ││ │ └────────────────────────┘ └──────────────────┼─────────────────────┘│ │ ┌─────────────────────────────────────────────┼───────────────────────┐│ │ │ │ ││ │ │ Databuilder ┌───────────────────────────┘ ││ │ │ │ ││ │ │ ┌───────────────▼────────────────┐ ┌──────────────────────────────┐ ││ │ ┌──┼─► Extractor of Sources ├─► nebula_search_data_extractor │ ││ │ │ │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ ││ │ │ │ ┌───────────────▼────────────────┐ ┌──────────────▼───────────────┐ ││ │ │ │ │ Loader filesystem_csv_nebula │ │ Loader Elastic FS loader │ ││ │ │ │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ ││ │ │ │ ┌───────────────▼────────────────┐ ┌──────────────▼───────────────┐ ││ │ │ │ │ Publisher nebula_csv_publisher │ │ Publisher Elasticsearch │ ││ │ │ │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ ││ │ │ └─────────────────┼─────────────────────────────────┼─────────────────┘│ │ └────────────────┐ │ ││ │ ┌─────────────┼───►─────────────────────────┐ ┌─────▼─────┐│ │ │ NebulaGraph │ │ │ │ ││ └────┼─────┬───────┴───┼───────────┐ ┌─────┐ │ │ ││ │ │ │ │ │MetaD│ │ │ ││ │ ┌───▼──┐ ┌───▼──┐ ┌───▼──┐ └─────┘ │ │ ││ ┌────┼─►GraphD│ │GraphD│ │GraphD│ │ │ ││ │ │ └──────┘ └──────┘ └──────┘ ┌─────┐ │ │ ││ │ │ :9669 │MetaD│ │ │ Elastic ││ │ │ ┌────────┐ ┌────────┐ ┌────────┐ └─────┘ │ │ Search ││ │ │ │ │ │ │ │ │ │ │ Cluster ││ │ │ │StorageD│ │StorageD│ │StorageD│ ┌─────┐ │ │ :9200 ││ │ │ │ │ │ │ │ │ │MetaD│ │ │ ││ │ │ └────────┘ └────────┘ └────────┘ └─────┘ │ │ ││ │ ├───────────────────────────────────────────┤ │ ││ └────┤ Nebula Studio:7001 │ │ ││ └───────────────────────────────────────────┘ └─────▲─────┘└──────────────────────────────────────────────────────────┘穿针引线:元数据发现

设置好基本环境后,让我们把所有东西穿起来。还记得我们有 ELT 一些数据到 PostgreSQL 吗?


 

那么,我们如何让 Amundsen 发现这些数据和 ETL 的元数据呢?

提取 Postgres 元数据

我们从数据源开始:首先是 Postgres。

我们为 Python3 安装 Postgres 客户端:

sudo apt-get install libpq-devpip3 install Psycopg2执行 Postgres 元数据 ETL

运行一个脚本来解析 Postgres 元数据:

export CREDENTIALS_POSTGRES_USER=lineage_refexport CREDENTIALS_POSTGRES_PASSWORD=lineage_refexport CREDENTIALS_POSTGRES_DATABASE=warehousepython3 example/scripts/sample_postgres_loader_nebula.py

我们看看把 Postgres 元数据加载到 NebulaGraph 的示例脚本的代码,非常简单直接:

# part 1: PostgresMetadata --> CSV --> NebulaGraphjob = DefaultJob(conf=job_config,task=DefaultTask(extractor=PostgresMetadataExtractor(),loader=FsNebulaCSVLoader()),publisher=NebulaCsvPublisher())# part 2: Metadata stored in NebulaGraph --> Elasticsearchextractor = NebulaSearchDataExtractor()task = SearchMetadatatoElasticasearchTask(extractor=extractor)job = DefaultJob(conf=job_config, task=task)

第一个工作路径是:PostgresMetadata --> CSV --> NebulaGraph

 

  • PostgresMetadataExtractor 用于从 Postgres 中提取元数据,可以参考文档 https://www.amundsen.io/amundsen/databuilder/#postgresmetadataextractor。
  • FsNebulaCSVLoader 用于将提取的数据转为 CSV 文件
  • NebulaCsvPublisher 用于将元数据以 CSV 格式发布到 NebulaGraph

 

第二个工作路径是:Metadata stored in NebulaGraph --> Elasticsearch

 

  • NebulaSearchDataExtractor 用于获取存储在 NebulaGraph 中的元数据
  • SearchMetadatatoElasticasearchTask 用于使 Elasticsearch 对元数据进行索引。

 

请注意,在生产环境中,我们可以在脚本中或使用 Apache Airflow 等编排平台触发这些作业。

验证 Postgres 中元数据的获取

搜索 payments 或者直接访问 http://localhost:5000/table_detail/warehouse/postgres/public/payments,你可以看到我们 Postgres 的元数据,比如:


 

像上面的屏幕截图一样,我们可以轻松完成元数据管理操作,如:添加标签、所有者和描述。

提取 dbt 元数据

其实,我们也可以从 dbt 本身提取元数据。

Amundsen DbtExtractor 会解析 catalog.json 或 manifest.json 文件并将元数据加载到 Amundsen 存储,这里当然指的是 NebulaGraph 和 Elasticsearch。

在上面的 Meltano 章节中,我们已经使用 meltano invoke dbt docs generate 生成了这个文件:

14:23:15 Done.14:23:15 Building catalog14:23:15 Catalog written to /home/ubuntu/ref-data-lineage/meltano_example_implementations/meltano_projects/singer_dbt_jaffle/.meltano/transformers/dbt/target/catalog.jsondbt 元数据 ETL 的执行

我们试着解析示例 dbt 文件中的元数据吧:

$ ls -l example/sample_data/dbt/total 184-rw-rw-r-- 1 w w 5320 May 15 07:17 catalog.json-rw-rw-r-- 1 w w 177163 May 15 07:17 manifest.json

我写的这个示例的加载例子如下:

python3 example/scripts/sample_dbt_loader_nebula.py

其中主要的代码如下:

# part 1: dbt manifest --> CSV --> NebulaGraphjob = DefaultJob(conf=job_config,task=DefaultTask(extractor=DbtExtractor(),loader=FsNebulaCSVLoader()),publisher=NebulaCsvPublisher())# part 2: Metadata stored in NebulaGraph --> Elasticsearchextractor = NebulaSearchDataExtractor()task = SearchMetadatatoElasticasearchTask(extractor=extractor)job = DefaultJob(conf=job_config, task=task)

它和 Postgres 元数据 ETL 的唯一区别是 extractor=DbtExtractor(),它带有以下配置以获取有关 dbt 项目的以下信息:

 

  • 数据库名称
  • 目录_json
  • manifest_json
job_config = ConfigFactory.from_dict({'extractor.dbt.database_name': database_name,'extractor.dbt.catalog_json': catalog_file_loc, # File'extractor.dbt.manifest_json': json.dumps(manifest_data), # JSON Dumped objecy'extractor.dbt.source_url': source_url})验证 dbt 抓取结果

 

搜索 dbt_demo 或者直接访问 http://localhost:5000/table_detail/dbt_demo/snowflake/public/raw_inventory_value,可以看到


 

这里给一个小提示,其实,我们可以选择启用 DEBUG log 级别去看已发送到 Elasticsearch 和 NebulaGraph 的内容。

- logging.basicConfig(level=logging.INFO)+ logging.basicConfig(level=logging.DEBUG)

或者,在 NebulaGraph Studio 中探索导入的数据:

先点击 Start with Vertices,并填写顶点 vid:snowflake://dbt_demo.public/fact_warehouse_inventory


 

我们可以看到顶点显示为粉红色的点。再让我们修改下 Expand / ” 拓展 “选项:

 

  • 方向:双向
  • 步数:单向、三步

 


 

并双击顶点(点),它将双向拓展 3 步:


 

像截图展示的那般,在可视化之后的图数据库中,这些元数据可以很容易被查看、分析,并从中获得洞察。

而且,我们在 NebulaGraph Studio 中看到的同 Amundsen 元数据服务的数据模型相呼应:


 

最后,请记住我们曾利用 dbt 来转换 Meltano 中的一些数据,并且清单文件路径是 .meltano/transformers/dbt/target/catalog.json,你可以尝试创建一个数据构建器作业来导入它。

提取 Superset 中的元数据

Amundsen 的 Superset Extractor 可以获取

 

  • Dashboard 元数据抽取,见 apache_superset_metadata_extractor.py
  • 图表元数据抽取,见 apache_superset_chart_extractor.py
  • Superset 元素与数据源(表)的关系抽取,见 apache_superset_table_extractor.py

 

来,现在试试提取之前创建的 Superset Dashboard 的元数据。

Superset 元数据 ETL 的执行

下边执行的示例 Superset 提取脚本可以获取数据并将元数据加载到 NebulaGraph 和 Elasticsearch 中。

python3 sample_superset_data_loader_nebula.py

如果我们将日志记录级别设置为 DEBUG,我们实际上可以看到这些中间的过程日志:

# fetching metadata from supersetDEBUG:urllib3.connectionpool:http://localhost:8088 "POST /api/v1/security/login HTTP/1.1" 200 280INFO:databuilder.task.task:Running a taskDEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:8088DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /api/v1/dashboard?q=(page_size:20,page:0,order_direction:desc) HTTP/1.1" 308 374DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /api/v1/dashboard/?q=(page_size:20,page:0,order_direction:desc) HTTP/1.1" 200 1058# insert DashboardDEBUG:databuilder.publisher.nebula_csv_publisher:Query: INSERT VERTEX `Dashboard` (`dashboard_url`, `name`, published_tag, publisher_last_updated_epoch_ms) VALUES "superset_dashboard://my_cluster.1/3":("http://localhost:8088/superset/dashboard/3/","my_dashboard","unique_tag",timestamp());# insert a DASHBOARD_WITH_TABLE relationship/edgeINFO:databuilder.publisher.nebula_csv_publisher:Importing data in edge files: ['/tmp/amundsen/dashboard/relationships/Dashboard_Table_DASHBOARD_WITH_TABLE.csv']DEBUG:databuilder.publisher.nebula_csv_publisher:Query:INSERT edge `DASHBOARD_WITH_TABLE` (`END_LABEL`, `START_LABEL`, published_tag, publisher_last_updated_epoch_ms) VALUES "superset_dashboard://my_cluster.1/3"->"postgresql+psycopg2://my_cluster.warehouse/orders":("Table","Dashboard","unique_tag", timestamp()), "superset_dashboard://my_cluster.1/3"->"postgresql+psycopg2://my_cluster.warehouse/customers":("Table","Dashboard","unique_tag", timestamp());验证 Superset Dashboard 元数据

通过在 Amundsen 中搜索它,我们现在可以获得 Dashboard 信息。

我们也可以从 NebulaGraph Studio 进行验证。


 

注:可以参阅 Dashboard 抓取指南中的 Amundsen Dashboard 图建模:


 

用 Superset 预览数据

Superset 可以用来预览表格数据,文档可以参考 https://www.amundsen.io/amundsen/frontend/docs/configuration/#preview-client,其中 /superset/sql_json/ 的 API 被 Amundsen Frontend Service 调用,取得预览信息。


 

开启数据血缘信息

默认情况下,数据血缘是关闭的,我们可以通过以下方式启用它:

第一步,cd 到 Amundsen 代码仓库下,这也是我们运行 docker-compose -f docker-amundsen-nebula.yml up 命令的地方:

cd amundsen

第二步,修改 frontend 下的 TypeScript 配置

--- a/frontend/amundsen_application/static/js/config/config-default.ts+++ b/frontend/amundsen_application/static/js/config/config-default.tstableLineage: {- inAppListEnabled: false,- inAppPageEnabled: false,+ inAppListEnabled: true,+ inAppPageEnabled: true,externalEnabled: false,iconPath: 'PATH_TO_ICON',isBeta: false,

第三步,重新构建 Docker 镜像,其中将重建前端图像。

docker-compose -f docker-amundsen-nebula.yml build

第四步,重新运行 up -d 以确保前端用新的配置:

docker-compose -f docker-amundsen-nebula.yml up -d

结果大概长这样子:

$ docker-compose -f docker-amundsen-nebula.yml up -dRecreating amundsenfrontend ... done

之后,我们可以访问 http://localhost:5000/lineage/table/gold/hive/test_schema/test_table1 看到 Lineage (beta) 血缘按钮已经显示出来了:


 

我们可以点击 Downstream 查看该表的下游资源:


 

或者点击血缘按钮查看血缘的图表式:


 

也有用于血缘查询的 API。

下面这个例子中,我们用 cURL 调用下这个 API:

docker run -it --rm --net container:amundsenfrontend nicolaka/netshootcurl "http://amundsenmetadata:5002/table/snowflake://dbt_demo.public/raw_inventory_value/lineage?depth=3&direction=both"

上面的 API 调用是查询上游和下游方向的 linage,表 snowflake://dbt_demo.public/raw_inventory_value 的深度为 3。

结果应该是这样的:

"depth": 3,"downstream_entities": ["level": 2,"usage": 0,"key": "snowflake://dbt_demo.public/fact_daily_expenses","parent": "snowflake://dbt_demo.public/fact_warehouse_inventory","badges": [],"source": "snowflake"},"level": 1,"usage": 0,"key": "snowflake://dbt_demo.public/fact_warehouse_inventory","parent": "snowflake://dbt_demo.public/raw_inventory_value","badges": [],"source": "snowflake"],"key": "snowflake://dbt_demo.public/raw_inventory_value","direction": "both","upstream_entities": []

实际上,这个血缘数据就是在我们的 dbtExtractor 执行期间提取和加载的,其中 extractor .dbt.{DbtExtractor.EXTRACT_LINEAGE} 默认为 true,因此,创建了血缘元数据并将其加载到了 Amundsen。

在 NebulaGraph 中洞察血缘

使用图数据库作为元数据存储的两个优点是:

图查询本身是一个灵活的 DSL for lineage API,例如,这个查询帮助我们执行 Amundsen 元数据 API 的等价的查询:

MATCH p=(t:`Table`) -[:`HAS_UPSTREAM`|:`HAS_DOWNSTREAM` *1..3]->(x)WHERE id(t) == "snowflake://dbt_demo.public/raw_inventory_value" RETURN p

来,在 NebulaGraph Studio 或者 Explorer 的控制台中查询下:


 

渲染下这个结果:


 

提取数据血缘

这些血缘信息是需要我们明确指定、获取的,获取的方式可以是自己写 Extractor,也可以是用已有的方式。比如:dbt 的 Extractor 和 Open Lineage 项目的 Amundsen Extractor。

通过 dbt

这个在刚才已经展示过了,dbt 的 Extractor 会从表级别获取血缘同其他 dbt 中产生的元数据信息一起被拿到。

通过 Open Lineage

Amundsen 中的另一个开箱即用的血缘 Extractor 是 OpenLineageTableLineageExtractor。

Open Lineage 是一个开放的框架,可以将不同来源的血统数据收集到一个地方,它可以将血统信息输出为 JSON 文件,参见文档 https://www.amundsen.io/amundsen/databuilder/#openlineagetablelineageextractor。

下边是它的 Amundsen Databuilder 例子:

dict_config = {f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.CLUSTER_NAME}': 'datalab',f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.OL_DATASET_NAMESPACE_OVERRIDE}': 'hive_table',f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.TABLE_LINEAGE_FILE_LOCATION}': 'input_dir/openlineage_nd.json',task = DefaultTask(extractor=OpenLineageTableLineageExtractor(),loader=FsNebulaCSVLoader())回顾

整套元数据治理 / 发现的方案思路如下:

 

  • 将整个数据技术栈中的组件作为元数据源(从任何数据库、数仓,到 dbt、Airflow、Openlineage、Superset 等各级项目)
  • 使用 Databuilder(作为脚本或 DAG)运行元数据 ETL,以使用 NebulaGraph 和 Elasticsearch 存储和索引
  • 从前端 UI(使用 Superset 预览)或 API 去使用、消费、管理和发现元数据
  • 通过查询和 UI 对 NebulaGraph,我们可以获得更多的可能性、灵活性和数据、血缘的洞察

 


 

涉及到的开源

此参考项目中使用的所有项目都按字典顺序在下面列出。

 

  • Amundsen
  • Apache Airflow
  • Apache Superset
  • dbt
  • Elasticsearch
  • meltano
  • NebulaGraph
  • Open Lineage
  • Singer

 

谢谢你读完本文(///▽///)

要来近距离体验一把图数据库吗?现在可以用用 NebulaGraph Cloud 来搭建自己的图数据系统哟,快来节省大量的部署安装时间来搞定业务吧~ NebulaGraph 阿里云计算巢现 30 天免费使用中~

想看源码的小伙伴可以前往 GitHub 阅读、使用、(^з^)-☆ star 它 -> GitHub:https://github.com/vesoft-inc/nebula



Tags:数据治理   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
向新手解释数据治理的一个最佳方式
数据管理和数据治理可能很难向新手解释。它们涵盖了复杂的数据能力领域,例如元数据管理、数据质量、数据架构、数据编目、数据隐私、数据科学和数据集成。我发现自己在为客户...【详细内容】
2023-09-17  Search: 数据治理  点击:(85)  评论:(0)  加入收藏
到底什么是数据治理?包含哪些内容?
一、数据治理的定义关于数据治理,国际数据治理研究所(DGI)给出的定义是:“数据治理是一个通过一系列信息相关的过程来实现决策权和职责分工的系统,这些过程按照达成共识的模型来...【详细内容】
2023-07-27  Search: 数据治理  点击:(81)  评论:(0)  加入收藏
框架结构:非侵入式数据治理框架
以下文章是描述非侵入式数据治理框架的三部分系列文章中的第一篇。该框架由 KIK Consulting & Educational Services ( KIKconsulting.com ) 和 The Data Administration Ne...【详细内容】
2023-07-08  Search: 数据治理  点击:(163)  评论:(0)  加入收藏
24张架构图把数据治理核心内容讲透了
前言随着信息革命和信息化的飞速发展,计算机数据量的急剧增长,数据利用和管理的重要性与日俱增,数据逐渐在信息化这个大舞台上扮演着越来越重要的角色。数据治理是企业大数据基...【详细内容】
2023-06-20  Search: 数据治理  点击:(186)  评论:(0)  加入收藏
如何轻松做数据治理?开源技术栈告诉你答案
搭建一套数据治理体系耗时耗力,但或许我们没有必要从头开始搞自己的数据血缘项目。本文分享如何用开源、现代的 DataOps、ETL、Dashboard、元数据、数据血缘管理系统构建大...【详细内容】
2022-12-27  Search: 数据治理  点击:(194)  评论:(0)  加入收藏
详解非结构化数据治理
随着互联网技术的日新月异,内容数据逐渐在各行业的业务中占据更重要的地位。日常的业务过程中,需要处理的大量电子文档、图片、音频、视频等,都属于内容数据范畴。例如,某银行的...【详细内容】
2022-08-24  Search: 数据治理  点击:(230)  评论:(0)  加入收藏
系统谈数据治理,具体案例来分析 宋懵懵的数据生活
为什么要做数据治理进入到大数据时代,数据领域里的工程师、分析师和科学家们可以很轻易的使用开源世界的各种技术(比如离线处理有MapReduce、Spark,实时处理有Flink、Spark Str...【详细内容】
2020-06-21  Search: 数据治理  点击:(283)  评论:(0)  加入收藏
基于大数据+AI体系的数据治理实践
过去十年,农业银行信息化建设积累了海量的数据。为了盘活这些数据资源,充分发挥数据价值,在大数据平台和AI建设过程中,农业银行采取“统筹规划、顶层设计、分步实施”的策略,不断...【详细内容】
2020-06-16  Search: 数据治理  点击:(344)  评论:(0)  加入收藏
▌简易百科推荐
大数据杀熟何时告别“人人喊打却无可奈何”?
2月7日郑州飞往珠海的航班,不同手机、不同账号搜索该航班显示出不同价格。图源网络有网友近日分享在某平台的购票经历,引发社会广泛关注——用3个账号买同一航班同...【详细内容】
2024-01-30    中国青年网  Tags:大数据杀熟   点击:(34)  评论:(0)  加入收藏
简易百科:到底什么是大数据?
随着互联网的快速发展,大数据已经成为了当今社会最热门的话题之一。那么,到底什么是大数据呢?首先,我们需要明确大数据的定义。大数据是指数据量极大、类型繁多、处理难度高的数...【详细内容】
2024-01-30    简易百科  Tags:大数据   点击:(40)  评论:(0)  加入收藏
数据采集新篇章:AI与大模型的融合应用
开篇在AIGC(人工智能与通用计算)应用中,大型语言模型(LLM)占据着举足轻重的地位。这些模型,如GPT和BERT系列,通过处理和分析庞大的数据集,已经极大地推动了自然语言理解和生成的边界...【详细内容】
2024-01-17  崔皓  51CTO  Tags:数据采集   点击:(52)  评论:(0)  加入收藏
挑战 Spark 和 Flink?大数据技术栈的突围和战争
十年的轮回,正如大数据的发展一般,它既是一个轮回的结束,也是崭新的起点。大数据在过去的二十年中蓬勃发展,从无到有,崛起为最具爆炸性的技术领域之一,逐渐演变成为每个企业不可或...【详细内容】
2024-01-17  InfoQ    Tags:大数据   点击:(40)  评论:(0)  加入收藏
分布式存储系统在大数据处理中扮演着怎样的角色?
如果存储节点本身可以定制,则通常会让其支持部分计算能力,以利用数据的亲和性,将部分计算下推到相关的存储节点上。如果存储是云上的 S3 等对象存储,无法定制,则通常会将数据在计...【详细内容】
2023-12-19  木鸟杂记  微信公众号  Tags:大数据   点击:(48)  评论:(0)  加入收藏
大数据如何实时拯救生命:车联网的数据分析有助预防交通事故
译者 | 李睿审校 | 重楼车联网(IoV)是汽车行业与物联网相结合的产物。预计车联网数据规模将越来越大,尤其是当电动汽车成为汽车市场新的增长引擎。问题是:用户的数据平台准备...【详细内容】
2023-12-19    51CTO  Tags:大数据   点击:(41)  评论:(0)  加入收藏
利用生成对抗网络进行匿名化数据处理
在互联网时代,数据日益成为人们的生产资料。然而,在某些情况下,我们需要分享数据,但又需要保护个人隐私。这时,匿名化技术就显得尤为重要。本文将介绍利用生成对抗网络进行匿名化...【详细内容】
2023-12-18  技巧达人小影    Tags:数据处理   点击:(57)  评论:(0)  加入收藏
盘点那些常见的数据中心类型,你知道几个?
在数字化潮流的浪潮下,数据中心如同企业的神经系统,关系到业务的稳健运转。而在这个巨大的网络中,各种数据中心类型如雨后春笋般崭露头角。从企业级的个性至云数据中心的虚拟化...【详细内容】
2023-12-07  数据中心之家  微信公众号  Tags:数据中心   点击:(67)  评论:(0)  加入收藏
数据中心的七个关键特征
随着信息技术的不断演进,数据中心的可靠性、可扩展性、高效性、安全性、灵活性、管理性和可持续性成为业界探讨的焦点。下面让我们一同深入剖析这些关键特征,了解它们是如何影...【详细内容】
2023-12-06  数据中心之家  微信公众号  Tags:数据   点击:(64)  评论:(0)  加入收藏
什么是数据解析?将数据转化为更好的决策
什么是数据解析?数据解析是一门专注于从数据中获取洞察力的学科。它包含数据分析(data analysis)和管理的流程、工具和技术,包括数据的收集、组织和存储。数据解析的主要目的是...【详细内容】
2023-12-06  计算机世界    Tags:数据解析   点击:(64)  评论:(0)  加入收藏
站内最新
站内热门
站内头条