大概所有的程序员应该都接触过批量插入的场景,我也相信任何的程序员都能写出可正常运行的批量插入的代码。但怎样实现一个高效、快速插入的批量插入功能呢?
由于每个人的工作履历,工作年限的不同,在实现这样的一个需求时,可能技术选型各有不同,有直接生成insert语句的,有用EF的或者其他的orm框架的。其实不管是手写insert还是使用EF,最终交给数据库执行的还是insert语句。下面是EF批量插入的示例代码:
var list = new List<Student>();
for (int i = 0; i < 100; i++)
{
list.Add(new Student { CreateTime = DateTime.Now, Name = "zjjjjjj" });
}
await _context.Students.AddRangeAsync(list);
await _context.SaveChangesAsync();
生成的脚本截图如下:
这种实现方式在数据量100以内时,耗时还算可以。但如果要批量导入的数据达到万级的时候,那耗时简直是灾难。我测试的数据如下(测试数据库为mysql,具体配置不详):
数据量 | 耗时(s) |
---|---|
10 | 0.028 |
1w | 3.929 |
10w | 31.280 |
10w的数据已经耗时超过了30s,我没有勇气测试100w数据的耗时,有兴趣的可以自行测试下。
下面就应该进入正题了,对于较大数据量(1000以上)场景下的批量插入,各个数据库应该都提供了相关的解决方案,由于工作所限,目前笔者仅接触过mysql和mssql。
mysql的实现方案是LOAD DATA命令,此命令接收一个csv文件,然后将文件上传到数据库服务器后,解析数据后插入。好在MySqlConnector提供了相关的封装,不用咱们去熟悉那么复杂的命令参数。
mssql实现的方案是使用SqlBulkCopy类,不过此类仅接收DataTable类型的数据,所以,在批量插入的时候,需要将数据源转换成DataTable。
综上所示,不管是mysql,还是mssql,均需要将数据源转换成指定的格式才可以使用批量导入的功能,所以这一块的主要核心就是转换数据源格式。mysql需要转换成csv,mssql需要转换成DataTable。下面就来一起看看具体的转换的方法。
以下代码是转换csv和DataTable相关方法:
namespace FL.DbBulk
{
public static class Extension
{
/// <summary>
/// 获取实体影射的表名
/// </summary>
/// <param name="type"></param>
/// <returns></returns>
public static string GetMappingName(this System.Type type)
{
var key = $"batch{type.FullName}";
var tableName = CacheService.Get(key);
if (string.IsNullOrEmpty(tableName))
{
var tableAttr = type.GetCustomAttribute<TableAttribute>();
if (tableAttr != null)
{
tableName = tableAttr.Name;
}
else
{
tableName = type.Name;
}
CacheService.Add(key, tableName);
}
return tableName;
}
public static List<EntityInfo> GetMappingProperties(this System.Type type)
{
var key = $"ICH.King.DbBulk{type.Name}";
var list = CacheService.Get<List<EntityInfo>>(key);
if (list == null)
{
list = new List<EntityInfo>();
foreach (var propertyInfo in type.GetProperties())
{
if (!propertyInfo.PropertyType.IsValueType &&
propertyInfo.PropertyType.Name != "Nullable`1" && propertyInfo.PropertyType != typeof(string)) continue;
var temp = new EntityInfo();
temp.PropertyInfo = propertyInfo;
temp.FieldName = propertyInfo.Name;
var attr = propertyInfo.GetCustomAttribute<ColumnAttribute>();
if (attr != null)
{
temp.FieldName = attr.Name;
}
temp.GetMethod = propertyInfo.CreateGetter();
list.Add(temp);
}
CacheService.Add(key, list);
}
return list;
}
/// <summary>
/// 创建cvs字符串
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entities"></param>
/// <param name="primaryKey"></param>
/// <returns></returns>
public static string CreateCsv<T>(this IEnumerable<T> entities, string primaryKey = "")
{
var sb = new StringBuilder();
var properties = typeof(T).GetMappingProperties().ToArray();
foreach (var entity in entities)
{
for (int i = 0; i < properties.Length; i++)
{
var ele = properties[i];
if (i != 0) sb.Append(",");
var value = ele.Get(entity);
if (ele.PropertyInfo.PropertyType.Name == "Nullable`1")
{
if (ele.PropertyInfo.PropertyType.GenericTypeArguments[0] == typeof(DateTime))
{
if (value == null)
{
sb.Append("NULL");
}
else
{
sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));
}
continue;
}
}
if (ele.PropertyInfo.PropertyType == typeof(DateTime))
{
sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));
continue;
}
//如果是主键&&string类型,且值不为空
if (ele.FieldName == primaryKey && ele.PropertyInfo.PropertyType == typeof(string))
{
sb.Append(Guid.NewGuid().ToString());
continue;
}
if (value == null)
{
continue;
}
if (ele.PropertyInfo.PropertyType == typeof(string))
{
var vStr = value.ToString();
if (vStr.Contains("\""))
{
vStr = vStr.Replace("\"", "\"\"");
}
if (vStr.Contains(",") || vStr.Contains("\r\n") || vStr.Contains("\n"))
{
vStr = $"\"{vStr}\"";
}
sb.Append(vStr);
}
else sb.Append(value);
}
sb.Append(IsWin() ? "\r\n" : "\n");
//sb.AppendLine();
}
return sb.ToString();
}
public static bool IsWin()
{
return RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
}
public static string CreateCsv(this DataTable table)
{
StringBuilder sb = new StringBuilder();
DataColumn colum;
foreach (DataRow row in table.Rows)
{
for (int i = 0; i < table.Columns.Count; i++)
{
colum = table.Columns[i];
if (i != 0) sb.Append(",");
if (colum.DataType == typeof(string))
{
var vStr = row[colum].ToString();
if (vStr.Contains("\""))
{
vStr = vStr.Replace("\"", "\"\"");
}
if (vStr.Contains(",") || vStr.Contains("\r\n") || vStr.Contains("\n"))
{
vStr = $"\"{vStr}\"";
}
sb.Append(vStr);
}
else sb.Append(row[colum]);
}
sb.Append(IsWin() ? "\r\n" : "\n");
}
return sb.ToString();
}
public static DataTable ToDataTable<T>(this IEnumerable<T> list, string primaryKey = "")
{
var type = typeof(T);
//获取实体映射的表名
var mappingName = type.GetMappingName();
var dt = new DataTable(mappingName);
//获取实体映射的属性列表
var columns = type.GetMappingProperties();
dt.Columns.AddRange(columns.Select(x => new DataColumn(x.FieldName)).ToArray());
foreach (var data in list)
{
var row = dt.NewRow();
foreach (var entityInfo in columns)
{
var value = entityInfo.Get(data);
if (primaryKey == entityInfo.FieldName && entityInfo.PropertyInfo.PropertyType == typeof(string))
{
row[entityInfo.FieldName] = value ?? Guid.NewGuid().ToString();
}
else
{
row[entityInfo.FieldName] = value;
}
}
dt.Rows.Add(row);
}
return dt;
}
}
}
转换成DataTable方法相对简单,但这里我做了个优化下,当判断主键是string类型,且值为空时,会自动生成一个GUID,并给其赋值,这样做的目的是为了和EF原生的插入功能兼容。
生成Csv的相对比较麻烦,因为Csv是用逗号以及其他符号来区分每一行、每一列数据,但经常会存在要插入的数据包含了csv的特殊符号,这样情况下就需要做转义。另外,还有一个需要考虑的问题,linux和windows默认的换行符是有区别的,windows的换行符为\r\n,而linux默认的是\n,所以在生成csv时,需要根据不同的系统进行处理。
下面来看下具体怎么调用相关的插入方法,首先看下mysql的,主要代码如下所示:
private async Task InsertCsvAsync(string csv, string tableName, List<string> columns)
{
var fileName = Path.GetTempFileName();
await File.WriteAllTextAsync(fileName, csv);
var conn = _context.Database.GetDbConnection() as MySqlConnection;
var loader = new MySqlBulkLoader(conn)
{
FileName = fileName,
Local = true,
LineTerminator = Extension.IsWin() ? "\r\n" : "\n",
FieldTerminator = ",",
TableName = tableName,
FieldQuotationCharacter = '"',
EscapeCharacter = '"',
CharacterSet = "UTF8"
};
loader.Columns.AddRange(columns);
await loader.LoadAsync();
}
在上述的代码中,首先创建一个临时文件,然后将其他数据源转换的csv内容写入到文件中,获取数据库连接,再然后创建MySqlBulkLoader类的实例,将相关参数进行复制后,还需要配置字段列表,最后执行LoadAsync命令。
下面是mssql的批量插入的核心代码:
public async Task InsertAsync(DataTable table)
{
if (table == null)
{
throw new ArgumentNullException();
}
if (string.IsNullOrEmpty(table.TableName))
{
throw new ArgumentNullException("DataTable的TableName属性不能为空");
}
var conn = (SqlConnection)_context.Database.GetDbConnection();
await conn.OpenAsync();
using (var bulk = new SqlBulkCopy(conn))
{
bulk.DestinationTableName = table.TableName;
foreach (DataColumn column in table.Columns)
{
bulk.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
await bulk.WriteToServerAsync(table);
}
}
以上方法相对简单,在此不做更多解释。
至此,mysql和mssql批量的导入的方案已经介绍完毕,但可能就会有人说了,这跟EF好像也没什么关系呀。
其实如果你有仔细看的话,或许能发现,我在代码中使用了一个名为_context字段,此字段其实就是EF的DbContext的实例。但文章内容到此时也没有完全的和EF结合,下面就来介绍下如何更优雅的将此功能集成到EF中。
在.net core中,接入EF的时候其实已经指定了使用的数据库类型,实例代码如下:
services.AddDbContext<MyDbContext>(opt => opt.UseMySql("server=10.0.0.146;Database=demo;Uid=root;Pwd=123456;Port=3306;AllowLoadLocalInfile=true"))
既然以及指定了数据库类型,那么在调用批量插入的时候,应该就不需要让调用者判断是使用mysql的方法,还是mssql的方法。具体怎么设计呢?且耐心往下看。
首先分别定义接口ISqlBulk,IMysqlBulk,ISqlServerBulk代码如下:
namespace FL.DbBulk
{
public interface ISqlBulk
{
/// <summary>
/// 批量导入数据
/// </summary>
/// <param name="table">数据源</param>
void Insert(DataTable table);
/// <summary>
/// 批量导入数据
/// </summary>
/// <param name="table">数据源</param>
Task InsertAsync(DataTable table);
void Insert<T>(IEnumerable<T> enumerable) where T : class;
Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class;
}
}
IMysqlBulk,ISqlServerBulk接口继承ISqlBulk,代码如下:
namespace FL.DbBulk
{
public interface IMysqlBulk : ISqlBulk
{
Task InsertAsync<T>(string csvPath, string tableName = "") where T : class;
}
}
namespace FL.DbBulk
{
public interface ISqlServerBulk:ISqlBulk
{
}
}
然后创建ISqlBulk实现类:
namespace FL.DbBulk
{
public class SqlBulk : ISqlBulk
{
private ISqlBulk _bulk;
public SqlBulk(DbContext context, IServiceProvider provider)
{
if (context.Database.IsMySql())
{
_bulk = provider.GetService<IMysqlBulk>();
}
else if (context.Database.IsSqlServer())
{
_bulk = provider.GetService<ISqlServerBulk>();
}
}
public void Insert(DataTable table)
{
_bulk.Insert(table);
}
public async Task InsertAsync(DataTable table)
{
await _bulk.InsertAsync(table);
}
public void Insert<T>(IEnumerable<T> enumerable) where T : class
{
_bulk.Insert(enumerable);
}
public async Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class
{
await _bulk.InsertAsync(enumerable);
}
}
}
在SqlBulk的构造函数中,通过context.Database的扩展方法判断数据库的类型,然后再获取相应的接口的实例。再然后就是实现IMysqlBulk和ISqlServerBulk的实现类。上文已经把核心代码贴出,再此为了篇幅,就不贴完整代码了。
再然后,就是提供一个注入services的方法,代码如下:
namespace Microsoft.Extensions.DependencyInjection
{
public static class ServiceCollectionExtension
{
public static IServiceCollection AddBatchDB<T>(this IServiceCollection services) where T:DbContext
{
services.TryAddScoped<IMysqlBulk, MysqlBulk>();
services.TryAddScoped<ISqlServerBulk, SqlServerBulk>();
services.TryAddScoped<ISqlBulk, SqlBulk>();
services.AddScoped<DbContext, T>();
return services;
}
}
}
有了以上代码,我们就可以通过在Startup中很方便的启用批量插入的功能了。
最后,贴出两种插入方式对比的测试数据:
数据量 | EF默认耗时(s) | ISqlBulk耗时(s) |
---|---|---|
10 | 0.028 | 0.030 |
1w | 3.929 | 1.581 |
10w | 31.280 | 15.408 |
以上测试数据均是使用同一个mysql数据库,不同配置以及网络环境下,测试的数据会有差异,有兴趣的可以自己试试。
至此,本人内容已完毕。
最后,贴出git地址,如果思路或代码可以帮到你,欢迎点赞,点star
https://github.com/fuluteam/FL.DbBulk.git
福尔斯
EF批量插入太慢?那是你的姿势不对的更多相关文章
-
EF批量插入数据耗时对比
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.T ...
-
一次EF批量插入多表数据的性能优化经历
距离上次的博客已经有15个多月了,感慨有些事情还是需要坚持,一旦停下来很有可能就会停很久或者从此再也不会坚持.但我个人一直还坚持认为属于技术*份子,且喜欢精益求精的那种.最近遇到两个和数据迁移相关的 ...
-
EF批量插入数据(Z.EntityFramework.Extensions)
EF用原生的插入数据方法DbSet.ADD()和 DbSet.AddRange()都很慢.所以要做大型的批量插入只能另选它法. 1.Nugget 2.代码 using EF6._0Test.EF; u ...
-
EF批量插入(转)
原作者地址http://blog.csdn.net/zlts000/article/details/46385773 之前做项目的时候,做出来的系统的性能不太好,在框架中使用了EntityFramew ...
-
EF批量插入 扩展
https://efbulkinsert.codeplex.com/ https://github.com/loresoft/EntityFramework.Extended
-
EF - 批量插入
比较一下下面两种方式的区别 1,每Add一次 就savechange() static void Main(string[] args) { //List<User> users= Fin ...
-
EF 批量插入,sqlhelper 批量插入
需添加一个using System.Linq; 引用 public void BulkInsert<T>(string connection, string tableName, ILis ...
-
Entity Framework与ADO.NET批量插入数据性能测试
Entity Framework是.NET平台下的一种简单易用的ORM框架,它既便于Domain Model和持久层的OO设计,也提高了代码的可维护性.但在使用中发现,有几类业务场景是EF不太擅长的, ...
-
使用EF扩展EntityFramework.BulkInsert实现批量插入
EntityFramework 最被人诟病的地方就是它的性能,处理大量数据时的效率.此种条件下,通常会转回使用 ADO.NET 来完成任务.而EntityFramework.BulkInsert则是利 ...
随机推荐
-
基于XMPP的IOS聊天客户端程序
简介:XMPP协议是一种基于Socket长连接.以XML格式进行基本信息交换.C/S S/S多种架构的聊天协议 XMPPServer 基于XMPP协议的服务端(例如eJabber.OpenFire) ...
-
HDU5845 Best Division
递归写法,好久不写很容易就gg了... dp[i]=max(dp[j])+1,并且s[i]XORs[j]<=x 01字典树优化一下转移. #include <bits/stdc++.h& ...
-
html中offsetTop、clientTop、scrollTop、offsetTop
HTML精确定位:scrollLeft,scrollWidth,clientWidth,offsetWidth scrollHeight: 获取对象的滚动高度. scrollLeft:设置或获取位于对 ...
-
Codeforces 719 E. Sasha and Array (线段树+矩阵运算)
题目链接:http://codeforces.com/contest/719/problem/E 题意:操作1将[l, r] + x; 操作2求f[l] + ... + f[r]; 题解:注意矩阵可以 ...
-
win7 设置自动关机
1.C:\Windows\System32\shutdown.exe 2. -s:表示关机: -r:表示重启: -t:表示时间,以秒为单位: -a:表示取消shutdown计划,即表示取消关机或重启命 ...
-
【转】引入android项目在eclipse ADT中显示中文乱码问题
(1)修改工作空间的编码方式:Window->Preferences->General->Workspace->Text file Encoding在Others里选择需要的编 ...
-
初识Selenium(四)
用Selenium实现页面自动化测试 引言 要不要做页面测试自动化的争议由来已久,不做或少做的主要原因是其成本太高,其中一个成本就是自动化脚本的编写和维护,那么有没有办法降低这种成本呢?童战同学在其博 ...
-
Java第二章 变量
1.什么是变量? 存储数据的基本单位. 2.数据类型分为: 基本类型和引用数据 3.基本数据类型和引用数据类型的区别: 基础数据:不同的变量会分配不同的存储空间,改变一个变量不会影响另一个变量 引用数 ...
-
简洁明了的插值音频重采样算法例子 (附完整C代码)
近一段时间在图像算法以及音频算法之间来回游走. 经常有一些需求,需要将音频进行采样转码处理. 现有的知名开源库,诸如: webrtc , sox等, 代码阅读起来实在闹心. 而音频重采样其实也就是插值 ...
-
如果去掉UITableView上的section的headerView和footerView的悬浮效果
项目需要cell的间距,又不需要悬浮效果,百度之后找到这个方法,记录一下,备忘. 用UIScrollView的代理方法实现 - (void)scrollViewDidScroll:(UIScrollV ...