批量插入
sql: insert into tableName(column ...) values(...)
当我们拿到数组之后,变量数组,按照批量插入的SQL语句构造SQL
type StorageSubField struct {
FieldKey string `tag:"1" required:"false" json:"FieldKey"`
FieldValue string `tag:"2" required:"false" json:"FieldValue"`
//FieldValueType int32 `tag:"3" required:"false" json:"FieldValueType"`
}
func insert() {
datas := [][]StorageSubField{
{
{
FieldKey: "id",
FieldValue: "5",
},
{
FieldKey: "name",
FieldValue: "zzr",
},
{
FieldKey: "addr",
FieldValue: "shenzhen",
},
},
{{
FieldKey: "id",
FieldValue: "6",
},
{
FieldKey: "name",
FieldValue: "zzr2",
},
{
FieldKey: "addr",
FieldValue: "shenzhen2",
}},
}
var endSql string
var endArgs []interface{}
for i := range datas {
if i == 0 {
sql, args, _ := buildInsertSQL("t_crud", datas[i])
endSql += sql
endArgs = append(endArgs, args...)
continue
}
sql, args := buildInsertSQLWithCount("t_crud", datas[i])
endSql += sql
endArgs = append(endArgs, args...)
}
fmt.Println("sql==", endSql)
fmt.Println("args:", endArgs)
dbInsert(endSql, endArgs)
}
func dbInsert(sql string, args []interface{}) {
res, err := db.Exec(sql, args...)
if err != nil {
fmt.Println("有错误")
fmt.Println("err:", err)
return
}
fmt.Println(res.RowsAffected())
}
// buildInsertSQL 构造insert语句
func buildInsertSQL(tableName string, subFields []StorageSubField) (
sql string, args []interface{}, err error) {
subFieldKeySQL := ""
subFieldValueSQL := ""
for i, subField := range subFields {
if i != 0 {
subFieldKeySQL += ", "
subFieldValueSQL += ", "
}
subFieldKeySQL += "`" + subField.FieldKey + "`"
subFieldValueSQL += "?"
args = append(args, subField.FieldValue)
}
sql = fmt.Sprintf("insert into %s(%s) values(%s)", tableName, subFieldKeySQL, subFieldValueSQL)
return
}
// buildInsertSQL 构造insert语句
func buildInsertSQLWithCount(tableName string, subFields []StorageSubField) (
string, []interface{}) {
var args []interface{}
//subFieldKeySQL := ""
subFieldValueSQL := ", ("
for i, subField := range subFields {
if i != 0 {
//subFieldKeySQL += ", "
subFieldValueSQL += ", "
}
//subFieldKeySQL += "`" + subField.FieldKey + "`"
subFieldValueSQL += "?"
args = append(args, subField.FieldValue)
}
//sql = fmt.Sprintf("insert into %s(%s) values(%s)", tableName, subFieldKeySQL, subFieldValueSQL)
return subFieldValueSQL + " ) ", args
}
批量更新
使用redis的时候,我们可以使用hmset命令批量更新,使用mongo的时候,我们可以使用bulkWrite进行批量更新,但是使用mysql的时候,我们只能通过构造SQL进行批量更新
sql: update t_crud set
name=case id when 'id_0' then 'zzr0' when 'id_1' then 'zzr1' end,
addr=case id when 'id_0' then 'zhanjiang0' when 'id_1' then 'zhanjiang1' end
where id in ('id_0','id_1');
func BuildSubField() {
reqs := []InsertOrUpdateReq{
{
SubFileds: []StorageSubField{
{FieldKey: "name", FieldValue: "zzr0"},
{FieldKey: "addr", FieldValue: "zhanjiang0"},
},
Subfield: StorageSubField{
FieldKey: "id",
FieldValue: "id_0",
},
},
{
SubFileds: []StorageSubField{
{FieldKey: "name", FieldValue: "zzr1"},
{FieldKey: "addr", FieldValue: "zhanjiang1"},
},
Subfield: StorageSubField{
FieldKey: "id",
FieldValue: "id_1",
},
},
}
sqlArr := make([]string, len(reqs[0].SubFileds)+2, len(reqs[0].SubFileds)+2)
sqlArr[0] = "update t_crud set "
allLast := false
for i := range reqs {
thisLast := false
subFields := reqs[i]
if i == len(reqs)-1 {
allLast = true
}
k := 1
for j := range subFields.SubFileds {
if j == len(subFields.SubFileds)-1 {
thisLast = true
}
sqlArr[k] = buildCase(sqlArr[k], subFields.SubFileds[j], subFields.Subfield, thisLast, allLast)
k++
}
sqlArr[len(sqlArr)-1] = buildWhere(sqlArr[len(sqlArr)-1], subFields.Subfield, allLast)
}
sqlArr[len(sqlArr)-2] = strings.Trim(sqlArr[len(sqlArr)-2], ",")
fmt.Println("arr:", sqlArr)
s := strings.Join(sqlArr, "\n")
fmt.Println("res:", s)
}
//func batchUpdate(subfields []StorageSubField, subfield StorageSubField) {
// sql := "update t_crud set "
// for i := range subfields {
//
// }
//}
// You are using safe update mode and you tried to update a table without a WHERE that uses a KEY column. To disable safe mode, toggle the option in Preferences -> SQL Editor and reconnect. 0.047 sec
func buildCase(caseSql string, subfields StorageSubField, field StorageSubField, thisLast bool, allLast bool) string {
// 是否第一次
if strings.HasPrefix(caseSql, subfields.FieldKey) {
caseSql = fmt.Sprintf("%s when '%s' then '%s'", caseSql, field.FieldValue, subfields.FieldValue)
} else {
caseSql = fmt.Sprintf("%s=case %s when '%s' then '%s'", subfields.FieldKey, field.FieldKey, field.FieldValue, subfields.FieldValue)
}
if allLast {
//if thisLast {
// caseSql += "end"
// return caseSql
//}
caseSql += " end,"
}
return caseSql
}
// buildWhere 构造where语句 如:where id in (1,2,3)
func buildWhere(whereSql string, subfield StorageSubField, isLast bool) string {
if strings.HasPrefix(whereSql, "where") {
if isLast {
whereSql = fmt.Sprintf("%s'%s')", whereSql, subfield.FieldValue)
} else {
whereSql = fmt.Sprintf("%s'%s',", whereSql, subfield.FieldValue)
}
} else {
whereSql = fmt.Sprintf("where %s in ('%s',", subfield.FieldKey, subfield.FieldValue)
}
return whereSql
}
go开多协程实现批量操作
开始事务,遍历数组,每条数据都开启一个协程去处理,并记录错误,当有错误的时候回滚,否则提交事务,遍历的同时也可以用一个新数组记录比如id,然后通过数组的顺序去数据库中查找数据并排序。
// batchInsert 启动多协程实现mysql多条数据插入
func batchInsert() {
//datas := []Data{
// {
// ID: "11",
// Name: "zzr5",
// Addr: "shenzhen5",
// },
// {
// ID: "12",
// Name: "zzr6",
// Addr: "shenzhen6",
// },
// {
// ID: "13",
// Name: "zzr5",
// Addr: "shenzhen5",
// },
// {
// ID: "14",
// Name: "zzr6",
// Addr: "shenzhen6",
// },
//}
datas := buildData()
//db.SetConnMaxLifetime(time.Second * 30)
tx, err := db.Begin()
if err != nil {
fmt.Println("tx err:%+v", err)
return
}
var ids []string
var wg sync.WaitGroup
var txErr error
for i := range datas {
ids = append(ids, datas[i].ID)
wg.Add(1)
go func(i int) {
defer wg.Done()
//r, getErr := tx.Query("select * from t_crud")
//if getErr != nil {
// fmt.Println("geterr:", getErr)
// return
//}
//fmt.Println(r.Columns())
//r.Close()
sql := fmt.Sprintf("insert into t_crud( `id`,`name`,`addr` ) values('%s','%s','%s')", datas[i].ID, datas[i].Name, datas[i].Addr)
fmt.Println(sql)
res, err2 := tx.Exec(sql)
if err2 != nil {
fmt.Println("exec err:%+v", err2)
txErr = err2
return
}
fmt.Println(res.RowsAffected())
//rows, getErr2 := tx.QueryContext(context.Background(), "select * from t_crud where id=?", "id_99")
//if getErr2 != nil {
// fmt.Println("geterr2:", getErr2)
// return
//}
//
//fmt.Println(rows.Columns())
//rows.Close()
}(i)
}
wg.Wait()
if txErr != nil {
fmt.Println("has err roolback:", txErr)
tx.Rollback()
return
}
err3 := tx.Commit()
if err3 != nil {
fmt.Println("failed:", err3)
} else {
fmt.Println("success")
}
//sqlStr := "SELECT * FROM t_crud WHERE id IN (1,2,3,4) ORDER BY FIND_IN_SET(id, '2,3,1,4');"
sqlStr := fmt.Sprintf("SELECT * FROM t_crud WHERE id IN %s ORDER BY FIND_IN_SET(id, %s)", sql1(ids), sql2(ids))
fmt.Println(sqlStr)
rows, getErr2 := db.Query(sqlStr)
if getErr2 != nil {
fmt.Println("geterr2:", getErr2)
return
}
fmt.Println(rows.Columns())
}
func sql1(ids []string) string {
sql := "("
for i := range ids {
if i == len(ids)-1 {
sql = fmt.Sprintf("%s '%s')", sql, ids[i])
} else {
sql = fmt.Sprintf("%s '%s',", sql, ids[i])
}
}
fmt.Println(sql)
sql2(ids)
return sql
}
func sql2(ids []string) string {
sql := "'"
for i := range ids {
if i == len(ids)-1 {
sql = fmt.Sprintf("%s %s'", sql, ids[i])
} else {
sql = fmt.Sprintf("%s %s,", sql, ids[i])
}
}
fmt.Println(sql)
return sql
}
网友评论