本例先用Spark把Zeppelin Notebook目录下的所有notebook信息都dump到Snowflake数据库。
再使用C++代码从数据库中还原zeppelin notebook,并将Zeppelin Notebook转换为Jupyter Notebook。
最后使用Databricks API将Jupyter Notebook上传到Databricks Workspace。
从S3 dump Zeppelin Notebook的pyspark代码如下,
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
TEST_SF_OPTIONS = {
"sfURL": "{SNOWFLAKE_URL}",
"sfUser": "{SF_USER}",
"sfPassword": "{SF_PASSWORD}",
"sfDatabase": "{SF_DATABASE}",
"sfWarehouse": "{SF_WAREHOUSE}",
"dbtable": "zeppelin_notes"
}
## Whole Text Files 会将所有文本文件读成 _1 路径, _2内容的形式
rdd = sc.wholeTextFiles("{NOTEBOOK_S3_PATH}")
rdd.count()
columns = ["path", "note"]
df = rdd.toDF(columns)
df.show(10, True)
df.write.format(SNOWFLAKE_SOURCE_NAME).options(**TEST_SF_OPTIONS).mode("overwrite").save()
C++侧代码结构如下,
![](https://img.haomeiwen.com/i8982195/8121f564be1ad2ad.png)
test/CMakeLists.txt
cmake_minimum_required(VERSION 2.6)
if(APPLE)
message(STATUS "This is Apple, do nothing.")
set(CMAKE_MACOSX_RPATH 1)
set(CMAKE_PREFIX_PATH /Users/aabjfzhu/software/vcpkg/ports/cppwork/vcpkg_installed/x64-osx/share )
elseif(UNIX)
message(STATUS "This is linux, set CMAKE_PREFIX_PATH.")
set(CMAKE_PREFIX_PATH /vcpkg/ports/cppwork/vcpkg_installed/x64-linux/share)
endif(APPLE)
project(zepplin_mig)
set(CMAKE_CXX_STANDARD 17)
add_definitions(-g)
find_package(ZLIB)
find_package(OpenCV REQUIRED )
find_package(Arrow CONFIG REQUIRED)
find_package(unofficial-brotli REQUIRED)
find_package(unofficial-utf8proc CONFIG REQUIRED)
find_package(Thrift CONFIG REQUIRED)
find_package(glog REQUIRED)
find_package(OpenSSL REQUIRED)
find_package(Boost REQUIRED COMPONENTS
system
filesystem
serialization
program_options
thread
)
find_package(DataFrame REQUIRED)
if(APPLE)
MESSAGE(STATUS "This is APPLE, set INCLUDE_DIRS")
set(INCLUDE_DIRS ${Boost_INCLUDE_DIRS} /usr/local/include /usr/local/iODBC/include /opt/snowflake/snowflakeodbc/include/ ${CMAKE_CURRENT_SOURCE_DIR}/../include/ ${CMAKE_CURRENT_SOURCE_DIR}/../../../include)
elseif(UNIX)
MESSAGE(STATUS "This is linux, set INCLUDE_DIRS")
set(INCLUDE_DIRS ${Boost_INCLUDE_DIRS} /usr/local/include ${CMAKE_CURRENT_SOURCE_DIR}/../include/ ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/)
endif(APPLE)
if(APPLE)
MESSAGE(STATUS "This is APPLE, set LINK_DIRS")
set(LINK_DIRS /usr/local/lib /usr/local/iODBC/lib /opt/snowflake/snowflakeodbc/lib/universal)
elseif(UNIX)
MESSAGE(STATUS "This is linux, set LINK_DIRS")
set(LINK_DIRS ${Boost_INCLUDE_DIRS} /usr/local/lib /vcpkg/ports/cppwork/vcpkg_installed/x64-linux/lib)
endif(APPLE)
if(APPLE)
MESSAGE(STATUS "This is APPLE, set ODBC_LIBS")
set(ODBC_LIBS iodbc iodbcinst)
elseif(UNIX)
MESSAGE(STATUS "This is linux, set LINK_DIRS")
set(ODBC_LIBS odbc odbcinst ltdl)
endif(APPLE)
include_directories(${INCLUDE_DIRS})
LINK_DIRECTORIES(${LINK_DIRS})
file( GLOB test_file_list ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/../impl/utils/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../include/utils/*.h ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/arr_/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/http/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/yaml/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/df/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/death_handler/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/api_accuracy/utils/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/api_accuracy/impl/*.cpp)
add_library(${PROJECT_NAME}_lib SHARED ${APP_SOURCES} ${test_file})
target_link_libraries(${PROJECT_NAME}_lib ${Boost_LIBRARIES} ZLIB::ZLIB glog::glog DataFrame::DataFrame ${OpenCV_LIBS})
target_link_libraries(${PROJECT_NAME}_lib OpenSSL::SSL OpenSSL::Crypto libgtest.a pystring libyaml-cpp.a libgmock.a ${ODBC_LIBS} libnanodbc.a pthread dl backtrace libzstd.a libbz2.a libsnappy.a re2::re2 parquet lz4 unofficial::brotli::brotlidec-static unofficial::brotli::brotlienc-static unofficial::brotli::brotlicommon-static utf8proc thrift::thrift arrow arrow_dataset)
foreach( test_file ${test_file_list} )
file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${test_file})
string(REPLACE ".cpp" "" file ${filename})
add_executable(${file} ${test_file})
target_link_libraries(${file} ${PROJECT_NAME}_lib)
endforeach( test_file ${test_file_list})
test/conv_nb_2_py_test.cpp
#include "death_handler/death_handler.h"
#include "json/json.hpp"
#include <glog/logging.h>
#include "utils/conv_util.h"
#include "utils/import_util.h"
#include "utils/zepplin_mig_cfg.h"
#include <gtest/gtest.h>
#include "df/df.h"
using json = nlohmann::json;
int main(int argc, char** argv) {
FLAGS_log_dir = "./";
FLAGS_alsologtostderr = true;
// 日志级别 INFO, WARNING, ERROR, FATAL 的值分别为0、1、2、3
FLAGS_minloglevel = 0;
Debug::DeathHandler dh;
google::InitGoogleLogging("./logs.log");
testing::InitGoogleTest(&argc, argv);
int ret = RUN_ALL_TESTS();
return ret;
}
GTEST_TEST(ConvNB2PyTests, NB2PyLocal) {
auto res = ConvUtil::get_all_note_books(zepplin_js_path, DataSource::LOCAL);
auto conv_res = ConvUtil::conv_notebook_2_py(res);
ASSERT_TRUE(conv_res);
}
GTEST_TEST(ConvNB2PyTests, NB2IPyNBLocal) {
auto res = ConvUtil::get_all_note_books(zepplin_js_path, DataSource::LOCAL);
auto conv_res = ConvUtil::conv_notebook_2_ipynb(res);
ASSERT_TRUE(conv_res);
}
GTEST_TEST(ConvNB2PyTests, NB2PyDB) {
auto res = ConvUtil::get_all_note_books(zepplin_js_path, DataSource::DB);
auto conv_res = ConvUtil::conv_notebook_2_py(res);
ASSERT_TRUE(conv_res);
}
GTEST_TEST(ConvNB2PyTests, NB2IPyNBDB) {
auto res = ConvUtil::get_all_note_books(zepplin_js_path, DataSource::DB);
auto conv_res = ConvUtil::conv_notebook_2_ipynb(res);
ASSERT_TRUE(conv_res);
}
GTEST_TEST(ConvNB2PyTests, IPyNBImport) {
auto conv_res = ImportUtil::import_note_books(ipynbs_test_path);
ASSERT_TRUE(conv_res);
}
include/utils/conv_util.h
#ifndef _FREDRIC_ZEPPLIN_MIG_CONV_UTIL_H_
#define _FREDRIC_ZEPPLIN_MIG_CONV_UTIL_H_
#include <vector>
#include <string>
enum class DataSource {
LOCAL,
DB
};
struct Notebook {
std::string nb_content;
std::string py_name;
std::string ipy_name;
};
struct ConvUtil {
static std::vector<Notebook> get_all_note_books(std::string const& note_path, DataSource const& source);
static bool conv_notebook_2_py(std::vector<Notebook> const& note_books);
static bool conv_notebook_2_ipynb(std::vector<Notebook> const& note_books);
};
#endif
include/utils/conv_util.cpp
#include "utils/conv_util.h"
#include "utils/ipy_consts.h"
#include "utils/zepplin_mig_cfg.h"
#include "pystring/pystring.h"
#include "sf_db2/sf_db2.h"
#include "json/json.hpp"
#include "api_accuracy/utils/io_util.h"
#include <filesystem>
#include <iostream>
#include <sstream>
namespace fs = std::filesystem;
using json = nlohmann::json;
Notebook make_a_notebook(std::string const& nb_content) {
auto nb_js = json::parse(nb_content);
auto name_ = nb_js["name"].get<std::string>();
std::vector<std::string> name_list;
pystring::split(name_, name_list, "/");
std::stringstream ss_name {};
for(auto& name_seg: name_list) {
if(!name_seg.empty()) {
ss_name << name_seg;
ss_name << "#";
}
}
auto final_name = ss_name.str();
final_name = final_name.substr(0, final_name.size()-1);
auto book_name = final_name + ".py";
auto ipy_book_name = final_name + ".ipynb";
return Notebook{nb_content, book_name, ipy_book_name};
}
std::vector<Notebook> get_all_note_books_from_local_dir(std::string const& note_path) {
std::vector<Notebook> ret_books{};
for(auto const& entry: fs::directory_iterator(note_path)) {
std::string path_ = entry.path();
// 不是json文件
if(!pystring::endswith(path_, ".json")) {
continue;
}
auto nb_content = IOUtil::read_file(path_);
ret_books.emplace_back(std::move(make_a_notebook(nb_content)));
}
return ret_books;
}
std::vector<Notebook> get_all_note_books_from_db() {
std::vector<Notebook> ret_books{};
sf_connection sf{conn_str};
std::string raw_query = "select NOTE from ONE_SERVICE_TEST_DB_AABJFZHU_DATA.PUBLIC.zeppelin_notes;";
auto res = sf.exec_raw_query(raw_query);
const std::string null_value = "null";
const auto columns = res.columns();
while (res.next()) {
auto const value = res.get<std::string>("NOTE", null_value);
std::cout << value << "\n";
ret_books.emplace_back(std::move(make_a_notebook(value)));
}
return ret_books;
}
std::vector<Notebook> ConvUtil::get_all_note_books(std::string const& note_path, DataSource const& source) {
if(source == DataSource::LOCAL) {
return get_all_note_books_from_local_dir(note_path);
} else {
return get_all_note_books_from_db();
}
}
bool ConvUtil::conv_notebook_2_py(std::vector<Notebook> const& note_books) {
for(auto const& nb: note_books) {
auto nb_content = nb.nb_content;
auto nb_js = json::parse(nb_content);
auto nb_paragraphs= nb_js["paragraphs"];
std::stringstream ss;
auto i {0};
for(auto const& nb_para: nb_paragraphs) {
if(nb_para.contains("text")) {
auto un_format_para = nb_para["text"].get<std::string>();
if(!pystring::startswith(un_format_para, "%sh")) {
un_format_para = pystring::replace(un_format_para, "%pyspark", "");
ss << "## Section " << ++i << " : \r\n" << un_format_para << "\r\n\r\n";
}
}
}
std::string out_file_name = "../pybooks/" + nb.py_name;
IOUtil::write_file(out_file_name, ss.str());
}
return true;
}
bool ConvUtil::conv_notebook_2_ipynb(std::vector<Notebook> const& note_books) {
for(auto const& nb: note_books) {
auto ipynb_temp = json::parse(ipynb_template);
auto meta_temp = json::parse(meta_template);
ipynb_temp["metadata"] = meta_temp;
auto nb_content = nb.nb_content;
auto nb_js = json::parse(nb_content);
auto nb_paragraphs= nb_js["paragraphs"];
auto i {0};
for(auto const& nb_para: nb_paragraphs) {
if(nb_para.find("text") == nb_para.end()) {
continue;
}
auto un_format_para = nb_para["text"].get<std::string>();
std::stringstream ss;
auto ipy_nb_cell_temp = json::parse(cell_template);
auto id = nb_para["id"].get<std::string>();
ipy_nb_cell_temp["id"] = id;
if(pystring::startswith(un_format_para, "%sh")) {
un_format_para = pystring::replace(un_format_para, "%sh", "%%sh");
} else {
un_format_para = pystring::replace(un_format_para, "%pyspark", "");
}
ss << un_format_para;
ipy_nb_cell_temp["source"].emplace_back(std::move(ss.str()));
ipynb_temp["cells"].emplace_back(std::move(ipy_nb_cell_temp));
}
std::string out_file_name = "../ipynbs/" + nb.ipy_name;
IOUtil::write_file(out_file_name, ipynb_temp.dump());
}
return true;
}
include/utils/import_util.h
#ifndef _FREDRIC_ZEPPLIN_MIG_IMPORT_UTIL_H_
#define _FREDRIC_ZEPPLIN_MIG_IMPORT_UTIL_H_
#include <string>
struct DbsPath {
std::string path;
std::string dir;
bool is_valid;
};
struct ImportUtil {
static bool import_note_books(std::string const& note_path);
};
#endif
include/utils/import_util.cpp
#include "utils/import_util.h"
#include <boost/beast/core/detail/base64.hpp>
#include <filesystem>
#include "api_accuracy/utils/io_util.h"
#include "http/http_util.h"
#include "json/json.hpp"
#include "pystring/pystring.h"
#include "utils/zepplin_mig_cfg.h"
namespace fs = std::filesystem;
using json = nlohmann::json;
std::string b64_encode(std::string const& raw_content) {
auto len_ = raw_content.size();
size_t encoded_size = boost::beast::detail::base64::encoded_size(len_);
std::string base64_data;
base64_data.resize(encoded_size);
boost::beast::detail::base64::encode(&base64_data[0], &raw_content[0],
len_);
return base64_data;
}
DbsPath get_dbs_path(std::string const& path_) {
std::vector<std::string> paths_arr;
pystring::split(path_, paths_arr, "/");
auto file_name = paths_arr[paths_arr.size() - 1];
std::vector<std::string> file_names_arr;
pystring::split(file_name, file_names_arr, "#");
// 不是垃圾箱里面的文件
if (file_names_arr[0] == "~Trash") {
return DbsPath{"", "", false};
}
std::string dst_file_name{};
std::stringstream ss;
ss << "/Shared/";
for (auto tmp_file_name_ : file_names_arr) {
ss << tmp_file_name_ << "/";
}
auto final_name = ss.str();
dst_file_name = final_name.substr(0, final_name.size() - 1);
// Create dir
auto pos = dst_file_name.find_last_of("/");
auto dbs_path_ = dst_file_name.substr(0, pos);
return DbsPath{dst_file_name, dbs_path_, true};
}
bool create_dbs_ws_dir(DbsPath const& dbs_path) {
json create_dir_body;
create_dir_body["path"] = dbs_path.dir;
auto create_dir_body_str = create_dir_body.dump();
std::string create_dir_post_res{};
auto create_dir_res = HttpUtil::post_and_get_str(
databricks_ws_host, ws_create_dir_url_path, headers,
create_dir_body_str, create_dir_post_res);
if (!create_dir_res) {
std::cerr << create_dir_post_res << "\n";
return false;
}
return true;
}
bool create_dbs_ws_file(DbsPath const& dbs_path, std::string const& base64_data) {
// Create workspace file
json body;
body["path"] = dbs_path.path;
body["content"] = base64_data;
body["language"] = "PYTHON";
body["overwrite"] = true;
body["format"] = "JUPYTER";
auto body_str = body.dump();
std::string post_res{};
auto import_res = HttpUtil::post_and_get_str(
databricks_ws_host, ws_import_url_path, headers, body_str, post_res);
if (!import_res) {
std::cerr << post_res << "\n";
return false;
}
return true;
}
bool ImportUtil::import_note_books(std::string const& note_path) {
std::set<std::string> exist_paths{};
for (auto const& entry : fs::directory_iterator(note_path)) {
std::string path_ = entry.path();
auto dbs_path = get_dbs_path(path_);
if (!dbs_path.is_valid) {
continue;
}
auto raw_content = IOUtil::read_file(path_);
// Encode raw content to base64 encoded data
auto base64_data = b64_encode(raw_content);
// 如果exist_paths不包含这个dir,才需要创建
// 创建不存在的dbs 目录
if (exist_paths.find(dbs_path.dir) == exist_paths.end()) {
bool res = create_dbs_ws_dir(dbs_path);
exist_paths.insert(dbs_path.dir);
if (!res) {
return false;
}
}
// 上载 dbs notebook
bool create_file_res = create_dbs_ws_file(dbs_path, base64_data);
if(!create_file_res) {
return false;
}
}
return true;
}
include/utils/ipy_consts.h
#ifndef _FREDRIC_IPY_CONSTS_H_
#define _FREDRIC_IPY_CONSTS_H_
#include <string>
extern std::string ipynb_template;
extern std::string cell_template;
extern std::string meta_template;
#endif
include/utils/ipy_consts.cpp
#include "utils/ipy_consts.h"
std::string ipynb_template = R"(
{
"cells": [],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 5
})";
std::string cell_template = R"({
"cell_type": "code",
"execution_count": 0,
"id": "20210811-090428_1531045589",
"metadata": {},
"outputs": [
],
"source": [
]
})";
std::string meta_template = R"(
{
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.8"
}
}
)";
include/utils/zepplin_mig_cfg.h
#ifndef _FREDRIC_ZEPPLIN_MIG_CFG_H_
#define _FREDRIC_ZEPPLIN_MIG_CFG_H_
#include <map>
#include <string>
extern std::string zepplin_js_path;
extern std::string ipynbs_test_path;
extern std::string conn_str;
extern std::string databricks_ws_host;
extern std::string ws_create_dir_url_path;
extern std::string ws_import_url_path;
extern std::map<std::string, std::string> headers;
#endif
include/utils/zepplin_mig_cfg.cpp
#include "utils/zepplin_mig_cfg.h"
std::string zepplin_js_path = "../nbooks";
std::string ipynbs_test_path = "../ipynbs";
std::string conn_str = "dsn=test_odbc;pwd=Lily870104";
std::string databricks_ws_host = "{DATABRICKS_WORKSPACE_HOST_NAME}";
std::string ws_create_dir_url_path = "/api/2.0/workspace/mkdirs";
std::string ws_import_url_path = "/api/2.0/workspace/import";
std::map<std::string, std::string> headers {{"Authorization", "Bearer {DBS_TOKEN_NAME}"}};
程序最终会完成转换并上传到Databricks Workspace的操作。
网友评论