在多线程应用程序中使用循环缓冲区高效地进行日志记录

时间:2022-06-09 19:49:29
在多线程应用程序中使用循环缓冲区高效地进行日志记录

在关键的计算机应用程序的生存期中,日志记录是一件非常重要的活动,特别是当故障的症状并不十分明显时。日志记录提供了故障前应用程序状态的详细信息,如变量的值、函数的返回值等等。在一段时间的运行过程中,将不断地产生大量的跟踪数据,并持续地将其写入到磁盘上的文本文件中。要进行有效的日志记录,需要使用大量的磁盘空间,并且在多线程环境中,所需的磁盘空间会成倍地增加,因为大量的线程都在记录它们的跟踪信息。

使用常规文件进行日志记录的两个主要问题是:硬盘空间的可用性,以及在对一个文件写入数据时磁盘 I/O 的速度较慢。持续地对磁盘进行写入操作可能会极大地降低程序的性能,导致其运行速度缓慢。通常,可以通过使用日志轮换策略来解决空间问题,将日志保存在几个文件中,当这些文件大小达到某个预定义的字节数时,对它们进行截断和覆盖。

要克服空间问题并实现磁盘 I/O 的最小化,某些程序可以将它们的跟踪数据记录在内存中,仅当请求时才转储些数据。这个循环的、内存中的缓冲区称为循环缓冲区。本文讨论了循环缓冲区的一些常见实现,并对多线程程序中循环缓冲区的启用机制提出了一些观点。

循环缓冲区是一种用于应用程序的日志记录技术,它可以将相关的数据保存在内存中,而不是每次都将其写入到磁盘上的文件中。在需要的时候(比如当用户请求将内存数据转储到文件中时、程序检测到一个错误时,或者由于非法的操作或者接收到的信号而引起程序崩溃时)可以将内存中的数据转储到磁盘。循环缓冲区日志记录由一个固定大小的内存缓冲区构成,进程使用这个内存缓冲区进行日志记录。顾名思义,该缓冲区采用循环的方式进行实现。当该缓冲区填满了数据时,无需为新的数据分配更多的内存,而是从缓冲区开始的位置对其进行写操作,因此将覆盖以前的内容。请参见下图的示例。


在多线程应用程序中使用循环缓冲区高效地进行日志记录 

上图显示了将两个条目写入到循环缓冲区后该缓冲区的状态。在写入了第一个日志条目(用蓝色表示)之后,当该进程尝试写入第二个条目(用红色表示)时,该缓冲区中已经没有足够的剩余空间。该进程写入数据,一直到达缓冲区的末尾,然后将剩余的数据复制到缓冲区的开始位置,覆盖以前的日志条目。

通过保存一个读指针,可以实现对循环缓冲区的读操作;相应地移动读指针和写指针,以确保在进行读操作期间,读指针不会越过写指针。为了提高效率,一些应用程序可以将原始数据(而不是经过格式化的数据)保存到该缓冲区。在这种情况下需要一个解析器,该解析器可以根据这些内存转储生成有意义的日志消息。

当您可以简单地对一个文件进行写入操作时,为什么要使用循环缓冲区呢?因为您覆盖了循环缓冲区中以前的内容,所以在完成该操作后,您将丢失以前的数据。与传统的文件日志记录机制相比,循环缓冲区提供了下列优势。

  • 速度快。与磁盘的 I/O 操作相比,内存的写操作要快得多。仅当需要的时候才刷新数据。
  • 持续的日志记录可能会填满系统中的空间,从而导致其他程序也耗尽空间并且执行失败。在这样的情况下,您有两种选择,要么手动地删除日志信息,要么实现日志轮换策略。
  • 一旦您启用了日志记录,无论您是否需要它,该进程都将持续地填充硬盘上的空间。
  • 有时,您仅仅需要程序崩溃之前的相关数据,而不是该进程完整的历史数据。
  • 有一些常见的调试函数,如 printf、write 等,可能会在多线程应用程序的情况下更改一个程序的行为,使得它们难以调试。使用这些函数会导致应用程序隐藏某些平时可能表现出来的错误。这些函数都是可撤销点,并且可能导致在线程环境中产生一个该程序并不期望的挂起信号。

