美文网首页技术方案mysql
mysql批量插入和批量修改

mysql批量插入和批量修改

作者: 彳亍口巴 | 来源:发表于2022-01-22 17:11 被阅读0次

批量插入

sql: insert into tableName(column ...) values(...)

当我们拿到数组之后,变量数组,按照批量插入的SQL语句构造SQL

type StorageSubField struct {
    FieldKey   string `tag:"1" required:"false" json:"FieldKey"`
    FieldValue string `tag:"2" required:"false" json:"FieldValue"`
    //FieldValueType int32  `tag:"3" required:"false" json:"FieldValueType"`
}

func insert() {
    datas := [][]StorageSubField{
        {
            {
                FieldKey:   "id",
                FieldValue: "5",
            },
            {
                FieldKey:   "name",
                FieldValue: "zzr",
            },
            {
                FieldKey:   "addr",
                FieldValue: "shenzhen",
            },
        },
        {{
            FieldKey:   "id",
            FieldValue: "6",
        },
            {
                FieldKey:   "name",
                FieldValue: "zzr2",
            },
            {
                FieldKey:   "addr",
                FieldValue: "shenzhen2",
            }},
    }
    var endSql string

    var endArgs []interface{}
    for i := range datas {
        if i == 0 {
            sql, args, _ := buildInsertSQL("t_crud", datas[i])
            endSql += sql
            endArgs = append(endArgs, args...)
            continue
        }
        sql, args := buildInsertSQLWithCount("t_crud", datas[i])
        endSql += sql
        endArgs = append(endArgs, args...)
    }
    fmt.Println("sql==", endSql)
    fmt.Println("args:", endArgs)

    dbInsert(endSql, endArgs)
    
}

func dbInsert(sql string, args []interface{}) {
    res, err := db.Exec(sql, args...)
    if err != nil {
        fmt.Println("有错误")
        fmt.Println("err:", err)
        return
    }
    fmt.Println(res.RowsAffected())
}

// buildInsertSQL 构造insert语句
func buildInsertSQL(tableName string, subFields []StorageSubField) (
    sql string, args []interface{}, err error) {
    subFieldKeySQL := ""
    subFieldValueSQL := ""
    for i, subField := range subFields {
        if i != 0 {
            subFieldKeySQL += ", "
            subFieldValueSQL += ", "
        }
        subFieldKeySQL += "`" + subField.FieldKey + "`"
        subFieldValueSQL += "?"
        
        args = append(args, subField.FieldValue)
    }
    sql = fmt.Sprintf("insert into %s(%s) values(%s)", tableName, subFieldKeySQL, subFieldValueSQL)
    return
}

// buildInsertSQL 构造insert语句
func buildInsertSQLWithCount(tableName string, subFields []StorageSubField) (
    string, []interface{}) {
    var args []interface{}
    //subFieldKeySQL := ""
    subFieldValueSQL := ", ("
    for i, subField := range subFields {
        if i != 0 {
            //subFieldKeySQL += ", "
            subFieldValueSQL += ", "
        }
        //subFieldKeySQL += "`" + subField.FieldKey + "`"
        subFieldValueSQL += "?"
        
        args = append(args, subField.FieldValue)
    }
    //sql = fmt.Sprintf("insert into %s(%s) values(%s)", tableName, subFieldKeySQL, subFieldValueSQL)
    return subFieldValueSQL + " ) ", args
}

批量更新

使用redis的时候,我们可以使用hmset命令批量更新,使用mongo的时候,我们可以使用bulkWrite进行批量更新,但是使用mysql的时候,我们只能通过构造SQL进行批量更新

sql: update t_crud set
name=case id when 'id_0' then 'zzr0' when 'id_1' then 'zzr1' end,
addr=case id when 'id_0' then 'zhanjiang0' when 'id_1' then 'zhanjiang1' end
where id in ('id_0','id_1');
func BuildSubField() {
    reqs := []InsertOrUpdateReq{
        {
            SubFileds: []StorageSubField{
                {FieldKey: "name", FieldValue: "zzr0"},
                {FieldKey: "addr", FieldValue: "zhanjiang0"},
            },
            Subfield: StorageSubField{
                FieldKey:   "id",
                FieldValue: "id_0",
            },
        },
        {
            SubFileds: []StorageSubField{
                {FieldKey: "name", FieldValue: "zzr1"},
                {FieldKey: "addr", FieldValue: "zhanjiang1"},
            },
            Subfield: StorageSubField{
                FieldKey:   "id",
                FieldValue: "id_1",
            },
        },
    }
    sqlArr := make([]string, len(reqs[0].SubFileds)+2, len(reqs[0].SubFileds)+2)
    sqlArr[0] = "update t_crud set "
    allLast := false

    for i := range reqs {
        thisLast := false
        subFields := reqs[i]
        if i == len(reqs)-1 {
            allLast = true
        }
        k := 1
        for j := range subFields.SubFileds {
            if j == len(subFields.SubFileds)-1 {
                thisLast = true
            }
            sqlArr[k] = buildCase(sqlArr[k], subFields.SubFileds[j], subFields.Subfield, thisLast, allLast)
            k++
        }

        sqlArr[len(sqlArr)-1] = buildWhere(sqlArr[len(sqlArr)-1], subFields.Subfield, allLast)
    }
    sqlArr[len(sqlArr)-2] = strings.Trim(sqlArr[len(sqlArr)-2], ",")
    fmt.Println("arr:", sqlArr)
    s := strings.Join(sqlArr, "\n")
    fmt.Println("res:", s)
}
//func batchUpdate(subfields []StorageSubField, subfield StorageSubField) {
//  sql := "update t_crud set "
//  for i := range subfields {
//
//  }
//}
// You are using safe update mode and you tried to update a table without a WHERE that uses a KEY column.  To disable safe mode, toggle the option in Preferences -> SQL Editor and reconnect.  0.047 sec
func buildCase(caseSql string, subfields StorageSubField, field StorageSubField, thisLast bool, allLast bool) string {
    // 是否第一次
    if strings.HasPrefix(caseSql, subfields.FieldKey) {
        caseSql = fmt.Sprintf("%s when '%s' then '%s'", caseSql, field.FieldValue, subfields.FieldValue)
    } else {
        caseSql = fmt.Sprintf("%s=case %s when '%s' then '%s'", subfields.FieldKey, field.FieldKey, field.FieldValue, subfields.FieldValue)
    }
    if allLast {
        //if thisLast {
        //  caseSql += "end"
        //  return caseSql
        //}
        caseSql += " end,"
    }
    return caseSql
}

