多个线程将其结果填入一个DataTable C#

时间:2021-07-09 00:40:05

I'm just beginning to learn the concept of threading, and I'm kind of stuck at this one problem, its driving me crazy....

我刚刚开始学习线程的概念,我有点困在这个问题,它让我疯狂......

What I actually need to accomplish -

我真正需要完成的事情 -

I have some 300 text files in a local directory, that need to be parsed for specific values... After I find these "values" in each text file, I need to store them in a database.. So I followed the plain approach of accessing each text file in the directory - parsing and updating the resulting values as a row to a local DataTable, and when I'm done parsing all the files and storing 300 rows to the DataTable, I would do a SQLBulkCopy of DataTable to my database. This approach works fine except that it takes me about 10 minutes to run my code!

我在本地目录中有大约300个文本文件,需要针对特定​​值进行解析...在每个文本文件中找到这些“值”后,我需要将它们存储在数据库中。所以我遵循了简单的方法访问目录中的每个文本文件 - 解析并将结果值作为一行更新到本地DataTable,当我完成解析所有文件并将300行存储到DataTable时,我会对我的DataTable执行SQLBulkCopy数据库。这种方法工作正常,但运行我的代码需要大约10分钟!

What I'm attempting to do now -

我现在想做什么 -

Create a new thread for each file and keep the thread count below 4 at any given time... then each thread would parse through the file and return a row to update the local DataTable

为每个文件创建一个新线程,并在任何给定时间将线程数保持在4以下...然后每个线程将解析整个文件并返回一行以更新本地DataTable

Where I'm stuck - I don't understand how to update this single Datatable that gets rows from multiple threads...

我被困的地方 - 我不明白如何更新从多个线程获取行的单个数据表...

Quite an explanation isn't it.. hope some one here can suggest a good idea for this...

相当一个解释不是..希望这里有人可以为此提出一个好主意......

Thanks, Nidhi

5 个解决方案

#1


This will be much easier if you just let each of your four threads write to the database themselves. In this scenario you don't have to worry about threading (except for what files each thread works on) as each worker thread could maintain their own datatable and consume 25% of the files.

如果您让四个线程中的每个线程自己写入数据库,这将更容易。在这种情况下,您不必担心线程(每个线程处理的文件除外),因为每个工作线程可以维护自己的数据表并占用25%的文件。

Alternatively, you can have a single datatable that all the threads use--just make sure to wrap accesses to it with a lock like so:

或者,您可以拥有一个所有线程都使用的数据表 - 只需确保使用锁定来包装对它的访问:

lock(YourTable.Rows.SyncRoot){
  // add rows to table
}

Of course this is all moot if the bottleneck is the disk, as @David B notes.

当然,如果瓶颈是磁盘,这就完全没有了,正如@David B所说。

#2


As was somewhat pointed out, you need to examine exactly where your bottleneck is and why you're using threading.

正如有人指出的那样,您需要准确检查瓶颈的位置以及使用线程的原因。

By moving to multiple threads, you do have a potential for increased performance. However, if you're updating the same DataTable with each thread, you're limited by the DataTable. Only one thread can write to the DataTable at one time (which you control with a lock), so you're still fundamentally processing in sequence.

通过转移到多个线程,您确实有可能提高性能。但是,如果您使用每个线程更新相同的DataTable,则受DataTable的限制。只有一个线程可以一次写入DataTable(您使用锁控制),因此您仍然可以从根本上按顺序处理。

On the other hand, most databases are designed for multiple connections, running on multiple threads, and have been highly tuned for that purpose. If you want to still use multiple threads: let each thread have its own connection to the database, and do its own processing.

另一方面,大多数数据库设计用于多个连接,在多个线程上运行,并且已经为此目的进行了高度调整。如果您仍想使用多个线程:让每个线程都有自己的数据库连接,并进行自己的处理。

Now, depending on the kind of processing going on, your bottleneck may be in opening and processing the file, and not in the database update.

现在,根据正在进行的处理类型,您的瓶颈可能在于打开和处理文件,而不是在数据库更新中。

One way to split things up:

分裂的一种方法:

  1. Put all the file names to be processed into a filename Queue.
  2. 将要处理的所有文件名放入文件名Queue中。

  3. Create a thread (or threads) to pull an item off the filename Queue, open and parse and process the file, and push the results into a result Queue.
  4. 创建一个线程(或多个线程)以从文件名Queue中提取项目,打开并解析并处理该文件,并将结果推送到结果队列中。

  5. Have another thread take the results from the result Queue, and insert them into the database.
  6. 让另一个线程从结果Queue中获取结果,并将它们插入数据库。

