安装并启动 Flink
- 下载 Flink 1.14.4 (使用的是 scala_2.12 版本)并将其解压至目录 flink-1.14.4
wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgztar -xvf flink-1.14.4-bin-scala_2.12.tgz
- 打开配置文件
vi /etc/profile
- 添加环境变量 HADOOP_CLASSPATH(需要 Hadoop 类 )
# HADOOP_HOME is your hadoop root directory after unpack the binary package.export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
如果未配置会报错
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
- 修改 flink-conf.yaml,默认是 child-first,添加
classloader.resolve-order: parent-first
vim conf/flink-conf.yamlclassloader.resolve-order: parent-first
- 启动 Flink 集群
cd flink-1.14.4bin/start-cluster.sh
如果未修改会报错
[ERROR] Could not execute SQL statement. Reason:**
**java.io.StreamCorruptedException: unexpected block data
启动 Flink SQL Client
在 Flink 中包含了 FileSystem SQL Connector,不需要添加额外的依赖
从文件系统中读取或者向文件系统中写入行时,需要指定相应的 format,有些 format 需要下载自己的依赖
文件系统连接器允许从本地或分布式文件系统进行读写
- 下载 flink-sql-parquet 依赖包(flink-1.14.4/lib 目录下)
cd libwget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-parquet_2.12/1.14.4/flink-sql-parquet_2.12-1.14.4.jar
- 启动 Flink SQL Client
./bin/sql-client.sh
- 创建表
CREATE TABLE cluster (cluster_id STRING,name STRING,app_id STRING) WITH ('connector' = 'filesystem','path' = 'hdfs://localhost:9000/root/cluster/','format' = 'parquet');
path 属性指定的是目录,而不是文件,该目录下的文件也不是可读的
- 读取 Parquet 文件
select * from cluster;
参考文档