// buildWhere 构造where语句 如:where id in (1,2,3)
func buildWhere(whereSql string, subfield StorageSubField, isLast bool) string {
    if strings.HasPrefix(whereSql, "where") {
        if isLast {
            whereSql = fmt.Sprintf("%s'%s')", whereSql, subfield.FieldValue)
        } else {
            whereSql = fmt.Sprintf("%s'%s',", whereSql, subfield.FieldValue)
        }
    } else {
        whereSql = fmt.Sprintf("where %s in ('%s',", subfield.FieldKey, subfield.FieldValue)
    }

    return whereSql
}



go开多协程实现批量操作

开始事务,遍历数组,每条数据都开启一个协程去处理,并记录错误,当有错误的时候回滚,否则提交事务,遍历的同时也可以用一个新数组记录比如id,然后通过数组的顺序去数据库中查找数据并排序。

// batchInsert 启动多协程实现mysql多条数据插入
func batchInsert() {
    //datas := []Data{
    //  {
    //      ID:   "11",
    //      Name: "zzr5",
    //      Addr: "shenzhen5",
    //  },
    //  {
    //      ID:   "12",
    //      Name: "zzr6",
    //      Addr: "shenzhen6",
    //  },
    //  {
    //      ID:   "13",
    //      Name: "zzr5",
    //      Addr: "shenzhen5",
    //  },
    //  {
    //      ID:   "14",
    //      Name: "zzr6",
    //      Addr: "shenzhen6",
    //  },
    //}
    datas := buildData()
    //db.SetConnMaxLifetime(time.Second * 30)
    tx, err := db.Begin()
    if err != nil {
        fmt.Println("tx err:%+v", err)
        return
    }
    var ids []string
    var wg sync.WaitGroup
    var txErr error
    for i := range datas {
        ids = append(ids, datas[i].ID)
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            //r, getErr := tx.Query("select * from t_crud")
            //if getErr != nil {
            //  fmt.Println("geterr:", getErr)
            //  return
            //}
            //fmt.Println(r.Columns())
            //r.Close()
            sql := fmt.Sprintf("insert into t_crud( `id`,`name`,`addr` ) values('%s','%s','%s')", datas[i].ID, datas[i].Name, datas[i].Addr)
            fmt.Println(sql)
            res, err2 := tx.Exec(sql)
            if err2 != nil {
                fmt.Println("exec err:%+v", err2)
                txErr = err2
                return
            }
            fmt.Println(res.RowsAffected())

            //rows, getErr2 := tx.QueryContext(context.Background(), "select * from t_crud where id=?", "id_99")
            //if getErr2 != nil {
            //  fmt.Println("geterr2:", getErr2)
            //  return
            //}
            //
            //fmt.Println(rows.Columns())
            //rows.Close()
        }(i)
    }
    wg.Wait()
    if txErr != nil {
        fmt.Println("has err roolback:", txErr)
        tx.Rollback()
        return
    }
    err3 := tx.Commit()
    if err3 != nil {
        fmt.Println("failed:", err3)
    } else {
        fmt.Println("success")
    }
    //sqlStr := "SELECT * FROM t_crud WHERE id IN (1,2,3,4) ORDER BY FIND_IN_SET(id, '2,3,1,4');"
    sqlStr := fmt.Sprintf("SELECT * FROM t_crud WHERE id IN %s ORDER BY FIND_IN_SET(id, %s)", sql1(ids), sql2(ids))
    fmt.Println(sqlStr)
    rows, getErr2 := db.Query(sqlStr)
    if getErr2 != nil {
        fmt.Println("geterr2:", getErr2)
        return
    }
    fmt.Println(rows.Columns())
}

func sql1(ids []string) string {
    sql := "("
    for i := range ids {
        if i == len(ids)-1 {
            sql = fmt.Sprintf("%s '%s')", sql, ids[i])
        } else {
            sql = fmt.Sprintf("%s '%s',", sql, ids[i])
        }

    }

    fmt.Println(sql)
    sql2(ids)
    return sql
}

func sql2(ids []string) string {
    sql := "'"
    for i := range ids {
        if i == len(ids)-1 {
            sql = fmt.Sprintf("%s %s'", sql, ids[i])
        } else {
            sql = fmt.Sprintf("%s %s,", sql, ids[i])
        }

    }
    fmt.Println(sql)
    return sql
}

相关文章

网友评论

    本文标题:mysql批量插入和批量修改

    本文链接:https://www.haomeiwen.com/subject/svnuhrtx.html