关于任务同步执行的多线程问题

时间:2022-09-23 07:51:13
本人要编一个边计算边发送的循环程序,本来想用多线程,可是由于变量同步的问题,多线程事实上只能是串行执行,即计算完发送或者发送完计算,因此时间没有压缩,考虑到发送的数据与计算得到的数据允许一定的时间延迟(即发送可以是之前某次生成的数据),能不能采用队列或者其他缓冲的手段来实现并行,这样在双CPU机器上,采用多线程应该能真正做到并行执行。
不知道小弟交代的清楚不,麻烦有相关经验的高手帮帮我,在此表示诚挚的谢意!

11 个解决方案

#1


用队列加锁储存任务信息,使用信号量或事件实现线程同步。

#2


如果是单cpu,而且使用阻塞方式发送的话,多线程还是很有好处的

#3


引用 1 楼 cnzdgs 的回复:
用队列加锁储存任务信息,使用信号量或事件实现线程同步。

谢谢。队列加锁以后,能够实现并行(边读边写,读前面的,写后面的)吗?谢谢!

#4


是不是这样:两个线程分别在队列pop与push时加锁,这样保证同步。找了一个CCriticalQueue 的类。可是没有使用例子。不知道能不能给出例程,谢谢!!!

#5


BOOL WINAPI QueueUserWorkItem(
  __in          LPTHREAD_START_ROUTINE Function,
  __in          PVOID Context,
  __in          ULONG Flags
);

#6


队列加锁就是在插入和移出元素的时候加锁,防止线程间冲突,各种队列模板都可以用。我一般是自定义一个双向链表,再加上一个同步对象,封装在一个类里面。CCriticalQueue没有用过。

#7


队列实现如下:

#include <windows.h>
#include <list>

using ::std::list;


template<class T>
class SyncQueue
{
public:
SyncQueue();
~SyncQueue();
void Push(T& item); // 入队
void Pop(); // 出队
T GetFront(); // 得到队头,队空则阻塞
BOOL TryGetFront(T *pRtnItem); // 得到队头,队空则直接返回FALSE,非阻塞。pRtnItem接收返回值
size_t GetSize(); // 得到队列当前长度

protected:
list<T> queue_;
CRITICAL_SECTION lock_;
HANDLE hEvent_;
};


template<class T>
SyncQueue<T>::SyncQueue()
{
::InitializeCriticalSection(&lock_);
hEvent_ = ::CreateEvent(NULL, FALSE, FALSE, NULL);
}


template<class T>
SyncQueue<T>::~SyncQueue()
{
queue_.clear();
::CloseHandle(hEvent_);
::DeleteCriticalSection(&lock_);
}


template<class T>
void SyncQueue<T>::Push(T& item) // 入队
{
::EnterCriticalSection(&lock_);
queue_.push_back(item);
::SetEvent(hEvent_);
::LeaveCriticalSection(&lock_);
}

template<class T>
void SyncQueue<T>::Pop() // 出队
{
::EnterCriticalSection(&lock_);
if (queue_.size() > 0)
{
queue_.pop_front();
}
::LeaveCriticalSection(&lock_);
}


template<class T>
T SyncQueue<T>::GetFront() // 得到队头,队空则阻塞
{
::EnterCriticalSection(&lock_);
while (queue_.size() == 0)
{
::LeaveCriticalSection(&lock_);
WaitForSingleObject(hEvent_, INFINITE);
::EnterCriticalSection(&lock_);
}

T headItem = queue_.front();
::LeaveCriticalSection(&lock_);

return headItem;
}


template<class T>
BOOL SyncQueue<T>::TryGetFront(T *pRtnItem) // 得到队头,队空则直接返回FALSE,非阻塞。pRtnItem接收返回值
{
assert(pRtnItem != NULL);

::EnterCriticalSection(&lock_);
if  (queue_.size() == 0)
{
::LeaveCriticalSection(&lock_);
return FALSE;
}

T headItem = queue_.front();
::LeaveCriticalSection(&lock_);

return headItem;
}


template<class T>
size_t SyncQueue<T>::GetSize() // 得到队列当前长度
{
::EnterCriticalSection(&lock_);
size_t size = queue_.size();
::LeaveCriticalSection(&lock_);
return size;
}

#8


测试程序如下:

#include "SyncQueue.h"
#include <stdio.h>


DWORD WINAPI WriteThread(LPVOID lpParam)
{
SyncQueue<int> *pSyncQ = (SyncQueue<int>*) lpParam;
int i;
int upBound = (1 << 30);
for(i = 0; i < upBound; ++ i)
{
pSyncQ->Push(i);
}
return 0;
}


DWORD WINAPI ReadThread(LPVOID lpParam)
{
SyncQueue<int> *pSyncQ = (SyncQueue<int>*) lpParam;
int i;
int upBound = (1 << 30);
for(i = 0; i < upBound; ++ i)
{
int itemFromQ = pSyncQ->GetFront();
pSyncQ->Pop();
if (i != itemFromQ)
{
printf("Error!\n");
}
if (i % 10000 == 0)
{
printf("Processed : %d\n", i);
}
}
return 0;
}