有时,当其他传统的日志记录方法失败时,可以使用循环缓冲区日志记录。这个部分介绍了在多线程应用程序中使用循环缓冲区启用日志记录时需要考虑的一些重要方面。

在访问一个公共的资源时,同步 始终是多线程程序不可缺少的部分,日志记录也不例外。因为每个线程都试图对全局空间进行写操作,所以必须确保它们同步地写入内存,否则消息就会遭到破坏。通常,每个线程在写入缓冲区之前都持有一个锁,在完成操作时释放该锁。您可以下载一个使用锁对内存进行写操作的循环缓冲区示例。

这种方法具有以下的缺点:如果您的应用程序中包含几个线程,并且每个线程都在进行详细地日志记录,那么该进程的整体性能将会受到影响,因为这些线程将在获得和释放锁上花费了大部分的时间。

通过使得每个线程将数据写入到它自己的内存块,就可以完全避免同步问题。当收到来自用户的转储数据的请求时,每个线程获得一个锁,并将其转储到中心位置。因为仅在将数据刷新到磁盘时获得锁,所以性能并不会受到很大的影响。在这样的情况下,您可能需要一个附加的程序对日志信息进行排序(按照线程 ID 或者时间戳的顺序),以便对其进行分析。您还可以选择仅在内存中写入消息代码,而不是完整的格式化消息,稍后使用一个外部的实用工具解析转储数据,将消息代码转换为有意义的文本。

另一种避免同步问题的方法是,分配一个很大的全局内存块,并将其划分为较小的槽位,其中每个槽位都可由一个线程用来进行日志记录。每个线程只能够读写它自己的槽位,而不是整个缓冲区。当每个线程第一次尝试写入数据时,它会尝试寻找一个空的内存槽位,并将其标记为忙碌。当线程获得了一个特定的槽位时,可以将跟踪槽位使用情况的位图中相应的位设置为 1,当该线程退出时,重新将这个位设置为 0。同时需要维护当前使用的槽位编号的全局列表,以及正在使用它的线程的线程信息。

要避免出现这样的情况,即一个线程已经死亡,但是却没有将其槽位在位图中对应的位设置为 0,您需要一个垃圾收集器线程,它遍历全局列表,并根据线程 ID 以固定的时间间隔进行轮询。它负责释放槽位并修改全局列表。


网上找的一个实现多线程环境下循环缓冲区的实现代码:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>

/*
* The program reads from a text file, each of the threads takes that message
* and writes it to the Ring Buffer after attaching its thread-id to it. Finally,
* the Ring Buffer is dumped to the file out.txt. The command line argument is the name of
* the input message file.
*/

struct ring_buffer
{
char * write_pointer;
char * buff_start;
int wrapped;
unsigned long size;
};

/** Function Prototypes **/
void init_buffer(unsigned long);
int load_data(void*, unsigned int*);
void print_buffer_state();
char* mem_allocate(unsigned long);
int dump_buffer(const char*);

/** Global Variables **/
struct ring_buffer global_ring;
pthread_mutex_t mutex;
char file_contents[300000];

/** Fucntions **/
char* mem_allocate(unsigned long mem_size)
{
char* data;
data = (char*)calloc(mem_size, sizeof(unsigned char));
if(data == NULL)
{
perror( "calloc" );
exit(1);
}
return data;
}

void init_buffer(unsigned long mem_size)
{
if(pthread_mutex_init(&mutex, 0) != 0)
{
perror("pthread_mutex_init");
exit(1);
}
global_ring.size = mem_size;
global_ring.buff_start = mem_allocate(mem_size);
global_ring.write_pointer = global_ring.buff_start;
global_ring.wrapped = 0;
}

