美文网首页
C++11 Snowflake简单查询器封装[ODBC]

C++11 Snowflake简单查询器封装[ODBC]

作者: FredricZhu | 来源:发表于2021-08-14 14:52 被阅读0次

    把昨天那个简单封装了一下,作为一个万能的查询器吧,可以做简单的SQL查询。
    因为公司的生产环境对测试是只读的,所以没做 insert和update封装。
    如果有需求的话,可以自己加。
    程序的结构和昨天一样,但是可以支持各种各样的查询了。


    image.png

    CMakeLists.txt

    
    cmake_minimum_required(VERSION 2.6)
    project(ref_demo2_test)
    
    add_definitions(-std=c++14)
    add_definitions(-g)
    
    
    
    find_package(Boost REQUIRED COMPONENTS
        system
        filesystem
        serialization
        program_options
        thread
        )
    
    include_directories(${Boost_INCLUDE_DIRS} /usr/local/include /usr/local/iODBC/include /opt/snowflake/snowflakeodbc/include/ ${CMAKE_CURRENT_SOURCE_DIR}/../../)
    
    LINK_DIRECTORIES(/usr/local/lib /usr/local/iODBC/lib /opt/snowflake/snowflakeodbc/lib/universal)
    
    file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/*.h
        ${CMAKE_CURRENT_SOURCE_DIR}/../impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
    foreach( sourcefile ${APP_SOURCES} )
            file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
        
            string(FIND "${filename}"  "test.cpp" "TEMP")
        if( NOT "${TEMP}" STREQUAL "-1" )
            string(REPLACE ".cpp" "" file ${filename})
            add_executable(${file}  ${APP_SOURCES})
            target_link_libraries(${file} ${Boost_LIBRARIES})
            target_link_libraries(${file}  ssl crypto libgtest.a libgtest_main.a libgmock.a iodbc iodbcinst pthread)
        endif()
    endforeach( sourcefile ${APP_SOURCES})
    

    sf_db.h

    #ifndef _FREDRIC_SF_DB_H_
    #define _FREDRIC_SF_DB_H_
    
    #include "sf_odbc.h"
    #include "sql.h"
    #include "sqlext.h"
    
    #include <vector>
    #include <string>
    #include <iostream>
    #include <map>
    
    #include <cassert>
    
    struct DBAuthorization {
        std::string dataSource{};
        std::string password{};
    };
    
    /**
     * iodbc
     * /usr/local/iODBC/include
     * /usr/local/iODBC/lib
     * */
    
    struct SFConnection {
    
        using DB_RETURN_TYPE = std::vector<std::map<std::string, std::string>>;
    
        SFConnection(DBAuthorization auth);
        ~SFConnection();
    
        /** 
         * 执行查询的函数
         * @param query_string 原始查询字符串,注意这里不支持select *, 一般也不会用select * 因为性能比较差
         * @return 返回查询的结果集
         * 
         * example:
         *  select product_key, device_code from dim_product_v1 limit 2 将会返回
         *  std::vector { std::map
         *      {"product_key": "111", "device_code": "ios-all"},
         *      {"product_key": "111", "device_code": "google-play"}
         *  }
         */
        DB_RETURN_TYPE exec_query(std::string query_string);
    
        /**
         * 从Query String里面解析出要查询哪些字段的函数
         * @param query_string 原始查询字符串,注意这里不支持select *, 一般也不会用select * 因为性能比较差
         * @return 返回解析出的查询字段
         * 
         * example:
         *  select product_key, device_code from dim_product_v1 将会解析出 std::vector {"product_key", "device_code"}
         */
        std::vector<std::string> parse_fields_from_query_string(std::string query_string);
    
        private:
            
            DBAuthorization m_auth{};
            SQLHENV henv;
            SQLHDBC hdbc;
            SQLRETURN henv_ret{-1};
            SQLRETURN hdbc_conn_ret{-1};
            SQLRETURN hdbc_ret{-1};
    };
    
    
    #endif
    

    sf_db.cpp

    #include "sf_db/sf_db.h"
    
    #include <algorithm>
    #include <sstream>
    #include <boost/algorithm/string.hpp>
    
    void checkError(SQLRETURN retcode, std::string msg) {
        if ((retcode != SQL_SUCCESS) && (retcode = !SQL_SUCCESS_WITH_INFO)) {
            std::cerr << msg << std::endl;
        }
        assert(retcode == SQL_SUCCESS || retcode == SQL_SUCCESS_WITH_INFO);
    }
    
    bool checkErrorCode(SQLRETURN retcode) {
        if ((retcode != SQL_SUCCESS) && (retcode = !SQL_SUCCESS_WITH_INFO)) {
            return false;
        }
        return true;
    }
    
    SFConnection::SFConnection(DBAuthorization auth) : m_auth(auth) {
        // Allocate environment handle
        henv_ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &henv);
        checkError(henv_ret, "SQLAllocHandle failed");
        // Set the ODBC version environment attribute
    
        henv_ret =
            SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (void *)SQL_OV_ODBC3, 0);
    
        checkError(henv_ret, "SQLSetEnvAttr failed");
        // Allocate connection handle
    
        hdbc_ret = SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc);
        checkError(hdbc_ret, "SQLAllocHandle failed");
        // Set login timeout to 5 seconds
    
        SQLSetConnectAttr(hdbc, SQL_LOGIN_TIMEOUT, (SQLPOINTER)5, 0);
    
        // Connect to data source
        hdbc_conn_ret = SQLConnect(hdbc, (SQLCHAR *)m_auth.dataSource.data(),
                                   SQL_NTS, (SQLCHAR *)NULL, 0,
                                   (SQLCHAR *)m_auth.password.data(), SQL_NTS);
    
        checkError(hdbc_conn_ret, "SQLConnect connect to product_odbc failed");
    
        std::cout << "connect to product_odbc success!" << std::endl;
    }
    
    std::vector<std::string> SFConnection::parse_fields_from_query_string(
        std::string query_string) {
    
        std::vector<std::string> result {};
        std::transform(query_string.begin(), query_string.end(),
                       query_string.begin(),
                       [](unsigned char c) { return std::tolower(c); });
        auto start = query_string.find("select") + 6;
        auto end = query_string.find("from") - 1;
        auto fields_string = query_string.substr(start, end - start);
        std::stringstream ss(fields_string);
        std::string field{};
        while(std::getline(ss, field, ',')) {
            boost::trim(field);
            result.emplace_back(std::move(field));
        }
        return std::move(result);
    }
    
    SFConnection::DB_RETURN_TYPE SFConnection::exec_query(
        std::string query_string) {
        DB_RETURN_TYPE results;
    
        std::vector<std::string> field_names = parse_fields_from_query_string(query_string);
    
        SQLHSTMT hstmt;
    
        std::vector<std::array<SQLCHAR, 2000>> field_values {};
        std::array<SQLCHAR, 2000> field_value {};
        field_values.resize(field_names.size(), field_value);
    
        std::vector<SQLLEN> field_lens {};
        field_lens.resize(field_names.size(), 0);
    
        // Allocate statement handle
    
        auto hstmt_ret = SQLAllocHandle(SQL_HANDLE_STMT, hdbc, &hstmt);
    
        if (!checkErrorCode(hstmt_ret)) {
            std::cerr << "SQLAllocHandle statement failed!" << std::endl;
            return results;
        }
    
        auto retcode =
            SQLExecDirect(hstmt, (SQLCHAR *)query_string.data(), SQL_NTS);
    
        if (!checkErrorCode(retcode)) {
            std::cerr << "SQLExecDirect failed!" << std::endl;
            goto ERROR_;
        }
    
        for(int i=0; i<field_names.size(); ++i) {
            SQLCHAR* tmpFieldValue = field_values[i].data();
            retcode = SQLBindCol(hstmt, i+1, SQL_C_CHAR, tmpFieldValue, 2000, &field_lens[i]);
        }
    
        // Fetch and print each row of data until
        // SQL_NO_DATA returned.
        for (int i = 0;; i++) {
            retcode = SQLFetch(hstmt);
            if (retcode == SQL_SUCCESS || retcode == SQL_SUCCESS_WITH_INFO) {
                std::cout << i+1 << " ";
                std::map<std::string, std::string> tmp_fields{};
    
                for(int j=0; j<field_names.size(); ++j) {
                    std::string temp_field((const char*)field_values[j].data(), field_lens[j]);
                    std::cout << temp_field << " ";
                    tmp_fields[field_names[j]] = temp_field;
                }
                std::cout << std::endl;
    
                results.emplace_back(std::move(tmp_fields));
    
            } else {
                if (retcode != SQL_NO_DATA) {
                    std::cout << "SQLFetch Error, error code: " << retcode
                              << std::endl;
                    break;
                } else {
                    break;
                }
            }
        }
    ERROR_:
        std::cout << "Free statement" << std::endl;
        SQLFreeHandle(SQL_HANDLE_STMT, hstmt);
        return std::move(results);
    }
    
    SFConnection::~SFConnection() {
        if (checkErrorCode(hdbc_conn_ret)) {
            SQLDisconnect(hdbc);
            std::cout << "Free connection" << std::endl;
        }
    
        if (checkErrorCode(hdbc_ret)) {
            SQLFreeHandle(SQL_HANDLE_DBC, hdbc);
            std::cout << "Free dbc" << std::endl;
        }
    
        if (checkErrorCode(henv_ret)) {
            SQLFreeHandle(SQL_HANDLE_ENV, henv);
            std::cout << "Free env" << std::endl;
        }
    }
    

    sf_db_test.cpp

    #include "sf_db/sf_db.h"
    
    #include <array>
    #include <gtest/gtest.h>
    
    GTEST_TEST(SFDBTests, TestExecQuery) {
        DBAuthorization auth {"product_odbc", "{YOUR_PASSWORD}"};
        SFConnection sf_conn {auth};
        std::string query_string = "select  product_key,change_time,EVENT_TYPE_NAME, change_column, old_value, new_value,meta from AA_INTELLIGENCE_PRODUCTION.ADL_MASTER.dim_event_service_v1 where  event_type_name='screenshot_change' and product_key=20600000009072 order by change_time desc limit 10;";
    
        auto results = sf_conn.exec_query(query_string);
        ASSERT_EQ(10, results.size());
    
        auto results2 = sf_conn.exec_query(query_string);
        ASSERT_EQ(10, results2.size());
    }
    
    
    GTEST_TEST(SFDBTests, TestExecQueryAndPrintValue) {
        DBAuthorization auth {"product_odbc", "{YOUR_PASSWORD}"};
        SFConnection sf_conn {auth};
        std::string query_string = "select  product_key,change_time,event_type_name, change_column, old_value, new_value,meta from AA_INTELLIGENCE_PRODUCTION.ADL_MASTER.dim_event_service_v1 where  event_type_name='screenshot_change' and product_key=20600000009072 order by change_time desc limit 2;";
    
        auto results = sf_conn.exec_query(query_string);
        ASSERT_EQ(2, results.size());
        std::cout << "product_key,  change_time,  event_type_name,  change_column,  old_value,  new_value,  meta" 
            << std::endl;
        
        for(auto&& result: results) {
            std::cout << " " << result["product_key"] << " " << result["change_time"] <<
                " " << result["event_type_name"] << " " << result["change_column"] <<
                " " << result["old_value"] << " " << result["new_value"] << 
                " " << result["meta"] << std::endl;
        }
    }
    
    
    GTEST_TEST(SFDBTests, TestParseFields) {
        DBAuthorization auth {"product_odbc", "{YOUR_PASSWORD}"};
        SFConnection sf_conn {auth};
        std::string query_string = "select product_key, meta from AA_INTELLIGENCE_PRODUCTION.ADL_MASTER.dim_event_service_v1 where  event_type_name='screenshot_change' and product_key=20600000009072 order by change_time desc limit 10;";
    
        auto res = sf_conn.parse_fields_from_query_string(query_string);
        ASSERT_EQ(2, res.size());
        ASSERT_EQ("product_key", res[0]);
        ASSERT_EQ("meta", res[1]);
    }
    

    程序输出如下,


    image.png

    相关文章

      网友评论

          本文标题:C++11 Snowflake简单查询器封装[ODBC]

          本文链接:https://www.haomeiwen.com/subject/niqebltx.html