项目前期使用mysql数据库,大约每天200w数据量,十天1500w数据量之后,读取写入都会很慢,而且经常锁表,后来采用vertica数据库,下面分享vertica数据库使用方法,以及大批量数据快速写入数据库的方法:
1,数据库IDE的配置
windows系统下需先安装jdk,然后才能使用IDE,可以使用dbvis_windows-x64_9_1_9,一个数据库统一客户端工具,网上搜索下载。
可能会出现需要加载jar包才能使用的情况,选择软件目录\jdbc\vertica.jar包即可。
2,c#里如何使用
下载类库:https://my.vertica.com/download/vertica/client-drivers/
项目中引用:
代码:
VerticaConnection conn = new VerticaConnection(ConfigurationManager.AppSettings["VerticalTest"]);
{
conn.Open();
VerticaTransaction txn = conn.BeginTransaction();
VerticaCommand cmd = new VerticaCommand();
cmd.Connection = conn;
cmd.Transaction = txn;
string deleteSourceData = @"Delete from " + strTableName + " where sampledate = " + time.ToString("yyyyMMdd") + " and city='" + city + "'";
cmd.CommandText = deleteSourceData;
cmd.CommandTimeout = ;
cmd.ExecuteNonQuery();
}
3,大量数据快速批量插入vertica数据库方法
sqlserver中常见插入方法:
insert into table1(a,b) valuse ('Cust1', 'Smith Company'),('Cust2', 'Perform Company')
vertica里不支持这种写法,但是支持如下:
insert into table1(a,b) select a,b,c from table2 union select 'Cust1', 'Smith Company'
这种写法速度并不快,更快速的方法是使用copy,copy可以从txt文件,也可以是内存流:
1)从txt: copy 表名 from '/indexdata/tmp/conv_ebc_esf_housedimension.txt' DELIMITER E'\t' ESCAPE AS '\' ENCLOSED BY '"' DIRECT EXCEPTIONS '/indexdata/tmp/housedimension.log';
第一个txt,是数据来源,可以把数据写入txt,列之间用tab分隔,行之间是回车,此种写法未经过代码验证。
2) 从内存流 :
思路就是:取出大量数据,写入MemoryStream内存流,加载进, copy语句使用语法:
copy {0}{1} from stdin record terminator E'{2}' delimiter E'{3}' enforcelength no commit
0:表名 ,1:列名,2:列之间分隔符'\n' ,3:行之间分隔符'\t'
stdin 就是指输入缓冲区
具体代码:
/// <summary>
/// 批量插入数据到vertica数据库
/// </summary>
/// <param name="dt">数据源</param>
/// <param name="strTableName">插入的目标表名</param>
/// <param name="time">日期(删除数据使用)</param>
/// <param name="city">城市(删除数据使用)</param>
public static void BulkCopy(DataTable dt, string strTableName, DateTime time,string city)
{
if (dt == null || dt.Rows.Count == )
{
return;
}
if (dt.Columns.Count == )
{
throw new Exception("The length of column cannot be zero.");
}
//从datatable中获取列名
List<string> lstField = new List<string>();
for (int colIndex = ; colIndex < dt.Columns.Count; colIndex++)
{
lstField.Add(dt.Columns[colIndex].ColumnName);
}
string strFiledList = string.Format("({0})", string.Join(",", lstField.ToArray()));
//拼写copy语句
const char RowSplit = '\n';
const char ColSplit = '\t'; string strCopyStatement = string.Format("copy {0}{1} from stdin record terminator E'{2}' delimiter E'{3}' enforcelength no commit",
strTableName, strFiledList, RowSplit, ColSplit);
//按照copy语句中的分隔符,分隔数据源
StringBuilder sbText = new StringBuilder();
foreach (DataRow dr in dt.Rows)
{
bool bFirstField = true;
for (int colIndex = ; colIndex < dt.Columns.Count; colIndex++)
{
string strVal = GetDataString(dr, colIndex);
if (bFirstField)
{
sbText.Append(strVal);
bFirstField = false;
}
else
{
sbText.AppendFormat("{0}{1}", ColSplit, strVal);
}
}
sbText.Append(RowSplit);
}
//数据源写入内存流
string strTemp = sbText.ToString();
byte[] buff = Encoding.UTF8.GetBytes(strTemp);
using (MemoryStream ms = new MemoryStream())
{
ms.Write(buff, , buff.Length);
ms.Flush();
ms.Position = ;
//建立vertica数据库连接
VerticaConnection conn = new VerticaConnection(ConfigurationManager.AppSettings["VerticalTest"]);
{
conn.Open();
VerticaTransaction txn = conn.BeginTransaction();
Vertica.Data.VerticaClient.VerticaCopyStream vcs = new VerticaCopyStream(conn, strCopyStatement);
//插入数据前,先删除重复数据
VerticaCommand cmd = new VerticaCommand();
cmd.Connection = conn;
cmd.Transaction = txn;
string deleteSourceData = @"Delete from " + strTableName + " where sampledate = " + time.ToString("yyyyMMdd") + " and city='" + city + "'";
cmd.CommandText = deleteSourceData;
cmd.CommandTimeout = ;
cmd.ExecuteNonQuery();
//批量插入数据
vcs.Start();
vcs.AddStream(ms, false);
vcs.Execute(); long insertedCount = vcs.Finish(); IList<long> lstRejected = vcs.Rejects;
if (lstRejected.Count > )
{
txn.Rollback();
conn.Close(); // Maybe need more detail info to show
throw new Exception("Bulk copy failure.");
}
else
{
txn.Commit();
conn.Close();
}
} ms.Close();
}
}