概述
通识
一、Spark使用
目前只支持DataFrame API
val df = Seq((1, "foo", "2020/04/04", System.currentTimeMillis()),(2, "bar Value", "2020/04/04", System.currentTimeMillis())).toDF("id", "name", "dt", "ts")df.write.format("hudi").options(getQuickstartWriteConfigs).option(RECORDKEY_FIELD_OPT_KEY, "id").option(PARTITIONPATH_FIELD_OPT_KEY, "dt").option(TABLE_NAME, tableName).option(HIVE_SYNC_ENABLED_OPT_KEY, true).option(HIVE_DATABASE_OPT_KEY, "luna_hudi").option(HIVE_TABLE_OPT_KEY, tableName).option(HIVE_URL_OPT_KEY, "jdbc:hive2://foo:10000").option(HIVE_USER_OPT_KEY, "admin").option(HIVE_PARTITION_FIELDS_OPT_KEY, "dt").mode(SaveMode.Overwrite).save(basePath)
Spark依赖
<!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-spark-bundle --><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-spark-bundle_2.11</artifactId><version>0.7.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-avro --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-avro_2.11</artifactId><version>2.4.6</version></dependency>
Spark配置
spark.serializer=org.apache.spark.serializer.KryoSerializerspark.sql.hive.convertMetastoreParquet=false
同步Hive配置项
同步Hive原理是创建一张外部表。
HIVE_SYNC_ENABLED_OPT_KEY=trueHIVE_DATABASE_OPT_KEY=luna_hudiHIVE_TABLE_OPT_KEY=luna_odsHIVE_URL_OPT_KEY=jdbc:hive2://foo:10000
二、与Hive对接
Spark开启Hive自动同步选项,会直接向HiveServer2创建外部表。
外部表
要把hudi-hadoop-mr-bundle放在目录hive的lib或者Spark ThriftServer的jars目录其中HiveServer2/ThriftServer。
https://mvnrepository.com/artifact/org.apache.hudi/hudi-hadoop-mr-bundle/0.7.0
Spark ThriftServer不支持 ALTER TABLE
org.apache.hudi.hive.HoodieHiveSyncException: Failed in executing SQL ALTER TABLE
luna_hudi.luna_ods_cowREPLACE COLUMNS(_hoodie_commit_timestring,_hoodie_commit_seqnostring,_hoodie_record_keystring,_hoodie_partition_pathstring,_hoodie_file_namestring,idint,namestring,dtstring,tsbigint ) at org.apache.hudi.hive.HoodieHiveClient.updateHiveSQL(HoodieHiveClient.java:369) at org.apache.hudi.hive.HoodieHiveClient.updateTableDefinition(HoodieHiveClient.java:251) at org.apache.hudi.hive.HiveSyncTool.syncSchema(HiveSyncTool.java:188) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:136) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:94) at org.apache.hudi.HoodieSparkSqlWriter$.org$apache$hudi$HoodieSparkSqlWriter$$syncHive(HoodieSparkSqlWriter.scala:355) at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:403)
at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:399)at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:399)at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:460)at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:217)at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:134)at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)at HudiEntry$.main(HudiEntry.scala:44)at HudiEntry.main(HudiEntry.scala)Caused by: java.sql.SQLException: org.apache.spark.sql.catalyst.parser.ParseException:
Operation not allowed: ALTER TABLE REPLACE COLUMNS(line 1, pos 0)
== SQL ==
ALTER TABLE
luna_hudi.luna_ods_cowREPLACE COLUMNS(_hoodie_commit_timestring,_hoodie_commit_seqnostring,_hoodie_record_keystring,_hoodie_partition_pathstring,_hoodie_file_namestring,idint,namestring,dtstring,tsbigint )^^^
at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:296)at org.apache.hudi.hive.HoodieHiveClient.updateHiveSQL(HoodieHiveClient.java:367)... 35 more
官方提供的同步Hive元数据工具
https://hudi.apache.org/docs/writing_data.html#syncing-to-hive
三、PrestoDB使用
presto hive.properties配置即可读Hudi表。
connector.name=hive-hadoop2hive.metastore.uri=thrift://foo:9083hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xmlhive.parquet.use-column-names=true
hive.parquet.use-column-names=true
https://www.google.com/search?q=presto+The+column+of+table+is+declared+as+type+bigint%2C+but+the+Parquet+file+(declares+the+column+as+type+BINARY&oq=presto+The+column+of+table+is+declared+as+type+bigint%2C+but+the+Parquet+file+(declares+the+column+as+type+BINARY&aqs=chrome..69i57j35i39l2j0i12j0l2j0i12l2j0l2.21954j0j7&sourceid=chrome&ie=UTF-8
https://stackoverflow.com/questions/60183579/presto-fails-with-type-mismatch-errors
四、Flink使用
Flink现在不能实时读Hudi。官方只是提供了个读Kafka写Hudi的工具Jar包。其他额外功能要自己开源码写代码实现,比如FlinkSQL的支持就需要自己实现。
https://mp.weixin.qq.com/s/d1GI1AYHUpKwz_VHd41CeA
数据迁移
历史的数据文件不能拿来直接用,一定要做一次有计算代价的转化。
新的Hive分区被Hudi掌管
全量数据迁移至Hudi
参考资料
https://hudi.apache.org/docs/migration_guide.html
