并行不适用于Entity Framework

时间:2022-08-06 13:54:10

I have a list of IDs, and I need to run several stored procedures on each ID.

我有一个ID列表,我需要在每个ID上运行几个存储过程。

When I am using a standard foreach loop, it works OK, but when I have many records, it works pretty slow.

当我使用标准的foreach循环时,它工作正常,但是当我有很多记录时,它的工作速度很慢。

I wanted to convert the code to work with EF, but I am getting an exception: "The underlying provider failed on Open".

我想将代码转换为与EF一起使用,但我得到一个例外:“底层提供程序在Open上失败”。

I am using this code, inside the Parallel.ForEach:

我在Parallel.ForEach中使用此代码:

using (XmlEntities osContext = new XmlEntities())
{
    //The code
}

But it still throws the exception.

但它仍然抛出异常。

Any idea how can I use Parallel with EF? do I need to create a new context for every procedure I am running? I have around 10 procedures, so I think its very bad to create 10 contexts, one for each.

任何想法我如何使用与EF并行?我是否需要为我正在运行的每个程序创建一个新的上下文?我有大约10个程序,所以我认为创建10个上下文非常糟糕,每个上下文一个。

4 个解决方案

#1


32  

The underlying database connections that the Entity Framework are using are not thread-safe. You will need to create a new context for each operation on another thread that you're going to perform.

实体框架使用的基础数据库连接不是线程安全的。您需要为要执行的另一个线程上的每个操作创建一个新上下文。

Your concern about how to parallelize the operation is a valid one; that many contexts are going to be expensive to open and close.

您对如何并行化操作的关注是有效的;打开和关闭的许多背景都很昂贵。

Instead, you might want to invert how your thinking about parallelizing the code. It seems you're looping over a number of items and then calling the stored procedures in serial for each item.

相反,您可能想要反思您对并行化代码的想法。看起来你正在循环遍历许多项目,然后为每个项目串行调用存储过程。