These can run simultaneously... the database won't be updated until there's something to update, and will just wait in the meantime.

这些可以同时运行...数据库将不会更新,直到有更新的东西,并将在此期间等待。

This approach lets you really know who is waiting on whom. If the read/process file part is slow, create more threads to do that. If the insert into database part is slow, create more threads to do that. The queues just need to be synchronized.

这种方法让您真正知道谁在等谁。如果读取/处理文件部分很慢,请创建更多线程来执行此操作。如果插入数据库部分很慢,请创建更多线程来执行此操作。队列只需要同步。

So, pseudocode:

Queue<string> _filesToProcess = new Queue<string>();
Queue<string> _results = new Queue<string>();
Thread _fileProcessingThread = new Thread( ProcessFiles );
Thread _databaseUpdatingThread = new Thread( UpdateDatabase );
bool _finished = false;

static void Main()
{
    foreach( string fileName in GetFileNamesToProcess() )
    {
       _filesToProcess.Enqueue( fileName );
    }

    _fileProcessingThread.Start();
    _databaseUpdatingThread.Start();

    // if we want to wait until they're both finished
    _fileProcessingThread.Join();
    _databaseUpdatingThread.Join();

    Console.WriteLine( "Done" );
}

void ProcessFiles()
{
   bool filesLeft = true;

   lock( _filesToProcess ){ filesLeft = _filesToProcess.Count() > 0; }

   while( filesLeft )
   {
      string fileToProcess;
      lock( _filesToProcess ){ fileToProcess = _filesToProcess.Dequeue(); }

      string resultAsString = ProcessFileAndGetResult( fileToProcess );

      lock( _results ){ _results.Enqueue( resultAsString ); }

      Thread.Sleep(1); // prevent the CPU from being 100%

      lock( _filesToProcess ){ filesLeft = _filesToProcess.Count() > 0; }
   }

   _finished = true;
}

void UpdateDatabase()
{
   bool pendingResults = false;

   lock( _results ){ pendingResults = _results.Count() > 0; }

   while( !_finished || pendingResults )
   {
      if( pendingResults )
      {
         string resultsAsString;
         lock( _results ){ resultsAsString = _results.Dequeue(); }

         InsertIntoDatabase( resultsAsString ); // implement this however
      }

      Thread.Sleep( 1 ); // prevents the CPU usage from being 100%

      lock( _results ){ pendingResults = _results.Count() > 0; }
   }
}

I'm pretty sure there's ways to make that "better", but it should do the trick so you can read and process data while also adding completed data to the database, and take advantage of threading.

我非常确定有什么方法可以让它“更好”,但它应该可以解决这个问题,这样你就可以读取和处理数据,同时还可以将完整的数据添加到数据库中,并利用线程。

If you want another Thread to process files, or to update the database, just create a new Thread( MethodName ), and call Start().

如果您希望另一个Thread处理文件或更新数据库,只需创建一个新的Thread(MethodName),然后调用Start()。

It's not the simplest example, but I think it's thorough. You're synchronizing two queues, and you need to make sure each is locked before accessing. You're keeping track of when each thread should finish, and you have data being marshaled between threads, but never processed more than once, using Queues.

这不是最简单的例子,但我认为这是彻底的。您正在同步两个队列,并且您需要确保每个队列在访问之前都已锁定。您将跟踪每个线程何时完成,并且您在线程之间编组数据,但从未使用队列多次处理。

Hope that helps.

希望有所帮助。

#3


What made you think that more threads would improve things? They probably won't.

是什么让你认为更多的线程会改善事情?他们可能不会。

I suggest you first get the program to work, then worry about getting it to work faster. Do it with only one thread.

我建议你先让程序运行,然后担心让它更快地运行。只用一个线程就可以了。

#4


SQLBulkCopy is a big hammer for only 300 rows.

SQLBulkCopy是一个只有300行的大锤子。

Check out Smart Thread Pool. This is an instance thread pool that you can limit to 4 threads very easily. Since you only have 300 rows consider post them directly to SQL in each thread rather than aggregating in you code.

查看智能线程池。这是一个实例线程池,您可以非常轻松地将其限制为4个线程。由于您只有300行,因此请考虑将它们直接发布到每个线程中的SQL,而不是在代码中聚合。

