返回介绍

命令行界面

发布于 2025-05-02 18:19:19 字数 16263 浏览 0 评论 0 收藏

Flink 提供命令行界面(CLI)来运行打包为 JAR 文件的程序,并控制它们的执行。CLI 是任何 Flink 设置的一部分,可在本地单节点设置和分布式设置中使用。它位于 <flink-home>/bin/flink 默认情况下,并连接到从同一安装目录启动的正在运行的 Flink 主服务器(JobManager)。

使用命令行界面的先决条件是 Flink 主机(JobManager)已启动(通过 <flink-home>/bin/start-cluster.sh )或 YARN 环境可用。

命令行可用于

例子

  • 运行没有参数的示例程序:
    ./bin/flink run ./examples/batch/WordCount.jar
    
  • 使用输入和结果文件的参数运行示例程序:
    ./bin/flink run ./examples/batch/WordCount.jar \
               --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • 运行带有并行性的示例程序 16 以及输入和结果文件的参数:
    ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
               --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • 运行禁用 flink log 输出的示例程序:
     ./bin/flink run -q ./examples/batch/WordCount.jar
    
  • 以分离模式运行示例程序:
     ./bin/flink run -d ./examples/batch/WordCount.jar
    
  • 在特定的 JobManager 上运行示例程序:
    ./bin/flink run -m myJMHost:8081 \
                 ./examples/batch/WordCount.jar \
                 --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • 以特定类作为入口点运行示例程序:
    ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
                 ./examples/batch/WordCount.jar \
                 --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • 使用具有 2 个 TaskManagers 的 每作业 YARN 群集 运行示例程序:
    ./bin/flink run -m yarn-cluster -yn 2 \
                 ./examples/batch/WordCount.jar \
                 --input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
    
  • 以 Word 为单位显示 WordCount 示例程序的优化执行计划:
    ./bin/flink info ./examples/batch/WordCount.jar \
                --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • 列出计划和正在运行的作业(包括其 JobID):
    ./bin/flink list
    
  • 列出预定作业(包括其作业 ID):
    ./bin/flink list -s
    
  • 列出正在运行的作业(包括其作业 ID):
    ./bin/flink list -r
    
  • 列出所有现有工作(包括其作业 ID):
    ./bin/flink list -a
    
  • 列出在 Flink YARN 会话中运行 Flink 作业:
    ./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
    
  • 取消工作:
    ./bin/flink cancel <jobID>
    
  • 使用保存点取消作业:
    ./bin/flink cancel -s [targetDirectory] <jobID>
    
  • 停止工作(仅限流处理工作):
    ./bin/flink stop <jobID>
    
  • 修改正在运行的作业(仅限流式处理作业):. / bin/flink modify <jobid>-p<newparallelism></newparallelism></jobid>

注意 :取消和停止(流处理)作业的区别如下:

在取消呼叫中,作业中的算子立即接收 cancel() 方法调用以尽快取消它们。如果算子在取消呼叫后没有停止,Flink 将开始定期中断线程,直到它停止。

“停止”呼叫是一种更优雅的方式来停止正在运行的流处理作业。Stop 仅适用于使用实现 StoppableFunction 接口的源的作业。当用户请求停止作业时,所有源都将接收 stop() 方法调用。该工作将继续运行,直到所有资源正常关闭。这允许作业完成处理所有飞行数据。

保存点

保存点 通过命令行客户端控制:

触发保存点

./bin/flink savepoint <jobId> [savepointDirectory]

这将触发具有 ID 的作业的保存点 jobId ,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。

此外,您可以选择指定目标文件系统目录以存储保存点。该目录需要可由 JobManager 访问。

如果未指定目标目录,则需要 配置默认目录 。否则,触发保存点将失败。

使用 YARN 触发保存点

./bin/flink savepoint <jobId> [savepointDirectory] -yid <yarnAppId>

这将触发具有 ID jobId 和 YARN 应用程序 ID 的作业的保存点 yarnAppId ,并返回创建的保存点的路径。

其他所有内容与上面 触发保存点 部分中描述的相同。

使用保存点取消

您可以自动触发保存点并取消作业。

./bin/flink cancel -s [savepointDirectory] <jobID>

如果未配置保存点目录,则需要为 Flink 安装配置默认保存点目录(请参阅 保存点 )。

只有保存点成功,才会取消该作业。

恢复保存点

./bin/flink run -s <savepointPath> ...

