男人狂躁进女人下面免费视频
男人狂躁进女人下面免费视频
你的位置:男人狂躁进女人下面免费视频 > 东北女人毛多水多牲交视频 > Flink SQL 知其是以然:在 Flink 中还能使用 Hive Udf?(附源码)

Flink SQL 知其是以然:在 Flink 中还能使用 Hive Udf?(附源码)

发布日期:2022-06-18 17:11    点击次数:199

Flink SQL 知其是以然:在 Flink 中还能使用 Hive Udf?(附源码)

 1.序篇

谎话未几说,我们先径直上本文的目次和论断,小伙伴不错先看论断快速了解博主渴望本文能给小伙伴们带来什么匡助:

布景及应用场景先容:博主渴望你能了解到,其实许多场景下及时数仓的成立都是跟着离线数仓而成立的(疏导的逻辑在及时数仓中从头齐全一遍),因此大约在 flink sql 中复用 hive udf 是大约大大提能手效的。 flink 彭胀守旧 hive 内置 udf:flink sql 提供了彭胀 udf 的智商,即 module,况兼 flink sql 也内置了 HiveModule(需要你主动加载进环境),来守旧一些 hive 内置的 udf (比如 get_json_object)给小伙伴们使用。 flink 彭胀守旧用户自界说的 hive udf:主要先容 flink sql 流任务中,不可使用 create temporary function 去引入一个用户自界说的 hive udf。因此博主只可通过 flink sql 提供的 module 插件智商,自界说了 module,来守旧引入用户自界说的 hive udf。 2.布景及应用场景先容

其实大多量公司都是从离线数仓滥觞成立的。敬佩群众势必在我方的坐褥环境中开采了颠倒多的 hive udf。跟着需求对于时效性条件的增高,越来越多的公司也滥觞成立起及时数仓。许多场景下及时数仓的成立都是跟着离线数仓而成立的。及时数据使用 flink 产出,离线数据使用 hive\spark 产出。

那么回到我们著述标题的问题:为什么需要 flink 守旧 hive udf 呢?

博主分析了下,论断如下:

站在数据需求的角度来说,一般会有以下两种情况:

已往也曾有了离线数据链路,需求方也想要及时数据。如若径直能用也曾开采好的 hive udf,则无须将疏导的逻辑搬动到 flink udf 中,况兼后续无需费时起劲体恤两个 udf 的逻辑一致性。 及时和离线的需求都是新的,需要新开采。如若只开采一套 udf,则渔人之利。

因此在 flink 中守旧 hive udf 这件事对开采人员提效来说黑白常有公道的。

3.在彭胀前,你需要披露一些基本看法 flink 守旧 hive udf 这件事分为两个部分。 flink 彭胀守旧 hive 内置 udf

flink 彭胀守旧用户自界说 hive udf

第一部分:flink 彭胀守旧 hive 内置 udf,比如 get_json_object,rlike 等等。

有同常识了,这样基本的 udf,flink 都莫得吗?

如实莫得。对于 flink sql 内置的 udf 见如下筹商,群众不错望望 flink 守旧了哪些 udf:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions/

那么如若我如若强运用用 get_json_object 这个 udf,会发生啥呢?恶果如下图。

径直报错找不到 udf。

第二部分:flink 彭胀守旧用户自界说 hive udf。

内置函数惩处不了用户的复杂需求,用户就需要我方写 hive udf,况兼这部分自界说 udf 也想在 flink sql 中使用。

底下望望怎么在 flink sql 中进行这两种彭胀。

4.hive udf 彭胀守旧 4.1.flink sql module

触及到彭胀 udf 就不得不提到 flink 提供的 module。见官网下图。

从第一句话就不错看到,东北女人毛多水多牲交视频module 的作用便是让用户去彭胀 udf 的。

flink 自己也曾内置了一个 module,名字叫 CoreModule,其中也曾包含了一些 udf。