#5


As the others have pointed out, remember to lock your table before updating. C#:

正如其他人指出的那样,记得在更新之前锁定你的表。 C#:

private object tableLock;

/*
Later in code.
*/

private void UpdateDataTable(object data)
{
    lock(tableLock)
    {
          //Add or update table rows
    }
}

As for methods of actually controlling and keeping the threads in line, just use a ThreadPool object, set the maximum threads to your limit, and the queuing can take care of things. For additional control you can toss in some logic that uses an array of WaitHandle objects. In fact that might actually be a good idea considering that you want to queue up 300 separate objects.

至于实际控制和保持线程的方法,只需使用ThreadPool对象,将最大线程设置为限制,排队可以处理事情。对于其他控件,您可以使用WaitHandle对象数组来处理某些逻辑。事实上,考虑到您想要排队300个单独的对象,这可能是一个好主意。

#1


This will be much easier if you just let each of your four threads write to the database themselves. In this scenario you don't have to worry about threading (except for what files each thread works on) as each worker thread could maintain their own datatable and consume 25% of the files.

如果您让四个线程中的每个线程自己写入数据库,这将更容易。在这种情况下,您不必担心线程(每个线程处理的文件除外),因为每个工作线程可以维护自己的数据表并占用25%的文件。

Alternatively, you can have a single datatable that all the threads use--just make sure to wrap accesses to it with a lock like so:

或者,您可以拥有一个所有线程都使用的数据表 - 只需确保使用锁定来包装对它的访问:

lock(YourTable.Rows.SyncRoot){
  // add rows to table
}

Of course this is all moot if the bottleneck is the disk, as @David B notes.

当然,如果瓶颈是磁盘,这就完全没有了,正如@David B所说。

#2


As was somewhat pointed out, you need to examine exactly where your bottleneck is and why you're using threading.

正如有人指出的那样,您需要准确检查瓶颈的位置以及使用线程的原因。

By moving to multiple threads, you do have a potential for increased performance. However, if you're updating the same DataTable with each thread, you're limited by the DataTable. Only one thread can write to the DataTable at one time (which you control with a lock), so you're still fundamentally processing in sequence.

通过转移到多个线程,您确实有可能提高性能。但是,如果您使用每个线程更新相同的DataTable,则受DataTable的限制。只有一个线程可以一次写入DataTable(您使用锁控制),因此您仍然可以从根本上按顺序处理。

On the other hand, most databases are designed for multiple connections, running on multiple threads, and have been highly tuned for that purpose. If you want to still use multiple threads: let each thread have its own connection to the database, and do its own processing.

另一方面,大多数数据库设计用于多个连接,在多个线程上运行,并且已经为此目的进行了高度调整。如果您仍想使用多个线程:让每个线程都有自己的数据库连接,并进行自己的处理。

Now, depending on the kind of processing going on, your bottleneck may be in opening and processing the file, and not in the database update.

现在,根据正在进行的处理类型,您的瓶颈可能在于打开和处理文件,而不是在数据库更新中。

One way to split things up:

分裂的一种方法:

  1. Put all the file names to be processed into a filename Queue.
  2. 将要处理的所有文件名放入文件名Queue中。

  3. Create a thread (or threads) to pull an item off the filename Queue, open and parse and process the file, and push the results into a result Queue.
  4. 创建一个线程(或多个线程)以从文件名Queue中提取项目,打开并解析并处理该文件,并将结果推送到结果队列中。

  5. Have another thread take the results from the result Queue, and insert them into the database.
  6. 让另一个线程从结果Queue中获取结果,并将它们插入数据库。

These can run simultaneously... the database won't be updated until there's something to update, and will just wait in the meantime.

这些可以同时运行...数据库将不会更新,直到有更新的东西,并将在此期间等待。

This approach lets you really know who is waiting on whom. If the read/process file part is slow, create more threads to do that. If the insert into database part is slow, create more threads to do that. The queues just need to be synchronized.

这种方法让您真正知道谁在等谁。如果读取/处理文件部分很慢,请创建更多线程来执行此操作。如果插入数据库部分很慢,请创建更多线程来执行此操作。队列只需要同步。

So, pseudocode:

Queue<string> _filesToProcess = new Queue<string>();
Queue<string> _results = new Queue<string>();
Thread _fileProcessingThread = new Thread( ProcessFiles );
Thread _databaseUpdatingThread = new Thread( UpdateDatabase );
bool _finished = false;