run 命令有一个保存点标志来提交作业,该作业从保存点恢复其状态。savepoint trigger 命令返回保存点路径。

默认情况下,我们尝试将所有保存点状态与正在提交的作业进行匹配。如果要允许跳过无法使用新作业恢复的保存点状态,可以设置 allowNonRestoredState 标志。如果在触发保存点并且仍想使用保存点时从程序中删除了作为程序一部分的 算子,则需要允许此 算子操作。

./bin/flink run -s <savepointPath> -n ...

如果您的程序删除了属于保存点的 算子,这将非常有用。

配置保存点

./bin/flink savepoint -d <savepointPath>

在给定路径处理保存点。savepoint trigger 命令返回保存点路径。

如果使用自定义状态实例(例如自定义还原状态或 RocksDB 状态),则必须指定触发保存点的程序 JAR 的路径,以便使用用户代码类加载器处理保存点:

./bin/flink savepoint -d <savepointPath> -j <jarFile>

否则,你会遇到一个 ClassNotFoundException

用法

命令行语法如下:

./flink <ACTION> [OPTIONS] [ARGUMENTS]

The following actions are available:

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action options:
   -c,--class <classname>         Class with the program entry point
                      ("main" method or "getPlan()" method.
                      Only needed if the JAR file does not
                      specify the class in its manifest.
   -C,--classpath <url>         Adds a URL to each user code
                      classloader  on all nodes in the
                      cluster. The paths must specify a
                      protocol (e.g. file://) and be
                      accessible on all nodes (e.g. by means
                      of a NFS share). You can use this
                      option multiple times for specifying
                      more than one URL. The protocol must
                      be supported by the {@link
                      java.net.URLClassLoader}.
   -d,--detached            If present, runs the job in detached
                      mode
   -n,--allowNonRestoredState       Allow to skip savepoint state that
                      cannot be restored. You need to allow
                      this if you removed an operator from
                      your program that was part of the
                      program when the savepoint was
                      triggered.
   -p,--parallelism <parallelism>     The parallelism with which to run the
                      program. Optional flag to override the
                      default value specified in the
                      configuration.
   -q,--sysoutLogging           If present, suppress logging output to
                      standard out.
   -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job
                      from (for example
                      hdfs:///flink/savepoint-1537).
  Options for yarn-cluster mode:
   -d,--detached            If present, runs the job in detached
                      mode
   -m,--jobmanager <arg>        Address of the JobManager (master) to
                      which to connect. Use this flag to
                      connect to a different JobManager than
                      the one specified in the
                      configuration.
   -yD <property=value>         use value for given property
   -yd,--yarndetached           If present, runs the job in detached
                      mode (deprecated; use non-YARN
                      specific option instead)
   -yh,--yarnhelp             Help for the Yarn session CLI.
   -yid,--yarnapplicationId <arg>     Attach to running YARN session
   -yj,--yarnjar <arg>          Path to Flink jar file
   -yjm,--yarnjobManagerMemory <arg>  Memory for JobManager Container
                      with optional unit (default: MB)
   -yn,--yarncontainer <arg>      Number of YARN container to allocate
                      (=Number of Task Managers)
   -ynm,--yarnname <arg>        Set a custom name for the application
                      on YARN
   -yq,--yarnquery            Display available YARN resources
                      (memory, cores)
   -yqu,--yarnqueue <arg>         Specify YARN queue.
   -ys,--yarnslots <arg>        Number of slots per TaskManager
   -yst,--yarnstreaming         Start Flink in streaming mode
   -yt,--yarnship <arg>         Ship files in the specified directory
                      (t for transfer)
   -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container
                      with optional unit (default: MB)
   -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                      sub-paths for high availability mode
   -ynl,--yarnnodeLabel <arg>       Specify YARN node label for the YARN application
   -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper
                      sub-paths for high availability mode

  Options for default mode:
   -m,--jobmanager <arg>       Address of the JobManager (master) to which
                   to connect. Use this flag to connect to a
                   different JobManager than the one specified
                   in the configuration.
   -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                   for high availability mode

Action "info" shows the optimized execution plan of the program (JSON).

  Syntax: info [OPTIONS] <jar-file> <arguments>
  "info" action options:
   -c,--class <classname>       Class with the program entry point ("main"
                    method or "getPlan()" method. Only needed
                    if the JAR file does not specify the class
                    in its manifest.
   -p,--parallelism <parallelism>   The parallelism with which to run the
                    program. Optional flag to override the
                    default value specified in the
                    configuration.

Action "list" lists running and scheduled programs.

  Syntax: list [OPTIONS]
  "list" action options:
   -r,--running   Show only running programs and their JobIDs
   -s,--scheduled   Show only scheduled programs and their JobIDs
  Options for yarn-cluster mode:
   -m,--jobmanager <arg>      Address of the JobManager (master) to
                    which to connect. Use this flag to connect
                    to a different JobManager than the one
                    specified in the configuration.
   -yid,--yarnapplicationId <arg>   Attach to running YARN session
   -z,--zookeeperNamespace <arg>  Namespace to create the Zookeeper
                    sub-paths for high availability mode

  Options for default mode:
   -m,--jobmanager <arg>       Address of the JobManager (master) to which
                   to connect. Use this flag to connect to a
                   different JobManager than the one specified
                   in the configuration.
   -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                   for high availability mode

Action "stop" stops a running program (streaming jobs only).

  Syntax: stop [OPTIONS] <Job ID>
  "stop" action options:

  Options for yarn-cluster mode:
   -m,--jobmanager <arg>      Address of the JobManager (master) to
                    which to connect. Use this flag to connect
                    to a different JobManager than the one
                    specified in the configuration.
   -yid,--yarnapplicationId <arg>   Attach to running YARN session
   -z,--zookeeperNamespace <arg>  Namespace to create the Zookeeper
                    sub-paths for high availability mode

  Options for default mode:
   -m,--jobmanager <arg>       Address of the JobManager (master) to which
                   to connect. Use this flag to connect to a
                   different JobManager than the one specified
                   in the configuration.
   -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                   for high availability mode

Action "cancel" cancels a running program.

  Syntax: cancel [OPTIONS] <Job ID>
  "cancel" action options:
   -s,--withSavepoint <targetDirectory>   Trigger savepoint and cancel job.
                      The target directory is optional. If
                      no directory is specified, the
                      configured default directory
                      (state.savepoints.dir) is used.
  Options for yarn-cluster mode:
   -m,--jobmanager <arg>      Address of the JobManager (master) to
                    which to connect. Use this flag to connect
                    to a different JobManager than the one
                    specified in the configuration.
   -yid,--yarnapplicationId <arg>   Attach to running YARN session
   -z,--zookeeperNamespace <arg>  Namespace to create the Zookeeper
                    sub-paths for high availability mode

  Options for default mode:
   -m,--jobmanager <arg>       Address of the JobManager (master) to which
                   to connect. Use this flag to connect to a
                   different JobManager than the one specified
                   in the configuration.
   -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                   for high availability mode

Action "savepoint" triggers savepoints for a running job or disposes existing ones.

  Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
  "savepoint" action options:
   -d,--dispose <arg>     Path of savepoint to dispose.
   -j,--jarfile <jarfile>   Flink program JAR file.
  Options for yarn-cluster mode:
   -m,--jobmanager <arg>      Address of the JobManager (master) to
                    which to connect. Use this flag to connect
                    to a different JobManager than the one
                    specified in the configuration.
   -yid,--yarnapplicationId <arg>   Attach to running YARN session
   -z,--zookeeperNamespace <arg>  Namespace to create the Zookeeper
                    sub-paths for high availability mode

  Options for default mode:
   -m,--jobmanager <arg>       Address of the JobManager (master) to which
                   to connect. Use this flag to connect to a
                   different JobManager than the one specified
                   in the configuration.
   -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                   for high availability mode

Action "modify" modifies a running job (e.g. change of parallelism).

  Syntax: modify <Job ID> [OPTIONS]
  "modify" action options:
   -h,--help               Show the help message for the CLI
                     Frontend or the action.
   -p,--parallelism <newParallelism>   New parallelism for the specified job.
   -v,--verbose            This option is deprecated.
  Options for yarn-cluster mode:
   -m,--jobmanager <arg>      Address of the JobManager (master) to
                    which to connect. Use this flag to connect
                    to a different JobManager than the one
                    specified in the configuration.
   -yid,--yarnapplicationId <arg>   Attach to running YARN session
   -z,--zookeeperNamespace <arg>  Namespace to create the Zookeeper
                    sub-paths for high availability mode

  Options for default mode:
   -m,--jobmanager <arg>       Address of the JobManager (master) to which
                   to connect. Use this flag to connect to a
                   different JobManager than the one specified
                   in the configuration.
   -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                   for high availability mode

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。