C++并发编程框架Theron(5)——File reader(1)

时间:2022-02-13 17:58:37

1 前言
  在上一篇博文,我主要通过Hello world!的示例,介绍了actors,frameworks,messages和receivers几个构建Theron框架程序的要点。但是Hello world!实例只是一个再简单不过的单actor的应用程序,我们学习Theron框架自然是希望多个actor相互协作来达到多线程开发的目的。在本篇博文中,我们会学习到一个更复杂,更有实际用处的程序示例。
2 File reader
  File reader示例是创建一个文件服务器,用来为客户端读取来自磁盘文件中的数据。我们要实现的是这样一个消息传递的交互:发送给actor一条消息,告诉它去读取一个文件数据到缓存区,并且随后再返回一条读取完毕的消息。
  我们可以看到,使用Theron创建这样一个文件服务器可以带来下面几点优势:
  ①、因为文件阅读被表达成actors,它可以与系统剩下的程序同时执行,没有额外的花费。因为使用消息传递机制。所以文件读取时异步而非堵塞的;
  ②、将文件阅读器放置到一个framework中,我们可以创建一个线程池(多个线程)阻塞文件阅读,从而不会饿死系统剩下程序。
  ③、支持更多的并行文件阅读只是意味着增加了更多的相同actor的拷贝,程序实现很简单,但是可以多线程完成很多文件的读取。
 2.1 actor
  首先,我们创建一个通用的多线程系统形式“Worker”actor,如下:

#include <Theron/Theron.h> 

template <class WorkMessage>
class Worker : public Theron::Actor
{
public:
// 构造函数
Worker(Theron::Framework &framework) : Theron::Actor(framework)
{
RegisterHandler(this, &Worker::Handler);
}
private:
// 消息处理函数
void Handler(const WorkMessage &message, const Theron::Address from)

{
// 消息参数是const,搜易我们需要拷贝它来改变它的性质
WorkMessage result(message);
result.Process();
// 返回线程消息给发送者
Send(result, from);
}
};

  通过它自己,这个actor模板不能做太多事情。当实例化一个具体的WorkMessage类型,它会变成该消息类型的一个处理者(也就是一个线程)。Worker从一个叫作Handler()的消息处理函数中获得处理WorkMessage消息的能力。这个处理函数通过拷贝消息来响应WorkMessage消息,并且在其上执行Process()方法,最后将其返回。重要的一点是,我们将Process()放在一个actor里面被调用,从而使得它能与外面程序一起运行。
  这块模板类的作用是,使得Worker能够处理许多不一样的消息类型,即重复使用。
 2.2 消息类定义
  现在让我们写一个ReadRequest消息类型来表示文件读取的请求,ReadRequest对象可以调用一个Process()方法,所以可以作为WorkMesssage被一个Worker所处理。它既会用来请求文件读取,也会用来返回结果,所以它封装关于从磁盘文件读取数据的任何内容,包括实际中数据怎样被读取:

// 数据读取请求: 读取一个磁盘文件的内容到缓存区.
struct ReadRequest
{
public:

explicit ReadRequest(
const Theron::Address client = Theron::Address(),
const char *const fileName = 0,
unsigned char *const buffer = 0,
const unsigned int bufferSize = 0) :
mClient(client),
mFileName(fileName),
mProcessed(false),
mBuffer(buffer),
mBufferSize(bufferSize),
mFileSize(0)
{
}
void Process()
{
mProcessed = true;
mFileSize = 0;
// 尝试打开文件
FILE *const handle = fopen(mFileName, "rb");
if (handle != 0)
{
// 读取数据文件,设置实际读取长度
mFileSize = (uint32_t) fread(
mBuffer,
sizeof(unsigned char),
mBufferSize,
handle);

fclose(handle);
}
}
Theron::Address mClient; // 请求客户的地址
const char *mFileName; // 请求文件的名称
bool mProcessed; // 文件是否被读取过
unsigned char *mBuffer; // 文件内容的缓存区
unsigned int mBufferSize; // 缓存区的大小
unsigned int mFileSize; // 文件的字节长度
};

  我们可以使用任何class或者struct作为Theron中的消息,唯一需要注意的是需要严格要求消息类型必须安全地可拷贝。当一个消息被发送,Theron会首先创建一个新的消息拷贝,以此发送端和接收端可以看到不同的拷贝数据,从而避免共用一个消息内存。
  实际上消息可拷贝是有重要的隐含信息的。它意味着我们消息必须是轻量级的,否则性能会大大的降低。消息中通过指针指向文件名和内存缓冲器,而不是缓冲器本身,这样可以避免大量的数据的拷贝(也就是所谓的浅拷贝)。
  无论什么时候我们发送一个消息中的指针,我们会访问actor接收消息来接近依靠指针指向的内存地址。因为发送者也可以接近这块内存,其实也是潜在的共享内存。这看似与我们一直强调的避免共享内存相违背,因为这样做非常危险,但是我们仍然在这里这样做是因为使用消息同步接近共享缓存区,可以确保发送端和接收端并不是同一时刻区接近这块缓存区。
 2.3 主函数
  消息与actor已经创立完毕,下面我们来创建一个简单的main程序来串联起来运行。首先,我们需要创建一个Worker actor,然后发送给它一系列ReadRequest类型的消息(使用命令行读取的),等待处理结果,最后打印输出所有读取文件的细节:

