返回介绍

用户定义的函数

发布于 2025-05-02 18:19:16 字数 25953 浏览 0 评论 0 收藏

用户定义的函数是一个重要的特性,因为它们显着扩展了查询的表达能力。

注册用户定义的函数

在大多数情况下,必须先注册用户定义的函数,然后才能在查询中使用它。没有必要为 Scala Table API 注册函数。

TableEnvironment 通过调用 registerFunction() 方法来注册函数。注册用户定义的函数时,会将其插入到函数目录中 TableEnvironment ,以便 Table API 或 SQL 解析器可以识别并正确转换它。

请找到如何注册,如何调用每个类型的用户定义函数(详细的例子 ScalarFunctionTableFunctionAggregateFunction 下面的子会话)。

标量函数

如果内置函数中不包含必需的标量函数,则可以为 Table API 和 SQL 定义自定义的,用户定义的标量函数。用户定义的标量函数将零个,一个或多个标量值映射到新的标量值。

为了定义一个标量函数之一具有以扩展的基类 ScalarFunctionorg.apache.flink.table.functions 和实现(一个或多个)的评价方法。标量函数的行为由评估方法确定。评估方法必须公开声明并命名 eval 。评估方法的参数类型和返回类型也确定标量函数的参数和返回类型。通过实现多个名为的方法,也可以重载评估方法 eval 。评估方法也可以支持变量参数,例如 eval(String... strs)

以下示例显示如何定义自己的哈希代码函数,在 TableEnvironment 中注册它,并在查询中调用它。请注意,您可以在注册之前通过构造函数配置标量函数:

public class HashCode extends ScalarFunction {
  private int factor = 12;

  public HashCode(int factor) {
    this.factor = factor;
  }

  public int eval(String s) {
    return s.hashCode() * factor;
  }
}

BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register the function
tableEnv.registerFunction("hashCode", new HashCode(10));

// use the function in Java  Table API
myTable.select("string, string.hashCode(), hashCode(string)");

// use the function in SQL API
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
// must be defined in static/object context class HashCode(factor: Int) extends ScalarFunction {
  def eval(s: String): Int = {
  s.hashCode() * factor
  }
}

val tableEnv = TableEnvironment.getTableEnvironment(env)

// use the function in Scala  Table API val hashCode = new HashCode(10)
myTable.select('string, hashCode('string))

// register and use the function in SQL tableEnv.registerFunction("hashCode", new HashCode(10))
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable")

默认情况下,评估方法的结果类型由 Flink 的类型提取工具确定。这对于基本类型或简单 POJO 就足够了,但对于更复杂,自定义或复合类型可能是错误的。在这些情况下 TypeInformation ,可以通过覆盖手动定义结果类型 ScalarFunction#getResultType()

以下示例显示了一个高级示例,该示例采用内部时间戳表示形式,并将内部时间戳表示形式返回为 long 值。通过重写, ScalarFunction#getResultType() 我们定义返回的 long 值应该 Types.TIMESTAMP 由代码生成解释为 a 。

public static class TimestampModifier extends ScalarFunction {
  public long eval(long t) {
  return t % 1000;
  }

  public TypeInformation<?> getResultType(signature: Class<?>[]) {
  return Types.TIMESTAMP;
  }
}
object TimestampModifier extends ScalarFunction {
  def eval(t: Long): Long = {
  t % 1000
  }

  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
  Types.TIMESTAMP
  }
}

表函数

与用户定义的标量函数类似,用户定义的表函数将零个,一个或多个标量值作为输入参数。但是,与标量函数相比,它可以返回任意数量的行作为输出而不是单个值。返回的行可以包含一个或多个列。