If you can, create a new Task<TResult> (or Task, if you don't need a result) for each procedure and then in that Task<TResult>, open a single context, loop through all of the items, and then execute the stored procedure. This way, you only have a number of contexts equal to the number of stored procedures that you are running in parallel.

如果可以的话,为每个过程创建一个新的Task (或任务,如果你不需要结果),然后在那个Task 中,打开一个上下文,循环遍历所有项目,然后执行存储过程。这样,您只有许多上下文等于并行运行的存储过程的数量。

Let's assume you have a MyDbContext with two stored procedures, DoSomething1 and DoSomething2, both of which take an instance of a class, MyItem.

假设你有一个带有两个存储过程的DoDbContext,DoSomething1和DoSomething2,它们都是一个类MyItem的实例。

Implementing the above would look something like:

实现上述内容将类似于:

// You'd probably want to materialize this into an IList<T> to avoid
// warnings about multiple iterations of an IEnumerable<T>.
// You definitely *don't* want this to be an IQueryable<T>
// returned from a context.
IEnumerable<MyItem> items = ...;

// The first stored procedure is called here.
Task t1 = Task.Run(() => { 
    // Create the context.
    using (var ctx = new MyDbContext())
    // Cycle through each item.
    foreach (MyItem item in items)
    {
        // Call the first stored procedure.
        // You'd of course, have to do something with item here.
        ctx.DoSomething1(item);
    }
});

// The second stored procedure is called here.
Task t2 = Task.Run(() => { 
    // Create the context.
    using (var ctx = new MyDbContext())
    // Cycle through each item.
    foreach (MyItem item in items)
    {
        // Call the first stored procedure.
        // You'd of course, have to do something with item here.
        ctx.DoSomething2(item);
    }
});

// Do something when both of the tasks are done.

If you can't execute the stored procedures in parallel (each one is dependent on being run in a certain order), then you can still parallelize your operations, it's just a little more complex.

如果你不能并行执行存储过程(每个存储过程都依赖于按特定顺序运行),那么你仍然可以并行化你的操作,它只是稍微复杂一些。

You would look at creating custom partitions across your items (using the static Create method on the Partitioner class). This will give you the means to get IEnumerator<T> implementations (note, this is not IEnumerable<T> so you can't foreach over it).

您将看到在项目中创建自定义分区(使用Partitioner类上的静态Create方法)。这将为您提供获取IEnumerator 实现的方法(注意,这不是IEnumerable ,因此您无法预测它)。

For each IEnumerator<T> instance you get back, you'd create a new Task<TResult> (if you need a result), and in the Task<TResult> body, you would create the context and then cycle through the items returned by the IEnumerator<T>, calling the stored procedures in order.

对于你回来的每个IEnumerator 实例,你将创建一个新的Task (如果你需要一个结果),并在Task 主体中,你将创建上下文然后循环返回的项目通过IEnumerator ,按顺序调用存储过程。

That would look like this:

这看起来像这样:

// Get the partitioner.
OrdinalPartitioner<MyItem> partitioner = Partitioner.Create(items);

// Get the partitions.
// You'll have to set the parameter for the number of partitions here.
// See the link for creating custom partitions for more
// creation strategies.
IList<IEnumerator<MyItem>> paritions = partitioner.GetPartitions(
    Environment.ProcessorCount);

// Create a task for each partition.
Task[] tasks = partitions.Select(p => Task.Run(() => { 
        // Create the context.
        using (var ctx = new MyDbContext())
        // Remember, the IEnumerator<T> implementation
        // might implement IDisposable.
        using (p)
        // While there are items in p.
        while (p.MoveNext())
        {
            // Get the current item.
            MyItem current = p.Current;

            // Call the stored procedures.  Process the item
            ctx.DoSomething1(current);
            ctx.DoSomething2(current);
        }
    })).
    // ToArray is needed (or something to materialize the list) to
    // avoid deferred execution.
    ToArray();

#2


5  

EF is not thread safe, so you cannot use Parallel.

EF不是线程安全的,因此您不能使用Parallel。

Take a look at Entity Framework and Multi threading

看一下Entity Framework和Multi threading

and this article.

和这篇文章。

#3


2  

This is what I use and works great. It additionally supports handling of the error exceptions and has a debug mode which makes it far easier to track things down

这是我使用和工作得很好的。它还支持处理错误异常,并具有调试模式,可以更容易地跟踪事物

public static ConcurrentQueue<Exception> Parallel<T>(this IEnumerable<T> items, Action<T> action, int? parallelCount = null, bool debugMode = false)
{
    var exceptions = new ConcurrentQueue<Exception>();
    if (debugMode)
    {
        foreach (var item in items)
        {
            try
            {
                action(item);
            }
            // Store the exception and continue with the loop.                     
            catch (Exception e)
            {
                exceptions.Enqueue(e);
            }
        }
    }
    else
    {
        var partitions = Partitioner.Create(items).GetPartitions(parallelCount ?? Environment.ProcessorCount).Select(partition => Task.Factory.StartNew(() =>
        {
            while (partition.MoveNext())
            {
                try
                {
                    action(partition.Current);
                }
                // Store the exception and continue with the loop.                     
                catch (Exception e)
                {
                    exceptions.Enqueue(e);
                }
            }
        }));
        Task.WaitAll(partitions.ToArray());
    }
    return exceptions;
}

You use it like the following where as db is the original DbContext and db.CreateInstance() creates a new instance using the same connection string.

您可以像下面那样使用它,因为db是原始DbContext,db.CreateInstance()使用相同的连接字符串创建新实例。

        var batch = db.Set<SomeListToIterate>().ToList();
        var exceptions = batch.Parallel((item) =>
        {
            using (var batchDb = db.CreateInstance())
            {
                var batchTime = batchDb.GetDBTime();
                var someData = batchDb.Set<Permission>().Where(x=>x.ID = item.ID).ToList();
                //do stuff to someData
                item.WasMigrated = true; //note that this record is attached to db not batchDb and will only be saved when db.SaveChanges() is called
                batchDb.SaveChanges();        
            }                
        });
        if (exceptions.Count > 0)
        {
            logger.Error("ContactRecordMigration : Content: Error processing one or more records", new AggregateException(exceptions));
            throw new AggregateException(exceptions); //optionally throw an exception
        }
        db.SaveChanges(); //save the item modifications

#4


0  

It's a bit difficult to troubleshoot this one without knowing what the inner exception result is, if any. This could very simply be a problem with the way that the connection string or provider configuration is set up.

在不知道内部异常结果是什么(如果有的话)的情况下对这个进行故障排除有点困难。这可能只是设置连接字符串或提供程序配置的方式的问题。

In general, you have to be careful with parallel code and EF. What you're doing -should- work, however. One question in my mind; Is any work being done on another instance of that context before the parallel? According to your post, you're doing a separate context in each thread. That's good. Part of me wonders however if there's some interesting constructor contention going on between the multiple contexts. If you aren't using that context anywhere before that parallel call, I would suggest trying to run even a simple query against the context to open it and make sure all of the EF bits are fired up before running the parallel method. I'll admit, I have not tried exactly what you did here, but I've done close and it's worked.

通常,您必须小心并行代码和EF。但是,你正在做什么 - 应该工作。我心中有一个问题;是否在并行之前在该上下文的另一个实例上完成了任何工作?根据你的帖子,你在每个帖子中都有一个单独的上下文。那很好。然而,我的一部分想知道在多个上下文之间是否存在一些有趣的构造函数争用。如果在并行调用之前没有在任何地方使用该上下文,我建议尝试对上下文运行一个简单的查询来打开它,并确保在运行并行方法之前激活所有EF位。我承认,我还没有尝试过你在这里所做的事,但我已经做得非常接近并且有效。

#1


32  

The underlying database connections that the Entity Framework are using are not thread-safe. You will need to create a new context for each operation on another thread that you're going to perform.

实体框架使用的基础数据库连接不是线程安全的。您需要为要执行的另一个线程上的每个操作创建一个新上下文。

Your concern about how to parallelize the operation is a valid one; that many contexts are going to be expensive to open and close.

您对如何并行化操作的关注是有效的;打开和关闭的许多背景都很昂贵。

Instead, you might want to invert how your thinking about parallelizing the code. It seems you're looping over a number of items and then calling the stored procedures in serial for each item.

相反,您可能想要反思您对并行化代码的想法。看起来你正在循环遍历许多项目,然后为每个项目串行调用存储过程。

If you can, create a new Task<TResult> (or Task, if you don't need a result) for each procedure and then in that Task<TResult>, open a single context, loop through all of the items, and then execute the stored procedure. This way, you only have a number of contexts equal to the number of stored procedures that you are running in parallel.

如果可以的话,为每个过程创建一个新的Task (或任务,如果你不需要结果),然后在那个Task 中,打开一个上下文,循环遍历所有项目,然后执行存储过程。这样,您只有许多上下文等于并行运行的存储过程的数量。

Let's assume you have a MyDbContext with two stored procedures, DoSomething1 and DoSomething2, both of which take an instance of a class, MyItem.

假设你有一个带有两个存储过程的DoDbContext,DoSomething1和DoSomething2,它们都是一个类MyItem的实例。

Implementing the above would look something like:

实现上述内容将类似于:

// You'd probably want to materialize this into an IList<T> to avoid
// warnings about multiple iterations of an IEnumerable<T>.
// You definitely *don't* want this to be an IQueryable<T>
// returned from a context.
IEnumerable<MyItem> items = ...;

// The first stored procedure is called here.
Task t1 = Task.Run(() => { 
    // Create the context.
    using (var ctx = new MyDbContext())
    // Cycle through each item.
    foreach (MyItem item in items)
    {
        // Call the first stored procedure.
        // You'd of course, have to do something with item here.
        ctx.DoSomething1(item);
    }
});

// The second stored procedure is called here.
Task t2 = Task.Run(() => { 
    // Create the context.
    using (var ctx = new MyDbContext())
    // Cycle through each item.
    foreach (MyItem item in items)
    {
        // Call the first stored procedure.
        // You'd of course, have to do something with item here.
        ctx.DoSomething2(item);
    }
});

// Do something when both of the tasks are done.

If you can't execute the stored procedures in parallel (each one is dependent on being run in a certain order), then you can still parallelize your operations, it's just a little more complex.

如果你不能并行执行存储过程(每个存储过程都依赖于按特定顺序运行),那么你仍然可以并行化你的操作,它只是稍微复杂一些。

You would look at creating custom partitions across your items (using the static Create method on the Partitioner class). This will give you the means to get IEnumerator<T> implementations (note, this is not IEnumerable<T> so you can't foreach over it).

您将看到在项目中创建自定义分区(使用Partitioner类上的静态Create方法)。这将为您提供获取IEnumerator 实现的方法(注意,这不是IEnumerable ,因此您无法预测它)。

For each IEnumerator<T> instance you get back, you'd create a new Task<TResult> (if you need a result), and in the Task<TResult> body, you would create the context and then cycle through the items returned by the IEnumerator<T>, calling the stored procedures in order.

对于你回来的每个IEnumerator 实例,你将创建一个新的Task (如果你需要一个结果),并在Task 主体中,你将创建上下文然后循环返回的项目通过IEnumerator ,按顺序调用存储过程。

That would look like this:

这看起来像这样:

// Get the partitioner.
OrdinalPartitioner<MyItem> partitioner = Partitioner.Create(items);

// Get the partitions.
// You'll have to set the parameter for the number of partitions here.
// See the link for creating custom partitions for more
// creation strategies.
IList<IEnumerator<MyItem>> paritions = partitioner.GetPartitions(
    Environment.ProcessorCount);

// Create a task for each partition.
Task[] tasks = partitions.Select(p => Task.Run(() => { 
        // Create the context.
        using (var ctx = new MyDbContext())
        // Remember, the IEnumerator<T> implementation
        // might implement IDisposable.
        using (p)
        // While there are items in p.
        while (p.MoveNext())
        {
            // Get the current item.
            MyItem current = p.Current;

            // Call the stored procedures.  Process the item
            ctx.DoSomething1(current);
            ctx.DoSomething2(current);
        }
    })).
    // ToArray is needed (or something to materialize the list) to
    // avoid deferred execution.
    ToArray();

#2


5  

EF is not thread safe, so you cannot use Parallel.

EF不是线程安全的,因此您不能使用Parallel。

Take a look at Entity Framework and Multi threading

看一下Entity Framework和Multi threading

and this article.

和这篇文章。

#3


2  

This is what I use and works great. It additionally supports handling of the error exceptions and has a debug mode which makes it far easier to track things down

这是我使用和工作得很好的。它还支持处理错误异常,并具有调试模式,可以更容易地跟踪事物

public static ConcurrentQueue<Exception> Parallel<T>(this IEnumerable<T> items, Action<T> action, int? parallelCount = null, bool debugMode = false)
{
    var exceptions = new ConcurrentQueue<Exception>();
    if (debugMode)
    {
        foreach (var item in items)
        {
            try
            {
                action(item);
            }
            // Store the exception and continue with the loop.                     
            catch (Exception e)
            {
                exceptions.Enqueue(e);
            }
        }
    }
    else
    {
        var partitions = Partitioner.Create(items).GetPartitions(parallelCount ?? Environment.ProcessorCount).Select(partition => Task.Factory.StartNew(() =>
        {
            while (partition.MoveNext())
            {
                try
                {
                    action(partition.Current);
                }
                // Store the exception and continue with the loop.                     
                catch (Exception e)
                {
                    exceptions.Enqueue(e);
                }
            }
        }));
        Task.WaitAll(partitions.ToArray());
    }
    return exceptions;
}

You use it like the following where as db is the original DbContext and db.CreateInstance() creates a new instance using the same connection string.

您可以像下面那样使用它,因为db是原始DbContext,db.CreateInstance()使用相同的连接字符串创建新实例。

        var batch = db.Set<SomeListToIterate>().ToList();
        var exceptions = batch.Parallel((item) =>
        {
            using (var batchDb = db.CreateInstance())
            {
                var batchTime = batchDb.GetDBTime();
                var someData = batchDb.Set<Permission>().Where(x=>x.ID = item.ID).ToList();
                //do stuff to someData
                item.WasMigrated = true; //note that this record is attached to db not batchDb and will only be saved when db.SaveChanges() is called
                batchDb.SaveChanges();        
            }                
        });
        if (exceptions.Count > 0)
        {
            logger.Error("ContactRecordMigration : Content: Error processing one or more records", new AggregateException(exceptions));
            throw new AggregateException(exceptions); //optionally throw an exception
        }
        db.SaveChanges(); //save the item modifications

#4


0  

It's a bit difficult to troubleshoot this one without knowing what the inner exception result is, if any. This could very simply be a problem with the way that the connection string or provider configuration is set up.

在不知道内部异常结果是什么(如果有的话)的情况下对这个进行故障排除有点困难。这可能只是设置连接字符串或提供程序配置的方式的问题。

In general, you have to be careful with parallel code and EF. What you're doing -should- work, however. One question in my mind; Is any work being done on another instance of that context before the parallel? According to your post, you're doing a separate context in each thread. That's good. Part of me wonders however if there's some interesting constructor contention going on between the multiple contexts. If you aren't using that context anywhere before that parallel call, I would suggest trying to run even a simple query against the context to open it and make sure all of the EF bits are fired up before running the parallel method. I'll admit, I have not tried exactly what you did here, but I've done close and it's worked.

通常,您必须小心并行代码和EF。但是,你正在做什么 - 应该工作。我心中有一个问题;是否在并行之前在该上下文的另一个实例上完成了任何工作?根据你的帖子,你在每个帖子中都有一个单独的上下文。那很好。然而,我的一部分想知道在多个上下文之间是否存在一些有趣的构造函数争用。如果在并行调用之前没有在任何地方使用该上下文,我建议尝试对上下文运行一个简单的查询来打开它,并确保在运行并行方法之前激活所有EF位。我承认,我还没有尝试过你在这里所做的事,但我已经做得非常接近并且有效。