美文网首页
Go & 管道

Go & 管道

作者: 杏壳 | 来源:发表于2019-06-29 22:07 被阅读0次

    管道

    管道是不同进程间通信的一种手段,是UNIX系统IPC的最古老形式,管道有如下特点:

    • 历史上,他们是半双工的(虽然有的系统提供了全双工)
    • 只能在具有公共祖先的两个进程间使用

    虽然有上面的局限,半双工管道仍是最常用的IPC方式,shell执行命令时,会为每一条命令单独创建一个进程,然后用管道将前一条命令进程的标准输出与后一条命令的标准输入相连接。

    这里我们先使用C语言测试一下管道:

    
    # include "stdio.h"
    # include "stdlib.h"
    # include "unistd.h"
    
    int main(void) {
        int a[2];
        char buf[10];
        if (pipe(a) == -1) {
            perror("pipe");
            exit(-1);
        }
        write(a[1], "CODE", 10);
        printf("\n");
        read(a[0], buf, 10);
        printf("%s", buf);
    }
    

    这里使用了pipe系统调用

    /* Create a one-way communication channel (pipe).
       If successful, two file descriptors are stored in PIPEDES;
       bytes written on PIPEDES[1] can be read from PIPEDES[0].
       Returns 0 if successful, -1 if not.  */
    extern int pipe (int __pipedes[2]) __THROW __wur;
    
    
    /* Write N bytes of BUF to FD.  Return the number written, or -1.
    
       This function is a cancellation point and therefore not marked with
       __THROW.  */
    extern ssize_t write (int __fd, const void *__buf, size_t __n) __wur;
    

    FIFO

    FIFO有时称为命名管道。未命名的管道只能在两个相关进程之间调用,而且这两个相关进程还有一个共同的创建了他们的祖先进程。但是,通过FIFO,不相关的进程也能交换数据。
    创建FIFO类似于创建文件,FIFO的路径名存在于文件系统中。
    对应的系统调用为:

    /* Create a new FIFO named PATH, with permission bits MODE.  */
    extern int mkfifo (const char *__path, __mode_t __mode)
         __THROW __nonnull ((1));
    

    命名管道允许任何进程通过它来交换数据,下面在命令行测试下FIFO的使用:

    mkfifo -m 666 myfifo
    ls -ll
    touch test.txt
    tee test.txt < myfifo
    cat ./main.go > myfifo
    cat ./test.txt
    

    首先创建了一个命名管道叫myfifo,执行完之后就会在当前目录下生成一个文件叫myfifo,ls -ll查看其类型为p,也就是管道的意思,然后使用这个管道实现了简单的交换数据。

    使用os/exec 调用外部命令

    接下来我们使用go的os/exec测试管道,这个包提供了调用外部命令的能力

    func main() {
        cmd := exec.Command("echo", "-n", "hello world")
        // ReadCloser
        out, err := cmd.StdoutPipe()
        if err != nil {
            log.Printf("out pipe fail:%v", err)
        }
        err = cmd.Start()
        if err != nil {
            log.Printf("start fail:%v\n", err)
        }
        // 1. 直接用字节切片接收
        //temp := make([]byte, 20)
        //out.Read(temp)
        //log.Println(string(temp))
        // 2. 使用bytes.Buffer
        //buf := bytes.Buffer{}
        //for {
        //  temp := make([]byte, 3)
        //  _, err := out.Read(temp)
        //  if err == io.EOF {
        //      break
        //  }
        //  buf.Write(temp)
        //}
        //log.Println(buf.String())
        // 3. 使用bufio.Reader,默认4096字节的缓冲区
        reader := bufio.NewReader(out)
        //output, _, err := reader.ReadLine()
        str, err := reader.ReadString('\n')
        if err != nil {
            log.Printf("reader read fail:%v", err)
        }
        log.Println(str)
        //log.Println(string(output))
        err = cmd.Wait()
        if err != nil {
            log.Printf("wait failed:%v", err)
        }
    }
    

    简单介绍下Go实现执行外部命令的细节,exec.Command函数会调用exec的LookPath函数,会去OS的PATH路径下查找,是否有对应的可执行程序文件,比如这里是echo,会依次访问path的路径,看有没有echo这个文件,在我的Mac上,最终找到的路径为/bin/echo。在exec.Start函数内部会调用exec.StartProcess来创建一个新的进程,在运行时执行fork系统调用

    pid, h, e := syscall.StartProcess(name, argv, sysattr)
    

    创建进程后,可使用pstree命令查看进程的关系,可以看到echo进程相关的祖先进程。

    $ pstree -g 2 -s echo 
    ─┬◆ 00001 root /sbin/launchd
     └─┬◆ 03320 hongyi /Applications/GoLand.app/Contents/MacOS/goland
       └─┬─ 18046 hongyi /Applications/GoLand.app/Contents/plugins/go/lib/dlv/mac/dlv --listen=localhost:49511 --h
         └─┬◆ 18047 hongyi /Library/Developer/CommandLineTools/Library/PrivateFrameworks/LLDB.framework/Versions/A
           └─┬─ 18048 hongyi /private/var/folders/v4/9s11xdk514n5x6ntjptb93680000gn/T/___go_build_main_go__1_
             └─── 18052 hongyi (echo)
    

    上面的go程序使用了cmd.StdoutPipe()方法,在StdoutPipe内部使用了下文要说的os.Pipe()函数,而在这个函数内部调用了syscall.Pipe(),这个函数最终调用了pipe系统调用。

    用管道连接命令

    我们通过管道来连接两个命令,这里会新起两个进程,一个ls,一个grep。

    ls ll | grep main
    
      func main() {
        cmd1 := exec.Command("ps", "-axv")
        cmd2 := exec.Command("grep", "Code.app")
        pipeTemp := bytes.Buffer{}
        // 模拟管道
        cmd1.Stdout = &pipeTemp
        if err := cmd1.Start(); err != nil {
            log.Fatalf("cmd1 start fail:%v", err)
        }
        if err := cmd1.Wait(); err != nil {
            log.Fatalf("cmd1 wait fail:%v", err)
        }
    
        cmd2.Stdin = &pipeTemp
        var outputBuf2 bytes.Buffer
        cmd2.Stdout = &outputBuf2
        if err := cmd2.Run(); err != nil {
            fmt.Printf("cmd2 run failed: %s\n", err)
            return
        }
        fmt.Println(outputBuf2.String())
    }
    

    cmd.Start函数如下:

        // 省略......
        type F func(*Cmd) (*os.File, error)
        for _, setupFd := range []F{(*Cmd).stdin, (*Cmd).stdout, (*Cmd).stderr} {
            fd, err := setupFd(c)
            if err != nil {
                c.closeDescriptors(c.closeAfterStart)
                c.closeDescriptors(c.closeAfterWait)
                return err
            }
            c.childFiles = append(c.childFiles, fd)
        }
        c.childFiles = append(c.childFiles, c.ExtraFiles...)
    
        var err error
        //  起一个新进程
        c.Process, err = os.StartProcess(c.Path, c.argv(), &os.ProcAttr{
            Dir:   c.Dir,
            Files: c.childFiles,
            Env:   dedupEnv(c.envv()),
            Sys:   c.SysProcAttr,
        })
        if err != nil {
            c.closeDescriptors(c.closeAfterStart)
            c.closeDescriptors(c.closeAfterWait)
            return err
        }
    
        c.closeDescriptors(c.closeAfterStart)
    
        c.errch = make(chan error, len(c.goroutine))
        for _, fn := range c.goroutine {
            // 起goroutine执行
            go func(fn func() error) {
                c.errch <- fn()
            }(fn)
        }
    

    对于cmd1,没有设置其标准输入,其标准输入为null device,设置了其标准输出为bytes.Buffer类型的pipeTemp,调用Start函数后,在方法writerDescriptor内部会创建一个管道,以并发的形式将管道的读端拷贝到Stdout,在Start函数中会执行这个c.goroutine,writerDescriptor返回管道的写端,加入到c. childFiles作为新进程的标准输出,最终childFiles包含的标准输入,输出,错误将传给新起的进程(ps进程),因此新进程向标准输出写,即是向管道里写入,而管道的读端又被copy到了pipeTemp,因此这个新进程的结果将被写入到pipeTemp中。

    对于cmd2,设置了标准输入为pipeTemp,也创建了一个管道,将读端返回,将cmd2的标准输入也就是pipeTemp,在goroutine中拷贝到管道写端。当goroutine执行时,pipeTemp的内容将被写入管道,而返回的管道的读端将会作为新进程的标准输入,也就是将pipeTemp的内容传给了新进程作为标准输入。


    func (c *Cmd) stdout() (f *os.File, err error) {
        return c.writerDescriptor(c.Stdout)
    }
    func (c *Cmd) writerDescriptor(w io.Writer) (f *os.File, err error) {
        //省略.....
        pr, pw, err := os.Pipe()
        if err != nil {
            return
        }
    
        c.closeAfterStart = append(c.closeAfterStart, pw)
        c.closeAfterWait = append(c.closeAfterWait, pr)
        c.goroutine = append(c.goroutine, func() error {
          // 将读端拷贝到stdout
            _, err := io.Copy(w, pr)
            pr.Close() // in case io.Copy stopped due to write error
            return err
        })
        return pw, nil
    }
    
    func (c *Cmd) stdin() (f *os.File, err error) {
        // 省略....
        pr, pw, err := os.Pipe()
        if err != nil {
            return
        }
    
        c.closeAfterStart = append(c.closeAfterStart, pr)
        c.closeAfterWait = append(c.closeAfterWait, pw)
        c.goroutine = append(c.goroutine, func() error {
            _, err := io.Copy(pw, c.Stdin)
            if skip := skipStdinCopyError; skip != nil && skip(err) {
                err = nil
            }
            if err1 := pw.Close(); err == nil {
                err = err1
            }
            return err
        })
        return pr, nil
    }
    

    os.Pipe

    下面直接使用Go的os.Pipe()API,os.Pipe()函数调用了syscall包的Pipe()函数。

    func basedFilePipe() {
        reader, writer, err := os.Pipe()
        if err != nil {
            log.Printf("get pipe failed:%v", err)
        }
        go func() {
            out := make([]byte, 26)
            n, err := reader.Read(out)
            if err != nil {
                log.Printf("read failed:%v", err)
            }
            log.Printf("read %d bytes", n)
            log.Println(string(out))
        }()
    
        temp := make([]byte, 26)
        for i := 65; i <= 90; i++ {
            temp[i-65] = byte(i)
        }
        n, err := writer.Write(temp)
        if err != nil {
            log.Printf("write failed:%v", err)
        }
        log.Printf("write %d bytes", n)
        time.Sleep(1 * time.Second)
    }
    

    上面介绍的管道是没有提供原子操作支持的,Go的io包提供了Pipe函数,这个函数有如下特点:

    • 基于内存的同步管道
    • 如果没有读端消费,写端一直阻塞(同步channel)
    • 没有内部缓存,直接从写端拷贝到读端
    • 并发读写安全(sync包)

    下面是一个内存管道的示例

    func ioPipe()  {
       reader, writer := io.Pipe()
       var wg sync.WaitGroup
       const goroutineCount = 2
       wg.Add(goroutineCount)
       //reader
       for i := 0; i<goroutineCount; i++ {
           go func() {
               defer wg.Done()
               output := make([]byte, 100)
               n, err := reader.Read(output)
               if err != nil {
                   fmt.Printf("Error: Couldn't read data from the named pipe: %s\n", err)
               }
               fmt.Printf("Read %d byte(s). [in-memory pipe]\n", n)
               log.Println(string(output))
           }()
       }
       // writer1
       go func() {
           input := make([]byte, 26)
           for i := 65; i <= 90; i++ {
               input[i-65] = byte(i)
           }
           n, err := writer.Write(input)
           if err != nil {
               fmt.Printf("Error: Couldn't write data to the named pipe: %s\n", err)
           }
           fmt.Printf("Written %d byte(s). [in-memory pipe]\n", n)
       }()
       // writer2
       go func() {
           input := make([]byte, 26)
           for i := 97; i <= 122; i++ {
               input[i-97] = byte(i)
           }
           n, err := writer.Write(input)
           if err != nil {
               fmt.Printf("Error: Couldn't write data to the named pipe: %s\n", err)
           }
           fmt.Printf("Written %d byte(s). [in-memory pipe]\n", n)
       }()
       wg.Wait()
    }
    

    os.Pipe() vs io.Pipe()

    • os.Pipe()基于syscall.Pipe()实现,比较底层,实现了基于操作系统级别的管道,而io.Pipe()基于channel和互斥锁实现数据通信
    • os.Pipe()不保证原子操作,io.Pipe是并发安全的
    • io.Pipe()因为使用了无缓冲channel, 读写是阻塞的

    syscall

    该包包含了一些操作系统底层的接口,实现细节依赖于底层的系统,默认下,godoc只会展示当前系统的syscall文档。如果你想展示其他系统的syscall文档,需设置$GOOS和$GOARCH到目标系统。syscall主要的使用是提供当前系统的接口给其他代码包如os,time,net。Go推荐使用那些包而不是这个syscall包。

    func Pipe(p []int) (err error)
    func Pipe2(p []int, flags int) (err error)
    func Mkfifo(path string, mode uint32) (err error)
    

    下面是pipe_bsd.go对os.Pipe()的实现

    func Pipe() (r *File, w *File, err error) {
        var p [2]int
        // See ../syscall/exec.go for description of lock.
        syscall.ForkLock.RLock()
        e := syscall.Pipe(p[0:])
        if e != nil {
            syscall.ForkLock.RUnlock()
            return nil, nil, NewSyscallError("pipe", e)
        }
        syscall.CloseOnExec(p[0])
        syscall.CloseOnExec(p[1])
        syscall.ForkLock.RUnlock()
        return newFile(uintptr(p[0]), "|0", kindPipe), newFile(uintptr(p[1]), "|1", kindPipe), nil
    }
    

    相关文章

      网友评论

          本文标题:Go & 管道

          本文链接:https://www.haomeiwen.com/subject/ckjdqctx.html