美文网首页
go mysql:多协程实现mysql事务的并发操作

go mysql:多协程实现mysql事务的并发操作

作者: 彳亍口巴 | 来源:发表于2022-01-26 15:16 被阅读0次

    背景

    在项目开发过程中,往往会涉及到同时插入或修改多条数据,并且操作是需要保证事务原子性的,要么全部成功,要么全部失败,此时最好的办法是一次请求完成全部的数据操作,即将所有的数据拼接成一条SQL语句,但如果我们需要对数据修改前后进行保存作为记录日志的话,将数据拼接成一条SQL语句就行不通了,此时可以通过开启多协程去并发执行修改操作

    datas := buildData()
        //db.SetConnMaxLifetime(time.Second * 30)
        tx, err := db.Begin()
        if err != nil {
            fmt.Println("tx err:%+v", err)
            return
        }
        var ids []string
        var wg sync.WaitGroup
        var txErr error
        for i := range datas {
            ids = append(ids, datas[i].ID)
            wg.Add(1)
            go func(i int) {
                defer wg.Done()
                //r, getErr := tx.Query("select * from t_crud")
                //if getErr != nil {
                //  fmt.Println("geterr:", getErr)
                //  return
                //}
                //fmt.Println(r.Columns())
                //r.Close()
                sql := fmt.Sprintf("insert into t_crud( `id`,`name`,`addr` ) values('%s','%s','%s')", datas[i].ID, datas[i].Name, datas[i].Addr)
                fmt.Println(sql)
                res, err2 := tx.Exec(sql)
                if err2 != nil {
                    fmt.Println("exec err:%+v", err2)
                    txErr = err2
                    return
                }
                fmt.Println(res.RowsAffected())
    
                //rows, getErr2 := tx.QueryContext(context.Background(), "select * from t_crud where id=?", "id_99")
                //if getErr2 != nil {
                //  fmt.Println("geterr2:", getErr2)
                //  return
                //}
                //
                //fmt.Println(rows.Columns())
                //rows.Close()
            }(i)
        }
        wg.Wait()
        if txErr != nil {
            fmt.Println("has err roolback:", txErr)
            tx.Rollback()
            return
        }
         tx.Commit()
    

    对数据遍历,每条数据开启一个协程,最后再汇总错误,当错误不为nil时回滚,否则提交,这样的缺点是有多少条数据就开启多少个协程,协程不好管理,可以通过channel对每次请求的协程数进行控制。

    func buildData3() {
        tx, err := db.Begin()
        if err != nil {
            fmt.Println("tx err:%+v", err)
            return
        }
        var txErr error
        datas := buildData()
        execFun := func(index int) {
            sql := fmt.Sprintf("insert into t_crud( `id`,`name`,`addr` ) values('%s','%s','%s')", datas[index].ID, datas[index].Name, datas[index].Addr)
            fmt.Println(sql)
            res, err2 := tx.Exec(sql)
            if err2 != nil {
                fmt.Println("exec err:%+v", err2)
                txErr = err2
                return
            }
            fmt.Println(res.RowsAffected())
        }
        // 同时最多有10个协程在跑
        DoBatch2(len(datas), 10, execFun)
        if txErr != nil {
            tx.Rollback()
            panic(txErr)
            return
        }
        tx.Commit()
    }
    
    // DoBatch 开启指定协程数批量执行
    func DoBatch(max int, goroutineNum int, execFun func(index int)) {
        var (
            wg          sync.WaitGroup
            workLimiter = make(chan struct{}, goroutineNum)
        )
        for i := 0; i < max; i++ {
            wg.Add(1)
            select {
            case workLimiter <- struct{}{}:
                go func(i int) {
                    defer func() {
                        <-workLimiter
                        wg.Done()
                    }()
                    execFun(i)
                }(i)
            }
    
        }
        wg.Wait()
    }
    

    通过channel对当前的协程数量进行控制,同时只允许goroutineNum个协程在跑,否则其他任务应该阻塞

    driver:bad connection

    值得一提的是,同一事务开启多协程的同时如果有并发读,那可能会出现driver:bad connection错误,原因是同一事务同一时间只能有一个可以进行读操作,读完之后需要将查询得到的Rows关闭

      r, getErr := tx.Query("select * from t_crud")
    if getErr != nil {
    fmt.Println("geterr:", getErr)
        return
    }
    fmt.Println(r.Columns())
    r.Close()
    

    相关文章

      网友评论

          本文标题:go mysql:多协程实现mysql事务的并发操作

          本文链接:https://www.haomeiwen.com/subject/izqnhrtx.html