美文网首页
c++调用hdfs

c++调用hdfs

作者: zlcook | 来源:发表于2020-08-07 20:31 被阅读0次
    • hdfs.h头文件时hdfs为c++提供的所有可调用接口。该文件在 $HADOOP_HDFS_HOME/include/hdfs.h 目录下。其中的api是 Hadoop FileSystem APIs的一个子集,还额外包含了连接和断开操作。

    连接

    void HdfsFileReadProcessor::connect() {
        struct hdfsBuilder* builder = hdfsNewBuilder();
        // no port specified.
        hdfsBuilderSetNameNode(builder, host_.c_str());
        // should called port-set func.
        hdfsBuilderSetNameNodePort(builder, port_);
            // 下面注释掉的一行,让每次返回的hdfsFs都是新的实例,否则hdfsBuilderConnect会从Cache中返回已有的实例
            // hdfsBuilderSetForceNewInstance(builder);
        hdfs_fs_ = hdfsBuilderConnect(builder);
    }
    
    • 可能的坑:上面语句不加上hdfsBuilderSetForceNewInstance(builder);,那么hdfsBuilderConnect返回的FileSystem是单例,所以返回多个hdfsFs时,如果其中一个被disconnected了,那么其它的也就被断开连接了。当再次disconnected时就会出现类似如下错误:
    FSDataInputStream#close error:
    java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:817)
        at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:702)
        at java.io.FilterInputStream.close(FilterInputStream.java:181)
    

    打开文件

    
    void HdfsFileReadProcessor::openFile() {
        // open file
        int32_t flag = hdfsExists(hdfs_fs_, path_.c_str());
        // path exists
        if (flag == 0) {
            hdfsFileInfo* path_info = hdfsGetPathInfo(hdfs_fs_, path_.c_str());
            tObjectKind fkind = path_info->mKind;
            hdfsFreeFileInfo(path_info, 1);
            if (fkind == tObjectKind::kObjectKindDirectory) {
                CHECK(false) << fmt::format("Hdfs path {0} is not a regular file.",
                                            path_);
            }
            hdfs_file_ =
                hdfsOpenFile(hdfs_fs_, path_.c_str(), O_RDONLY, READ_BUFFER, 0, 0);
        } else {
            CHECK(false) << fmt::format("Hdfs file {0} is not exists.", path_);
        }
        CHECK(hdfsFileIsOpenForRead(hdfs_file_) != 1)
            << fmt::format("Open hdfs file {0} for read failed.", path_);
    }
    

    读取操作

        buffer_.resize(READ_BUFFER);
        tSize read_size = hdfsRead(hdfs_fs_, hdfs_file_, &(buffer_[left_size_]),
                                   READ_BUFFER - left_size_);
    

    关闭文件和关闭HdfsFileSystem

    
    void HdfsFileReadProcessor::disConnect() {
        if (hdfs_file_ != nullptr) {
            hdfsCloseFile(hdfs_fs_, hdfs_file_);
            hdfs_file_ = nullptr;
        }
    
           // 此处关闭后,如果hdfs_fs创建时不是以new instance方式创建的,那么全局是一个单例,此处关闭,那么其它地方使用时也是处于无效状态。
        if (hdfs_fs_ != nullptr) {
            hdfsDisconnect(hdfs_fs_);
                    hdfs_fs_ = nullptr;
        }
    }
    

    相关文章

      网友评论

          本文标题:c++调用hdfs

          本文链接:https://www.haomeiwen.com/subject/qxyqdktx.html