那我们要怎么使用 module 这玩意去彭胀我们的 hive udf 呢?

4.2.flink 彭胀守旧 hive 内置 udf

神情如下:

引入 hive 的 connector。其中包含了 flink 官方提供的一个 HiveModule。在 HiveModule 中包含了 hive 内置的 udf。

<dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>     <version>${flink.version}</version> </dependency

在 StreamTableEnvironment 中加载 HiveModule。

String name = "default"; String version = "3.1.2"; tEnv.loadModule(name, new HiveModule(version)); 

然后在规章台打印一下咫尺有的 module。

String[] modules = tEnv.listModules(); Arrays.stream(modules).forEach(System.out::println); 

然后不错看到除了 core module,还有我们刚刚加载进去的 default module。

default core 

检察通盘 module 的通盘 udf。在规章台打印一下。

String[] functions = tEnv.listFunctions(); Arrays.stream(functions).forEach(System.out::println); 

就会将 default 和 core module 中的通盘包含的 udf 给列举出来,天然也就包含了 hive module 中的 get_json_object。

然后我们再去在 flink sql 中使用 get_json_object 这个 udf,就莫得报错,能平素输出恶果了。

使用 flink hive connector 自带的 HiveModule,也曾大约惩处很大一部分常见 udf 使用的问题了。

4.2.flink 彭胀守旧用户自界说 hive udf

蓝本博主是径直想要使用 flink sql 中的 create temporary function 去引申引入自界说 hive udf 的。

例如如下:

CREATE TEMPORARY FUNCTION test_hive_udf as 'flink.examples.sql._09.udf._02_stream_hive_udf.TestGenericUDF'; 

发咫尺引申这句 sql 时,是不错引申得手,将 udf 注册进去的。

然则在后续 udf 开动化时就报错了。具体造作如下图。径直报错 ClassCastException。

看了下源码,flink 流环境下(未销毁 hive catalog 时)在创建 udf 时会合计这个 udf 是 flink 生态体系中的 udf。

是以在开动化我们引入的 TestGenericUDF 时,默许会按照 flink 的 UserDefinedFunction 强转,因此才会报强转造作。

那么我们就不可使用 hive udf 了吗?

造作,小伙伴萌岂敢有这种主见。博主都把这个标题列出来了(过劲都吹出去了),还能给不出惩处决策嘛。

思绪见下一章节。

4.3.flink 彭胀守旧用户自界说 hive udf 的增强 module

其实思绪很简便。

使用 flink sql 中的 create temporary function 固然不可引申,然则 flink 提供了插件化的自界说 module。

我们不错彭胀一个守旧用户自界说 hive udf 的 module,使用这个 module 来守旧自界说的 hive udf。

齐全的代码也颠倒简便。简便的把 flink hive connector 提供的 HiveModule 做一个增强即可,即下图中的 HiveModuleV2。

使用花式如下图所示:

然后圭臬就平素跑起来了。

肥肠滴好用!

5.回归与预测

本文主要先容了如若在 flink sql 使用 hive 内置 udf 及用户自界说 hive udf,回归如下:

布景及应用场景先容:博主渴望你能了解到,其实许多场景下及时数仓的成立都是跟着离线数仓而成立的(疏导的逻辑在及时数仓中从头齐全一遍),因此大约在 flink sql 中复用 hive udf 是大约大大提能手效的。 flink 彭胀守旧 hive 内置 udf:flink sql 提供了彭胀 udf 的智商,即 module,况兼 flink sql 也内置了 HiveModule(需要你主动加载进环境),来守旧一些 hive 内置的 udf (比如 get_json_object)给小伙伴们使用。 flink 彭胀守旧用户自界说的 hive udf:主要先容 flink sql 流任务中,不可使用 create temporary function 去引入一个用户自界说的 hive udf。因此博主只可通过 flink sql 提供的 module 插件智商,自界说了 module,来守旧引入用户自界说的 hive udf。