muduo的日志库分析四之AsyncLogging类

时间:2022-05-10 21:55:40
   用一个背景线程负责收集日志消息并写入日志文件,其它业务线程只管往这个“日志线程”发送日志消息,这称为"异步日志"。在多线程服务程序中,异步日志(也可叫“非阻塞日志”)是必需的。假如在网络IO线程或业务线程中直接写日志,写操作偶尔可能阻塞一会儿,业务中的响应请求可能会超时,或者耽误发送心跳包,在分布式系统中可能造成多米骨牌效应,例如误报死锁引发自动failover等。因此,在其它业务线程中应该彻底避免磁盘IO,阻塞会影响服务程序的性能。
    缓冲区技术,在服务端编程中经常用到的。muduo日志库是用双缓冲技术。

    很多项目服务端最开始的性能瓶颈不出现在语言也不出现在框架,通常是出现在数据库上,具体表现为数据库慢查询、死锁、数据库连接数占满等。当遇到数据库读写频繁的业务,缓存是必不可少的了。但是缓存该怎么做才能使利用率更高,减少或避免脏读呢?


从需求、现有的硬件等限制,学习如何设计并实现一个合格的日志库?
     (1)需求分为功能需求、性能需求。
        功能需求主要考虑: 接口设计是否合理、易用?需要支持哪些功能?muduo日志库的设计原则是:尽量提供最精简的日志设施,不必要的就不提供。封闭的接口尽量便于程序员阅读、查错。
        性能需求考虑单位时间内可写入的最大日志量,且要求它不会阻塞正常的业务处理流程。muduo的设计原则:性能只需要“足够好”,即能达到现代硬盘的最大写入带宽即可。且在实现时,要考虑减少多线程的锁争用,尽量不阻塞正常的业务处理逻辑。
     (2)如何做异常处理、性能调优?
     (3)日志库的实现逻辑基本大同小异,逻辑基本是:在各个业务线程中拼装日志串,然后将日志串存入一个Buffer(访问时需要加锁),另外有一个日志线程不停地从Buffer中取数据,然后将它输出到日志文件中。实现的时候会考虑:
         *Buffer如何设计?什么时候唤醒日志线程从Buffer中取数据?
         *如何减少 业务线程、日志线程 访问Buffer时的锁竞争?
        *日志串如何组装,才能使它组装速度足够快、且要兼顾接口设计的易用性?
        *要考虑线程间的竞争、写入速度等各种情况,保证不会丢失每条日志串。
       *什么时候切换写到另一个日志文件?什么时候flush到日志文件?
       *若日志串写入过多,日志线程来不及消费,怎么办?


   muduo日志库的性能很高,大概可以达到每秒200多万条,非常快。代码中做了很多性能调优,比如实现了一个memory output stream,来加快各种类型转换成字符串。利用线程私有变量缓存了一些变量值来加快日志串的组装。用双缓冲技术来减少线程之间的锁竞争、最大化一次性输出日志的吞吐量。阅读时注意每个优化细节,然后在平时做性能调优时 模仿它。
   使得前端的业务线程与后端的日志线程能够并发,并且,写日志不太频繁,提高效率.

const int kLargeBuffer = 4000*1000;
typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
typedef BufferVector::value_type BufferPtr;

const int flushInterval_; // 超时时间,在flushInterval_秒内,缓冲区没写满,仍将缓冲区的数据写到文件中
std::atomic<bool> running_; // 是否正在运行
string basename_; // 日志名字
size_t rollSize_; // 预留的日志大小
muduo::Thread thread_; // 执行该异步日志记录器的线程
muduo::CountDownLatch latch_; // 倒数计数,用于指示什么时候日志记录器才能开始正常工作,用于等待线程启动
muduo::MutexLock mutex_;
muduo::Condition cond_;

