美文网首页我爱编程
Go通过thrift连接HBase

Go通过thrift连接HBase

作者: 一条湫刀鱼 | 来源:发表于2017-08-09 09:50 被阅读499次

    apche thrift2安装手册

    • Problem
    # thrifttest
    ./ThriftTest.go:6503: cannot use thriftTestProcessorTestVoid literal (type *thriftTestProcessorTestVoid) as type thrift.TProcessorFunction in assignment:
            *thriftTestProcessorTestVoid does not implement thrift.TProcessorFunction (wrong type for Process method)
                    have Process(int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
                    want Process(context.Context, int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
    ./ThriftTest.go:6504: cannot use thriftTestProcessorTestString literal (type *thriftTestProcessorTestString) as type thrift.TProcessorFunction in assignment:
            *thriftTestProcessorTestString does not implement thrift.TProcessorFunction (wrong type for Process method)
                    have Process(int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
                    want Process(context.Context, int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
    ./ThriftTest.go:6505: cannot use thriftTestProcessorTestBool literal (type *thriftTestProcessorTestBool) as type thrift.TProcessorFunction in assignment:
            *thriftTestProcessorTestBool does not implement thrift.TProcessorFunction (wrong type for Process method)
                    have Process(int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
                    want Process(context.Context, int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
    ./ThriftTest.go:6506: cannot use thriftTestProcessorTestByte literal (type *thriftTestProcessorTestByte) as type thrift.TProcessorFunction in assignment:
            *thriftTestProcessorTestByte does not implement thrift.TProcessorFunction (wrong type for Process method)
                    have Process(int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
                    want Process(context.Context, int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
    ./ThriftTest.go:6507: cannot use thriftTestProcessorTestI32 literal (type *thriftTestProcessorTestI32) as type thrift.TProcessorFunction in assignment:
            *thriftTestProcessorTestI32 does not implement thrift.TProcessorFunction (wrong type for Process method)
                    have Process(int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
                    want Process(context.Context, int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
    ./ThriftTest.go:6508: cannot use thriftTestProcessorTestI64 literal (type *thriftTestProcessorTestI64) as type thrift.TProcessorFunction in assignment:
            *thriftTestProcessorTestI64 does not implement thrift.TProcessorFunction (wrong type for Process method)
                    have Process(int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
                    want Process(context.Context, int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
    ./ThriftTest.go:6509: cannot use thriftTestProcessorTestDouble literal (type *thriftTestProcessorTestDouble) as type thrift.TProcessorFunction in assignment:
            *thriftTestProcessorTestDouble does not implement thrift.TProcessorFunction (wrong type for Process method)
                    have Process(int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
                    want Process(context.Context, int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
    ./ThriftTest.go:6510: cannot use thriftTestProcessorTestBinary literal (type *thriftTestProcessorTestBinary) as type thrift.TProcessorFunction in assignment:
            *thriftTestProcessorTestBinary does not implement thrift.TProcessorFunction (wrong type for Process method)
                    have Process(int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
                    want Process(context.Context, int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
    ./ThriftTest.go:6511: cannot use thriftTestProcessorTestStruct literal (type *thriftTestProcessorTestStruct) as type thrift.TProcessorFunction in assignment:
            *thriftTestProcessorTestStruct does not implement thrift.TProcessorFunction (wrong type for Process method)
                    have Process(int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
                    want Process(context.Context, int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
    ./ThriftTest.go:6512: cannot use thriftTestProcessorTestNest literal (type *thriftTestProcessorTestNest) as type thrift.TProcessorFunction in assignment:
            *thriftTestProcessorTestNest does not implement thrift.TProcessorFunction (wrong type for Process method)
                    have Process(int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
                    want Process(context.Context, int32, thrift.TProtocol, thrift.TProtocol) (bool, thrift.TException)
    ./ThriftTest.go:6512: too many errors
    
    • Solution
    ➜  thrift.git git:(master) ✗ pwd
    /opt/gowork/src/git.apache.org/thrift.git
    ➜  thrift.git git:(master) ✗ git branch -a
      master
      remotes/origin/0.1.x
    * remotes/origin/0.10.0
      remotes/origin/0.2.x
      remotes/origin/0.3.x
      remotes/origin/0.4.x
      remotes/origin/0.5.x
      remotes/origin/0.6.x
      remotes/origin/0.7.x
      remotes/origin/0.8.x
      remotes/origin/0.9.1
      remotes/origin/0.9.2
      remotes/origin/0.9.3
      remotes/origin/0.9.x
      remotes/origin/HEAD -> origin/master
      remotes/origin/master
      remotes/origin/py-compiler
    
    

    切换到分支remotes/origin/0.10.0


    • 重启HBase

    连接到HBase

    • hbase.thrift
    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    // NOTE: The "required" and "optional" keywords for the service methods are purely for documentation
    
    namespace java org.apache.hadoop.hbase.thrift2.generated
    namespace cpp apache.hadoop.hbase.thrift2
    namespace rb Apache.Hadoop.Hbase.Thrift2
    namespace py hbase
    namespace perl Hbase
    
    struct TTimeRange {
      1: required i64 minStamp,
      2: required i64 maxStamp
    }
    
    /**
     * Addresses a single cell or multiple cells
     * in a HBase table by column family and optionally
     * a column qualifier and timestamp
     */
    struct TColumn {
      1: required binary family,
      2: optional binary qualifier,
      3: optional i64 timestamp
    }
    
    /**
     * Represents a single cell and its value.
     */
    struct TColumnValue {
      1: required binary family,
      2: required binary qualifier,
      3: required binary value,
      4: optional i64 timestamp,
      5: optional binary tags
    }
    
    /**
     * Represents a single cell and the amount to increment it by
     */
    struct TColumnIncrement {
      1: required binary family,
      2: required binary qualifier,
      3: optional i64 amount = 1
    }
    
    /**
     * if no Result is found, row and columnValues will not be set.
     */
    struct TResult {
      1: optional binary row,
      2: required list<TColumnValue> columnValues
    }
    
    /**
     * Specify type of delete:
     *  - DELETE_COLUMN means exactly one version will be removed,
     *  - DELETE_COLUMNS means previous versions will also be removed.
     */
    enum TDeleteType {
      DELETE_COLUMN = 0,
      DELETE_COLUMNS = 1,
      DELETE_FAMILY = 2,
      DELETE_FAMILY_VERSION = 3
    }
    
    /**
     * Specify Durability:
     *  - SKIP_WAL means do not write the Mutation to the WAL.
     *  - ASYNC_WAL means write the Mutation to the WAL asynchronously,
     *  - SYNC_WAL means write the Mutation to the WAL synchronously,
     *  - FSYNC_WAL means Write the Mutation to the WAL synchronously and force the entries to disk.
     */
    
    enum TDurability {
      SKIP_WAL = 1,
      ASYNC_WAL = 2,
      SYNC_WAL = 3,
      FSYNC_WAL = 4
    }
    struct TAuthorization {
     1: optional list<string> labels
    }
    
    struct TCellVisibility {
     1: optional string expression
    }
    /**
     * Used to perform Get operations on a single row.
     *
     * The scope can be further narrowed down by specifying a list of
     * columns or column families.
     *
     * To get everything for a row, instantiate a Get object with just the row to get.
     * To further define the scope of what to get you can add a timestamp or time range
     * with an optional maximum number of versions to return.
     *
     * If you specify a time range and a timestamp the range is ignored.
     * Timestamps on TColumns are ignored.
     */
    struct TGet {
      1: required binary row,
      2: optional list<TColumn> columns,
    
      3: optional i64 timestamp,
      4: optional TTimeRange timeRange,
    
      5: optional i32 maxVersions,
      6: optional binary filterString,
      7: optional map<binary, binary> attributes
      8: optional TAuthorization authorizations
    }
    
    /**
     * Used to perform Put operations for a single row.
     *
     * Add column values to this object and they'll be added.
     * You can provide a default timestamp if the column values
     * don't have one. If you don't provide a default timestamp
     * the current time is inserted.
     *
     * You can specify how this Put should be written to the write-ahead Log (WAL)
     * by changing the durability. If you don't provide durability, it defaults to
     * column family's default setting for durability.
     */
    struct TPut {
      1: required binary row,
      2: required list<TColumnValue> columnValues
      3: optional i64 timestamp,
      5: optional map<binary, binary> attributes,
      6: optional TDurability durability,
      7: optional TCellVisibility cellVisibility
    }
    
    /**
     * Used to perform Delete operations on a single row.
     *
     * The scope can be further narrowed down by specifying a list of
     * columns or column families as TColumns.
     *
     * Specifying only a family in a TColumn will delete the whole family.
     * If a timestamp is specified all versions with a timestamp less than
     * or equal to this will be deleted. If no timestamp is specified the
     * current time will be used.
     *
     * Specifying a family and a column qualifier in a TColumn will delete only
     * this qualifier. If a timestamp is specified only versions equal
     * to this timestamp will be deleted. If no timestamp is specified the
     * most recent version will be deleted.  To delete all previous versions,
     * specify the DELETE_COLUMNS TDeleteType.
     *
     * The top level timestamp is only used if a complete row should be deleted
     * (i.e. no columns are passed) and if it is specified it works the same way
     * as if you had added a TColumn for every column family and this timestamp
     * (i.e. all versions older than or equal in all column families will be deleted)
     *
     * You can specify how this Delete should be written to the write-ahead Log (WAL)
     * by changing the durability. If you don't provide durability, it defaults to
     * column family's default setting for durability.
     */
    struct TDelete {
      1: required binary row,
      2: optional list<TColumn> columns,
      3: optional i64 timestamp,
      4: optional TDeleteType deleteType = 1,
      6: optional map<binary, binary> attributes,
      7: optional TDurability durability
    
    }
    
    /**
     * Used to perform Increment operations for a single row.
     *
     * You can specify how this Increment should be written to the write-ahead Log (WAL)
     * by changing the durability. If you don't provide durability, it defaults to
     * column family's default setting for durability.
     */
    struct TIncrement {
      1: required binary row,
      2: required list<TColumnIncrement> columns,
      4: optional map<binary, binary> attributes,
      5: optional TDurability durability
      6: optional TCellVisibility cellVisibility
    }
    
    /* 
     * Used to perform append operation 
     */
    struct TAppend {
      1: required binary row,
      2: required list<TColumnValue> columns,
      3: optional map<binary, binary> attributes,
      4: optional TDurability durability
      5: optional TCellVisibility cellVisibility
    }
    
    enum TReadType {
      DEFAULT = 1,
      STREAM = 2,
      PREAD = 3
    }
    
    /**
     * Any timestamps in the columns are ignored but the colFamTimeRangeMap included, use timeRange to select by timestamp.
     * Max versions defaults to 1.
     */
    struct TScan {
      1: optional binary startRow,
      2: optional binary stopRow,
      3: optional list<TColumn> columns
      4: optional i32 caching,
      5: optional i32 maxVersions=1,
      6: optional TTimeRange timeRange,
      7: optional binary filterString,
      8: optional i32 batchSize,
      9: optional map<binary, binary> attributes
      10: optional TAuthorization authorizations
      11: optional bool reversed
      12: optional bool cacheBlocks
      13: optional map<binary,TTimeRange> colFamTimeRangeMap
      14: optional TReadType readType
      15: optional i32 limit
    }
    
    /**
     * Atomic mutation for the specified row. It can be either Put or Delete.
     */
    union TMutation {
      1: TPut put,
      2: TDelete deleteSingle,
    }
    
    /**
     * A TRowMutations object is used to apply a number of Mutations to a single row.
     */
    struct TRowMutations {
      1: required binary row
      2: required list<TMutation> mutations
    }
    
    struct THRegionInfo {
      1: required i64 regionId
      2: required binary tableName
      3: optional binary startKey
      4: optional binary endKey
      5: optional bool offline
      6: optional bool split
      7: optional i32 replicaId
    }
    
    struct TServerName {
      1: required string hostName
      2: optional i32 port
      3: optional i64 startCode
    }
    
    struct THRegionLocation {
      1: required TServerName serverName
      2: required THRegionInfo regionInfo
    }
    
    /**
     * Thrift wrapper around
     * org.apache.hadoop.hbase.filter.CompareFilter$CompareOp.
     */
    enum TCompareOp {
      LESS = 0,
      LESS_OR_EQUAL = 1,
      EQUAL = 2,
      NOT_EQUAL = 3,
      GREATER_OR_EQUAL = 4,
      GREATER = 5,
      NO_OP = 6
    }
    
    
    //
    // Exceptions
    //
    
    /**
     * A TIOError exception signals that an error occurred communicating
     * to the HBase master or a HBase region server. Also used to return
     * more general HBase error conditions.
     */
    exception TIOError {
      1: optional string message
    }
    
    /**
     * A TIllegalArgument exception indicates an illegal or invalid
     * argument was passed into a procedure.
     */
    exception TIllegalArgument {
      1: optional string message
    }
    
    service THBaseService {
    
      /**
       * Test for the existence of columns in the table, as specified in the TGet.
       *
       * @return true if the specified TGet matches one or more keys, false if not
       */
      bool exists(
        /** the table to check on */
        1: required binary table,
    
        /** the TGet to check for */
        2: required TGet tget
      ) throws (1:TIOError io)
    
    
      /**
      * Test for the existence of columns in the table, as specified by the TGets.
      *
      * This will return an array of booleans. Each value will be true if the related Get matches
      * one or more keys, false if not.
      */
      list<bool> existsAll(
        /** the table to check on */
        1: required binary table,
    
        /** a list of TGets to check for */
        2: required list<TGet> tgets
      ) throws (1:TIOError io)
    
      /**
       * Method for getting data from a row.
       *
       * If the row cannot be found an empty Result is returned.
       * This can be checked by the empty field of the TResult
       *
       * @return the result
       */
      TResult get(
        /** the table to get from */
        1: required binary table,
    
        /** the TGet to fetch */
        2: required TGet tget
      ) throws (1: TIOError io)
    
      /**
       * Method for getting multiple rows.
       *
       * If a row cannot be found there will be a null
       * value in the result list for that TGet at the
       * same position.
       *
       * So the Results are in the same order as the TGets.
       */
      list<TResult> getMultiple(
        /** the table to get from */
        1: required binary table,
    
        /** a list of TGets to fetch, the Result list
            will have the Results at corresponding positions
            or null if there was an error */
        2: required list<TGet> tgets
      ) throws (1: TIOError io)
    
      /**
       * Commit a TPut to a table.
       */
      void put(
        /** the table to put data in */
        1: required binary table,
    
        /** the TPut to put */
        2: required TPut tput
      ) throws (1: TIOError io)
    
      /**
       * Atomically checks if a row/family/qualifier value matches the expected
       * value. If it does, it adds the TPut.
       *
       * @return true if the new put was executed, false otherwise
       */
      bool checkAndPut(
        /** to check in and put to */
        1: required binary table,
    
        /** row to check */
        2: required binary row,
    
        /** column family to check */
        3: required binary family,
    
        /** column qualifier to check */
        4: required binary qualifier,
    
        /** the expected value, if not provided the
            check is for the non-existence of the
            column in question */
        5: binary value,
    
        /** the TPut to put if the check succeeds */
        6: required TPut tput
      ) throws (1: TIOError io)
    
      /**
       * Commit a List of Puts to the table.
       */
      void putMultiple(
        /** the table to put data in */
        1: required binary table,
    
        /** a list of TPuts to commit */
        2: required list<TPut> tputs
      ) throws (1: TIOError io)
    
      /**
       * Deletes as specified by the TDelete.
       *
       * Note: "delete" is a reserved keyword and cannot be used in Thrift
       * thus the inconsistent naming scheme from the other functions.
       */
      void deleteSingle(
        /** the table to delete from */
        1: required binary table,
    
        /** the TDelete to delete */
        2: required TDelete tdelete
      ) throws (1: TIOError io)
    
      /**
       * Bulk commit a List of TDeletes to the table.
       *
       * Throws a TIOError if any of the deletes fail.
       *
       * Always returns an empty list for backwards compatibility.
       */
      list<TDelete> deleteMultiple(
        /** the table to delete from */
        1: required binary table,
    
        /** list of TDeletes to delete */
        2: required list<TDelete> tdeletes
      ) throws (1: TIOError io)
    
      /**
       * Atomically checks if a row/family/qualifier value matches the expected
       * value. If it does, it adds the delete.
       *
       * @return true if the new delete was executed, false otherwise
       */
      bool checkAndDelete(
        /** to check in and delete from */
        1: required binary table,
    
        /** row to check */
        2: required binary row,
    
        /** column family to check */
        3: required binary family,
    
        /** column qualifier to check */
        4: required binary qualifier,
    
        /** the expected value, if not provided the
            check is for the non-existence of the
            column in question */
        5: binary value,
    
        /** the TDelete to execute if the check succeeds */
        6: required TDelete tdelete
      ) throws (1: TIOError io)
    
      TResult increment(
        /** the table to increment the value on */
        1: required binary table,
    
        /** the TIncrement to increment */
        2: required TIncrement tincrement
      ) throws (1: TIOError io)
    
      TResult append(
        /** the table to append the value on */
        1: required binary table,
    
        /** the TAppend to append */
        2: required TAppend tappend
      ) throws (1: TIOError io)
    
      /**
       * Get a Scanner for the provided TScan object.
       *
       * @return Scanner Id to be used with other scanner procedures
       */
      i32 openScanner(
        /** the table to get the Scanner for */
        1: required binary table,
    
        /** the scan object to get a Scanner for */
        2: required TScan tscan,
      ) throws (1: TIOError io)
    
      /**
       * Grabs multiple rows from a Scanner.
       *
       * @return Between zero and numRows TResults
       */
      list<TResult> getScannerRows(
        /** the Id of the Scanner to return rows from. This is an Id returned from the openScanner function. */
        1: required i32 scannerId,
    
        /** number of rows to return */
        2: i32 numRows = 1
      ) throws (
        1: TIOError io,
    
        /** if the scannerId is invalid */
        2: TIllegalArgument ia
      )
    
      /**
       * Closes the scanner. Should be called to free server side resources timely.
       * Typically close once the scanner is not needed anymore, i.e. after looping
       * over it to get all the required rows.
       */
      void closeScanner(
        /** the Id of the Scanner to close **/
        1: required i32 scannerId
      ) throws (
        1: TIOError io,
    
        /** if the scannerId is invalid */
        2: TIllegalArgument ia
      )
    
      /**
       * mutateRow performs multiple mutations atomically on a single row.
      */
      void mutateRow(
      /** table to apply the mutations */
        1: required binary table,
    
        /** mutations to apply */
        2: required TRowMutations trowMutations
      ) throws (1: TIOError io)
    
      /**
       * Get results for the provided TScan object.
       * This helper function opens a scanner, get the results and close the scanner.
       *
       * @return between zero and numRows TResults
       */
      list<TResult> getScannerResults(
        /** the table to get the Scanner for */
        1: required binary table,
    
        /** the scan object to get a Scanner for */
        2: required TScan tscan,
    
        /** number of rows to return */
        3: i32 numRows = 1
      ) throws (
        1: TIOError io
      )
    
      /**
       * Given a table and a row get the location of the region that
       * would contain the given row key.
       *
       * reload = true means the cache will be cleared and the location
       * will be fetched from meta.
       */
      THRegionLocation getRegionLocation(
        1: required binary table,
        2: required binary row,
        3: bool reload,
      ) throws (
        1: TIOError io
      )
    
      /**
       * Get all of the region locations for a given table.
       **/
      list<THRegionLocation> getAllRegionLocations(
        1: required binary table,
      ) throws (
        1: TIOError io
      )
    
      /**
       * Atomically checks if a row/family/qualifier value matches the expected
       * value. If it does, it mutates the row.
       *
       * @return true if the row was mutated, false otherwise
       */
      bool checkAndMutate(
        /** to check in and delete from */
        1: required binary table,
    
        /** row to check */
        2: required binary row,
    
        /** column family to check */
        3: required binary family,
    
        /** column qualifier to check */
        4: required binary qualifier,
    
        /** comparison to make on the value */
        5: required TCompareOp compareOp,
    
        /** the expected value to be compared against, if not provided the
            check is for the non-existence of the column in question */
        6: binary value,
    
        /** row mutations to execute if the value matches */
        7: required TRowMutations rowMutations
      ) throws (1: TIOError io)
    }
    
    

    编译生成协议文件夹gen-go

    GoUnusedProtection__.go hbase-consts.go hbase.go t_h_base_service-remote
    cp$GOPATH:src

    • main.go
    package main
    
    import (
        //"encoding/binary"
        "fmt"
        "git.apache.org/thrift.git/lib/go/thrift"
        "hbase"
        //"net"
        //"os"
        //"reflect"
        //"strconv"
        //"time"
    )
    
    const (
        HOST       = "hostname"
        PORT       = "9090"
        TESTRECORD = 10
    )
    
    func main() {
    
        table := "bigdata:applydata_fl"
        rowkey := "4851bfe3-521f-a264-9581-9de139c91461|GS2017072415008551000859747"
        //family := "r"
    
        protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
        transport, err := thrift.NewTSocket(HOST + ":" + PORT)
    
        if err != nil {
            panic(err)
        }
    
        client := hbase.NewTHBaseServiceClientFactory(transport, protocolFactory)
        if err := transport.Open(); err != nil {
            panic(err)
        }
    
        fmt.Println(client)
        defer transport.Close()
    
        logformattitle := "调用Exists方法"
        fmt.Printf(logformattitle)
    
        isexists, err := client.Exists([]byte(table), &hbase.TGet{Row: []byte(rowkey)})
    
        fmt.Printf("rowkey{%s} in table{%s} Exists:%t\n", rowkey, table, isexists)
    
        /***************************/
        logformattitle = "调用Get方法获取新增加的数据"
        fmt.Printf(logformattitle)
    
        result, err := client.Get([]byte(table), &hbase.TGet{Row: []byte(rowkey)})
    
        if err != nil {
            fmt.Printf("Get err:%s\n", err)
        } else {
            fmt.Println("Rowkey:" + string(result.Row))
            for _, cv := range result.ColumnValues {
                fmt.Printf("Get err:%+v\n", cv)
                //printscruct(cv)
            }
        }
    
    }
    

    相关文章

      网友评论

        本文标题:Go通过thrift连接HBase

        本文链接:https://www.haomeiwen.com/subject/dujxrxtx.html