zoukankan      html  css  js  c++  java
  • 12 hdfs常用文件、目录拷贝操作、删除操作

    package com.da.hbase.tool.utils;
    
    import com.da.hbase.tool.common.Const;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.RemoteIterator;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.File;
    import java.io.IOException;
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    /**
     * hdfs操作常用方法类
     */
    public class HdfsUtils {
        public static final Logger LOG= LoggerFactory.getLogger(HdfsUtils.class);
        /**
         * 通过ip直接连接hdfs
         * @param ip
         * @return
         */
        public static FileSystem getFsFromIp(String ip){
            FileSystem fs = null;
            try {
                fs=FileSystem.get(URI.create("hdfs://"+ip),new Configuration());
            } catch (IOException e) {
                LOG.error("此ip:{} 连接出现异常", ip);
            }
            return  fs;
        }
    
        /**
         * 检查该fs是否可用
         * @param fs
         * @return
         */
        public static Boolean checkFs(FileSystem fs){
            Boolean success=true;
            if(null==fs){
                return false;
            }
            Path path=new Path("/");
            try {
                RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path);
                success=true;
            } catch (IOException e) {
                success=false;
            }
            return success;
        }
    
        /**
         * 从ips中获取一个可用的fs
         * @param ips
         * @return
         */
        public static FileSystem getAndCheckFs(String ips){
            return getAndCheckFs(ips,",");
        }
        /**
         * 从ips中获取一个可用的fs
         * @param ips
         * @param separator
         * @return
         */
        public static FileSystem getAndCheckFs(String ips,String separator){
            String [] ipArr=ips.split(separator);
            FileSystem fs=null;
            for (String ip : ipArr) {
                fs=getFsFromIp(ip);
                if(checkFs(fs)){
                    LOG.info("此Ip:{}可连接hdfs",ip);
                    break;
                }else{
                    fs=null;
                }
            }
            if(null==fs){
                LOG.error("无法连接hdfs环境,请检查网络是否可用或者ip配置是否正确,配置ips:{}",ips);
            }
            return fs;
        }
    
        /**
         * 测试getAndCheckFs方法
         */
        private static void testConnectFs(){
            String ips="10.17.139.126,10.17.139.127,10.17.139.125";
            FileSystem fs=getAndCheckFs(ips);
            String path1="/hbase/data/default/";
            Path path=new Path(path1);
            try {
                RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path);
                while(remoteIterator.hasNext()){
                    System.out.println(remoteIterator.next().getPath());
                }
            } catch (IOException e) {
            }
        }
    
        /**
         * 查看当前路径是否存在
         * @param fs
         * @param path
         * @return
         */
        public static Boolean checkPathExist(FileSystem fs,String path){
            Boolean isExist=true;
            try {
                isExist=fs.exists(new Path(path));
            } catch (IOException e) {
                isExist=false;
                e.printStackTrace();
            }
            return  isExist;
        }
    
        /**
         * 递归遍历找到所有目录和文件存储在map中,文件,key:路径,value:FILE ;目录,key:路径,value:DIR
         * @param fs
         * @param src
         */
        public static void recureScanDir(FileSystem fs,Path src, Map<Path,String> map){
            try{
                if(fs.isFile(src)) {
                    map.put(src, Const.FILE_STATUS);
                }else{
                    map.remove(src);
                    RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(src);
                    if(!remoteIterator.hasNext()){
                        map.put(src, Const.DIR_STATUS);
                    }else {
                        while (remoteIterator.hasNext()){
                            recureScanDir(fs,remoteIterator.next().getPath(),map);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            }
    
        /**
         * 目录从本地拷贝到hdfs上
         * @param fs
         * @param src
         * @param dst
         * @return
         */
        public static Boolean copyFromLocal(FileSystem fs,Path src,Path dst){
            Boolean success=true;
            try {
                if(fs.exists(dst)){
                    fs.delete(dst,true);
                }
                fs.copyFromLocalFile(false,true,src,dst);
                success=true;
            } catch (IOException e) {
                success=false;
                LOG.error("文件从本地拷贝到hdfs上,出现Io异常,导致拷贝文件失败,src:{},dst:{}",src,dst);
                e.printStackTrace();
            }
            return success;
        }
    
            /**
         *目录从hdfs上拷贝到本地
         * @param fs
         * @param src
         * @param dst
         * @return
         */
        public static Boolean copyToLocal(FileSystem fs,Path src,Path dst){
            Boolean success=true;
            try {
                if(new File(dst.toString()).exists()){
                    Utils.deletNotEmptyDir(new File(dst.toString()));
                }
                fs.copyToLocalFile(false, src, dst, true);
                success=true;
            } catch (IOException e) {
                success=false;
                LOG.error("文件从hdfs拷贝到本地,出现Io异常,导致拷贝文件失败");
                e.printStackTrace();
            }
            return success;
        }
    
        private static void testCopyFileToLocal(){
            String ips="10.17.139.126,10.17.139.127,10.17.139.125";
            FileSystem fs=getAndCheckFs(ips);
            String path1="/hbase/data/default/";
            Path path=new Path(path1);
            try {
                RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path);
                while(remoteIterator.hasNext()){
                    System.out.println(remoteIterator.next().getPath());
                }
            } catch (IOException e) {
                LOG.error(e.getMessage());
            }
        }
    
        /**
         * 获取目录path下所有的文件名
         * @param fs
         * @param path
         * @return
         */
        public static List<String> scanDir(FileSystem fs,Path path){
            List<String> list=new ArrayList<>();
            try {
                RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path);
                while(remoteIterator.hasNext()){
                    list.add(remoteIterator.next().getPath().getName());
                }
            } catch (IOException e) {
                LOG.error(e.getMessage());
            }
            return list;
        }
    
        public static void main(String[] args) {
            //testConnectFs();
            testCopyFileToLocal();
    
        }
    }
  • 相关阅读:
    Qt中实现单例模式(SingleTon)
    毕设开发手记(二)
    Qt中由表中单元格的QModelIndex获取Global Pos的正确方法
    Qt的槽可以使用默认参数
    C#结构的了解
    dos命令集江南技术联盟
    C#中的abstract与virtual的用法
    [转]六种删除数据库重复行的方法
    PL/SQL的TO_CHAR()与TO_DATE()
    索引索引索引(转)
  • 原文地址:https://www.cnblogs.com/yangh2016/p/6076984.html
Copyright © 2011-2022 走看看