美文网首页软件开发经验golang学习
golang流操作(一)——GetStream

golang流操作(一)——GetStream

作者: StormZhu | 来源:发表于2018-12-26 23:02 被阅读53次

    问题

    最近看了《分布式对象存储--原理架构及Go语言实现》这本书,整体思路很清晰,但是由于对于golang中的数据流操作(tcp数据流,文件流等)不是很熟悉,导致对代码的理解有点问题。所以特地整理一下。

    场景

    假设要做一个对象储存,架构如下所示:

    对象存储架构.png

    其中接口服务器是直接和客户端交互的,而数据服务器是用来存储存储数据的。

    现在将问题简化,只有一个接口服务器,一个数据服务器。当客户端想从服务器获取数据时,流程如下所示:

    客户端发起请求流程.png

    客户端需要先找接口服务器要数据,接口服务器找数据服务器要(当然需要先定位数据在哪个数据服务器上),数据服务器回数据给接口服务器,接口服务器再回复给客户端,经过这样的流程,客户端才能收到数据。

    接口设计

    客户端获取数据时向接口服务器发送GET请求,请求的url/objects/<object_name>

    同样接口服务器向数据服务器转发GET请求,请求的url/objects/<object_name>

    数据服务器读取本地指定文件夹的<object_name>文件,将内容回复给接口服务器,接口服务器再回复给客户端。

    数据服务器实现

    dataServer代码

    数据服务器的功能很明显,所以先完成数据服务器的代码。

    golang实现API十分简单:

    // dataServer.go
    package main
    
    import (
        "net/http"
        "io"
        "os"
        "log"
        "strings"
    )
    
    const objectDir = "D:/objects/"
    
    func Handler(w http.ResponseWriter, r *http.Request) {
        m := r.Method
        if m == http.MethodGet {
            get(w, r)
            return
        }
        w.WriteHeader(http.StatusMethodNotAllowed)
    }
    
    func get(w http.ResponseWriter, r *http.Request) {
        // 收到 接口服务器的 请求
        // 提取 要获取的文件名
        name := strings.Split(r.URL.EscapedPath(), "/")[2]
        f, e := os.Open(objectDir + name)
        if e != nil {
            log.Println(e)
            w.WriteHeader(http.StatusNotFound)
            return
        }
        defer f.Close()
        // 真正读取,并发送
        io.Copy(w, f)
    }
    
    func main() {
        http.HandleFunc("/objects/", Handler)
        http.ListenAndServe(":8889", nil)
    }
    
    

    dataServer监听在本地的8889端口。代码中值得解释的就是get()函数,这是执行GET请求的时要执行的主要函数,主要工作就是读取文件,将文件内容写入http.ResponseWriter中,就能返回数据。

    这里要注意的是f, e := os.Open(objectDir + name)中的f是个*File类型,定义如下:

    // File represents an open file descriptor.
    type File struct {
        *file // os specific
    }
    
    // read reads up to len(b) bytes from the File.
    // It returns the number of bytes read and an error, if any.
    func (f *File) read(b []byte) (n int, err error) {
        n, err = f.pfd.Read(b)
        runtime.KeepAlive(f)
        return n, err
    }
    

    可以看到它实现了io.Reader接口,而io.Copy()函数的定义为:

    // Copy copies from src to dst until either EOF is reached
    // on src or an error occurs. It returns the number of bytes
    // copied and the first error encountered while copying, if any.
    //
    // A successful Copy returns err == nil, not err == EOF.
    // Because Copy is defined to read from src until EOF, it does
    // not treat an EOF from Read as an error to be reported.
    //
    // If src implements the WriterTo interface,
    // the copy is implemented by calling src.WriteTo(dst).
    // Otherwise, if dst implements the ReaderFrom interface,
    // the copy is implemented by calling dst.ReadFrom(src).
    
    func Copy(dst Writer, src Reader) (written int64, err error) {
        return copyBuffer(dst, src, nil)
    }
    

    copyBuffer()函数调用了src这个io.Reader接口的Read()函数读取数据,然后将数据拷贝到dst这个io.Writer中。

    所以文件的真正读取,是发生在io.Copy(w, f)这句代码处的,这句代码会阻塞到所有的数据读取完毕。注意:文件流读取完成后需要调用Close函数关闭。

    测试

    由于数据服务器的地址是localhost:8889,所以在浏览器输入localhost:8889/objects/test.txt可以看到浏览器确实接收到了数据。

    测试结果.png

    接口服务器实现

    发起GET请求

    golang中发起简单的GET请求使用http.Get()函数即可。

    例如,向baidu.com发起GET请求。

    package main
    import (
        "net/http"
        "io/ioutil"
        "fmt"
    )
    
    func main() {
        resp, err := http.Get("http://www.baidu.com")
        if err != nil {
            fmt.Println(err.Error())
            return
        }
        defer resp.Body.Close()
        body, err := ioutil.ReadAll(resp.Body)
        if err != nil {
            fmt.Println(err.Error())
            return
        }
        fmt.Println(string(body))
    }
    
    

    http.Get()含义的定义如下:

    // Get is a wrapper around DefaultClient.Get.
    //
    // To make a request with custom headers, use NewRequest and
    // DefaultClient.Do.
    func Get(url string) (resp *Response, err error) {
        return DefaultClient.Get(url)
    }
    
    type Response struct {
        Status     string // e.g. "200 OK"
        StatusCode int    // e.g. 200
        Proto      string // e.g. "HTTP/1.0"
        ProtoMajor int    // e.g. 1
        ProtoMinor int    // e.g. 0
        Header Header
        Body io.ReadCloser
        ContentLength int64
        TransferEncoding []string
        Close bool
        Uncompressed bool
        Trailer Header
        Request *Request
        TLS *tls.ConnectionState
    }
    

    可以看到第一个返回值是resp *Response类型,其中有个Body成员是个io.ReadCloser接口,这个接口是io.Reader接口和io.Closer接口的组合,数据都存在这个Body中,调用ioutil.ReadAll(resp.Body)就是执行resp.Body实现的Read函数,真正读取数据。读取出来的[]byte类型,所以打印的时候转成string。需要注意:和文件流一样,这个Body也需要关闭。

    apiServer代码

    所以apiServer在处理GET请求 的时候,只需要先向dataServer发送GET请求,将接收的内容返回就行了。

    关键点在于apiServer需要完全读取dataServer返回的内容存在内存里,然后将存好的数据发给客户端吗?

    答案是否定的,因为这是个对象存储,可能存储的内容非常大,大到接口服务器的内存都放不下,apiServer先将内容读到内存是不现实的。

    解决方法就是用流的方式,假设 dataServer->apiServer->客户端之间维护了一个数据流,可以dataServer读了一部分数据,就向apiServer转发,apiServer同时客户端转发,这样数据就可以源源不断的向水流一样流向客户端apiServer使用的内存量就可以减小。

    apiServer的结构和dataServer一样,只是get函数的流程不一样。

    // apiServer.go
    package main
    
    import (
        "net/http"
        "io"
        "strings"
    )
    const dataServerAddr = "http://localhost:8889/objects/"
    
    func Handler(w http.ResponseWriter, r *http.Request) {
        m := r.Method
        if m == http.MethodGet {
            get(w, r)
            return
        }
        w.WriteHeader(http.StatusMethodNotAllowed)
    }
    
    func get(w http.ResponseWriter, r *http.Request) {
        // 解析 客户端的请求的文件名
        name := strings.Split(r.URL.EscapedPath(), "/")[2]
        // 向 数据服务器 请求该文件
        resp, err := http.Get(dataServerAddr + name)
        if err != nil {
            w.WriteHeader(http.StatusNotFound)
            return
        }
        defer resp.Body.Close()
        // 将 数据服务器 的回复 返回给 客户端
        io.Copy(w, resp.Body)
    }
    
    func main() {
        http.HandleFunc("/objects/", Handler)
        http.ListenAndServe(":8888", nil)
    }
    

    apiServer监听在本地的8888端口。在收到客户端的请求后,解析出请求的文件名,然后向dataServer发起GET请求。http.Get()执行完毕,apiServerdataServer就建立了连接,但是还没有真正读取数据。只有在最后执行io.Copy(w, resp.Body)的时候才读取resp.Body的数据,再写入到w中,客户端就可以读取。

    流传输

    这就是数据流的含义了。

    getStream流封装

    可以经上面get函数发请求的部分封装起来。

    新增一个object包,添加get.go文件

    // object/get.go
    package objects
    
    import (
        "io"
        "net/http"
        "fmt"
    )
    
    type GetStream struct {
        reader io.ReadCloser
    }
    
    func (r *GetStream) Read(p []byte) (n int, err error) {
        return r.reader.Read(p)
    }
    
    func (r *GetStream) Close() (err error) {
        return r.reader.Close()
    }
    
    func NewGetStream(url string) (*GetStream, error) {
        r, e := http.Get(url)
        if e != nil {
            return nil, e
        }
        if r.StatusCode != http.StatusOK {
            return nil, fmt.Errorf("dataServer return http code %d", r.StatusCode)
        }
        return &GetStream{r.Body}, nil
    }
    

    首先封装了一个GetStream类型,它内部只有一个成员reader, 由于读取完需要关闭,所以是io.ReadCloser接口类型。GetStreamReadClose函数都是借助成员reader帮助完成。

    NewGetStream()函数就是帮助构造一个GetStream类型。也就是发其一个GET请求,使用resp.Body初始化GetStream类型。这样就得到了一个GetStream类型。

    apiServerget()函数也随之改变:

    // apiServer.go 中 get函数
    func get(w http.ResponseWriter, r *http.Request) {
        // 解析 接口服务器 接收 客户端的数据
        name := strings.Split(r.URL.EscapedPath(), "/")[2]
        // 构造一个 GetStream 类型
        stream, e := objects.NewGetStream(dataServerAddr + name)
        if e != nil {
            log.Println(e)
            w.WriteHeader(http.StatusNotFound)
            return
        }
        defer stream.Close()  // 不要忘记关闭这个流
        io.Copy(w, stream) // stream实现了io.reader接口
    }
    

    先使用objects.NewGetStream得到stream,从这个流中读数据,就是从apiServerdataServer之间建立的连接中读取数据。最后将这个stream中的数据写入到w中。同样,不要忘记关闭这个流(书上带忘记了)。

    虽然这个代码太简单了,以至于没有感受到封装的好处,但是后面封装PutSteam的时候,就可以感受到封装代码更加简单,更容易理解。

    测试

    启动apiServerdataServer,向apiServer发起请求,地址是localhost:8888:浏览器输入localhost:8888/objects/test.txt可以看到浏览器确实接收到了数据:

    测试结果.png

    总结

    • 实现了io.Readerio.Writer接口结构的实际上都是,只有在调用ReadWrite的时候才会真正的读写数据。
    • 几个流之间可以串连起来。读取第一个流,就会读取后面几个流
    • apiServer不需要完全读取dataServer的数据才回复客户端,完全可以将三者通过数据流进行连接。

    下一步工作

    下一步需要完成客户端上传文件的操作,这是个PUT请求,会学习到使用io.Pipe()函数将两个协程之间通过数据流(一个读,一个写)连接起来。(可能要等写完小论文了。。。)

    参考

    《分布式对象存储--原理架构及Go语言实现》

    相关文章

      网友评论

        本文标题:golang流操作(一)——GetStream

        本文链接:https://www.haomeiwen.com/subject/uaqllqtx.html