为了定义表函数之一具有以扩展的基类 TableFunctionorg.apache.flink.table.functions 和实现(一个或多个)的评价方法。表函数的行为由其评估方法确定。必须声明 public 和命名评估方法 eval 。该 TableFunction 可以通过实施名为多种方法被重载 eval 。评估方法的参数类型确定表函数的所有有效参数。评估方法也可以支持变量参数,例如 eval(String... strs) 。返回表的类型由泛型类型确定 TableFunction 。评估方法使用受保护的方法发出输出行 collect(T)

在该 Table API,表格函数用于 .join(Expression).leftOuterJoin(Expression) Scala 用户和 .join(String).leftOuterJoin(String) 针对 Java 用户。的 join 算子(交叉)关联其中,从外部表与由表值函数(其是在 算子操作者的右侧)所产生的所有行的每行(表上的 算子操作者的左侧)。的 leftOuterJoin 算子连接从外部表(在 算子左侧表)与由表值函数(其是在 算子操作者的右侧)所产生的所有行的每一行,并保存的量,表函数返回一个外部行空表。在 SQL 中使用 LATERAL TABLE(&lt;TableFunction&gt;) CROSS JOIN 和 LEFT JOIN 以及 ON TRUE 连接条件(参见下面的示例)。

以下示例显示如何定义表值函数,在 TableEnvironment 中注册它,并在查询中调用它。请注意,您可以在注册之前通过构造函数配置表函数:

// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
  private String separator = " ";

  public Split(String separator) {
    this.separator = separator;
  }

  public void eval(String str) {
    for (String s : str.split(separator)) {
      // use collect(...) to emit a row
      collect(new Tuple2<String, Integer>(s, s.length()));
    }
  }
}

BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ...     // table schema: [a: String]

// Register the function.
tableEnv.registerFunction("split", new Split("#"));

// Use the table function in the Java  Table API. "as" specifies the field names of the table.
myTable.join("split(a) as (word, length)").select("a, word, length");
myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");

// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in  Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in  Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
// The generic type "(String, Int)" determines the schema of the returned table as (String, Integer). class Split(separator: String) extends TableFunction[(String, Int)] {
  def eval(str: String): Unit = {
  // use collect(...) to emit a row.
  str.split(separator).foreach(x -> collect((x, x.length))
  }
}

val tableEnv = TableEnvironment.getTableEnvironment(env)
val myTable = ...     // table schema: [a: String] 
// Use the table function in the Scala  Table API (Note: No registration required in Scala  Table API). val split = new Split("#")
// "as" specifies the field names of the generated table. myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length)
myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length)

// Register the table function to use it in SQL queries. tableEnv.registerFunction("split", new Split("#"))

// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in  Table API) tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in  Table API) tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE")

IMPORTANT: Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.

请注意,POJO 类型没有确定性字段顺序。因此,您无法重命名由表函数返回的 POJO 字段 AS

默认情况下,a 的结果类型 TableFunction 由 Flink 的自动类型提取工具确定。这适用于基本类型和简单 POJO,但对于更复杂,自定义或复合类型可能是错误的。在这种情况下,结果的类型可以通过覆盖 TableFunction#getResultType() 返回它来手动指定 TypeInformation

以下示例显示了一个 TableFunction 返回 Row 需要显式类型信息的类型的示例。我们定义返回的表类型应该 RowTypeInfo(String, Integer) 通过重写 TableFunction#getResultType()

public class CustomTypeSplit extends TableFunction<Row> {
  public void eval(String str) {
    for (String s : str.split(" ")) {
      Row row = new Row(2);
      row.setField(0, s);
      row.setField(1, s.length);
      collect(row);
    }
  }

  @Override
  public TypeInformation<Row> getResultType() {
    return Types.ROW(Types.STRING(), Types.INT());
  }
}
class CustomTypeSplit extends TableFunction[Row] {
  def eval(str: String): Unit = {
  str.split(" ").foreach({ s =>
    val row = new Row(2)
    row.setField(0, s)
    row.setField(1, s.length)
    collect(row)
  })
  }

  override def getResultType: TypeInformation[Row] = {
  Types.ROW(Types.STRING, Types.INT)
  }
}