void main()
{
SyncQueue<int> syncQ;
HANDLE hWriteThread = ::CreateThread(NULL, NULL, WriteThread, &syncQ, NULL, NULL);
HANDLE hReadThread = ::CreateThread(NULL, NULL, ReadThread, &syncQ, NULL, NULL);
::WaitForSingleObject(hWriteThread, INFINITE);
::WaitForSingleObject(hReadThread, INFINITE);
}

#9


好铁需要人气!加油

#10


很有道理值得学习

#11


#1


用队列加锁储存任务信息,使用信号量或事件实现线程同步。

#2


如果是单cpu,而且使用阻塞方式发送的话,多线程还是很有好处的

#3


引用 1 楼 cnzdgs 的回复:
用队列加锁储存任务信息,使用信号量或事件实现线程同步。

谢谢。队列加锁以后,能够实现并行(边读边写,读前面的,写后面的)吗?谢谢!

#4


是不是这样:两个线程分别在队列pop与push时加锁,这样保证同步。找了一个CCriticalQueue 的类。可是没有使用例子。不知道能不能给出例程,谢谢!!!

#5


BOOL WINAPI QueueUserWorkItem(
  __in          LPTHREAD_START_ROUTINE Function,
  __in          PVOID Context,
  __in          ULONG Flags
);

#6


队列加锁就是在插入和移出元素的时候加锁,防止线程间冲突,各种队列模板都可以用。我一般是自定义一个双向链表,再加上一个同步对象,封装在一个类里面。CCriticalQueue没有用过。

#7


队列实现如下:

#include <windows.h>
#include <list>

using ::std::list;


template<class T>
class SyncQueue
{
public:
SyncQueue();
~SyncQueue();
void Push(T& item); // 入队
void Pop(); // 出队
T GetFront(); // 得到队头,队空则阻塞
BOOL TryGetFront(T *pRtnItem); // 得到队头,队空则直接返回FALSE,非阻塞。pRtnItem接收返回值
size_t GetSize(); // 得到队列当前长度

protected:
list<T> queue_;
CRITICAL_SECTION lock_;
HANDLE hEvent_;
};


template<class T>
SyncQueue<T>::SyncQueue()
{
::InitializeCriticalSection(&lock_);
hEvent_ = ::CreateEvent(NULL, FALSE, FALSE, NULL);
}


template<class T>
SyncQueue<T>::~SyncQueue()
{
queue_.clear();
::CloseHandle(hEvent_);
::DeleteCriticalSection(&lock_);
}


template<class T>
void SyncQueue<T>::Push(T& item) // 入队
{
::EnterCriticalSection(&lock_);
queue_.push_back(item);
::SetEvent(hEvent_);
::LeaveCriticalSection(&lock_);
}

template<class T>
void SyncQueue<T>::Pop() // 出队
{
::EnterCriticalSection(&lock_);
if (queue_.size() > 0)
{
queue_.pop_front();
}
::LeaveCriticalSection(&lock_);
}


template<class T>
T SyncQueue<T>::GetFront() // 得到队头,队空则阻塞
{
::EnterCriticalSection(&lock_);
while (queue_.size() == 0)
{
::LeaveCriticalSection(&lock_);
WaitForSingleObject(hEvent_, INFINITE);
::EnterCriticalSection(&lock_);
}

T headItem = queue_.front();
::LeaveCriticalSection(&lock_);

return headItem;
}


template<class T>
BOOL SyncQueue<T>::TryGetFront(T *pRtnItem) // 得到队头,队空则直接返回FALSE,非阻塞。pRtnItem接收返回值
{
assert(pRtnItem != NULL);

::EnterCriticalSection(&lock_);
if  (queue_.size() == 0)
{
::LeaveCriticalSection(&lock_);
return FALSE;
}

T headItem = queue_.front();
::LeaveCriticalSection(&lock_);

return headItem;
}


template<class T>
size_t SyncQueue<T>::GetSize() // 得到队列当前长度
{
::EnterCriticalSection(&lock_);
size_t size = queue_.size();
::LeaveCriticalSection(&lock_);
return size;
}

#8


测试程序如下:

#include "SyncQueue.h"
#include <stdio.h>


DWORD WINAPI WriteThread(LPVOID lpParam)
{
SyncQueue<int> *pSyncQ = (SyncQueue<int>*) lpParam;
int i;
int upBound = (1 << 30);
for(i = 0; i < upBound; ++ i)
{
pSyncQ->Push(i);
}
return 0;
}


DWORD WINAPI ReadThread(LPVOID lpParam)
{
SyncQueue<int> *pSyncQ = (SyncQueue<int>*) lpParam;
int i;
int upBound = (1 << 30);
for(i = 0; i < upBound; ++ i)
{
int itemFromQ = pSyncQ->GetFront();
pSyncQ->Pop();
if (i != itemFromQ)
{
printf("Error!\n");
}
if (i % 10000 == 0)
{
printf("Processed : %d\n", i);
}
}
return 0;
}


void main()
{
SyncQueue<int> syncQ;
HANDLE hWriteThread = ::CreateThread(NULL, NULL, WriteThread, &syncQ, NULL, NULL);
HANDLE hReadThread = ::CreateThread(NULL, NULL, ReadThread, &syncQ, NULL, NULL);
::WaitForSingleObject(hWriteThread, INFINITE);
::WaitForSingleObject(hReadThread, INFINITE);
}

#9


好铁需要人气!加油

#10


很有道理值得学习

#11