美文网首页
go内存排序

go内存排序

作者: 百炼 | 来源:发表于2019-01-27 20:14 被阅读0次

    说明:

    Source And Sink

    package pipeline
    
    import (
        "encoding/binary"
        "fmt"
        "io"
        "math/rand"
        "sort"
        "time"
    )
    
    var startTime time.Time
    
    func Init() {
        startTime = time.Now()
    }
    
    func ArraySource(a ...int) chan int {
        out := make(chan int)
        go func() {
            for _, v := range a {
                out <- v
            }
            close(out)
        }()
        return out
    }
    
    func InMemSort(in <-chan int) <-chan int {
        out := make(chan int, 1024)
        go func() {
            //Read into memo
            var a []int
            for v := range in {
                a = append(a, v)
            }
            fmt.Println("Read done:", time.Now().Sub(startTime))
            //
            sort.Ints(a)
            fmt.Println("InMemSort done:", time.Now().Sub(startTime))
            for _, v := range a {
                out <- v
            }
            close(out)
        }()
        return out
    }
    
    func Merge(in1, in2 <-chan int) <-chan int {
        out := make(chan int, 1024)
        go func() {
            v1, ok1 := <-in1
            v2, ok2 := <-in2
            for ok1 && ok2 {
                if v1 <= v2 {
                    out <- v1
                    v1, ok1 = <-in1
                } else {
                    out <- v2
                    v2, ok2 = <-in2
                }
            }
            if !ok1 {
                for ok2 {
                    out <- v2
                    v2, ok2 = <-in2
                }
            }
            if !ok2 {
                for ok1 {
                    out <- v1
                    v1, ok1 = <-in1
                }
            }
            close(out)
            fmt.Println("Merge done:", time.Now().Sub(startTime))
        }()
        return out
    }
    
    func ReaderSource(reader io.Reader, chunkSize int) <-chan int {
        out := make(chan int, 1024)
        go func() {
            buffer := make([]byte, 8)
            bytesRead := 0
            for {
                n, err := reader.Read(buffer)
                bytesRead += n
                if n > 0 {
                    v := int(binary.BigEndian.Uint64(buffer))
                    out <- v
                }
                if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) {
                    break
                }
            }
            close(out)
        }()
        return out
    }
    
    func WriteSink(writer io.Writer, in <-chan int) {
        for v := range in {
            buffer := make([]byte, 8)
            binary.BigEndian.PutUint64(buffer, uint64(v))
            writer.Write(buffer)
        }
    }
    
    func RandomSource(count int) <-chan int {
        out := make(chan int, 1024)
        go func() {
            for i := 0; i < count; i++ {
                out <- rand.Int()
            }
            close(out)
        }()
        return out
    }
    
    func MergeN(inputs ...<-chan int) <-chan int {
        if len(inputs) == 1 {
            return inputs[0]
        }
        m := len(inputs) / 2
        //merge inputs[0...m) and inputs[m...end)
        return Merge(
            MergeN(inputs[:m]...),
            MergeN(inputs[m:]...), )
    }
    

    testfile文件生成

    生成测试文件(small.in/large.in),对应的小文件和大文件

    package main
    
    import (
        "bufio"
        "fmt"
        "os"
        "pipeline"
    )
    
    //生成小的测试文件
    //const FILE_NAME = "small.in"
    //const N = 64
    
    //生成大的测试文件
    const FILE_NAME = "large.in"
    const N = 1000000
    
    func main() {
        file, e := os.Create(FILE_NAME)
        if e != nil {
            panic(e)
        }
        defer file.Close()
    
        p := pipeline.RandomSource(N)
        writer := bufio.NewWriter(file)
        pipeline.WriteSink(writer, p)
        writer.Flush()
    
        file, e = os.Open(FILE_NAME)
        if e != nil {
            panic(e)
        }
        defer file.Close()
    
        p = pipeline.ReaderSource(file, -1)
    
        count := 0
        for v := range p {
            count++
            if count < 100 {
                fmt.Println(v)
            }
        }
    }
    
    //测试内存的二分排序
    func main0() {
        p := pipeline.Merge(
            pipeline.InMemSort(pipeline.ArraySource(8, 2, 3, 0, 1)),
            pipeline.InMemSort(pipeline.ArraySource(9, 7)),
        )
        for v := range p {
            fmt.Println(v)
        }
    }
    

    利用网络排序

    package pipeline
    
    import (
        "bufio"
        "net"
    )
    
    func NetworkSink(addr string, in <-chan int) {
        listen, err := net.Listen("tcp", addr)
        if err != nil {
            panic(err)
        }
    
        go func() {
            defer listen.Close()
            conn, err := listen.Accept()
            if err != nil {
                panic(err)
            }
            defer conn.Close()
            writer := bufio.NewWriter(conn)
            defer writer.Flush()
            WriteSink(writer, in)
        }()
    }
    
    func NetworkSource(addr string) <-chan int {
        out := make(chan int)
        go func() {
            conn, err := net.Dial("tcp", addr)
            if err != nil {
                panic(err)
            }
    
            defer conn.Close()
            r := ReaderSource(conn, -1)
            for v := range r {
                out <- v
            }
            close(out)
        }()
        return out
    }
    

    测试主方法

    package main
    
    import (
        "bufio"
        "fmt"
        "os"
        "pipeline"
        "strconv"
    )
    
    func main2() {
        p := createNetworkPipeline("large.in", 8000000, 4)
    
        writeToFile(p, "large-networkpipeline.out")
        printFile("large-networkpipeline.out")
    }
    
    func main1() {
        p := createPipeline("large.in", 8000000 , 4)
        writeToFile(p, "large-pipeline.out")
        printFile("large-pipeline.out")
    }
    
    func printFile(filename string) {
        file, err := os.Open(filename)
        if err != nil {
            panic(err)
        }
        defer file.Close()
    
        p := pipeline.ReaderSource(file, -1)
        count := 0
        for v := range p {
            if count < 100 {
                fmt.Println(v)
                count ++
            }
        }
    }
    
    func writeToFile(p <-chan int, filename string) {
        file, err := os.Create(filename)
        if err != nil {
            panic(err)
        }
        defer file.Close()
    
        writer := bufio.NewWriter(file)
        defer writer.Flush()
    
        pipeline.WriteSink(writer, p)
    }
    
    func createPipeline(filename string, filesize, chunkCount int) <-chan int {
        chunkSize := filesize / chunkCount
        pipeline.Init()
        var sortResults []<-chan int
        for i := 0; i < chunkCount; i++ {
            file, err := os.Open(filename)
            if err != nil {
                panic(err)
            }
            file.Seek(int64(i*chunkSize), 0)
    
            source := pipeline.ReaderSource(
                bufio.NewReader(file), chunkSize)
            sortResults = append(sortResults, pipeline.InMemSort(source))
        }
        return pipeline.MergeN(sortResults...)
    }
    
    func createNetworkPipeline(filename string, filesize, chunkCount int) <-chan int {
        chunkSize := filesize / chunkCount
        pipeline.Init()
        var sortAddr []string
        for i := 0; i < chunkCount; i++ {
            file, err := os.Open(filename)
            if err != nil {
                panic(err)
            }
            file.Seek(int64(i*chunkSize), 0)
    
            source := pipeline.ReaderSource(
                bufio.NewReader(file), chunkSize)
            addr := ":" + strconv.Itoa(7000+i)
            pipeline.NetworkSink(addr, pipeline.InMemSort(source))
            sortAddr = append(sortAddr, addr)
        }
    
        var sortResults []<-chan int
        for _, addr := range sortAddr {
            sortResults = append(sortResults, pipeline.NetworkSource(addr))
        }
        return pipeline.MergeN(sortResults...)
    }
    

    相关文章

      网友评论

          本文标题:go内存排序

          本文链接:https://www.haomeiwen.com/subject/jkynjqtx.html