美文网首页
Zeppelin notebook迁移到Databricks第二

Zeppelin notebook迁移到Databricks第二

作者: FredricZhu | 来源:发表于2022-04-15 20:03 被阅读0次

本例先用Spark把Zeppelin Notebook目录下的所有notebook信息都dump到Snowflake数据库。
再使用C++代码从数据库中还原zeppelin notebook,并将Zeppelin Notebook转换为Jupyter Notebook。
最后使用Databricks API将Jupyter Notebook上传到Databricks Workspace。

从S3 dump Zeppelin Notebook的pyspark代码如下,

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
TEST_SF_OPTIONS = {
        "sfURL": "{SNOWFLAKE_URL}",
        "sfUser": "{SF_USER}",
        "sfPassword": "{SF_PASSWORD}",
        "sfDatabase": "{SF_DATABASE}",
        "sfWarehouse": "{SF_WAREHOUSE}",
        "dbtable": "zeppelin_notes"
}
## Whole Text Files 会将所有文本文件读成 _1 路径, _2内容的形式
rdd = sc.wholeTextFiles("{NOTEBOOK_S3_PATH}")
rdd.count()
columns = ["path", "note"]
df = rdd.toDF(columns)
df.show(10, True)

df.write.format(SNOWFLAKE_SOURCE_NAME).options(**TEST_SF_OPTIONS).mode("overwrite").save()

C++侧代码结构如下,


image.png

test/CMakeLists.txt

cmake_minimum_required(VERSION 2.6)

if(APPLE)
    message(STATUS "This is Apple, do nothing.")
    set(CMAKE_MACOSX_RPATH 1)
    set(CMAKE_PREFIX_PATH /Users/aabjfzhu/software/vcpkg/ports/cppwork/vcpkg_installed/x64-osx/share )
elseif(UNIX)
    message(STATUS "This is linux, set CMAKE_PREFIX_PATH.")
    set(CMAKE_PREFIX_PATH /vcpkg/ports/cppwork/vcpkg_installed/x64-linux/share)
endif(APPLE)

project(zepplin_mig)

set(CMAKE_CXX_STANDARD 17)

add_definitions(-g)

find_package(ZLIB)

find_package(OpenCV REQUIRED )
find_package(Arrow CONFIG REQUIRED)

find_package(unofficial-brotli REQUIRED)
find_package(unofficial-utf8proc CONFIG REQUIRED)
find_package(Thrift CONFIG REQUIRED)

find_package(glog REQUIRED)

find_package(OpenSSL REQUIRED)

find_package(Boost REQUIRED COMPONENTS
    system
    filesystem
    serialization
    program_options
    thread
    )

find_package(DataFrame REQUIRED)

if(APPLE)
    MESSAGE(STATUS "This is APPLE, set INCLUDE_DIRS")
set(INCLUDE_DIRS ${Boost_INCLUDE_DIRS} /usr/local/include /usr/local/iODBC/include /opt/snowflake/snowflakeodbc/include/ ${CMAKE_CURRENT_SOURCE_DIR}/../include/ ${CMAKE_CURRENT_SOURCE_DIR}/../../../include)
elseif(UNIX)
    MESSAGE(STATUS "This is linux, set INCLUDE_DIRS")
    set(INCLUDE_DIRS ${Boost_INCLUDE_DIRS} /usr/local/include ${CMAKE_CURRENT_SOURCE_DIR}/../include/   ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/)
endif(APPLE)


if(APPLE)
    MESSAGE(STATUS "This is APPLE, set LINK_DIRS")
    set(LINK_DIRS /usr/local/lib /usr/local/iODBC/lib /opt/snowflake/snowflakeodbc/lib/universal)
elseif(UNIX)
    MESSAGE(STATUS "This is linux, set LINK_DIRS")
    set(LINK_DIRS ${Boost_INCLUDE_DIRS} /usr/local/lib /vcpkg/ports/cppwork/vcpkg_installed/x64-linux/lib)
endif(APPLE)

if(APPLE)
    MESSAGE(STATUS "This is APPLE, set ODBC_LIBS")
    set(ODBC_LIBS iodbc iodbcinst)
