type
status
date
slug
summary
tags
category
icon
password

架构

首先,我们先了解一下元数据平台架构,主流程是:SQL采集 --》 SQL解析 --》应用层。
  1. SQL采集:针对各种SQL查询引擎,编写相应的钩子函数进行SQL收集,收集内容有执行时间、执行耗时、执行用户、执行引擎、jobId和执行SQL等等,最后把SQL信息入Kafka。Hive是实现ExecuteWithHookContext接口,Presto是实现EventListener接口,Spark是实现SparkListner接口。
  1. SQL解析:Flink实时消费Kafka数据,进行SQL解析。解析SQL的过程为:定义词法规则和语法规则文件 --》 使用Antlr实现SQL词法和语法解析 --》生成AST语法树 --》遍历AST语法树,考虑到Presto和Spark的SQL语法类似,因此直接参考Hive底层源码实现SQL解析。解析完成后,把血缘信息和元数据信息分别入JanusGraph和ElasticSearch。
  1. 应用层:数据录入JanusGraph和ElasticSearch后,就可以进行血缘查询和元数据查询。然后通过graph节点的出度入度进行热点分析。
notion image

SQL采集

本文以Hive的SQL采集为例,编写自定义Hive Hook类,实现SQL采集。此Hook类可通过hive-site.xml配置实现SQL信息入Kafka或JDBC存储引擎。

SqlCollectionHook

SqlCollectionStrategy

KafkaSqlCollectionStrategy

JdbcSqlCollectionStrategy

hive-site.xml

添加自定义Hook类配置,有两种SQL存储策略:Kafka和JDBC,选择一种配置。
重启Hive metastore使Hive Hook生效

SQL解析

参考Hive源码模仿编写SQL解析工具,先经过类进行语法分析,再根据Schema生成执行计划QueryPlan。字段血缘可根据LineageLogger类进行实现,表血缘根据字段血缘获取。

SQLParse2

注意事项
  • 开启血缘信息:查看hive源码发现hive的hive.exec.post.hooks配置需要包含org.apache.hadoop.hive.ql.hooks.PostExecutePrinter、org.apache.hadoop.hive.ql.hooks.LineageLogger或org.apache.atlas.hive.hook.HiveHook中任意一个Hook函数,SessionState中才会有血缘关系。因此可以在配置中加入其中任意一个Hook配置:conf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "org.apache.hadoop.hive.ql.hooks.PostExecutePrinter")。
相关代码可以参见SemanticAnalyzer类:
  • 过滤表/视图存在与否检测:修改SemanticAnalyzer中检查表/视图存在与否的代码,重新打包,使SQL解析工具使用这个hive-exec-2.3.7.jar这个包。不然SQL解析过程中会报错。
  • 开启动态分区非严格模式:conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict")

Flink任务

有了SQL解析工具后,就可以编写Flink程序消费Kafka数据解析SQL,然后录入JanusGraph。本文只提供录入JanusGraph方案,录入ElasticSearch请自行实现。

LineageTask

SqlEntity

LineageProcessWindowFunction

LineageBuildMapFunction

LineageToGraphSinkFunction

remote.yaml

数据血缘查询

数据录入JanusGraph后就可以通过连接GremlinServer来查询血缘数据。本文编写一个spark程序例子,查询GremlinServer生成供Echarts使用的图json文件。

GraphToJsonTask

GraphDao

生成json文件后,Echarts就可以直接使用

Echarts文件

效果图(https://xyueji.github.io/traffic/graph.html):

参考:
 
精准去重-字典编码+BitmapSpark源码分析01 - 2.1.0版本源码编译及阅读环境搭建