
参考博客:SpringBoot集成Hadoop系列一 —— 对HDFS的文件操作
1.导入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.3.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.3.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.3.3</version></dependency><dependency><groupId>cn.bestwu</groupId><artifactId>ik-analyzers</artifactId><version>5.1.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.46</version></dependency><!-- 此处的导入依赖与mapreduce有关 --><dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.8</version><scope>system</scope><systemPath>${JAVA_HOME}/lib/tools.jar</systemPath></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
2.核心代码
application.properties
# tomcat thread = 200server.tomcat.max-threads=1000# edit tomcat portserver.port=8900# session time 30server.session-timeout=60spring.application.name=hadoopspring.servlet.multipart.max-file-size=50MBspring.servlet.multipart.max-request-size=50MBhdfs.path=hdfs://hadoop0:9000hdfs.username=root
HdfsConfig.java
import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Configuration;/*** @author HappyDragon1994*/@Configurationpublic class HdfsConfig {@Value("${hdfs.path}")private String path;public String getPath() {return path;}public void setPath(String path) {this.path = path;}}
HdfsController.java
package com.zym.controller;import com.zym.entity.User;import com.zym.result.Result;import com.zym.service.HdfsService;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import org.apache.hadoop.fs.BlockLocation;import org.springframework.web.bind.annotation.*;import org.springframework.web.multipart.MultipartFile;import java.util.List;import java.util.Map;@RestController@RequestMapping("/hadoop/hdfs")@Slf4jpublic class HdfsController {/*** 创建文件夹* @param path* @return* @throws Exception*/@RequestMapping(value = "mkdir", method = RequestMethod.POST)@ResponseBodypublic Result mkdir(@RequestParam("path") String path) throws Exception {if (StringUtils.isEmpty(path)) {System.out.println("请求参数为空");return new Result(Result.FAILURE, "请求参数为空");}// 创建空文件夹boolean isOk = HdfsService.mkdir(path);if (isOk) {log.debug("文件夹创建成功");return new Result(Result.SUCCESS, "文件夹创建成功");} else {log.debug("文件夹创建失败");return new Result(Result.FAILURE, "文件夹创建失败");}}/*** 读取HDFS目录信息* @param path* @return* @throws Exception*/@PostMapping("/readPathInfo")public Result readPathInfo(@RequestParam("path") String path) throws Exception {List<Map<String, Object>> list = HdfsService.readPathInfo(path);return new Result(Result.SUCCESS, "读取HDFS目录信息成功", list);}/*** 获取HDFS文件在集群中的位置* @param path* @return* @throws Exception*/@PostMapping("/getFileBlockLocations")public Result getFileBlockLocations(@RequestParam("path") String path) throws Exception {BlockLocation[] blockLocations = HdfsService.getFileBlockLocations(path);return new Result(Result.SUCCESS, "获取HDFS文件在集群中的位置", blockLocations);}/*** 创建文件* @param path* @return* @throws Exception*/@PostMapping("/createFile")public Result createFile(@RequestParam("path") String path, @RequestParam("file") MultipartFile file)throws Exception {if (StringUtils.isEmpty(path) || null == file.getBytes()) {return new Result(Result.FAILURE, "请求参数为空");}HdfsService.createFile(path, file);return new Result(Result.SUCCESS, "创建文件成功");}/*** 读取HDFS文件内容* @param path* @return* @throws Exception*/@PostMapping("/readFile")public Result readFile(@RequestParam("path") String path) throws Exception {String targetPath = HdfsService.readFile(path);return new Result(Result.SUCCESS, "读取HDFS文件内容", targetPath);}/*** 读取HDFS文件转换成Byte类型* @param path* @return* @throws Exception*/@PostMapping("/openFileToBytes")public Result openFileToBytes(@RequestParam("path") String path) throws Exception {byte[] files = HdfsService.openFileToBytes(path);return new Result(Result.SUCCESS, "读取HDFS文件转换成Byte类型", files);}/*** 读取HDFS文件装换成User对象* @param path* @return* @throws Exception*/@PostMapping("/openFileToUser")public Result openFileToUser(@RequestParam("path") String path) throws Exception {User user = HdfsService.openFileToObject(path, User.class);return new Result(Result.SUCCESS, "读取HDFS文件装换成User对象", user);}/*** 读取文件列表* @param path* @return* @throws Exception*/@PostMapping("/listFile")public Result listFile(@RequestParam("path") String path) throws Exception {if (StringUtils.isEmpty(path)) {return new Result(Result.FAILURE, "请求参数为空");}List<Map<String, String>> returnList = HdfsService.listFile(path);return new Result(Result.SUCCESS, "读取文件列表成功", returnList);}/*** 重命名文件* @param oldName* @param newName* @return* @throws Exception*/@PostMapping("/renameFile")public Result renameFile(@RequestParam("oldName") String oldName, @RequestParam("newName") String newName)throws Exception {if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {return new Result(Result.FAILURE, "请求参数为空");}boolean isOk = HdfsService.renameFile(oldName, newName);if (isOk) {return new Result(Result.SUCCESS, "文件重命名成功");} else {return new Result(Result.FAILURE, "文件重命名失败");}}/*** 删除文件* @param path* @return* @throws Exception*/@PostMapping("/deleteFile")public Result deleteFile(@RequestParam("path") String path) throws Exception {boolean isOk = HdfsService.deleteFile(path);if (isOk) {return new Result(Result.SUCCESS, "delete file success");} else {return new Result(Result.FAILURE, "delete file fail");}}/*** 上传文件* @param path* @param uploadPath* @return* @throws Exception*/@PostMapping("/uploadFile")public Result uploadFile(@RequestParam("path") String path, @RequestParam("uploadPath") String uploadPath)throws Exception {HdfsService.uploadFile(path, uploadPath);return new Result(Result.SUCCESS, "upload file success");}/*** 下载文件* @param path* @param downloadPath* @return* @throws Exception*/@PostMapping("/downloadFile")public Result downloadFile(@RequestParam("path") String path, @RequestParam("downloadPath") String downloadPath)throws Exception {HdfsService.downloadFile(path, downloadPath);return new Result(Result.SUCCESS, "download file success");}/*** HDFS文件复制* @param sourcePath* @param targetPath* @return* @throws Exception*/@PostMapping("/copyFile")public Result copyFile(@RequestParam("sourcePath") String sourcePath, @RequestParam("targetPath") String targetPath)throws Exception {HdfsService.copyFile(sourcePath, targetPath);return new Result(Result.SUCCESS, "copy file success");}/*** 查看文件是否已存在* @param path* @return* @throws Exception*/@PostMapping("/existFile")public Result existFile(@RequestParam("path") String path) throws Exception {boolean isExist = HdfsService.existFile(path);return new Result(Result.SUCCESS, "file isExist: " + isExist);}}
User.java
package com.zym.entity;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/*** @author HappyDragon1994*/public class User implements Writable {private String username;private Integer age;private String address;public User() {super();// TODO Auto-generated constructor stub}public User(String username, Integer age, String address) {super();this.username = username;this.age = age;this.address = address;}@Overridepublic void write(DataOutput output) throws IOException {// 把对象序列化output.writeChars(username);output.writeInt(age);output.writeChars(address);}@Overridepublic void readFields(DataInput input) throws IOException {// 把序列化的对象读取到内存中username = input.readUTF();age = input.readInt();address = input.readUTF();}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}@Overridepublic String toString() {return "User [username=" + username + ", age=" + age + ", address=" + address + "]";}}
Result.java
package com.zym.result;import lombok.Data;@Datapublic class Result {private String resCode;private String resDes;private Object data;public static final String FAILURE="sys-00-01";public static final String SUCCESS = "SUCCESS";public Result(String resCode,String resDes,Object data){this.resCode = resCode;this.resDes = resDes;this.data=data;}public Result(String resCode,String resDes){this.resCode = resCode;this.resDes = resDes;}}
HdfsService.java
package com.zym.service;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang3.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import org.apache.hadoop.hdfs.web.JsonUtil;import org.apache.hadoop.io.IOUtils;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import org.springframework.web.multipart.MultipartFile;import javax.annotation.PostConstruct;import java.io.BufferedReader;import java.io.InputStreamReader;import java.net.URI;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;@Componentpublic class HdfsService {@Value("${hdfs.path}")private String path;@Value("${hdfs.username}")private String username;private static String hdfsPath;private static String hdfsName;private static final int bufferSize = 1024 * 1024 * 64;@PostConstructpublic void getPath() {hdfsPath = this.path;}@PostConstructpublic void getName() {hdfsName = this.username;}public static String getHdfsPath() {return hdfsPath;}public String getUsername() {return username;}/*** 获取HDFS配置信息* @return*/private static Configuration getConfiguration() {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", hdfsPath);return configuration;}/*** 获取HDFS文件系统对象* @return* @throws Exception*/public static FileSystem getFileSystem() throws Exception {// 客户端去操作hdfs时是有一个用户身份的,默认情况下hdfs客户端api会从jvm中获取一个参数作为自己的用户身份// DHADOOP_USER_NAME=hadoop// 也可以在构造客户端fs对象时,通过参数传递进去FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);return fileSystem;}/*** 在HDFS创建文件夹* @param path* @return* @throws Exception*/public static boolean mkdir(String path) throws Exception {if (StringUtils.isEmpty(path)) {return false;}if (existFile(path)) {return true;}FileSystem fs = getFileSystem();// 目标路径Path srcPath = new Path(path);boolean isOk = fs.mkdirs(srcPath);fs.close();return isOk;}/*** 判断HDFS文件是否存在* @param path* @return* @throws Exception*/public static boolean existFile(String path) throws Exception {if (StringUtils.isEmpty(path)) {return false;}FileSystem fs = getFileSystem();Path srcPath = new Path(path);boolean isExists = fs.exists(srcPath);return isExists;}/*** 读取HDFS目录信息* @param path* @return* @throws Exception*/public static List<Map<String, Object>> readPathInfo(String path) throws Exception {if (StringUtils.isEmpty(path)) {return null;}if (!existFile(path)) {return null;}FileSystem fs = getFileSystem();// 目标路径Path newPath = new Path(path);FileStatus[] statusList = fs.listStatus(newPath);List<Map<String, Object>> list = new ArrayList<>();if (null != statusList && statusList.length > 0) {for (FileStatus fileStatus : statusList) {Map<String, Object> map = new HashMap<>();map.put("filePath", fileStatus.getPath());map.put("fileStatus", fileStatus.toString());list.add(map);}return list;} else {return null;}}/*** HDFS创建文件* @param path* @param file* @throws Exception*/public static void createFile(String path, MultipartFile file) throws Exception {if (StringUtils.isEmpty(path) || null == file.getBytes()) {return;}String fileName = file.getOriginalFilename();FileSystem fs = getFileSystem();// 上传时默认当前目录,后面自动拼接文件的目录Path newPath = new Path(path + "/" + fileName);// 打开一个输出流FSDataOutputStream outputStream = fs.create(newPath);outputStream.write(file.getBytes());outputStream.close();fs.close();}/*** 读取HDFS文件内容* @param path* @return* @throws Exception*/public static String readFile(String path) throws Exception {if (StringUtils.isEmpty(path)) {return null;}if (!existFile(path)) {return null;}FileSystem fs = getFileSystem();// 目标路径Path srcPath = new Path(path);FSDataInputStream inputStream = null;try {inputStream = fs.open(srcPath);// 防止中文乱码BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));String lineTxt = "";StringBuffer sb = new StringBuffer();while ((lineTxt = reader.readLine()) != null) {sb.append(lineTxt);}return sb.toString();} finally {inputStream.close();fs.close();}}/*** 读取HDFS文件列表* @param path* @return* @throws Exception*/public static List<Map<String, String>> listFile(String path) throws Exception {if (StringUtils.isEmpty(path)) {return null;}if (!existFile(path)) {return null;}FileSystem fs = getFileSystem();// 目标路径Path srcPath = new Path(path);// 递归找到所有文件RemoteIterator<LocatedFileStatus> filesList = fs.listFiles(srcPath, true);List<Map<String, String>> returnList = new ArrayList<>();while (filesList.hasNext()) {LocatedFileStatus next = filesList.next();String fileName = next.getPath().getName();Path filePath = next.getPath();Map<String, String> map = new HashMap<>();map.put("fileName", fileName);map.put("filePath", filePath.toString());returnList.add(map);}fs.close();return returnList;}/*** HDFS重命名文件* @param oldName* @param newName* @return* @throws Exception*/public static boolean renameFile(String oldName, String newName) throws Exception {if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {return false;}FileSystem fs = getFileSystem();// 原文件目标路径Path oldPath = new Path(oldName);// 重命名目标路径Path newPath = new Path(newName);boolean isOk = fs.rename(oldPath, newPath);fs.close();return isOk;}/*** 删除HDFS文件* @param path* @return* @throws Exception*/public static boolean deleteFile(String path) throws Exception {if (StringUtils.isEmpty(path)) {return false;}if (!existFile(path)) {return false;}FileSystem fs = getFileSystem();Path srcPath = new Path(path);boolean isOk = fs.deleteOnExit(srcPath);fs.close();return isOk;}/*** 上传HDFS文件* @param path* @param uploadPath* @throws Exception*/public static void uploadFile(String path, String uploadPath) throws Exception {if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {return;}FileSystem fs = getFileSystem();// 上传路径Path clientPath = new Path(path);// 目标路径Path serverPath = new Path(uploadPath);// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为falsefs.copyFromLocalFile(false, clientPath, serverPath);fs.close();}/*** 下载HDFS文件* @param path* @param downloadPath* @throws Exception*/public static void downloadFile(String path, String downloadPath) throws Exception {if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {return;}FileSystem fs = getFileSystem();// 上传路径Path clientPath = new Path(path);// 目标路径Path serverPath = new Path(downloadPath);// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为falsefs.copyToLocalFile(false, clientPath, serverPath);fs.close();}/*** HDFS文件复制* @param sourcePath* @param targetPath* @throws Exception*/public static void copyFile(String sourcePath, String targetPath) throws Exception {if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) {return;}FileSystem fs = getFileSystem();// 原始文件路径Path oldPath = new Path(sourcePath);// 目标路径Path newPath = new Path(targetPath);FSDataInputStream inputStream = null;FSDataOutputStream outputStream = null;try {inputStream = fs.open(oldPath);outputStream = fs.create(newPath);IOUtils.copyBytes(inputStream, outputStream, bufferSize, false);} finally {inputStream.close();outputStream.close();fs.close();}}/*** 打开HDFS上的文件并返回byte数组* @param path* @return* @throws Exception*/public static byte[] openFileToBytes(String path) throws Exception {if (StringUtils.isEmpty(path)) {return null;}if (!existFile(path)) {return null;}FileSystem fs = getFileSystem();// 目标路径Path srcPath = new Path(path);try {FSDataInputStream inputStream = fs.open(srcPath);return IOUtils.readFullyToByteArray(inputStream);} finally {fs.close();}}/*** 打开HDFS上的文件并返回java对象* @param path* @return* @throws Exception*/public static <T extends Object> T openFileToObject(String path, Class<T> clazz) throws Exception {if (StringUtils.isEmpty(path)) {return null;}if (!existFile(path)) {return null;}String jsonStr = readFile(path);return JSONObject.parseObject(jsonStr,clazz);}/*** 获取某个文件在HDFS的集群位置* @param path* @return* @throws Exception*/public static BlockLocation[] getFileBlockLocations(String path) throws Exception {if (StringUtils.isEmpty(path)) {return null;}if (!existFile(path)) {return null;}FileSystem fs = getFileSystem();// 目标路径Path srcPath = new Path(path);FileStatus fileStatus = fs.getFileStatus(srcPath);return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());}}
代码地址:???
……哎,开源路漫漫,Gitee现在仓库公开还要审核……下次改用GitHub吧,这次的代码上面都贴出来了,复制粘贴即可~
部分Postman测试截图:






