缓冲区技术,在服务端编程中经常用到的。muduo日志库是用双缓冲技术。
很多项目服务端最开始的性能瓶颈不出现在语言也不出现在框架,通常是出现在数据库上,具体表现为数据库慢查询、死锁、数据库连接数占满等。当遇到数据库读写频繁的业务,缓存是必不可少的了。但是缓存该怎么做才能使利用率更高,减少或避免脏读呢?
从需求、现有的硬件等限制,学习如何设计并实现一个合格的日志库?
(1)需求分为功能需求、性能需求。
功能需求主要考虑: 接口设计是否合理、易用?需要支持哪些功能?muduo日志库的设计原则是:尽量提供最精简的日志设施,不必要的就不提供。封闭的接口尽量便于程序员阅读、查错。
性能需求考虑单位时间内可写入的最大日志量,且要求它不会阻塞正常的业务处理流程。muduo的设计原则:性能只需要“足够好”,即能达到现代硬盘的最大写入带宽即可。且在实现时,要考虑减少多线程的锁争用,尽量不阻塞正常的业务处理逻辑。
(2)如何做异常处理、性能调优?
(3)日志库的实现逻辑基本大同小异,逻辑基本是:在各个业务线程中拼装日志串,然后将日志串存入一个Buffer(访问时需要加锁),另外有一个日志线程不停地从Buffer中取数据,然后将它输出到日志文件中。实现的时候会考虑:
*Buffer如何设计?什么时候唤醒日志线程从Buffer中取数据?
*如何减少 业务线程、日志线程 访问Buffer时的锁竞争?
*日志串如何组装,才能使它组装速度足够快、且要兼顾接口设计的易用性?
*要考虑线程间的竞争、写入速度等各种情况,保证不会丢失每条日志串。
*什么时候切换写到另一个日志文件?什么时候flush到日志文件?
*若日志串写入过多,日志线程来不及消费,怎么办?
muduo日志库的性能很高,大概可以达到每秒200多万条,非常快。代码中做了很多性能调优,比如实现了一个memory output stream,来加快各种类型转换成字符串。利用线程私有变量缓存了一些变量值来加快日志串的组装。用双缓冲技术来减少线程之间的锁竞争、最大化一次性输出日志的吞吐量。阅读时注意每个优化细节,然后在平时做性能调优时 模仿它。
使得前端的业务线程与后端的日志线程能够并发,并且,写日志不太频繁,提高效率.
const int kLargeBuffer = 4000*1000;缓冲区Buffer是模版类FixedBuffer的一份具体实现,其大小为4M。
typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
typedef BufferVector::value_type BufferPtr;
const int flushInterval_; // 超时时间,在flushInterval_秒内,缓冲区没写满,仍将缓冲区的数据写到文件中
std::atomic<bool> running_; // 是否正在运行
string basename_; // 日志名字
size_t rollSize_; // 预留的日志大小
muduo::Thread thread_; // 执行该异步日志记录器的线程
muduo::CountDownLatch latch_; // 倒数计数,用于指示什么时候日志记录器才能开始正常工作,用于等待线程启动
muduo::MutexLock mutex_;
muduo::Condition cond_;
BufferPtr currentBuffer_; // 当前的缓冲区
BufferPtr nextBuffer_; // 预备缓冲区
BufferVector buffers_; // 缓冲区队列,待写入文件
为什么vector容器中存储的是std::unique_ptr呢?
用std::unique_ptr智能指针管理对象资源,持有对对象的独有权 —— 两个 unique_ptr 不能指向一个对象,不能进行复制操作只能进行移动操作,在 unique_ptr 的生命期结束后释放该资源。
std::vector的push_back函数。
void push_back( const T& value ); // (1)很明显,左值调用1,右值调用2。如果你要往容器内放入超大对象,那么版本2自然是不2选择。
void push_back( T&& value ); // (2)
vector<vector<string>> vv;
vector<string> v = {"123", "456"};
v.push_back("789"); // 临时构造的string类型右值被移动进容器v
vv.push_back(move(v)); // 显式将v移动进vv
MyObj::MyObj() {
for (...) {
vec.push_back(new T());
}
// ...
}
MyObj::~MyObj() {
for (vector<T*>::iterator iter = vec.begin(); iter != vec.end(); ++iter) {
if (*iter) delete *iter;
}
// ...
}
// 开始启动异步日志
void start()
{
running_ = true;
// 在构造函数中latch_的值为1
// 线程运行之后将latch_的减为0
thread_.start();
// 必须等到latch_变为0才能从start函数中返回,这表明初始化已经完成,等待日志线程启动
latch_.wait();
}
//所有LOG_*最终都会调用append函数。
void AsyncLogging::append(const char* logline, int len)
{
muduo::MutexLockGuard lock(mutex_);
// 如果当前buffer还有空间,就添加到当前日志
if (currentBuffer_->avail() > len)
{
currentBuffer_->append(logline, len);
}
else// 当前buffer已满
{
// 把当前buffer添加到buffer列表中
buffers_.push_back(std::move(currentBuffer_));
// 重新设置当前buffer
if (nextBuffer_)
{
currentBuffer_ = std::move(nextBuffer_);
}
else
{
currentBuffer_.reset(new Buffer); // Rarely happens
//如果前端写入速度太快了,一下子把两块缓冲都用完了,那么只好分配一块新的buffer,作当前缓冲,这是极少发生的情况
}
currentBuffer_->append(logline, len);
// 通知日志线程,有数据可写
// 也就是说,只有当缓冲区满了之后才会将数据写入日志文件中
cond_.notify();
}
}
// 线程调用的函数,主要用于周期性的flush数据到日志文件中
void AsyncLogging::threadFunc()
{
assert(running_ == true);
latch_.countDown();
LogFile output(basename_, rollSize_, false);
BufferPtr newBuffer1(new Buffer); //这两个是后台线程的buffer
BufferPtr newBuffer2(new Buffer);
newBuffer1->bzero();
newBuffer2->bzero();
BufferVector buffersToWrite; //用来和前台线程的buffers_进行swap.
buffersToWrite.reserve(16);
while (running_)
{
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());
{
// 加锁
muduo::MutexLockGuard lock(mutex_);
// 如果buffer为空,那么表示没有数据需要写入文件,那么就等待指定的时间(注意这里没有用倒数计数器)
if (buffers_.empty()) // unusual usage!
{
cond_.waitForSeconds(flushInterval_);
}
//无论cond是因何而醒来,都要将currentBuffer_放到buffers_中。
//如果是因为时间到而醒,那么currentBuffer_还没满,此时也要将之写入LogFile中。
//如果已经有一个前台buffer满了,那么在前台线程中就已经把一个前台buffer放到buffers_中
//了。此时,还是需要把currentBuffer_放到buffers_中(注意,前后放置是不同的buffer,
//因为在前台线程中,currentBuffer_已经被换成nextBuffer_指向的buffer了)
buffers_.push_back(std::move(currentBuffer_));
/*---归还一个buffer---*/ // 将新的buffer转成当前缓冲区
currentBuffer_ = std::move(newBuffer1);
// buffers_和buffersToWrite交换数据,此时buffers_所有的数据存放在buffersToWrite,而buffers_变为空
buffersToWrite.swap(buffers_);
if (!nextBuffer_) //append currentBuffer_ = std::move(nextBuffer_);
{//the nextBuffer_ is still free. notuse for swap with currentBuffer_
nextBuffer_ = std::move(newBuffer2);/*-----假如需要,归还第二个----*/
}
}
assert(!buffersToWrite.empty());
// 如果将要写入文件的buffer列表中buffer的个数大于25,那么将多余数据删除
// 消息堆积
//前端陷入死循环,拼命发送日志消息,超过后端的处理能力
//这是典型的生产速度超过消费速度,会造成数据在内存中的堆积
//严重时引发性能问题(可用内存不足),
//或程序崩溃(分配内存失败)
if (buffersToWrite.size() > 25)
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));
// 丢掉多余日志,以腾出内存,仅保留两块缓冲区
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
}
// 将buffersToWrite的数据写入到日志中
for (size_t i = 0; i < buffersToWrite.size(); ++i)
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
output.append(buffersToWrite[i]->data(), buffersToWrite[i]->length());
}
// 重新调整buffersToWrite的大小
if (buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
// 仅保留两个buffer,用于newBuffer1和newBuffer2
buffersToWrite.resize(2);
}
if (!newBuffer1)
{
assert(!buffersToWrite.empty());
// 从buffersToWrite中弹出一个作为newBuffer1
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
// 清理newBuffer1
newBuffer1->reset();
}
//前台buffer是由newBuffer1 2 归还的。现在把buffersToWrite的buffer归还给后台buffer
if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset();
}
buffersToWrite.clear();
output.flush(); //flush to drive. less than3 mins a time
}
output.flush();
}
先用线程同步中的条件变量进行睡眠,睡眠时间为日志库的flush时间。所以,当条件变量的条件满足(即前台线程把一个已经满了的buffer放到了buffers_中),或者超时。无论是哪种情况,都还会有一个currentBuffer_前台buffer正在使用。将这个currentBuffer_放到已满buffers_数组中。这样buffers_就有了待进行IO的buffer了。
将bufferToWrite和buffers_进行swap。这就完成了将写了日志记录的buffer从前台线程到后台线程的转变。后台线程慢慢进行IO即可
muduo::MutexLockGuard lock(mutex_);在多处理器中,对于某种pthread_cond_signal的实现,不可能避免解除阻塞,多个等待条件变量的线程。
if (buffers_.empty()) // unusual usage!
{
cond_.waitForSeconds(flushInterval_);
}
在多处理器中,pthread_cond_signal可能唤醒多个等待条件变量的线程