static void Main()
{
    foreach( string fileName in GetFileNamesToProcess() )
    {
       _filesToProcess.Enqueue( fileName );
    }

    _fileProcessingThread.Start();
    _databaseUpdatingThread.Start();

    // if we want to wait until they're both finished
    _fileProcessingThread.Join();
    _databaseUpdatingThread.Join();

    Console.WriteLine( "Done" );
}

void ProcessFiles()
{
   bool filesLeft = true;

   lock( _filesToProcess ){ filesLeft = _filesToProcess.Count() > 0; }

   while( filesLeft )
   {
      string fileToProcess;
      lock( _filesToProcess ){ fileToProcess = _filesToProcess.Dequeue(); }

      string resultAsString = ProcessFileAndGetResult( fileToProcess );

      lock( _results ){ _results.Enqueue( resultAsString ); }

      Thread.Sleep(1); // prevent the CPU from being 100%

      lock( _filesToProcess ){ filesLeft = _filesToProcess.Count() > 0; }
   }

   _finished = true;
}

void UpdateDatabase()
{
   bool pendingResults = false;

   lock( _results ){ pendingResults = _results.Count() > 0; }

   while( !_finished || pendingResults )
   {
      if( pendingResults )
      {
         string resultsAsString;
         lock( _results ){ resultsAsString = _results.Dequeue(); }

         InsertIntoDatabase( resultsAsString ); // implement this however
      }

      Thread.Sleep( 1 ); // prevents the CPU usage from being 100%

      lock( _results ){ pendingResults = _results.Count() > 0; }
   }
}

I'm pretty sure there's ways to make that "better", but it should do the trick so you can read and process data while also adding completed data to the database, and take advantage of threading.

我非常确定有什么方法可以让它“更好”,但它应该可以解决这个问题,这样你就可以读取和处理数据,同时还可以将完整的数据添加到数据库中,并利用线程。

If you want another Thread to process files, or to update the database, just create a new Thread( MethodName ), and call Start().

如果您希望另一个Thread处理文件或更新数据库,只需创建一个新的Thread(MethodName),然后调用Start()。

It's not the simplest example, but I think it's thorough. You're synchronizing two queues, and you need to make sure each is locked before accessing. You're keeping track of when each thread should finish, and you have data being marshaled between threads, but never processed more than once, using Queues.

这不是最简单的例子,但我认为这是彻底的。您正在同步两个队列,并且您需要确保每个队列在访问之前都已锁定。您将跟踪每个线程何时完成,并且您在线程之间编组数据,但从未使用队列多次处理。

Hope that helps.

希望有所帮助。

#3


What made you think that more threads would improve things? They probably won't.

是什么让你认为更多的线程会改善事情?他们可能不会。

I suggest you first get the program to work, then worry about getting it to work faster. Do it with only one thread.

我建议你先让程序运行,然后担心让它更快地运行。只用一个线程就可以了。

#4


SQLBulkCopy is a big hammer for only 300 rows.

SQLBulkCopy是一个只有300行的大锤子。

Check out Smart Thread Pool. This is an instance thread pool that you can limit to 4 threads very easily. Since you only have 300 rows consider post them directly to SQL in each thread rather than aggregating in you code.

查看智能线程池。这是一个实例线程池,您可以非常轻松地将其限制为4个线程。由于您只有300行,因此请考虑将它们直接发布到每个线程中的SQL,而不是在代码中聚合。

#5


As the others have pointed out, remember to lock your table before updating. C#:

正如其他人指出的那样,记得在更新之前锁定你的表。 C#:

private object tableLock;

/*
Later in code.
*/

private void UpdateDataTable(object data)
{
    lock(tableLock)
    {
          //Add or update table rows
    }
}

As for methods of actually controlling and keeping the threads in line, just use a ThreadPool object, set the maximum threads to your limit, and the queuing can take care of things. For additional control you can toss in some logic that uses an array of WaitHandle objects. In fact that might actually be a good idea considering that you want to queue up 300 separate objects.

至于实际控制和保持线程的方法,只需使用ThreadPool对象,将最大线程设置为限制,排队可以处理事情。对于其他控件,您可以使用WaitHandle对象数组来处理某些逻辑。事实上,考虑到您想要排队300个单独的对象,这可能是一个好主意。