聚合函数

用户定义的聚合函数(UDAGG)将一个表(一个或多个具有一个或多个属性的行)聚合到标量值。

![UDAGG 机制](img/udagg-mechanism.png)

上图显示了聚合的示例。假设您有一个包含饮料数据的表格。该表由三列的 idnameprice 5 行。想象一下,您需要找到表中所有饮料的最高价格,即执行 max() 聚合。您需要检查 5 行中的每一行,结果将是单个数值。

用户定义的聚合函数通过扩展 AggregateFunction 类来实现。一个 AggregateFunction 作品如下。首先,它需要一个 accumulator ,它是保存聚合的中间结果的数据结构。通过调用 createAccumulator() 方法创建一个空累加器 AggregateFunction 。随后, accumulate() 为每个输入行调用函数的方法以更新累加器。处理完所有行后,将 getValue() 调用该函数的方法来计算并返回最终结果。

每种方法都必须使用以下方法 AggregateFunction

  • createAccumulator()
  • accumulate()
  • getValue()

Flink 的类型提取工具无法识别复杂的数据类型,例如,如果它们不是基本类型或简单的 POJO。类似于 ScalarFunctionTableFunctionAggregateFunction 提供了指定 TypeInformation 结果类型(通过 AggregateFunction#getResultType() )和累加器类型(通过 AggregateFunction#getAccumulatorType() )的方法。

除了上述方法之外,还有一些可以选择性实施的简约方法。虽然其中一些方法允许系统更有效地执行查询,但其他方法对于某些用例是强制性的。例如, merge() 如果聚合函数应该应用于会话组窗口的上下文中,则该方法是必需的(当观察到“连接”它们的行时,需要连接两个会话窗口的累加器)。

AggregateFunction 根据用例,Required 以下方法:

  • retract() 有界 OVER 窗口上的聚合需要。
  • merge() 是许多批量聚合和会话窗口聚合所必需的。
  • resetAccumulator() 是许多批量聚合所必需的。

所有方法 AggregateFunction 必须声明为 public ,而不是 static 完全按照上面提到的名称命名。该方法 createAccumulatorgetValuegetResultType ,和 getAccumulatorType 在定义的 AggregateFunction 抽象类,而另一些则收缩的方法。为了定义聚合函数,必须扩展基类 org.apache.flink.table.functions.AggregateFunction 并实现一个(或多个) accumulate 方法。该方法 accumulate 可以使用不同的参数类型重载,并支持可变参数。

AggregateFunction 下面给出了所有方法的详细文档。

/**
  * Base class for aggregation functions.
  *
  * @param <T>   the type of the aggregation result
  * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
  *       aggregated values which are needed to compute an aggregation result.
  *       AggregateFunction represents its state using accumulator, thereby the state of the
  *       AggregateFunction must be put into the accumulator.
  */
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {

  /**
  * Creates and init the Accumulator for this [[AggregateFunction]].
  *
  * @return the accumulator with the initial value
  */
  public ACC createAccumulator(); // MANDATORY

  /** Processes the input values and update the provided accumulator instance. The method
  * accumulate can be overloaded with different custom types and arguments. An AggregateFunction
  * requires at least one accumulate() method.
  *
  * @param accumulator       the accumulator which contains the current aggregated results
  * @param [user defined inputs] the input value (usually obtained from a new arrived data).
  */
  public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY

  /**
  * Retracts the input values from the accumulator instance. The current design assumes the
  * inputs are the values that have been previously accumulated. The method retract can be
  * overloaded with different custom types and arguments. This function must be implemented for
  * datastream bounded over aggregate.
  *
  * @param accumulator       the accumulator which contains the current aggregated results
  * @param [user defined inputs] the input value (usually obtained from a new arrived data).
  */
  public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL

  /**
  * Merges a group of accumulator instances into one accumulator instance. This function must be
  * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
  *
  * @param accumulator  the accumulator which will keep the merged aggregate results. It should
  *           be noted that the accumulator may contain the previous aggregated
  *           results. Therefore user should not replace or clean this instance in the
  *           custom merge method.
  * @param its      an [[java.lang.Iterable]] pointed to a group of accumulators that will be
  *           merged.
  */
  public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL

  /**
  * Called every time when an aggregation result should be materialized.
  * The returned value could be either an early and incomplete result
  * (periodically emitted as data arrive) or the final result of the
  * aggregation.
  *
  * @param accumulator the accumulator which contains the current
  *          aggregated results
  * @return the aggregation result
  */
  public T getValue(ACC accumulator); // MANDATORY

  /**
  * Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for
  * dataset grouping aggregate.
  *
  * @param accumulator  the accumulator which needs to be reset
  */
  public void resetAccumulator(ACC accumulator); // OPTIONAL

  /**
  * Returns true if this AggregateFunction can only be applied in an OVER window.
  *
  * @return true if the AggregateFunction requires an OVER window, false otherwise.
  */
  public Boolean requiresOver = false; // PRE-DEFINED

  /**
  * Returns the TypeInformation of the AggregateFunction's result.
  *
  * @return The TypeInformation of the AggregateFunction's result or null if the result type
  *     should be automatically inferred.
  */
  public TypeInformation<T> getResultType = null; // PRE-DEFINED

  /**
  * Returns the TypeInformation of the AggregateFunction's accumulator.
  *
  * @return The TypeInformation of the AggregateFunction's accumulator or null if the
  *     accumulator type should be automatically inferred.
  */
  public TypeInformation<T> getAccumulatorType = null; // PRE-DEFINED
}
/**
  * Base class for aggregation functions.
  *
  * @tparam T   the type of the aggregation result
  * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
  *       aggregated values which are needed to compute an aggregation result.
  *       AggregateFunction represents its state using accumulator, thereby the state of the
  *       AggregateFunction must be put into the accumulator.
  */
abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {
  /**
  * Creates and init the Accumulator for this [[AggregateFunction]].
  *
  * @return the accumulator with the initial value
  */
  def createAccumulator(): ACC // MANDATORY 
  /**
  * Processes the input values and update the provided accumulator instance. The method
  * accumulate can be overloaded with different custom types and arguments. An AggregateFunction
  * requires at least one accumulate() method.
  *
  * @param accumulator       the accumulator which contains the current aggregated results
  * @param [user defined inputs] the input value (usually obtained from a new arrived data).
  */
  def accumulate(accumulator: ACC, [user defined inputs]): Unit // MANDATORY 
  /**
  * Retracts the input values from the accumulator instance. The current design assumes the
  * inputs are the values that have been previously accumulated. The method retract can be
  * overloaded with different custom types and arguments. This function must be implemented for
  * datastream bounded over aggregate.
  *
  * @param accumulator       the accumulator which contains the current aggregated results
  * @param [user defined inputs] the input value (usually obtained from a new arrived data).
  */
  def retract(accumulator: ACC, [user defined inputs]): Unit // OPTIONAL 
  /**
  * Merges a group of accumulator instances into one accumulator instance. This function must be
  * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
  *
  * @param accumulator  the accumulator which will keep the merged aggregate results. It should
  *           be noted that the accumulator may contain the previous aggregated
  *           results. Therefore user should not replace or clean this instance in the
  *           custom merge method.
  * @param its      an [[java.lang.Iterable]] pointed to a group of accumulators that will be
  *           merged.
  */
  def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL 
  /**
  * Called every time when an aggregation result should be materialized.
  * The returned value could be either an early and incomplete result
  * (periodically emitted as data arrive) or the final result of the
  * aggregation.
  *
  * @param accumulator the accumulator which contains the current
  *          aggregated results
  * @return the aggregation result
  */
  def getValue(accumulator: ACC): T // MANDATORY 
  h/**
  * Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for
  * dataset grouping aggregate.
  *
  * @param accumulator  the accumulator which needs to be reset
  */
  def resetAccumulator(accumulator: ACC): Unit // OPTIONAL 
  /**
  * Returns true if this AggregateFunction can only be applied in an OVER window.
  *
  * @return true if the AggregateFunction requires an OVER window, false otherwise.
  */
  def requiresOver: Boolean = false // PRE-DEFINED 
  /**
  * Returns the TypeInformation of the AggregateFunction's result.
  *
  * @return The TypeInformation of the AggregateFunction's result or null if the result type
  *     should be automatically inferred.
  */
  def getResultType: TypeInformation[T] = null // PRE-DEFINED 
  /**
  * Returns the TypeInformation of the AggregateFunction's accumulator.
  *
  * @return The TypeInformation of the AggregateFunction's accumulator or null if the
  *     accumulator type should be automatically inferred.
  */
  def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED }

以下示例说明如何 算子操作

  • 定义一个 AggregateFunction 计算给定列的加权平均值,
  • TableEnvironment 和中注册函数
  • 在查询中使用该函数。

为了计算加权平均值,累加器需要存储已累积的所有数据的加权和和计数。在我们的示例中,我们将一个类定义为 WeightedAvgAccum 累加器。Flink 的检查点机制自动备份累加器,并在无法确保一次性语义的情况下进行恢复。

accumulate() 我们的方法 WeightedAvg AggregateFunction 有三个输入。第一个是 WeightedAvgAccum 累加器,另外两个是用户定义的输入:输入值 ivalue 和输入的权重 iweight 。虽然 retract()merge()resetAccumulator() 方法不是强制性的最聚集的类型,我们提供以下举例它们。请注意,我们使用 Java 基本类型和定义 getResultType() ,并 getAccumulatorType() 在 Scala 例如方法,因为 Flink 类型提取不 Scala 类型的工作非常好。

/**
 * Accumulator for WeightedAvg.
 */
public static class WeightedAvgAccum {
  public long sum = 0;
  public int count = 0;
}

/**
 * Weighted Average user-defined aggregate function.
 */
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {

  @Override
  public WeightedAvgAccum createAccumulator() {
    return new WeightedAvgAccum();
  }

  @Override
  public Long getValue(WeightedAvgAccum acc) {
    if (acc.count == 0) {
      return null;
    } else {
      return acc.sum / acc.count;
    }
  }

  public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
    acc.sum += iValue * iWeight;
    acc.count += iWeight;
  }

  public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
    acc.sum -= iValue * iWeight;
    acc.count -= iWeight;
  }

  public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
    Iterator<WeightedAvgAccum> iter = it.iterator();
    while (iter.hasNext()) {
      WeightedAvgAccum a = iter.next();
      acc.count += a.count;
      acc.sum += a.sum;
    }
  }

  public void resetAccumulator(WeightedAvgAccum acc) {
    acc.count = 0;
    acc.sum = 0L;
  }
}

