基础知识
在现实的开发场景中,有许许多多的数据在入库之前,我们并不知道到数据库是否已经存在该数据,其实都会先查询一遍,若存在,则执行更新语句,否则执行插入语句。
但是在SQL标准语句中,对于批量插入数据若存在则更新的语句实现方式有
insert ... on duplicate key update ...
针对批量插入或者更新的业务,我们可以实现拼接SQL语句来提高数据入库的效率。
适用数据库类型
-
MySQL
-
PostgresSQL
-
SQLite
数据库适用框架
-
GORM
代码分析
-
首先定义个结构体
type BatchProvider struct {
TableName string `json:"table_name"` // 表名称
Fields []string `json:"fields"` // 字段名称列表
ConflictFields []string `json:"conflict_fields, omitempty"` // 冲突字段名称列表
UpdateFields []string `json:"update_fields"` // 更新字段名称列表
BatchAmount int `json:"batch_amount"` // 每一次批量操作数
}
-
然后定义一个Update更新方法并实现
// records 批量数据
func (provider *BatchProvider) Update(engine *, records [][]interface{}) error {
var (
index = 0
end int
err error
)
for index < len(records) {
end = index +
if end > len(records) {
end = len(records)
}
if err = (engine, records[index:end]); err != nil {
return err
}
index = end
}
return err
}
func (provider *BatchProvider) load(engine *, records [][]interface{}) error {
// 定义变量
var (
sql string
args []interface{}
err error
)
// 构造sql
sql, err = (records, (*engine))
if err != nil {
return err
}
// 添加值列表
for _, record := range records {
args = append(args, record...)
}
return (sql, args...).Error
}
// 声明定义三种不同的执行引擎类型
type DialectorType string
const (
DIALECTOR_MYSQL DialectorType = "mysql"
DIALECTOR_PGSQL DialectorType = "postgres"
DIALECTOR_SQLITE DialectorType = "sqlite"
)
// 通过执行Dialector的名称,判断执行引擎的类型
func (provider *BatchProvider) engineJudge(engine ) DialectorType {
switch () {
case (&{}).Name():
return DIALECTOR_PGSQL
case (&{}).Name():
return DIALECTOR_MYSQL
case (&{}).Name():
return DIALECTOR_SQLITE
default:
return ""
}
}
-
最关键的代码就在constructSQL方法中
func (provider *BatchProvider) constructSQL(records [][]interface{}, dialectorType DialectorType) (string, error) {
switch dialectorType {
case DIALECTOR_PGSQL:
return (records), nil
case DIALECTOR_MYSQL:
return (records), nil
case DIALECTOR_SQLITE:
return (records), nil
default:
return "", ("dialector type is invalid")
}
}
-
对于不同的数据库,批量操作的SQL语句是有略微不同的,因此需要分开进行编写
func (provider *BatchProvider) constructPGSQL(records [][]interface{}) string {
var (
valueNames string
valuePlaceHolder string
valuePlaceHolders string
sql string
)
valueNames = (, ", ")
valuePlaceHolder = ("?,", len())
valuePlaceHolder = "(" + valuePlaceHolder[:len(valuePlaceHolder)-1] + "),"
valuePlaceHolders = (valuePlaceHolder, len(records))
valuePlaceHolders = valuePlaceHolders[:len(valuePlaceHolders)-1]
sql = "insert into " + + " (" + valueNames + ") values" + valuePlaceHolders
if len() > 0 {
var onDups []string
sql += " on conflict(" + (, ", ") + ") do "
if len() > 0 {
for _, field := range {
onDups = append(onDups, field+"=excluded."+field)
}
sql += "update set " + (onDups, ", ")
} else {
sql += "nothing"
}
}
return sql
}
func (provider *BatchProvider) constructMYSQL(records [][]interface{}) string {
var (
valueNames string
valuePlaceHolder string
valuePlaceHolders string
sql string
)
valueNames = (, ", ")
valuePlaceHolder = ("?,", len())
valuePlaceHolder = "(" + valuePlaceHolder[:len(valuePlaceHolder)-1] + "),"
valuePlaceHolders = (valuePlaceHolder, len(records))
valuePlaceHolders = valuePlaceHolders[:len(valuePlaceHolders)-1]
sql = "insert into " + + " (" + valueNames + ") values" + valuePlaceHolders
var onDups []string
sql += " on duplicate key "
if len() > 0 {
for _, field := range {
onDups = append(onDups, field+"=values("+field+")")
}
sql += "update " + (onDups, ", ")
} else {
sql += "nothing"
}
return sql
}
// 同PostgresSQL一样
func (provider *BatchProvider) constructSQLite(records [][]interface{}) string {
return (records)
}
代码测试
-
定义一个结构体,对应到数据库中的一张表
type User struct {
Id string `gorm:"column:id;pk" json:"id"`
Name string `gorm:"column:name" json:"name"`
UpdateTimestamp int64 `gorm:"column:update_timestamp" json:"update_timestamp"`
}
func (user *User) TableName() string {
return "user"
}
-
创建GORM的实例,进行自动建表,并插入(更新)数据
// SQLite
func NewSQLiteEngine() (*, error) {
var dataSource = "db/?_timeout=5000"
// 获取配置文件,创建相应的目录
dataSourceDir := (dataSource)
_, fileErr := (dataSourceDir)
if fileErr != nil || !(fileErr) {
_ = (dataSourceDir, )
}
return ((dataSource), &{})
}
// MySQL
func NewMySQLEngine() (*, error) {
var mysqlUrl = "root:123456@tcp(127.0.0.1:23306)/batch-update?charset=utf8mb4&parseTime=True&loc=Local"
return ((mysqlUrl), &{})
}
func main() {
var p = &BatchProvider{
TableName: (&User{}).TableName(),
Fields: []string{"id", "name", "update_timestamp"},
ConflictFields: []string{"id"},
UpdateFields: []string{"name", "update_timestamp"},
BatchAmount: 100,
}
var records = [][]interface{}{
{"1", "name_1", ().Unix()},
{"2", "name_2", ().Unix()},
{"3", "name_3", ().Unix()},
}
// SQLite测试
engine, err := NewSQLiteEngine()
if err != nil {
panic(err)
}
engine = ()
if err = (&User{}); err != nil {
panic(err)
}
if err = (engine, records); err != nil {
panic(err)
}
// MySQL测试
engine, err = NewMySQLEngine()
if err != nil {
panic(err)
}
engine = ()
if err = (&User{}); err != nil {
panic(err)
}
if err = (engine, records); err != nil {
panic(err)
}
}