BufferPtr currentBuffer_; // 当前的缓冲区
BufferPtr nextBuffer_; // 预备缓冲区
BufferVector buffers_; // 缓冲区队列,待写入文件
缓冲区Buffer是模版类FixedBuffer的一份具体实现,其大小为4M。
为什么vector容器中存储的是std::unique_ptr呢?
用std::unique_ptr智能指针管理对象资源,持有对对象的独有权 —— 两个 unique_ptr 不能指向一个对象,不能进行复制操作只能进行移动操作,在 unique_ptr 的生命期结束后释放该资源。
std::vector的push_back函数。

void push_back( const T& value ); // (1)
void push_back( T&& value ); // (2)
很明显,左值调用1,右值调用2。如果你要往容器内放入超大对象,那么版本2自然是不2选择。
vector<vector<string>> vv;

vector<string> v = {"123", "456"};
v.push_back("789"); // 临时构造的string类型右值被移动进容器v
vv.push_back(move(v)); // 显式将v移动进vv

std::vector的增长又一个隐蔽的优化。
当vector的存储容量需要增长时,通常会重新申请一块内存,并把原来的内容一个个复制过去并删除。复制并删除,降低性能。改用移动就够了。对于像vector<string>这样的容器,如果频繁插入造成存储容量不可避免的增长时,移动语义可以带来悄无声息而且美好的优化。std::unique_ptr放入容器,由于vector增长时会复制对象,像std::unique_ptr这样不可复制的对象是无法放入容器的。但实际上vector并不复制对象,而只是“移动”对象。所以随着移动语义的引入,std::unique_ptr放入std::vector成为理所当然的事情。
std::unique_ptr的自动管理对象生命期。想必每个人都写过这样的代码:
MyObj::MyObj() {
for (...) {
vec.push_back(new T());
}
// ...
}
MyObj::~MyObj() {
for (vector<T*>::iterator iter = vec.begin(); iter != vec.end(); ++iter) {
if (*iter) delete *iter;
}
// ...
}
繁琐暂且不说,异常安全也是大问题。使用vector<unique_ptr<T>>,完全无需显式析构,unqiue_ptr自会打理一切。

value_type 就是 vector<T> 的元素类型,也就是 T。当写通用的算法处理任意类型的 vector<> 或其他容器类型时是很有用的。

// 开始启动异步日志  
void start()
{
running_ = true;

// 在构造函数中latch_的值为1
// 线程运行之后将latch_的减为0
thread_.start();

// 必须等到latch_变为0才能从start函数中返回,这表明初始化已经完成,等待日志线程启动
latch_.wait();
}

//所有LOG_*最终都会调用append函数。
void AsyncLogging::append(const char* logline, int len)
{
muduo::MutexLockGuard lock(mutex_);
// 如果当前buffer还有空间,就添加到当前日志
if (currentBuffer_->avail() > len)
{
currentBuffer_->append(logline, len);
}
else// 当前buffer已满
{
// 把当前buffer添加到buffer列表中
buffers_.push_back(std::move(currentBuffer_));

// 重新设置当前buffer
if (nextBuffer_)
{
currentBuffer_ = std::move(nextBuffer_);
}
else
{
currentBuffer_.reset(new Buffer); // Rarely happens
//如果前端写入速度太快了,一下子把两块缓冲都用完了,那么只好分配一块新的buffer,作当前缓冲,这是极少发生的情况
}
currentBuffer_->append(logline, len);
// 通知日志线程,有数据可写
// 也就是说,只有当缓冲区满了之后才会将数据写入日志文件中
cond_.notify();
}
}
可以看到前台线程所做的工作比较简单,如果currentBuffer_够用,就把日志内容写入到currentBuffer_中,如果不够用(就认为其满了),就把currentBuffer_放到已满buffer数组中,等待消费者线程(即后台线程)来取。并且把currentBuffer_指向nextBuffer_的buffer。

    接着看后端是怎样把日志内容写入文件。

