💻 前言
导入数据这种脏活、累活,相信大家多多少少都有经历,常见的场景有:
- 同服务器从A表导数据到B表
- 批量导入新数据
- 批量新增或更新数据
- 跨服务器从A表导数据到B表
每种场景有自己的特点,我们一般会根据特点定制做导入数据优化,减少总体导入的耗时,或者避免数据库IO/CPU占用过高,而影响到其他正常业务。
FreeSql 有好几个实用功能,流式读取数据、查询并插入、批量对比更新、插入或修改(支持实体类或字典),用好这些功能可以很方便的实现各种导入数据场景。其实 FreeSql 对应的文档一直都有,只是内容介绍比较零散,这篇文章将针对数据导入详细介绍它们的使用方法,既然 FreeSql bug 少那就多优化一下文档吧!
本文讲解以上四种常见的数据导入实现,让使用者高效解决工作问题。如果你在使用其他更好的导入方案,欢迎加入讨论!
🌳 C#.NET ORM概念
.NET ORM Object Relational Mapping 是一种为了解决面向对象与关系数据库存在的互不匹配的现象的技术。FreeSql .NET ORM 支持 .NetFramework4.0+、.NetCore、Xamarin、MAUI、Blazor、以及还有说不出来的运行平台,因为代码绿色无依赖,支持新平台非常简单。目前单元测试数量:8500+,Nuget下载数量:1M+。使用最宽松的开源协议 MIT https://github.com/dotnetcore/FreeSql ,可以商用,文档齐全,甚至拿去卖钱也可以。
FreeSql 主要优势在于易用性上,基本是开箱即用,在不同数据库之间切换兼容性比较好,整体的功能特性如下:
- 支持 CodeFirst 对比结构变化迁移、DbFirst 从数据库生成实体类;
- 支持 丰富的表达式函数,独特的自定义解析;
- 支持 批量添加、批量更新、BulkCopy、导航属性,贪婪加载、延时加载、级联保存、级联删除;
- 支持 读写分离、分表分库,租户设计,分布式事务;
- 支持 MySql/SqlServer/PostgreSQL/Oracle/Sqlite/Firebird/达梦/神通/人大金仓/翰高/Clickhouse/MsAccess Ado.net 实现包,以及 Odbc 的专门实现包;
8000+个单元测试作为基调,支持10多数数据库,我们提供了通用Odbc理论上支持所有数据库,目前已知有群友使用 FreeSql 操作华为高斯、mycat、tidb 等数据库。安装时只需要选择对应的数据库实现包:
dotnet add packages FreeSql.Provider.Sqlite
static IFreeSql fsql = new FreeSql.FreeSqlBuilder()
.UseConnectionString(FreeSql.DataType.Sqlite, @"Data Source=db1.db")
.UseAutoSyncStructure(true) //自动同步实体结构到数据库
.UseNoneCommandParameter(true) //SQL不使用参数化,以便调试
.UseMonitorCommand(cmd => Console.WriteLine(cmd.CommandText)) //打印 SQL
.Build();
FreeSql 提供多种 CRUD 使用习惯,请根据实际情况选择团队合适的一种:
- 要么 FreeSql,原始用法;
- 要么 FreeSql.Repository,仓储+工作单元习惯;
- 要么 FreeSql.DbContext,很像 EFCore 的使用习惯;
- 要么 FreeSql.BaseEntity,充血模式;
- 要么 直接像 dapper 那样使用 DbConnection 扩展方法;
⚡ 场景一:同服务器从A表导数据到B表
导入数据,正常需要根据源数据量的大小来评估,评估过程需要在实践中慢慢积累,以便选择对应的导入方案。同服务器导入数据的方案有:
1、insert into A(field1, field2) select name, code from B where ...
fsql.Select<B>()
.Where(b => b.Time > DateTime.Parse("2022-08-01"))
.InsertInto("A", b => new { b.Name, b.Code });
如果数据源是多个表组成,也可以:
fsql.Select<B, C>()
.InnerJoin((b, c) => b.Id == c.Id)
.Where((b, c) => b.Time > DateTime.Parse("2022-08-01"))
.InsertInto("A", (b, c) => new { b.Name, c.Code });
2、分段插入
FreeSql 提供流式分段返回数据,防止读取的数据源量过多时占用内存,假设数据表有100W万记录,我们可以设置每次只返回 1000 条。提示:ToChunk 只执行了一次 SQL 查询。
fsql.Select<B>()
.Where(b => b.Time > DateTime.Parse("2022-08-01"))
.ToChunk(b => new A { field1 = b.Name, field2 = b.Code }, 1000, cb => {
fsql.Insert(cb.Object).NoneParameter().ExecuteAffrows();
//如果数据库支持 BulkCopy 可以调用对应的 API 方法,如 SqlServer:
//fsql.Insert(cb.Object).ExecuteSqlBulkCopy();
});
3、分页插入
利用分页多次读取,分页可能造成新旧数据问题,尽量设置好分页排序并记录好偏移量,确保重复问题。(不推荐)
var pageNumber = 1;
while (true)
{
var list = fsql.Select<B>()
.Where(b => b.Time > DateTime.Parse("2022-08-01"))
.Page(pageNumber++, 1000)
.OrderBy(b => b.Time)
.ToList(b => new A { field1 = b.Name, field2 = b.Code }, 1000);
if (list.Count == 0) break;
fsql.Insert(cb.Object).NoneParameter().ExecuteAffrows();
//如果数据库支持 BulkCopy 可以调用对应的 API 方法,如 SqlServer:
//fsql.Insert(cb.Object).ExecuteSqlBulkCopy();
//停顿5秒后再插入,这个值可以根据需要自己调
Thread.CurrentThread.Join(TimeSpan.FromSeconds(5));
}
📯 场景二:批量导入新数据
从 Excel/CVS 文件批量导入新数据,第一步需要将文件内容转换为 DataTable/List<T> c# 对象,这一步网上有很多工具类,在此就不讲解了。
此场景适合导入的数据是全新的、不存在于目标数据库表,假设我们都已经将 Excel/CVS 内容转换成为了 List<T>
1、利用无参数化插入
批量插入利用无参数化,会比参数化效率更高,注意参数化与SQL注入没有必然联系。
fsql.Insert(list).NoneParameter().ExecuteAffrows();
2、利用 BulkCopy 插入
BulkCopy 是大批量数据插入的最优方案,只可惜不是每种数据库都支持,FreeSql 支持了 SqlServer/Oracle/MySql/PostgreSQL/达梦 等数据库的 BulkCopy API,如果是其他数据库建议使用无参数化插入。
fsql.Insert(list).ExecuteSqlBulkCopy(); //SqlServer
fsql.Insert(list).ExecuteOracleBulkCopy(); //Oracle
fsql.Insert(list).ExecuteMySqlBulkCopy(); //MySql
fsql.Insert(list).ExecutePgCopy(); //PostgreSQL
fsql.Insert(list).ExecuteDmBulkCopy(); //达梦
为什么不统一 API?
目前每种数据库驱动的 BulkCopy API 参数不一致,这些参数可以针对性的进行调优。
🚀 场景三:批量新增或更新数据
相比场景二,场景三会更麻烦,因为不是简单的追加数据,还要处理历史数据的更新,甚至对历史数据存在则忽略。正因为复杂才衍生出了更多的方案,每种方案都有优缺点,需要使用者根据自身实际情况选择最适合的一种方法。
同上,我们假设已经将 Excel/CVS 内容转换成为了 List<T>
1、内存循环 list 查询判断(不推荐)
foreach (var item in list)
{
if (fsql.Select<T>(item).Any() == false)
fsql.Insert(item).ExecuteAffrows();
else
fsql.Update<T>().SetSource(item).ExecuteAffrows();
}
这种方式实在不推荐作为批量操作,性能非常低。其实 FreeSql.Repository 提供了上述的封装:
var repo = fsql.GetRepository<T>();
foreach (var item in list)
repo.InsertOrUpdate(item);
2、利用数据库 MERGE INTO 特性
IFreeSql 定义了 InsertOrUpdate 方法实现添加或修改的功能,利用数据库特性:
Database | Features | Database | Features | |
---|---|---|---|---|
MySql | on duplicate key update | 达梦 | merge into | |
PostgreSQL | on conflict do update | 人大金仓 | on conflict do update | |
SqlServer | merge into | 神通 | merge into | |
Oracle | merge into | 南大通用 | merge into | |
Sqlite | replace into | MsAccess | 不支持 | |
Firebird | merge into |
fsql.InsertOrUpdate<T>()
.SetSource(list) //需要操作的数据
//.IfExistsDoNothing() //如果数据存在,啥事也不干(相当于只有不存在数据时才插入)
.ExecuteAffrows();
//或者..
var sql = fsql.Select<T2, T3>()
.ToSql((a, b) => new
{
id = a.id + 1,
name = "xxx"
}, FieldAliasOptions.AsProperty);
fsql.InsertOrUpdate<T>()
.SetSource(sql)
.ExecuteAffrows();
fsql.InsertOrUpdateDict 方法可针对非实体类操作(字典类型)
题外话,是否见过这种 SQL:
insert into T
select name, code
from dual
where not exists(
select 1 from T where code = dual.code
)
3、利用 BeginEdit 批量编辑
MERGE INTO 数据库特性,其实性能是很低的。800万行记录导入7000行大约7秒,各数据库性能差不多。
BeginEdit 是 FreeSql 特色功能之一,非常实用,可它却是少有被发掘到的功能。创意源自于医疗软件,比如操作员编辑清单,会新增,会删除,会修改,等操作完后再点保存。
其实我过往开发的项目也有过类似需求,每步操作都进行数据库保存,没什么问题吧?让我们看下最后统一保存应该怎么做。
//将 list 返回给 UI 端渲染
var list = fsql.Select<T>().Where(a => a.OrderId == 100).ToList();
//统一保存
//旧数据可通过查询,或者由 UI 端提供
List<T> oldlist = fsql.Select<T>().Where(a => a.OrderId == 100).ToList();
//新数据由 UI 端提供
List<T> newlist = ...;
var repo = fsql.GetRepository<T>();
repo.BeginEdit(oldlist); //开始进行编辑
repo.EndEdit(newlist); //对比新旧List
BeginEdit/EndEdit 只针对 oldlist 对比,而不是针对全表。在内存中对比计算 Insert/Update/Delete 比 MERGE INTO 性能快得多,利用该功能可以很方便的实现批量导入或更新,例如重复导入一天的数据。
应当注意当导入的数据量过大时,应该分批进行操作,而不是一次性对比全部,假设我们每批执行 1000条:
//查询旧数据
var oldlist = fsql.Select<T>().WhereDynamic(list1000).ToList();
//使用 IN 查询性能可能较慢,可以按时间范围查询,如下:
//var minTime = list1000.Select(a => a.Time).Min();
//var maxTime = list1000.Select(a => a.Time).Max();
//var oldlist = fsql.Select<T>().Where(a=> a.Time.Between(minTime, maxTime)).ToList();
//在内存二次过滤,还可以进一步优化将 list1000.Any 改成字典
//oldlist = oldlist.Where(a=> !list1000.Any(b => b.Id == a.Id)).ToList();
var repo = fsql.GetRepository<T>();
//被编辑的数据
repo.BeginEdit(oldlist);
//将 list1000 与 oldlist 进行对比,计算出 delete/insert/update 语句执行
repo.EndEdit(list1000);
EndEdit 最多执行 3条 SQL,从而大大提升了命令往返时间消耗。特别适合导入大批量数据,且大部分数据已经存在,或者数据未发生变更的场景。
4、MySql insert ignore into
如果只插入不存的的数据,并且使用 MySql 数据库,可以使用以下方式:
fsql.Insert<T>().MySqlIgnoreInto().AppendData(list).NoneParameter().ExecuteAffrows();
///INSERT IGNORE INTO `T`(...)
//VALUES(...),(...),(...)
🌌 场景四:跨服务器从A表导数据到B表
与场景一类似,跨服务器需要定义多个 IFreeSql 对象,假设 A表所在服务器访问对象是 fsql1,B表使用 fsql2
1、分段插入
fsql2.Select<B>()
.Where(b => b.Time > DateTime.Parse("2022-08-01"))
.ToChunk(b => new A { field1 = b.Name, field2 = b.Code }, 1000, cb => {
fsql1.Insert(cb.Object).NoneParameter().ExecuteAffrows();
//如果数据库支持 BulkCopy 可以调用对应的 API 方法,如 SqlServer:
//fsql1.Insert(cb.Object).ExecuteSqlBulkCopy();
//这里也可以使用 BeginEdit 进行批量编辑功能,解决数据重复问题
});
2、分页插入
利用分页多次读取,分页可能造成新旧数据问题,尽量设置好分页排序并记录好偏移量,确保重复问题。(不推荐)
var pageNumber = 1;
while (true)
{
var list = fsql2.Select<B>()
.Where(b => b.Time > DateTime.Parse("2022-08-01"))
.Page(pageNumber++, 1000)
.OrderBy(b => b.Time)
.ToList(b => new A { field1 = b.Name, field2 = b.Code }, 1000);
if (list.Count == 0) break;
fsql1.Insert(cb.Object).NoneParameter().ExecuteAffrows();
//如果数据库支持 BulkCopy 可以调用对应的 API 方法,如 SqlServer:
//fsql1.Insert(cb.Object).ExecuteSqlBulkCopy();
//这里也可以使用 BeginEdit 进行批量编辑功能,解决数据重复问题
//停顿5秒后再插入,这个值可以根据需要自己调
Thread.CurrentThread.Join(TimeSpan.FromSeconds(5));
}
⛳ 结束语
FreeSql 的功能强大,以及稳定性,我不想吹牛,也不喜欢吹牛,如果大家有什么好的想法可以一起讨论,毕竟我们只能遇到有限的场景,还有很多我不知道的场景需求不是吗?
希望这篇文章能帮助大家轻松理解并熟练掌握它,快速解决工作中遇到的数据导入问题,为企业的项目研发贡献力量。
开源地址:https://github.com/dotnetcore/FreeSql
作者是什么人?
作者是一个入行 18年的老批,他目前写的.net 开源项目有:
开源项目 | 描述 | 开源地址 | 开源协议 |
---|---|---|---|
FreeIM | 聊天系统架构 | https://github.com/2881099/FreeIM | MIT |
FreeRedis | Redis SDK | https://github.com/2881099/FreeRedis | MIT |
csredis | https://github.com/2881099/csredis | MIT | |
FightLandlord | 斗DI主网络版 | https://github.com/2881099/FightLandlord | 学习用途 |
FreeScheduler | 定时任务 | https://github.com/2881099/FreeScheduler | MIT |
IdleBus | 空闲容器 | https://github.com/2881099/IdleBus | MIT |
FreeSql | ORM | https://github.com/dotnetcore/FreeSql | MIT |
FreeSql.Cloud | 分布式tcc/saga | https://github.com/2881099/FreeSql.Cloud | MIT |
FreeSql.AdminLTE | 低代码后台生成 | https://github.com/2881099/FreeSql.AdminLTE | MIT |
FreeSql.DynamicProxy | 动态代理 | https://github.com/2881099/FreeSql.DynamicProxy | 学习用途 |
需要的请拿走,这些都是最近几年的开源作品,以前更早写的就不发了。
网友评论