package com.zdjizhi.utils; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.CommonConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class HdfsUtils { private static final Log logger = LogFactory.get(); private static FileSystem fileSystem; static { Configuration configuration = new Configuration(); try { //指定用户 //配置hdfs相关信息 // configuration.set("fs.defaultFS","hdfs://ns1"); // configuration.set("hadoop.proxyuser.root.hosts","*"); // configuration.set("hadoop.proxyuser.root.groups","*"); // configuration.set("ha.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM); // configuration.set("dfs.nameservices","ns1"); // configuration.set("dfs.ha.namenodes.ns1","nn1,nn2"); // configuration.set("dfs.namenode.rpc-address.ns1.nn1",CommonConfig.HDFS_URI_NS1); // configuration.set("dfs.namenode.rpc-address.ns1.nn2",CommonConfig.HDFS_URI_NS2); // configuration.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); // //创建fileSystem,用于连接hdfs //// fileSystem = FileSystem.get(configuration); System.setProperty("HADOOP_USER_NAME", CommonConfig.HDFS_USER); //创建fileSystem,用于连接hdfs fileSystem = FileSystem.get(new URI(CommonConfig.HDFS_URI_NS1),configuration); } catch (IOException e) { throw new RuntimeException(e); } catch (URISyntaxException e) { e.printStackTrace(); } } public static boolean isExists(String filePath) throws IOException { return fileSystem.exists(new Path(filePath)); } public static byte[] getFileBytes(String filePath) throws IOException { try (FSDataInputStream open = fileSystem.open(new Path(filePath))) { byte[] bytes = new byte[open.available()]; open.read(0, bytes, 0, open.available()); return bytes; } catch (IOException e) { logger.error("An I/O exception when files are download from HDFS. Message is :" + e.getMessage()); } return null; } public static void uploadFileByBytes(String filePath,byte[] bytes) throws IOException { try (FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(filePath), true)) { fsDataOutputStream.write(bytes); fsDataOutputStream.flush(); } catch (RuntimeException e) { logger.error("Uploading files to the HDFS is abnormal. Message is :" + e.getMessage()); } catch (IOException e) { logger.error("An I/O exception when files are uploaded to HDFS. Message is :" + e.getMessage()); } } public static void rename(String src, String dst) throws IOException { fileSystem.rename(new Path(src),new Path(dst)); } }