static const int MAX_FILES = 16; 
static const int MAX_FILE_SIZE = 16384;

int main(int argc, char *argv[])
{
if (argc < 2)
{
printf("Expected up to 16 file name arguments.\n");
}
// 创建一个worker去处理工作
Theron::Framework framework;
Worker<ReadRequest> worker(framework);
// 注册一个receiver来捕(catcher)获返回的结果
Theron::Receiver receiver;
Theron::Catcher<ReadRequest> resultCatcher;
receiver.RegisterHandler(&resultCatcher, &Theron::Catcher<ReadRequest>::Push);
// 命令行上每个文件名称作为请求消息
for (int i = 0; i < MAX_FILES && i + 1 < argc; ++i)
{
unsigned char *const buffer = new unsigned char[MAX_FILE_SIZE];
const ReadRequest message(
receiver.GetAddress(),
argv[i + 1],
buffer,
MAX_FILE_SIZE);
framework.Send(message, receiver.GetAddress(), worker.GetAddress());
}
// 等待所有结果
for (int i = 1; i < argc; ++i)
{
receiver.Wait();
}
// 处理结果,我们仅打印文件的名称
ReadRequest result;
Theron::Address from;
while (!resultCatcher.Empty())
{
resultCatcher.Pop(result, from);
printf("Read %d bytes from file '%s'\n", result.FileSize(), result.FileName());
// 释放申请的空间
delete [] result.Buffer();
}
}

  我们再详细的理一遍整个过程,受我们创建了一个Theron::Framework,这个Framework前面介绍过就是一个管理类来主持actors。
  接着我们楚江了一个Worker actor模板实例,实例化ReadRequest消息类型。我们将该Worker actor和framework进行绑定,从而可以有效的管理framework中的worker。
  再接着我们创建了一个Theron::Receiver。这钱Hello world!中已经提及过Receiver是一个帮助类,拥有自己的地址,可以让非actor的代码能够接收来自actors的消息(main函数自然不是actor模式创建的,所以需要一个具体地址来接收来自actor的消息)。Wait()方法可以用来同步等待来自actors的消息。
  和同步消息一样,Receiver也允许我们处理和检查消息。我们可以像传统类一样注册一个公共的方法作为消息的处理函数。但是在这个案例中我们使用的是Theron::Catcher,它是另一个帮助者,是一个线程安全的队列可以捕获被Receiver收到的所有消息。Catcher的Push()方法可以被注册为一个Receiver的消息处理函数。
3 小结
  这篇博文我们主要完成了一个读取文件功能的应用,我们接触了三个Theron框架的核心概念:管理actors的Frameworks,让非actor的代码可以接收来自actors的消息的Receivers;以及存储接收自Receivers的消息。
  但是这个实例到目前为止有一个缺陷,就是尽管Worker可以在一个独立线程异步处理ReadRequests消息,但是它仍然是串联来处理所有的消息。当Worker收到一系列ReadRequests消息的时候,它的Handler()消息处理函数会严格按照顺序依次被执行。实际上,这些请求在Worker的内部消息队列中排列着。下一篇博文我会扩展这个样例,通过写一个Dispatcher actor(actor调度员)来创建和控制一系列Workers来同时并行处理多个请求。
  以上是个人学习记录,由于能力和时间有限,如果有错误望读者纠正,谢谢!
  转载请注明出处:http://blog.csdn.net/FX677588/article/details/75201088


  参考文献:
  Theron框架官网File reader章节http://www.theron-library.com/index.php?t=page&p=lesson01