Golang批量更新数据操作

时间:2025-02-23 08:34:12

基础知识

在现实的开发场景中,有许许多多的数据在入库之前,我们并不知道到数据库是否已经存在该数据,其实都会先查询一遍,若存在,则执行更新语句,否则执行插入语句。

但是在SQL标准语句中,对于批量插入数据若存在则更新的语句实现方式有

insert ... on duplicate key update ...

针对批量插入或者更新的业务,我们可以实现拼接SQL语句来提高数据入库的效率。

适用数据库类型

  • MySQL

  • PostgresSQL

  • SQLite

数据库适用框架

  • GORM

代码分析

  • 首先定义个结构体

type BatchProvider struct {
  TableName      string   `json:"table_name"`   // 表名称
  Fields         []string `json:"fields"`       // 字段名称列表
  ConflictFields []string `json:"conflict_fields, omitempty"`  // 冲突字段名称列表
  UpdateFields   []string `json:"update_fields"`   // 更新字段名称列表
  BatchAmount    int      `json:"batch_amount"`    // 每一次批量操作数
}
  • 然后定义一个Update更新方法并实现

// records 批量数据
func (provider *BatchProvider) Update(engine *, records [][]interface{}) error {
  var (
    index = 0
    end   int
    err   error
  )
  for index < len(records) {
    end = index + 
    if end > len(records) {
      end = len(records)
    }
    if err = (engine, records[index:end]); err != nil {
      return err
    }
    index = end
  }
  return err
}
​
func (provider *BatchProvider) load(engine *, records [][]interface{}) error {
  // 定义变量
  var (
    sql  string
    args []interface{}
    err  error
  )
  // 构造sql
  sql, err = (records, (*engine))
  if err != nil {
    return err
  }
  // 添加值列表
  for _, record := range records {
    args = append(args, record...)
  }
  return (sql, args...).Error
}
​
// 声明定义三种不同的执行引擎类型
type DialectorType string
const (
  DIALECTOR_MYSQL  DialectorType = "mysql"
  DIALECTOR_PGSQL  DialectorType = "postgres"
  DIALECTOR_SQLITE DialectorType = "sqlite"
)
​
// 通过执行Dialector的名称,判断执行引擎的类型
func (provider *BatchProvider) engineJudge(engine ) DialectorType {
  switch () {
  case (&{}).Name():
    return DIALECTOR_PGSQL
  case (&{}).Name():
    return DIALECTOR_MYSQL
  case (&{}).Name():
    return DIALECTOR_SQLITE
  default:
    return ""
  }
}
  • 最关键的代码就在constructSQL方法中

func (provider *BatchProvider) constructSQL(records [][]interface{}, dialectorType DialectorType) (string, error) {
  switch dialectorType {
  case DIALECTOR_PGSQL:
    return (records), nil
  case DIALECTOR_MYSQL:
    return (records), nil
  case DIALECTOR_SQLITE:
    return (records), nil
  default:
    return "", ("dialector type is invalid")
  }
}
  • 对于不同的数据库,批量操作的SQL语句是有略微不同的,因此需要分开进行编写

func (provider *BatchProvider) constructPGSQL(records [][]interface{}) string {
  var (
    valueNames        string
    valuePlaceHolder  string
    valuePlaceHolders string
    sql               string
  )
  valueNames = (, ", ")
  valuePlaceHolder = ("?,", len())
  valuePlaceHolder = "(" + valuePlaceHolder[:len(valuePlaceHolder)-1] + "),"
  valuePlaceHolders = (valuePlaceHolder, len(records))
  valuePlaceHolders = valuePlaceHolders[:len(valuePlaceHolders)-1]
  sql = "insert into " +  + " (" + valueNames + ") values" + valuePlaceHolders
  if len() > 0 {
    var onDups []string
    sql += " on conflict(" + (, ", ") + ") do "
    if len() > 0 {
      for _, field := range  {
        onDups = append(onDups, field+"=excluded."+field)
      }
      sql += "update set " + (onDups, ", ")
    } else {
      sql += "nothing"
    }
  }
  return sql
}
​
func (provider *BatchProvider) constructMYSQL(records [][]interface{}) string {
  var (
    valueNames        string
    valuePlaceHolder  string
    valuePlaceHolders string
    sql               string
  )
  valueNames = (, ", ")
  valuePlaceHolder = ("?,", len())
  valuePlaceHolder = "(" + valuePlaceHolder[:len(valuePlaceHolder)-1] + "),"
  valuePlaceHolders = (valuePlaceHolder, len(records))
  valuePlaceHolders = valuePlaceHolders[:len(valuePlaceHolders)-1]
  sql = "insert into " +  + " (" + valueNames + ") values" + valuePlaceHolders
  var onDups []string
  sql += " on duplicate key "
  if len() > 0 {
    for _, field := range  {
      onDups = append(onDups, field+"=values("+field+")")
    }
    sql += "update " + (onDups, ", ")
  } else {
    sql += "nothing"
  }
  return sql
}
​
// 同PostgresSQL一样
func (provider *BatchProvider) constructSQLite(records [][]interface{}) string {
  return (records)
}

代码测试

  • 定义一个结构体,对应到数据库中的一张表

type User struct {
  Id              string `gorm:"column:id;pk" json:"id"`
  Name            string `gorm:"column:name" json:"name"`
  UpdateTimestamp int64  `gorm:"column:update_timestamp" json:"update_timestamp"`
}
​
func (user *User) TableName() string {
  return "user"
}
  • 创建GORM的实例,进行自动建表,并插入(更新)数据

// SQLite
func NewSQLiteEngine() (*, error) {
  var dataSource = "db/?_timeout=5000"
  // 获取配置文件,创建相应的目录
  dataSourceDir := (dataSource)
  _, fileErr := (dataSourceDir)
  if fileErr != nil || !(fileErr) {
    _ = (dataSourceDir, )
  }
  return ((dataSource), &{})
}
​
// MySQL
func NewMySQLEngine() (*, error) {
  var mysqlUrl = "root:123456@tcp(127.0.0.1:23306)/batch-update?charset=utf8mb4&parseTime=True&loc=Local"
  return ((mysqlUrl), &{})
}
​
func main() {
  var p = &BatchProvider{
    TableName:      (&User{}).TableName(),
    Fields:         []string{"id", "name", "update_timestamp"},
    ConflictFields: []string{"id"},
    UpdateFields:   []string{"name", "update_timestamp"},
    BatchAmount:    100,
  }
  var records = [][]interface{}{
    {"1", "name_1", ().Unix()},
    {"2", "name_2", ().Unix()},
    {"3", "name_3", ().Unix()},
  }
  // SQLite测试
  engine, err := NewSQLiteEngine()
  if err != nil {
    panic(err)
  }
  engine = ()
  if err = (&User{}); err != nil {
    panic(err)
  }
  if err = (engine, records); err != nil {
    panic(err)
  }
  // MySQL测试
  engine, err = NewMySQLEngine()
  if err != nil {
    panic(err)
  }
  engine = ()
  if err = (&User{}); err != nil {
    panic(err)
  }
  if err = (engine, records); err != nil {
    panic(err)
  }
}