int load_data(void* data, unsigned int* len)
{
int temp = 0;

if(len == NULL || *len == 0)
{
return -1;
}

if(*len > global_ring.size)
{
*len = global_ring.size;
}

if(pthread_mutex_lock(&mutex) != 0)
{
perror("pthread_mutex_lock");
exit(1);
}
temp = (global_ring.buff_start + global_ring.size) - global_ring.write_pointer;
if(*len > temp)
{
memcpy(global_ring.write_pointer, data, temp);
memcpy(global_ring.buff_start, (char*)data+temp, *len-temp);
global_ring.write_pointer = global_ring.buff_start + *len-temp;
global_ring.wrapped = 1;
}
else
{
memcpy(global_ring.write_pointer, data, *len);
global_ring.write_pointer += *len;
}

if(pthread_mutex_unlock(&mutex) != 0)
{
perror("pthread_mutex_lock");
exit(1);
}
return 0;
}

int dump_buffer(const char* file_to_dump)
{
FILE* fp ;
int temp = 0;
int len;

if ((fp = fopen(file_to_dump, "a")) == NULL)
return -1;

pthread_mutex_lock(&mutex);

if(global_ring.wrapped == 1)
{
fwrite( global_ring.buff_start, sizeof(unsigned char), global_ring.size, fp);
}
else
{
len = global_ring.write_pointer - global_ring.buff_start + 1;
fwrite(global_ring.buff_start, sizeof(unsigned char), len, fp);
}
fclose(fp);
pthread_mutex_unlock(&mutex);
return 0;
}

void print_buffer_state()
{
printf("\n/***** Printing the buffer before dump****/\n");
printf("Buffer data from Start pointer:\n%s\n", global_ring.buff_start);
printf("/******************************/\n\n");
}

int read_input(char* filename)
{
FILE *inputFilePtr;
char *iReturn;
inputFilePtr = fopen(filename, "r");
if(inputFilePtr == NULL)
{
printf("ERROR: File Not Found\n");
return -1;
}
iReturn = fgets(file_contents, 209600, inputFilePtr);

if (iReturn == NULL) /* End of file reached */
return -1;
return 0;
}

void* loader_function(void* a)
{
unsigned int len, current_thread, old_state = 0;
char* local_msg;

current_thread = pthread_self();//得到当前的线程
local_msg = (char*)calloc(sizeof(file_contents)+32 , sizeof(unsigned char));

if(sprintf(local_msg, "%s:%d", file_contents, current_thread) <0)
{
printf(":(\n");
}
printf("Message loaded by thread %d to the buffer = \"%s\"\n", pthread_self(), local_msg);
len = strlen(local_msg);
load_data(local_msg, &len);
pthread_exit(0);
}

void threaded_test(char* filename)
{
pthread_t threads[2];
int i,j, ret,toDump = 0;

int data_read = read_input(filename);

if(data_read < 0)
exit(1);
printf("FILE_CONTENTS = %s\n", file_contents);

if(pthread_mutex_init(&mutex, 0) != 0)
{
perror("pthread_mutex_init");
exit(1);
}
init_buffer(20);

/***** Threads Created for to load the data *****/

for(j = 0; j<2; j++)
{
pthread_create (&threads[j], NULL, &loader_function, NULL);
}

for(j = 0; j<2; j++)
{
pthread_join(threads[j], NULL);
}

print_buffer_state();
dump_buffer("out.txt");
printf("Buffer dumped to file \"out.txt\"\n\n");
}

int main(int argc, char** argv)
{
if(argc!=2)
{
printf("Usage: ./test <input-file>\n");
exit(1);
}
else
{
threaded_test(argv[1]);
}
}


参考:http://www.ibm.com/developerworks/cn/aix/library/au-buffer/index.html#download