Skip to content

Commit

Permalink
Merge pull request #2 from baisui1981/tis-release-0.10.1
Browse files Browse the repository at this point in the history
Tis release 0.10.1
  • Loading branch information
baisui1981 authored Jan 29, 2024
2 parents 8541805 + 0145014 commit 496e8ad
Show file tree
Hide file tree
Showing 12 changed files with 2,167 additions and 2,070 deletions.
57 changes: 39 additions & 18 deletions hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -95,31 +96,51 @@ public static Configuration prepareHadoopConf(Configuration conf) {
return conf;
}

static ServiceLoader<IExtraHadoopFileSystemGetter> extraFileSystemLoader;

// static IExtraHadoopFileSystemGetter extraFileSystemLoader;

public static FileSystem getFs(String path, Configuration conf) {

if (extraFileSystemLoader == null) {
extraFileSystemLoader
= ServiceLoader.load(IExtraHadoopFileSystemGetter.class, FSUtils.class.getClassLoader());
}

for (IExtraHadoopFileSystemGetter loader : extraFileSystemLoader) {
FileSystem fs;
prepareHadoopConf(conf);
try {
LOG.info("load hdfs of " + path + " from extrnal System");
return loader.getHadoopFileSystem(path);
fs = new Path(path).getFileSystem(conf);
} catch (IOException e) {
throw new HoodieIOException("Failed to get instance of " + FileSystem.class.getName(), e);
}
return fs;
// if (extraFileSystemLoader == null || FSUtils.class.getClassLoader() != extraFileSystemLoader.getClass().getClassLoader()) {
// LOG.info("start to get create instance of extraFileSystemLoader");
// extraFileSystemLoader = getExtraFileSystemLoader(conf, 0);
// }
// if (extraFileSystemLoader == null) {
// throw new IllegalStateException("extraFileSystemLoader can not be null");
// }
//
// LOG.info("load hdfs of " + path + " from extrnal System");
// return extraFileSystemLoader.getHadoopFileSystem(path);
}

// FileSystem fs;
// prepareHadoopConf(conf);
// private static IExtraHadoopFileSystemGetter getExtraFileSystemLoader(Configuration conf, int retryCount) {
//
// try {
// fs = new Path(path).getFileSystem(conf);
// } catch (IOException e) {
// throw new HoodieIOException("Failed to get instance of " + FileSystem.class.getName(), e);
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// return fs;
throw new IllegalStateException("has not find any extraFileSystemLoader");
}
//
// ServiceLoader<IExtraHadoopFileSystemGetter> svcLoader
// = ServiceLoader.load(IExtraHadoopFileSystemGetter.class, FSUtils.class.getClassLoader());
// Iterator<IExtraHadoopFileSystemGetter> it = svcLoader.iterator();
// while (it.hasNext()) {
// return it.next();
// }
//
// if (retryCount < 3) {
// return getExtraFileSystemLoader(conf, retryCount + 1);
// } else {
// throw new IllegalStateException("has not find any extraFileSystemLoader,retryCount:" + retryCount);
// }
// }

public static FileSystem getFs(String path, Configuration conf, boolean localByDefault) {
if (localByDefault) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ public static void delete(FileSystem fs, Path metadataFolder, Set<String> delete
*/
public static void create(FileSystem fs, Path metadataFolder, Properties properties)
throws IOException {
// if(1==1){
// throw new IllegalStateException("");
// }
if (!fs.exists(metadataFolder)) {
fs.mkdirs(metadataFolder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.sink.utils;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
Expand Down Expand Up @@ -75,7 +76,8 @@ private static HiveSyncConfig buildSyncConfig(Configuration conf) {
hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME);
hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD);
hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL);
hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractPartitionKeys(conf));
// hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractPartitionKeys(conf));
hiveSyncConfig.partitionFields = Arrays.asList(org.apache.hadoop.util.StringUtils.split(conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS)));
hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME);
hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC);
hiveSyncConfig.useFileListingFromMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
Expand Down
Loading

0 comments on commit 496e8ad

Please sign in to comment.