Flink 快速入门开发 ( 基于scala )
pom 中导入依赖
<properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.11.8</scala.version><scala.binary.version>2.11</scala.binary.version><hadoop.version>2.7.6</hadoop.version><flink.version>1.6.1</flink.version><mysql.version>5.1.48</mysql.version><fastjson.version>1.2.51</fastjson.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><!-- <arg>-make:transitive</arg> --><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.apache.spark.WordCount</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
批处理 ( 文件 —> world count )
import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._/*** @author yanglibin* @create 2020-02-28 11:17*/object WorldCount {def wc1(filePath :String) = {val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentenv.readTextFile(filePath).flatMap(_.split(" ")).map( (_,1) ).groupBy(0).sum(1).print()}}
常见问题
Error:(15, 15) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String].flatMap(_.split(" "))解决:import org.apache.flink.api.scala._
流处理
Flink 是一个流处理框架, 而且是一个事件驱动的框架
file
def wc2(filePath: String) = {// 获取环境val environment = StreamExecutionEnvironment.getExecutionEnvironment// 设置 cpu 核数environment.setParallelism(4)environment.readTextFile(filePath).flatMap(_.split(" ")).map( (_,1) ).keyBy(0).sum(1).print()// 执行environment.execute()}
socket
/*** 无边界* @param hostname* @param port* @return*/def wc3(hostname:String, port: Int) = {val environment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setParallelism(4)// 从 socket 中获取网络数据environment.socketTextStream(hostname, port).flatMap(_.split(" ")).map( (_,1) ).keyBy(0).sum(1).print()environment.execute()}
Flink 集群环境搭建
Standalone模式
# 下载地址: https://flink.apache.org/downloads.html# 下载时注意 scala 的版本tar -xvf /usr/local/src/flink-1.9.2-bin-scala_2.11.tgz -C /usr/local/ln -sv /usr/local/flink-1.9.2 /usr/local/flink### flink 配置该用 #### conf/flink-conf.yamljobmanager.rpc.address flink-master# conf/masterflink-master# conf/slavesflink-node-01flink-node-02### flink cluster 启动 ###bin/start-cluster.sh
yarn 模式
1) Adding the Hadoop classpath to Flink
############# hadoop 环境 ############### Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty.if [ -z "$HADOOP_CONF_DIR" ]; thenif [ -n "$HADOOP_HOME" ]; then# HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME pathif [ -d "$HADOOP_HOME/conf" ]; then# its a Hadoop 1.xHADOOP_CONF_DIR="$HADOOP_HOME/conf"fiif [ -d "$HADOOP_HOME/etc/hadoop" ]; then# Its Hadoop 2.2+HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"fififi# try and set HADOOP_CONF_DIR to some common default if it's not setif [ -z "$HADOOP_CONF_DIR" ]; thenif [ -d "/etc/hadoop/conf" ]; thenecho "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set."HADOOP_CONF_DIR="/etc/hadoop/conf"fifi
2) Putting the required jar files into /lib directory of the Flink distribution
# run Flink on YARN, connect to HDFS, connect to HBase,# or use some Hadoop-based file system connector# 下载 对应的依赖lib 包 ( Pre-bundled Hadoop 2.7.5 ) 到 flink lib 目录下wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar -O /usr/local/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
flink job 运行
standalone 模式下
web UI 界面操作 ( 略 )
命令行运行
bin/flink run -c com.ylb.WorldCount /tmp/flink-demo.jar# -c : 指定 jar 包中要运行的类名
yarn 模式下
session-cluster
这里的session 表示的是 资源不释放,
使用:
- 先 yarn-session.sh 启动一个 session, 用来处理 flink
- 启动 flink 任务
# 1. 启动一个 session cluster test 实例bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d# 启动之后在本机(其他机器没有)的临时目录(/tmp 目录)下会生成一个文件:.yarn-properties-(用户名)# 2. 执行任务( 向上面创建的 yarn-application )bin/flink run -c com.ylb.WorldCount /tmp/flink-demo.jar# 3. kill yarn applicationyarn application -kill application_1583028240987_0002 # 对应自己的app_id# 参数介绍-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)Optional-at,--applicationType <arg> Set a custom application type for the application on YARN-D <property=value> use value for given property-d,--detached If present, runs the job in detached mode-h,--help Help for the Yarn session CLI.-id,--applicationId <arg> Attach to running YARN session-j,--jar <arg> Path to Flink jar file-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)-m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)-nl,--nodeLabel <arg> Specify YARN node label for the YARN application-nm,--name <arg> Set a custom name for the application on YARN-q,--query Display available YARN resources (memory, cores)-qu,--queue <arg> Specify YARN queue.-s,--slots <arg> Number of slots per TaskManager-sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, suchas typing Ctrl + C.-st,--streaming Start Flink in streaming mode-t,--ship <arg> Ship files in the specified directory (t for transfer)-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
Per Job Cluster
每一个 flink job 申请一个独立的 yarn-application
bin/flink run -m yarn-cluster -c com.ylb.WorldCount /tmp/flink-demo.jar# 注意要删除 /tmp/.yarn-properties-xxxx
两者的区别:
资源是否长时间占用不释放
任务链: 将并行度相同的一对一关系方法形成一个完整的任务来执行。
禁用操作链条: env.disableOperatorChaining() 会导致操作不会链接在一起形成完整的任务, 一个subTask 就是一个Task
startNewChain() : 从当前方法开始产生新的任务链条