// register function
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("wAvg", new WeightedAvg());

// use function
tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
import java.lang.{Long => JLong, Integer => JInteger}
import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.AggregateFunction

/**
 * Accumulator for WeightedAvg.
 */
class WeightedAvgAccum extends JTuple1[JLong, JInteger] {
  sum = 0L
  count = 0
}

/**
 * Weighted Average user-defined aggregate function.
 */
class WeightedAvg extends AggregateFunction[JLong, CountAccumulator] {

  override def createAccumulator(): WeightedAvgAccum = {
  new WeightedAvgAccum
  }

  override def getValue(acc: WeightedAvgAccum): JLong = {
  if (acc.count == 0) {
    null
  } else {
    acc.sum / acc.count
  }
  }

  def accumulate(acc: WeightedAvgAccum, iValue: JLong, iWeight: JInteger): Unit = {
  acc.sum += iValue * iWeight
  acc.count += iWeight
  }

  def retract(acc: WeightedAvgAccum, iValue: JLong, iWeight: JInteger): Unit = {
  acc.sum -= iValue * iWeight
  acc.count -= iWeight
  }

  def merge(acc: WeightedAvgAccum, it: java.lang.Iterable[WeightedAvgAccum]): Unit = {
  val iter = it.iterator()
  while (iter.hasNext) {
    val a = iter.next()
    acc.count += a.count
    acc.sum += a.sum
  }
  }