// 线程调用的函数,主要用于周期性的flush数据到日志文件中
void AsyncLogging::threadFunc()
{
assert(running_ == true);
latch_.countDown();
LogFile output(basename_, rollSize_, false);
BufferPtr newBuffer1(new Buffer); //这两个是后台线程的buffer
BufferPtr newBuffer2(new Buffer);
newBuffer1->bzero();
newBuffer2->bzero();
BufferVector buffersToWrite; //用来和前台线程的buffers_进行swap.
buffersToWrite.reserve(16);
while (running_)
{
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());
{
// 加锁
muduo::MutexLockGuard lock(mutex_);
// 如果buffer为空,那么表示没有数据需要写入文件,那么就等待指定的时间(注意这里没有用倒数计数器)
if (buffers_.empty()) // unusual usage!
{
cond_.waitForSeconds(flushInterval_);
}
//无论cond是因何而醒来,都要将currentBuffer_放到buffers_中。
//如果是因为时间到而醒,那么currentBuffer_还没满,此时也要将之写入LogFile中。
//如果已经有一个前台buffer满了,那么在前台线程中就已经把一个前台buffer放到buffers_中
//了。此时,还是需要把currentBuffer_放到buffers_中(注意,前后放置是不同的buffer,
//因为在前台线程中,currentBuffer_已经被换成nextBuffer_指向的buffer了)
buffers_.push_back(std::move(currentBuffer_));
/*---归还一个buffer---*/ // 将新的buffer转成当前缓冲区
currentBuffer_ = std::move(newBuffer1);
// buffers_和buffersToWrite交换数据,此时buffers_所有的数据存放在buffersToWrite,而buffers_变为空
buffersToWrite.swap(buffers_);
if (!nextBuffer_) //append currentBuffer_ = std::move(nextBuffer_);
{//the nextBuffer_ is still free. notuse for swap with currentBuffer_
nextBuffer_ = std::move(newBuffer2);/*-----假如需要,归还第二个----*/
}
}

assert(!buffersToWrite.empty());

// 如果将要写入文件的buffer列表中buffer的个数大于25,那么将多余数据删除
// 消息堆积
//前端陷入死循环,拼命发送日志消息,超过后端的处理能力
//这是典型的生产速度超过消费速度,会造成数据在内存中的堆积
//严重时引发性能问题(可用内存不足),
//或程序崩溃(分配内存失败)
if (buffersToWrite.size() > 25)
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));
// 丢掉多余日志,以腾出内存,仅保留两块缓冲区
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
}
// 将buffersToWrite的数据写入到日志中
for (size_t i = 0; i < buffersToWrite.size(); ++i)
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
output.append(buffersToWrite[i]->data(), buffersToWrite[i]->length());
}
// 重新调整buffersToWrite的大小
if (buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
// 仅保留两个buffer,用于newBuffer1和newBuffer2
buffersToWrite.resize(2);
}
if (!newBuffer1)
{
assert(!buffersToWrite.empty());
// 从buffersToWrite中弹出一个作为newBuffer1
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
// 清理newBuffer1
newBuffer1->reset();
}
//前台buffer是由newBuffer1 2 归还的。现在把buffersToWrite的buffer归还给后台buffer
if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset();
}

buffersToWrite.clear();
output.flush(); //flush to drive. less than3 mins a time
}
output.flush();
}
从代码可以看到后台线程的主要如下:
        先用线程同步中的条件变量进行睡眠,睡眠时间为日志库的flush时间。所以,当条件变量的条件满足(即前台线程把一个已经满了的buffer放到了buffers_中),或者超时。无论是哪种情况,都还会有一个currentBuffer_前台buffer正在使用。将这个currentBuffer_放到已满buffers_数组中。这样buffers_就有了待进行IO的buffer了。
        将bufferToWrite和buffers_进行swap。这就完成了将写了日志记录的buffer从前台线程到后台线程的转变。后台线程慢慢进行IO即可

为了及时将日志消息写入文件,即使buffer A未满,日志库也会每3秒执行一次交换写入操作.
muduo::MutexLockGuard lock(mutex_);
if (buffers_.empty()) // unusual usage!
{
cond_.waitForSeconds(flushInterval_);
}
在多处理器中,对于某种pthread_cond_signal的实现,不可能避免解除阻塞,多个等待条件变量的线程。
在多处理器中,pthread_cond_signal可能唤醒多个等待条件变量的线程