elseif(UNIX)
    MESSAGE(STATUS "This is linux, set LINK_DIRS")
    set(ODBC_LIBS odbc odbcinst ltdl)
endif(APPLE)

include_directories(${INCLUDE_DIRS})
LINK_DIRECTORIES(${LINK_DIRS})

file( GLOB test_file_list ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp) 

file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/../impl/utils/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../include/utils/*.h ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/arr_/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/http/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/yaml/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/df/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/death_handler/impl/*.cpp  ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/api_accuracy/utils/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/api_accuracy/impl/*.cpp)

add_library(${PROJECT_NAME}_lib SHARED ${APP_SOURCES} ${test_file})
target_link_libraries(${PROJECT_NAME}_lib ${Boost_LIBRARIES} ZLIB::ZLIB glog::glog DataFrame::DataFrame ${OpenCV_LIBS})
target_link_libraries(${PROJECT_NAME}_lib OpenSSL::SSL OpenSSL::Crypto libgtest.a pystring libyaml-cpp.a libgmock.a ${ODBC_LIBS} libnanodbc.a pthread dl backtrace libzstd.a libbz2.a libsnappy.a re2::re2 parquet lz4 unofficial::brotli::brotlidec-static unofficial::brotli::brotlienc-static unofficial::brotli::brotlicommon-static utf8proc thrift::thrift  arrow arrow_dataset)

foreach( test_file ${test_file_list} )
    file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${test_file})
    string(REPLACE ".cpp" "" file ${filename})
    add_executable(${file}  ${test_file})
    target_link_libraries(${file} ${PROJECT_NAME}_lib)
endforeach( test_file ${test_file_list})

test/conv_nb_2_py_test.cpp


#include "death_handler/death_handler.h"
#include "json/json.hpp"
#include <glog/logging.h>
#include "utils/conv_util.h"
#include "utils/import_util.h"
#include "utils/zepplin_mig_cfg.h"

#include <gtest/gtest.h>
#include "df/df.h"

using json = nlohmann::json;

int main(int argc, char** argv) {
    FLAGS_log_dir = "./";
    FLAGS_alsologtostderr = true;
    // 日志级别 INFO, WARNING, ERROR, FATAL 的值分别为0、1、2、3
    FLAGS_minloglevel = 0;

    Debug::DeathHandler dh;

    google::InitGoogleLogging("./logs.log");
    testing::InitGoogleTest(&argc, argv);
    int ret = RUN_ALL_TESTS();
    return ret;
}

GTEST_TEST(ConvNB2PyTests, NB2PyLocal) {
    auto res = ConvUtil::get_all_note_books(zepplin_js_path, DataSource::LOCAL);
    auto conv_res = ConvUtil::conv_notebook_2_py(res);
    ASSERT_TRUE(conv_res);
}

GTEST_TEST(ConvNB2PyTests, NB2IPyNBLocal) {
    auto res = ConvUtil::get_all_note_books(zepplin_js_path, DataSource::LOCAL);
    auto conv_res = ConvUtil::conv_notebook_2_ipynb(res);
    ASSERT_TRUE(conv_res);
}

GTEST_TEST(ConvNB2PyTests, NB2PyDB) {
    auto res = ConvUtil::get_all_note_books(zepplin_js_path, DataSource::DB);
    auto conv_res = ConvUtil::conv_notebook_2_py(res);
    ASSERT_TRUE(conv_res);
}

GTEST_TEST(ConvNB2PyTests, NB2IPyNBDB) {
    auto res = ConvUtil::get_all_note_books(zepplin_js_path, DataSource::DB);
    auto conv_res = ConvUtil::conv_notebook_2_ipynb(res);
    ASSERT_TRUE(conv_res);
}

GTEST_TEST(ConvNB2PyTests, IPyNBImport) {
    auto conv_res = ImportUtil::import_note_books(ipynbs_test_path);
    ASSERT_TRUE(conv_res);
}

include/utils/conv_util.h

#ifndef _FREDRIC_ZEPPLIN_MIG_CONV_UTIL_H_
#define _FREDRIC_ZEPPLIN_MIG_CONV_UTIL_H_

#include <vector>
#include <string>

enum class DataSource {
    LOCAL,
    DB
};

struct Notebook {
    std::string nb_content;
    std::string py_name;
    std::string ipy_name;
};

struct ConvUtil {
    static std::vector<Notebook> get_all_note_books(std::string const& note_path, DataSource const& source);

    static bool conv_notebook_2_py(std::vector<Notebook> const& note_books);

    static bool conv_notebook_2_ipynb(std::vector<Notebook> const& note_books);
};

#endif

include/utils/conv_util.cpp

#include "utils/conv_util.h"
#include "utils/ipy_consts.h"
#include "utils/zepplin_mig_cfg.h"

#include "pystring/pystring.h"
#include "sf_db2/sf_db2.h"

#include "json/json.hpp"
#include "api_accuracy/utils/io_util.h"

#include <filesystem>
#include <iostream>
#include <sstream>

namespace fs = std::filesystem;
using json = nlohmann::json;

Notebook make_a_notebook(std::string const& nb_content) {
    auto nb_js = json::parse(nb_content);
    auto name_ = nb_js["name"].get<std::string>();
    
    std::vector<std::string> name_list;
    pystring::split(name_, name_list, "/");

    std::stringstream ss_name {};
    for(auto& name_seg: name_list) {
        if(!name_seg.empty()) {
            ss_name << name_seg;
            ss_name << "#";
        }
    }

    auto final_name = ss_name.str();
    final_name = final_name.substr(0, final_name.size()-1);

    auto book_name = final_name  + ".py";
    auto ipy_book_name = final_name  + ".ipynb";
    
    return Notebook{nb_content, book_name, ipy_book_name};
}


std::vector<Notebook> get_all_note_books_from_local_dir(std::string const& note_path) {
    std::vector<Notebook> ret_books{}; 
    for(auto const& entry: fs::directory_iterator(note_path)) {
        std::string path_ = entry.path();
        // 不是json文件
        if(!pystring::endswith(path_, ".json")) {
            continue;
        }
        auto nb_content = IOUtil::read_file(path_);
        ret_books.emplace_back(std::move(make_a_notebook(nb_content)));
    }
    return ret_books;
}

std::vector<Notebook> get_all_note_books_from_db() {
    std::vector<Notebook> ret_books{};
    sf_connection sf{conn_str};
    std::string raw_query = "select NOTE from ONE_SERVICE_TEST_DB_AABJFZHU_DATA.PUBLIC.zeppelin_notes;";
    auto res = sf.exec_raw_query(raw_query);
    const std::string null_value = "null";
    const auto columns = res.columns();

    while (res.next()) {
        auto const value = res.get<std::string>("NOTE", null_value);
        std::cout << value << "\n";
        ret_books.emplace_back(std::move(make_a_notebook(value)));
    }
    return ret_books;
}

std::vector<Notebook> ConvUtil::get_all_note_books(std::string const& note_path, DataSource const& source) {   
    if(source == DataSource::LOCAL) {
        return get_all_note_books_from_local_dir(note_path);
    } else {
        return get_all_note_books_from_db();
    }
}  


bool ConvUtil::conv_notebook_2_py(std::vector<Notebook> const& note_books) {
    for(auto const& nb: note_books) {
        auto nb_content = nb.nb_content;
        auto nb_js = json::parse(nb_content);
        auto nb_paragraphs= nb_js["paragraphs"];
        std::stringstream ss;
        auto i {0};
        for(auto const& nb_para: nb_paragraphs) {
            if(nb_para.contains("text")) {
                auto un_format_para = nb_para["text"].get<std::string>();
                if(!pystring::startswith(un_format_para, "%sh")) {
                    un_format_para = pystring::replace(un_format_para, "%pyspark", "");
                    ss << "## Section " << ++i << " : \r\n" << un_format_para << "\r\n\r\n";
                }
            }
        }
        std::string out_file_name = "../pybooks/" + nb.py_name;
        IOUtil::write_file(out_file_name, ss.str());
    }
    return true;
}

bool ConvUtil::conv_notebook_2_ipynb(std::vector<Notebook> const& note_books) {
    for(auto const& nb: note_books) {
        auto ipynb_temp = json::parse(ipynb_template);
        auto meta_temp = json::parse(meta_template);
        ipynb_temp["metadata"] = meta_temp;

        auto nb_content = nb.nb_content;
        auto nb_js = json::parse(nb_content);
        auto nb_paragraphs= nb_js["paragraphs"];
        auto i {0};
        for(auto const& nb_para: nb_paragraphs) {
            if(nb_para.find("text") == nb_para.end()) {
                continue;
            }
            
            auto un_format_para = nb_para["text"].get<std::string>();
          
            std::stringstream ss;
            auto ipy_nb_cell_temp = json::parse(cell_template);
            auto id = nb_para["id"].get<std::string>();
            ipy_nb_cell_temp["id"] = id;

            if(pystring::startswith(un_format_para, "%sh")) {
                un_format_para = pystring::replace(un_format_para, "%sh", "%%sh");
            } else {
                un_format_para = pystring::replace(un_format_para, "%pyspark", "");
            }
            
            ss << un_format_para;

            ipy_nb_cell_temp["source"].emplace_back(std::move(ss.str()));
            ipynb_temp["cells"].emplace_back(std::move(ipy_nb_cell_temp));
        }
        std::string out_file_name = "../ipynbs/" + nb.ipy_name;
        IOUtil::write_file(out_file_name, ipynb_temp.dump());
    }
    return true;
}

include/utils/import_util.h

#ifndef _FREDRIC_ZEPPLIN_MIG_IMPORT_UTIL_H_
#define _FREDRIC_ZEPPLIN_MIG_IMPORT_UTIL_H_

#include <string>

struct DbsPath {
    std::string path;
    std::string dir;
    bool is_valid;
};

struct ImportUtil {
    static bool import_note_books(std::string const& note_path);
};

#endif

include/utils/import_util.cpp

#include "utils/import_util.h"

#include <boost/beast/core/detail/base64.hpp>
#include <filesystem>

#include "api_accuracy/utils/io_util.h"
#include "http/http_util.h"
#include "json/json.hpp"
#include "pystring/pystring.h"
#include "utils/zepplin_mig_cfg.h"

namespace fs = std::filesystem;
using json = nlohmann::json;

std::string b64_encode(std::string const& raw_content) {
    auto len_ = raw_content.size();
    size_t encoded_size = boost::beast::detail::base64::encoded_size(len_);
    std::string base64_data;
    base64_data.resize(encoded_size);
    boost::beast::detail::base64::encode(&base64_data[0], &raw_content[0],
                                         len_);
    return base64_data;
}

DbsPath get_dbs_path(std::string const& path_) {
    std::vector<std::string> paths_arr;
    pystring::split(path_, paths_arr, "/");
    auto file_name = paths_arr[paths_arr.size() - 1];

    std::vector<std::string> file_names_arr;
    pystring::split(file_name, file_names_arr, "#");
    // 不是垃圾箱里面的文件
    if (file_names_arr[0] == "~Trash") {
        return DbsPath{"", "", false};
    }
    std::string dst_file_name{};
    std::stringstream ss;
    ss << "/Shared/";
    for (auto tmp_file_name_ : file_names_arr) {
        ss << tmp_file_name_ << "/";
    }

    auto final_name = ss.str();
    dst_file_name = final_name.substr(0, final_name.size() - 1);
    // Create dir
    auto pos = dst_file_name.find_last_of("/");
    auto dbs_path_ = dst_file_name.substr(0, pos);
    return DbsPath{dst_file_name, dbs_path_, true};
}

bool create_dbs_ws_dir(DbsPath const& dbs_path) {
    json create_dir_body;
    create_dir_body["path"] = dbs_path.dir;
    auto create_dir_body_str = create_dir_body.dump();
    std::string create_dir_post_res{};
    auto create_dir_res = HttpUtil::post_and_get_str(
        databricks_ws_host, ws_create_dir_url_path, headers,
        create_dir_body_str, create_dir_post_res);

    if (!create_dir_res) {
        std::cerr << create_dir_post_res << "\n";
        return false;
    }
    return true;
}

bool create_dbs_ws_file(DbsPath const& dbs_path, std::string const& base64_data) {
    // Create workspace file
    json body;

    body["path"] = dbs_path.path;
    body["content"] = base64_data;
    body["language"] = "PYTHON";
    body["overwrite"] = true;
    body["format"] = "JUPYTER";
    auto body_str = body.dump();

    std::string post_res{};
    auto import_res = HttpUtil::post_and_get_str(
        databricks_ws_host, ws_import_url_path, headers, body_str, post_res);

    if (!import_res) {
        std::cerr << post_res << "\n";
        return false;
    }
    return true;
}

bool ImportUtil::import_note_books(std::string const& note_path) {
    std::set<std::string> exist_paths{};
    for (auto const& entry : fs::directory_iterator(note_path)) {
        std::string path_ = entry.path();
        auto dbs_path = get_dbs_path(path_);
        if (!dbs_path.is_valid) {
            continue;
        }
        auto raw_content = IOUtil::read_file(path_);
        // Encode raw content to base64 encoded data
        auto base64_data = b64_encode(raw_content);
        // 如果exist_paths不包含这个dir,才需要创建
        // 创建不存在的dbs 目录
        if (exist_paths.find(dbs_path.dir) == exist_paths.end()) {
            bool res = create_dbs_ws_dir(dbs_path);
            exist_paths.insert(dbs_path.dir);
            if (!res) {
                return false;
            }
        }
        // 上载 dbs notebook
        bool create_file_res = create_dbs_ws_file(dbs_path, base64_data);
        if(!create_file_res) {
            return false;
        }
    }
    return true;
}

include/utils/ipy_consts.h

#ifndef _FREDRIC_IPY_CONSTS_H_
#define _FREDRIC_IPY_CONSTS_H_
#include <string>
extern std::string ipynb_template;
extern std::string cell_template;
extern std::string meta_template;
#endif

include/utils/ipy_consts.cpp

#include "utils/ipy_consts.h"

std::string ipynb_template = R"(
{
 "cells": [],
 "metadata": {},
 "nbformat": 4,
 "nbformat_minor": 5
})";

std::string cell_template =  R"({
   "cell_type": "code",
   "execution_count": 0,
   "id": "20210811-090428_1531045589",
   "metadata": {},
   "outputs": [
   ],
   "source": [
   ]
  })";

std::string meta_template = R"(
{
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.8"
  }
}
)";

include/utils/zepplin_mig_cfg.h

#ifndef _FREDRIC_ZEPPLIN_MIG_CFG_H_
#define _FREDRIC_ZEPPLIN_MIG_CFG_H_
#include <map>
#include <string>

extern std::string zepplin_js_path;
extern std::string ipynbs_test_path;
extern std::string conn_str;
extern std::string databricks_ws_host;
extern std::string ws_create_dir_url_path;
extern std::string ws_import_url_path;

extern std::map<std::string, std::string> headers;
#endif

include/utils/zepplin_mig_cfg.cpp

#include "utils/zepplin_mig_cfg.h"

std::string zepplin_js_path = "../nbooks";

std::string ipynbs_test_path = "../ipynbs";

std::string conn_str = "dsn=test_odbc;pwd=Lily870104";
std::string databricks_ws_host = "{DATABRICKS_WORKSPACE_HOST_NAME}";

std::string ws_create_dir_url_path = "/api/2.0/workspace/mkdirs";
std::string ws_import_url_path = "/api/2.0/workspace/import";

std::map<std::string, std::string> headers {{"Authorization", "Bearer {DBS_TOKEN_NAME}"}};

程序最终会完成转换并上传到Databricks Workspace的操作。

相关文章

网友评论

      本文标题:Zeppelin notebook迁移到Databricks第二

      本文链接:https://www.haomeiwen.com/subject/ffmqertx.html