  def resetAccumulator(acc: WeightedAvgAccum): Unit = {
  acc.count = 0
  acc.sum = 0L
  }

  override def getAccumulatorType: TypeInformation[WeightedAvgAccum] = {
  new TupleTypeInfo(classOf[WeightedAvgAccum], Types.LONG, Types.INT)
  }

  override def getResultType: TypeInformation[JLong] = Types.LONG
}

// register function val tEnv: StreamTableEnvironment = ???
tEnv.registerFunction("wAvg", new WeightedAvg())

// use function tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")

实施 UDF 的最佳实践

Table API 和 SQL 代码生成在内部尝试尽可能地使用原始值。用户定义的函数可以通过对象创建,转换和(un)装箱引入很多开销。因此,强烈建议将参数和结果类型声明为基本类型而不是它们的盒装类。 Types.DATE 并且 Types.TIME 也可以表示为 intTypes.TIMESTAMP 可以表示为 long

我们建议用户定义的函数应该由 Java 而不是 Scala 编写,因为 Scala 类型对 Flink 的类型提取器构成了挑战。

将 UDF 与运行时集成

有时,用户定义的函数可能需要获取全局运行时信息,或者在实际工作之前进行一些设置/清理工作。用户定义的函数提供 open()close() 方法可以被覆盖,并提供与 RichFunction DataSet 或 DataStream API 中的方法类似的函数。

