__memp_alloc()
注: MPOOL_ALLOC_SEARCH_DYN 没有 出现在 bdb document上, 也没出现在 除了mp_alloc外的代码里. 先删了 以便代码清楚.
按 mpool初始化代码来看, 一个hash bucket上 假定为 2.5个buffer.
查找有 三层嵌套:
遍历mpool region所有的hash bucket
遍历 此bucket的 buffer list
遍历此buffer的 version chain
用了 两个 栈内变量 标记 mtx分配的情况: h_locked, b_lock
mpool->last_checked
存了 上一次 buffer checked for free
全选复制放进笔记
MPOOL_REGION_LOCK(env, infop);
// 首先 直接去 alloc.
alloc: if ((ret = __env_alloc(infop, len, &p)) == 0) {
if (mfp != NULL) {
MVCC_BHALIGN(p);
bhp = (BH *)p;
if ((ret = __mutex_alloc(env, MTX_MPOOL_BH, DB_MUTEX_SHARED, &bhp->mtx_buf)) != 0) {
MVCC_BHUNALIGN(bhp);
__env_alloc_free(infop, bhp);
goto search;
}
c_mp->pages++;
}
MPOOL_REGION_UNLOCK(env, infop);
found: if (offsetp != NULL)
*offsetp = R_OFFSET(infop, p);
*(void **)retp = p;
goto done; // 成功退出情况: 1. 可以从free mem 分配; 2. 找到某个 buffer 可以重用.
} else if (giveup || c_mp->pages == 0) {
MPOOL_REGION_UNLOCK(env, infop);
__db_errx(env, DB_STR("3017", "unable to allocate space from the buffer cache"));
if (ret == ENOMEM && write_error != 0)
ret = EIO;
goto done;
}
search: // 保证有 mpool region的 lock
cache_reduction = c_mp->pages / 10;
high_priority = aggressive ? MPOOL_LRU_MAX : c_mp->lru_priority - cache_reduction;
lru_generation = c_mp->lru_generation;
ret = 0;
freed_space = 0;
total_buckets += buckets;
buckets = 0; // 考察过的buffer数
for (;;) { // 对hash bucket 的遍历
if (c_mp->pages == 0)
goto alloc;
hp = &dbht[c_mp->last_checked++]; // 下一个hash bucket
if (hp >= hp_end) { // wrap around
c_mp->last_checked = 0;
hp = &dbht[c_mp->last_checked++];
}
/*
* Aggressive:
* a: flush所有的buffer, 不论priority;
* b: 每一个hash bucket都考虑, 不会只考虑 两个;
* c: 考虑 放弃的 情况.
*
* 到此3次后, sync 内存池.
*/
if (buckets++ == c_mp->htab_buckets) { // 扫完一遍 hash bucket
if (freed_space > 0)
goto alloc;
MPOOL_REGION_UNLOCK(env, infop);
/* Refresh the list of mvcc reader transactions. */
if (snapshots != NULL)
__os_free(env, snapshots);
if ((ret = __txn_get_readers(
env, &snapshots, &n_snapshots)) != 0)
goto err;
aggressive++;
high_priority = MPOOL_LRU_MAX; // aggressive, 考虑所有 buffer
switch (aggressive) {
case 1:
break;
case 2:
put_counter = c_mp->put_counter; // 考虑 放弃的 情况
break;
case 3:
case 4:
case 5:
case 6:
(void)__memp_sync_int( // sync mpool,
env, NULL, 0, DB_SYNC_ALLOC, NULL, NULL);
__os_yield(env, 1, 0);
break;
default:
aggressive = 1;
if (put_counter == c_mp->put_counter)
giveup = 1;
break;
}
MPOOL_REGION_LOCK(env, infop);
goto alloc;
}
if (SH_TAILQ_FIRST(&hp->hash_bucket, __bh) == NULL) // 空 的 hash bucket
continue;
MPOOL_REGION_UNLOCK(env, infop); // 这里, 先释放 mpool region mtx; 再 加 hash bucket的 读mtx
MUTEX_READLOCK(env, hp->mtx_hash);
h_locked = 1; // hash lock?
b_lock = 0; // buffer lock?
if (buckets > MPOOL_ALLOC_SEARCH_LIMIT && aggressive == 0) {
aggressive = 1; // 进入 aggresive的 一种情况, 考虑了 足够量的 bucket.
high_priority = MPOOL_LRU_MAX;
if (snapshots == NULL && (ret = __txn_get_readers(
env, &snapshots, &n_snapshots)) != 0)
goto err;
}
retry_search: // retry_search, 对某个hash bucket的查找
bhp = NULL; // 当前bucket中 最合适的 candidate
bucket_priority = high_priority; // 当前bucket中, 可以考虑的 buffer 的最小priority. 找 当前bucket中可以考虑的最小priority的buffer
obsolete = 0;
if (n_snapshots > 0 && LOG_COMPARE(&snapshots[n_snapshots - 1], &hp->old_reader) > 0)
hp->old_reader = snapshots[n_snapshots - 1]; // 缓存 当前 最旧的 reader(即最老的那个trans 可以读到的lsn)
SH_TAILQ_FOREACH(current_bhp, &hp->hash_bucket, hq, __bh) { // 遍历当前的 hash bucket的 buffer列表
if (SH_CHAIN_SINGLETON(current_bhp, vc)) { // 每一个buffer 初始化时, vc->next,pre都设为-1(__memp_fget).
// 即为singleton(此buffer没有别的版本)
if (BH_REFCOUNT(current_bhp) != 0) // 正在使用.不考虑
continue; // 继续遍历当前bucket
buffers++; // 表示我们 考虑过的 buffer数
if (bucket_priority > current_bhp->priority) { // 当前 buffer的priority 前面的 buffers的priority 都要小.
bucket_priority = current_bhp->priority; // 记录priority最小值
if (bhp != NULL)
atomic_dec(env, &bhp->ref); // 舍弃 上一个保存的 buffer candidate
bhp = current_bhp; // 当前buffer 为candidate
atomic_inc(env, &bhp->ref); // 防止 当前buffer被 别的thread 从mpool中 移除
}
continue; // 继续遍历当前bucket
}
// 到了这里, 表示当前buffer 有别的mvcc version; 当前buffer为 最新的version.
for (mvcc_bhp = oldest_bhp = current_bhp;
mvcc_bhp != NULL;
oldest_bhp = mvcc_bhp,
mvcc_bhp = SH_CHAIN_PREV(mvcc_bhp, vc, __bh)) { // 遍历vc chain. 沿vc 链向前, buffer越来越旧.
DB_ASSERT(env, mvcc_bhp !=
SH_CHAIN_PREV(mvcc_bhp, vc, __bh));
if (n_snapshots > 0 &&
__memp_bh_unreachable(env,
mvcc_bhp, snapshots, n_snapshots)) {
oldest_bhp = mvcc_bhp; // 当前mvcc buffer不可见, 找到obsolete
goto is_obsolete;
}
// 当前buffer 可以 被mvcc reader trans 看到
if (bhp != NULL &&
mvcc_bhp->priority >= bhp->priority)
continue; // 当前mvcc buffer 比当前bucket中的candidate buffer priority 高; 继续遍历当前mvcc chain
if (BH_REFCOUNT(mvcc_bhp) != 0)
continue; // 正在使用, 继续遍历当前mvcc chain
if (aggressive < 2 && ++versions < (buffers >> 2))
continue; // aggressive 不够高; mpool里面的 mvcc buffer比值不高. 继续遍历当前mvcc chain
buffers++;
if (F_ISSET(mvcc_bhp, BH_FROZEN))
continue; // 继续遍历当前mvcc chain. frozen为什么不考虑? - frozen的page占内存很少,榨不出油
// 当前mvcc buffer 为candidate
if (bhp != NULL)
atomic_dec(env, &bhp->ref);
bhp = mvcc_bhp;
atomic_inc(env, &bhp->ref);
}
// 到了这里, oldest一定是 mvcc chain最老的一个.
if (BH_REFCOUNT(oldest_bhp) != 0)
continue;
if (BH_OBSOLETE(oldest_bhp, hp->old_reader, vlsn)) {
// 确定能到这里么? BH_OBSOLETE. 1). oldest_bph为最新(没有vc.next), 即为singleton, 不可能; 2). 有vc.next, 会在
// 前面的__memp_bh_unreachable()调用 那里 排除. 除非 n_snapshots == 0, 可能么?
if (aggressive < 2)
buffers++;
is_obsolete: // 可以从 前面的__memp_bh_unreachable()调用 那里过来
obsolete = 1;
// oldest_bhp 为 找到的 obsolete的buffer; bhp为前面的candidate
if (bhp != NULL)
atomic_dec(env, &bhp->ref);
bhp = oldest_bhp;
atomic_inc(env, &bhp->ref);
goto this_buffer;
}
}
// 到这里 对 当前 hash bucket遍历结束;
if (bhp == NULL)
goto next_hb; // next_hb 在方法最后, 可能 遍历 下一个 hb; 或者 retry alloc.
priority = bhp->priority;
// 缓存当前hb, 进入下一个hb遍历.
if (hp_saved == NULL) {
if (aggressive > 1 && n_snapshots > 1)
goto this_buffer;
hp_saved = hp;
priority_saved = priority;
goto next_hb;
}
// 到了这里, 表示 有了 两个hash bucket备选(当前bucket CB, 以前bucket PB). 若当前的 bucket好, 直接用;
// 否则 交换 两个bucket, retry_search 前面的bucket (PB). why: 我们只对当前bucket加mtx, 以前的bucket
// 仅记录 (bucket地址, candidate buffer的 priority) 作为参考. 所以要加mtx后, 重新search一遍.
// 对以前的 bucket (PB)扫后, 若没有 candidate buffer, next_hb 扫 CB之后的bucket.
// PB未找到buffer: 1). 以前的 candidate buffer 被移除了(which is good); 2). 以前的 candidate buffer priority 增大了.
if (priority > priority_saved && hp != hp_saved) {
MUTEX_UNLOCK(env, hp->mtx_hash);
hp_tmp = hp_saved;
hp_saved = hp;
hp = hp_tmp;
priority_saved = priority;
MUTEX_READLOCK(env, hp->mtx_hash);
h_locked = 1;
DB_ASSERT(env, BH_REFCOUNT(bhp) > 0);
atomic_dec(env, &bhp->ref);
goto retry_search; // 重新扫当前的bucket(其实是 存的以前的那个)
}
if (lru_generation != c_mp->lru_generation) { // lru 可能被别的thread 重设.
DB_ASSERT(env, BH_REFCOUNT(bhp) > 0);
atomic_dec(env, &bhp->ref);
MUTEX_UNLOCK(env, hp->mtx_hash);
MPOOL_REGION_LOCK(env, infop);
hp_saved = NULL;
goto search; // 重头开始 扫 所有的hash bucket. 重设lru_generation
}
this_buffer:
// 到这里, 表示 1. 找到一个obsolete buffer; 2. 找到bucket中最小priority的buffer, 其为singleton或mvcc最老版本. 且
// 2.1. 现在 aggressive > 1 && n_snapshots > 1; 2.2. 比较过两个bucket 中的candidate, got a winner.
// 尝试重用这个buffer, 或者释放此buffer的内存.
hp_saved = NULL;
MUTEX_UNLOCK(env, hp->mtx_hash);
h_locked = 0;
if (BH_REFCOUNT(bhp) > 1) // buffer refcount 为db_atomic_t, 没有mtx 保护
goto next_hb;
if ((ret = MUTEX_TRYLOCK(env, bhp->mtx_buf)) != 0) { // 去hash mtx, 加buffer mtx
if (ret != DB_LOCK_NOTGRANTED) {
goto err;
}
ret = 0;
goto next_hb;
}
F_SET(bhp, BH_EXCLUSIVE);
if (obsolete)
F_SET(bhp, BH_UNREACHABLE);
b_lock = 1;
if (BH_REFCOUNT(bhp) != 1)
goto next_hb;
bh_mfp = R_ADDR(dbmp->reginfo, bhp->mf_offset); // MPOOLFILE
ret = 0;
dirty_eviction = 0;
if (F_ISSET(bhp, BH_DIRTY)) {
DB_ASSERT(env, atomic_read(&hp->hash_page_dirty) > 0);
ret = __memp_bhwrite(dbmp, hp, bh_mfp, bhp, 0); // 写脏页
DB_ASSERT(env, atomic_read(&bhp->ref) > 0);
if (ret != 0) { // 写 失败
if (ret != EPERM && ret != EAGAIN) {
write_error++;
__db_errx(env, DB_STR_A("3018"...);
}
bhp->priority = MPOOL_LRU_REDZONE; // priority设为最大, 则 下次不会选它.
// (那此 buffer 的priority 什么时候被 重设?? 也许 在下次写的时候?)
goto next_hb;
}
dirty_eviction = 1;
}
if (SH_CHAIN_HASPREV(bhp, vc) ||
(SH_CHAIN_HASNEXT(bhp, vc) && !obsolete)) { // 要做mvcc freeze情况, 有磁盘io, 代价大
if (!aggressive ||
F_ISSET(bhp, BH_DIRTY | BH_FROZEN)) // 这里怎么会有 BH_DIRTY? 刚写过脏页了. 而且标dirty 需要 buffer mtx保护.
goto next_hb;
ret = __memp_bh_freeze(
dbmp, infop, hp, bhp, &alloc_freeze); // 对此buffer做freeze. 即将page 写入磁盘, 内存仅保留少量信息.
if (ret == EIO)
write_error++;
if (ret == EBUSY || ret == EIO ||
ret == ENOMEM || ret == ENOSPC) {
ret = 0;
goto next_hb;
} else if (ret != 0) {
DB_ASSERT(env, BH_REFCOUNT(bhp) > 0);
atomic_dec(env, &bhp->ref);
DB_ASSERT(env, b_lock);
F_CLR(bhp, BH_EXCLUSIVE);
MUTEX_UNLOCK(env, bhp->mtx_buf);
DB_ASSERT(env, !h_locked);
goto err;
}
}
MUTEX_LOCK(env, hp->mtx_hash); // 注: 此时我们 还有 buffer的mtx
h_locked = 1;
// 刚才我们 release了 hash bucket mtx. 所以buffer 可能被修改了.
if (BH_REFCOUNT(bhp) != 1 || F_ISSET(bhp, BH_DIRTY) ||
(SH_CHAIN_HASNEXT(bhp, vc) &&
SH_CHAIN_NEXTP(bhp, vc, __bh)->td_off != bhp->td_off &&
!(obsolete || BH_OBSOLETE(bhp, hp->old_reader, vlsn)))) {
if (FLD_ISSET(env->dbenv->verbose, DB_VERB_MVCC))
__db_msg(env,
"memp_alloc next_hb past bhp %lx flags %x ref %d %lx/%lx",
(u_long)R_OFFSET(infop, bhp), bhp->flags,
BH_REFCOUNT(bhp),
(u_long)R_OFFSET(infop, SH_CHAIN_NEXTP(bhp, vc, __bh)),
(u_long)R_OFFSET(infop, SH_CHAIN_PREVP(bhp, vc, __bh)));
goto next_hb;
}
/*
* If the buffer is frozen, thaw it and look for another one
* we can use. (Calling __memp_bh_freeze above will not mark
* this bhp BH_FROZEN; it creates another frozen one.)
*/
if (F_ISSET(bhp, BH_FROZEN)) {
DB_ASSERT(env, SH_CHAIN_SINGLETON(bhp, vc) || // 到达这里的情况. singleton; obsolete;
obsolete || BH_OBSOLETE(bhp, hp->old_reader, vlsn));
DB_ASSERT(env, BH_REFCOUNT(bhp) > 0);
if (!F_ISSET(bhp, BH_THAWED)) {
if ((ret = __memp_bh_thaw(dbmp,
infop, hp, bhp, NULL)) != 0) // 最后一个参数为NULL, 即 此 bhp 被移除.
goto done;
MUTEX_READLOCK(env, hp->mtx_hash); // hash bucket mtx在 thaw 时被释放; 重新拿.
} else {
need_free = atomic_dec(env, &bhp->ref) == 0;
F_CLR(bhp, BH_EXCLUSIVE);
MUTEX_UNLOCK(env, bhp->mtx_buf);
if (need_free) {
MPOOL_REGION_LOCK(env, infop);
SH_TAILQ_INSERT_TAIL(&c_mp->free_frozen,
bhp, hq);
MPOOL_REGION_UNLOCK(env, infop);
}
}
bhp = NULL;
b_lock = alloc_freeze = 0;
goto retry_search; // thaw 过了 此buffer, 再当前 bucket 中再扫一次
}
/*
* If we need some empty buffer headers for freezing, turn the
* buffer we've found into frozen headers and put them on the
* free list. Only reset alloc_freeze if we've actually
* allocated some frozen buffer headers.
*/
if (alloc_freeze) { // 这段干嘛的? 好像是处理 MPOOL 的 free_frozen列表
/* __memp_ bhfree(..., 0) unlocks both hp & bhp. */
h_locked = 0;
b_lock = 0;
if ((ret = __memp_bhfree(dbmp,
infop, bh_mfp, hp, bhp, 0)) != 0)
goto err;
DB_ASSERT(env, bhp->mtx_buf != MUTEX_INVALID);
if ((ret = __mutex_free(env, &bhp->mtx_buf)) != 0)
goto err;
MVCC_MPROTECT(bhp->buf, bh_mfp->pagesize,
PROT_READ | PROT_WRITE | PROT_EXEC);
MPOOL_REGION_LOCK(env, infop);
SH_TAILQ_INSERT_TAIL(&c_mp->alloc_frozen,
(BH_FROZEN_ALLOC *)bhp, links);
frozen_bhp = (BH_FROZEN_PAGE *)
((BH_FROZEN_ALLOC *)bhp + 1);
endp = (u_int8_t *)bhp->buf + bh_mfp->pagesize;
while ((u_int8_t *)(frozen_bhp + 1) < endp) {
frozen_bhp->header.mtx_buf = MUTEX_INVALID;
SH_TAILQ_INSERT_TAIL(&c_mp->free_frozen,
(BH *)frozen_bhp, hq);
frozen_bhp++;
}
MPOOL_REGION_UNLOCK(env, infop);
alloc_freeze = 0;
MUTEX_READLOCK(env, hp->mtx_hash);
h_locked = 1;
goto retry_search; // 还得在 当前bucket 再扫一次
}
if (mfp != NULL && mfp->pagesize == bh_mfp->pagesize) {
/* __memp_ bhfree(..., 0) unlocks both hp & bhp. */
h_locked = 0;
b_lock = 0;
if ((ret = __memp_bhfree(dbmp, // bhfree 会assert refcount == 1. 这里距上一次判 refcount 有几条if, the window exists, right?
infop, bh_mfp, hp, bhp, 0)) != 0)
goto err;
p = bhp;
goto found; // 终于. bph的size 和我们要的size一样, 可以重用.
}
freed_space += sizeof(*bhp) + bh_mfp->pagesize; // bhp 的size 和 实际数据页的 page size
/* __memp_ bhfree(.., BH_FREE_FREEMEM) also unlocks hp & bhp. */
h_locked = 0;
b_lock = 0;
if ((ret = __memp_bhfree(dbmp,
infop, bh_mfp, hp, bhp, BH_FREE_FREEMEM)) != 0) // bhp 整个被干掉
goto err;
/* Reset "aggressive" and "write_error" if we free any space. */
if (aggressive > 1)
aggressive = 1;
write_error = 0;
if (0) {
next_hb: if (bhp != NULL) {
DB_ASSERT(env, BH_REFCOUNT(bhp) > 0);
atomic_dec(env, &bhp->ref);
if (b_lock) {
F_CLR(bhp, BH_EXCLUSIVE);
MUTEX_UNLOCK(env, bhp->mtx_buf);
b_lock = 0;
}
}
if (h_locked)
MUTEX_UNLOCK(env, hp->mtx_hash);
h_locked = 0;
}
obsolete = 0;
MPOOL_REGION_LOCK(env, infop);
if (freed_space >= 3 * len) // free的 空间大于需要的三倍, retry; 但是可能空间是不连续的.
goto alloc;
}
err:
if (h_locked) {
MUTEX_UNLOCK(env, hp->mtx_hash);
h_locked = 0;
}
done:
if (snapshots != NULL)
__os_free(env, snapshots);
return (ret);
}