open() 在评估方法之前调用该方法一次。在 close() 该评价方法最后一次通话之后方法。

open() 方法提供 FunctionContext 包含关于执行用户定义的函数的上下文的信息,例如度量标准组,分布式缓存文件或全局作业参数。

通过调用以下相应的方法可以获得以下信息 FunctionContext

方法描述
getMetricGroup()此并行子任务的度量标准组。
getCachedFile(name)分布式缓存文件的本地临时文件副本。
getJobParameter(name, defaultValue)与给定键关联的全局作业参数值。

以下示例代码段显示了如何 FunctionContext 在标量函数中使用以访问全局作业参数:

public class HashCode extends ScalarFunction {

  private int factor = 0;

  @Override
  public void open(FunctionContext context) throws Exception {
    // access "hashcode_factor" parameter
    // "12" would be the default value if parameter does not exist
    factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12"));
  }

  public int eval(String s) {
    return s.hashCode() * factor;
  }
}

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// set job parameter
Configuration conf = new Configuration();
conf.setString("hashcode_factor", "31");
env.getConfig().setGlobalJobParameters(conf);

// register the function
tableEnv.registerFunction("hashCode", new HashCode());

// use the function in Java  Table API
myTable.select("string, string.hashCode(), hashCode(string)");

// use the function in SQL
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
object hashCode extends ScalarFunction {

  var hashcode_factor = 12

  override def open(context: FunctionContext): Unit = {
  // access "hashcode_factor" parameter
  // "12" would be the default value if parameter does not exist
  hashcode_factor = context.getJobParameter("hashcode_factor", "12").toInt
  }

  def eval(s: String): Int = {
  s.hashCode() * hashcode_factor
  }
}

val tableEnv = TableEnvironment.getTableEnvironment(env)

// use the function in Scala  Table API myTable.select('string, hashCode('string))

// register and use the function in SQL tableEnv.registerFunction("hashCode", hashCode)
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable")

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。