Binder中的ProcessState和IPCThreadState分析

时间:2021-05-13 15:51:55

<span style="font-family: 宋体; font-size: 14px; text-indent: 28px;">ProcessState和IPCThreadState最为binder通信的基础,我们从MediaPlayer来开始分析这两个类</span>
 
int main(int argc, char** argv){......sp<ProcessState> proc(ProcessState::self());        sp<IServiceManager> sm = defaultServiceManager();        ALOGI("ServiceManager: %p", sm.get());        AudioFlinger::instantiate();// 绑定media服务        MediaPlayerService::instantiate();        CameraService::instantiate();        AudioPolicyService::instantiate();        registerExtensions();        ProcessState::self()->startThreadPool();        IPCThreadState::self()->joinThreadPool();}

在进程的开始,创建了一个singleton的ProcessState

sp<ProcessState> ProcessState::self()
{
Mutex::Autolock _l(gProcessMutex);
if (gProcess != NULL) {
return gProcess;
}
gProcess = new ProcessState;
return gProcess;
}

ProcessState::ProcessState()
: mDriverFD(open_driver())
, mVMStart(MAP_FAILED)
, mManagesContexts(false)
, mBinderContextCheckFunc(NULL)
, mBinderContextUserData(NULL)
, mThreadPoolStarted(false)
, mThreadPoolSeq(1)
{
if (mDriverFD >= 0) {
// XXX Ideally, there should be a specific define for whether we
// have mmap (or whether we could possibly have the kernel module
// availabla).
#if !defined(HAVE_WIN32_IPC)
// mmap the binder, providing a chunk of virtual address space to receive transactions.
mVMStart = mmap(0, BINDER_VM_SIZE, PROT_READ, MAP_PRIVATE | MAP_NORESERVE, mDriverFD, 0);
if (mVMStart == MAP_FAILED) {
// *sigh*
ALOGE("Using /dev/binder failed: unable to mmap transaction memory.\n");
close(mDriverFD);
mDriverFD = -1;
}
#else
mDriverFD = -1;
#endif
}

LOG_ALWAYS_FATAL_IF(mDriverFD < 0, "Binder driver could not be opened. Terminating.");
}

static int open_driver()
{
int fd = open("/dev/binder", O_RDWR);
if (fd >= 0) {
fcntl(fd, F_SETFD, FD_CLOEXEC);
int vers;
status_t result = ioctl(fd, BINDER_VERSION, &vers);
if (result == -1) {
ALOGE("Binder ioctl to obtain version failed: %s", strerror(errno));
close(fd);
fd = -1;
}
if (result != 0 || vers != BINDER_CURRENT_PROTOCOL_VERSION) {
ALOGE("Binder driver protocol does not match user space protocol!");
close(fd);
fd = -1;
}
size_t maxThreads = 15;
result = ioctl(fd, BINDER_SET_MAX_THREADS, &maxThreads);
if (result == -1) {
ALOGE("Binder ioctl to set max threads failed: %s", strerror(errno));
}
} else {
ALOGW("Opening '/dev/binder' failed: %s\n", strerror(errno));
}
return fd;
}

跟踪代码知,这个ProcessState singleton实例对象打开了binder驱动,并映射了地址空间用于接收binder的传输数据,同时也设定binder最大的线程为15

查看获取ServiceManager代码

sp<IServiceManager> defaultServiceManager()
{
if (gDefaultServiceManager != NULL) return gDefaultServiceManager;

{
AutoMutex _l(gDefaultServiceManagerLock);
if (gDefaultServiceManager == NULL) {
// Bpbinder 转为 IServiceManager (BpServiceManager)
gDefaultServiceManager = interface_cast<IServiceManager>(
ProcessState::self()->getContextObject(NULL));
}
}

return gDefaultServiceManager;
}

从上面知ProcessState提供了getContextObject,getContextObject是调用了getStrongProxyForHandle,实际上getContextObject就是获取ServiceManager的Binder对象

sp<IBinder> ProcessState::getStrongProxyForHandle(int32_t handle)
{
sp<IBinder> result;

AutoMutex _l(mLock);

handle_entry* e = lookupHandleLocked(handle);

if (e != NULL) {
// We need to create a new BpBinder if there isn't currently one, OR we
// are unable to acquire a weak reference on this current one. See comment
// in getWeakProxyForHandle() for more info about this.
IBinder* b = e->binder;
if (b == NULL || !e->refs->attemptIncWeak(this)) {
b = new BpBinder(handle);
e->binder = b;
if (b) e->refs = b->getWeakRefs();
result = b;
} else {
// This little bit of nastyness is to allow us to add a primary
// reference to the remote proxy when this team doesn't have one
// but another team is sending the handle to us.
result.force_set(b);
e->refs->decWeak(this);
}
}

return result;
}

在这一步ProcessState创建了Binder对象,而在new BpBinder的时候同时创建IPCThreadState

BpBinder::BpBinder(int32_t handle)
: mHandle(handle)
, mAlive(1)
, mObitsSent(0)
, mObituaries(NULL)
{
ALOGV("Creating BpBinder %p handle %d\n", this, mHandle);

extendObjectLifetime(OBJECT_LIFETIME_WEAK);
IPCThreadState::self()->incWeakHandle(handle);
}

IPCThreadState* IPCThreadState::self()
{
if (gHaveTLS) {
restart:
const pthread_key_t k = gTLS;
IPCThreadState* st = (IPCThreadState*)pthread_getspecific(k);
if (st) return st;
return new IPCThreadState;
}

if (gShutdown) return NULL;

pthread_mutex_lock(&gTLSMutex);
if (!gHaveTLS) {
// 函数 pthread_key_create() 用来创建线程私有数据。该函数从 TSD 池中分配一项,将其地址值赋给 key 供以后访问使用
if (pthread_key_create(&gTLS, threadDestructor) != 0) {
pthread_mutex_unlock(&gTLSMutex);
return NULL;
}
gHaveTLS = true;
}
pthread_mutex_unlock(&gTLSMutex);
goto restart;
}

IPCThreadState创建了一个线程,并带入线程析构


void IPCThreadState::threadDestructor(void *st)
{
IPCThreadState* const self = static_cast<IPCThreadState*>(st);
if (self) {
self->flushCommands();
#if defined(HAVE_ANDROID_OS)
if (self->mProcess->mDriverFD > 0) {
ioctl(self->mProcess->mDriverFD, BINDER_THREAD_EXIT, 0);
}
#endif
delete self;
}
}

Pthread可看百科http://baike.baidu.com/view/974776.htm?fr=aladdin

当线程结束时,flushCommands调用talkWithDriver告诉不read,只把数据写完到binder,告诉向Binder发出Binder_Thread_EXIT

 

 

在回头看创建BpBinder后,ProcessState的mHandleToObject已经有了这个ServiceManager binder对象,同时BpServiceManager也有ServiceManager binder对象,记住这些

 

接着我们看MediaPlayerService::instantiate();

void MediaPlayerService::instantiate() {
defaultServiceManager()->addService(
String16("media.player"), new MediaPlayerService());
}

这是在调用BpServiceManager的addService


virtual status_t addService(const String16& name, const sp<IBinder>& service,
bool allowIsolated)
{
Parcel data, reply;
data.writeInterfaceToken(IServiceManager::getInterfaceDescriptor());
data.writeString16(name);
data.writeStrongBinder(service);
data.writeInt32(allowIsolated ? 1 : 0);
status_t err = remote()->transact(ADD_SERVICE_TRANSACTION, data, &reply);
return err == NO_ERROR ? reply.readExceptionCode() : err;
}

remote()->transact(ADD_SERVICE_TRANSACTION, data, &reply);这里的remote为前面所说的BpServiceManager所记录下来的BpBinder(0),我们跟踪代码到BpBinder的transact,看看

 

status_t BpBinder::transact(
uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags)
{
// Once a binder has died, it will never come back to life.
if (mAlive) {
status_t status = IPCThreadState::self()->transact(
mHandle, code, data, reply, flags);
if (status == DEAD_OBJECT) mAlive = 0;
return status;
}

return DEAD_OBJECT;
}


这里的transact是调用了IPCThreadState::self()的transact,此时mHandler为0,code为ADD_SERVICE_TRANSACTION,flags为0,Parcel data为如下图

Binder中的ProcessState和IPCThreadState分析
Binder中的ProcessState和IPCThreadState分析

进入IPCThreadtransact函数

status_t IPCThreadState::transact(int32_t handle,
uint32_t code, const Parcel& data,
Parcel* reply, uint32_t flags)
{
status_t err = data.errorCheck();

flags |= TF_ACCEPT_FDS;
// TF_ACCEPT_FDS位设置为1,表示允许Server进程在返回结果中携带文件描述符

IF_LOG_TRANSACTIONS() {
TextOutput::Bundle _b(alog);
alog << "BC_TRANSACTION thr " << (void*)pthread_self() << " / hand "
<< handle << " / code " << TypeCode(code) << ": "
<< indent << data << dedent << endl;
}

if (err == NO_ERROR) {
LOG_ONEWAY(">>>> SEND from pid %d uid %d %s", getpid(), getuid(),
(flags & TF_ONE_WAY) == 0 ? "READ REPLY" : "ONE WAY");

// 把data,handle等填充入binder_transaction_data,在binder_transaction_data前面加入BC_TRANSACTION,写到Parcel mOut中。
err = writeTransactionData(BC_TRANSACTION, flags, handle, code, data, NULL);
}

if (err != NO_ERROR) {
if (reply) reply->setError(err);
return (mLastError = err);
}

if ((flags & TF_ONE_WAY) == 0) { // 不是异步的话
#if 0
if (code == 4) { // relayout
ALOGI(">>>>>> CALLING transaction 4");
} else {
ALOGI(">>>>>> CALLING transaction %d", code);
}
#endif
if (reply) {
err = waitForResponse(reply);
} else {
Parcel fakeReply;
err = waitForResponse(&fakeReply);
}
#if 0
if (code == 4) { // relayout
ALOGI("<<<<<< RETURNING transaction 4");
} else {
ALOGI("<<<<<< RETURNING transaction %d", code);
}
#endif

IF_LOG_TRANSACTIONS() {
TextOutput::Bundle _b(alog);
alog << "BR_REPLY thr " << (void*)pthread_self() << " / hand "
<< handle << ": ";
if (reply) alog << indent << *reply << dedent << endl;
else alog << "(none requested)" << endl;
}
} else {
err = waitForResponse(NULL, NULL);
}

return err;
}

Transact主要是把datahandle等填充入binder_transaction_data,在binder_transaction_data前面加入BC_TRANSACTION,写到Parcel mOut中。

status_t IPCThreadState::writeTransactionData(int32_t cmd, uint32_t binderFlags,
int32_t handle, uint32_t code, const Parcel& data, status_t* statusBuffer)
{
binder_transaction_data tr;

tr.target.handle = handle;
tr.code = code;
tr.flags = binderFlags;
tr.cookie = 0;
tr.sender_pid = 0;
tr.sender_euid = 0;

const status_t err = data.errorCheck();
if (err == NO_ERROR) {
tr.data_size = data.ipcDataSize();
tr.data.ptr.buffer = data.ipcData();
tr.offsets_size = data.ipcObjectsCount()*sizeof(size_t);
tr.data.ptr.offsets = data.ipcObjects();
} else if (statusBuffer) {
tr.flags |= TF_STATUS_CODE;
*statusBuffer = err;
tr.data_size = sizeof(status_t);
tr.data.ptr.buffer = statusBuffer;
tr.offsets_size = 0;
tr.data.ptr.offsets = NULL;
} else {
return (mLastError = err);
}

mOut.writeInt32(cmd);
mOut.write(&tr, sizeof(tr));

return NO_ERROR;
}

然后判断是否同步,是同步的话,就调用


waitForResponse(reply);
status_t IPCThreadState::waitForResponse(Parcel *reply, status_t *acquireResult)
{
int32_t cmd;
int32_t err;

while (1) {
if ((err=talkWithDriver()) < NO_ERROR) break;
err = mIn.errorCheck();
if (err < NO_ERROR) break;
if (mIn.dataAvail() == 0) continue;

cmd = mIn.readInt32();

IF_LOG_COMMANDS() {
alog << "Processing waitForResponse Command: "
<< getReturnString(cmd) << endl;
}

switch (cmd) {
case BR_TRANSACTION_COMPLETE:
if (!reply && !acquireResult) goto finish;
break;

case BR_DEAD_REPLY:
err = DEAD_OBJECT;
goto finish;

case BR_FAILED_REPLY:
err = FAILED_TRANSACTION;
goto finish;

case BR_ACQUIRE_RESULT:
{
ALOG_ASSERT(acquireResult != NULL, "Unexpected brACQUIRE_RESULT");
const int32_t result = mIn.readInt32();
if (!acquireResult) continue;
*acquireResult = result ? NO_ERROR : INVALID_OPERATION;
}
goto finish;

case BR_REPLY:
{
binder_transaction_data tr;
err = mIn.read(&tr, sizeof(tr));
ALOG_ASSERT(err == NO_ERROR, "Not enough command data for brREPLY");
if (err != NO_ERROR) goto finish;

if (reply) { if ((tr.flags & TF_STATUS_CODE) == 0) {
reply->ipcSetDataReference(
reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
tr.data_size,
reinterpret_cast<const size_t*>(tr.data.ptr.offsets),
tr.offsets_size/sizeof(size_t),
freeBuffer, this);
} else {
err = *static_cast<const status_t*>(tr.data.ptr.buffer);
freeBuffer(NULL,
reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
tr.data_size,
reinterpret_cast<const size_t*>(tr.data.ptr.offsets),
tr.offsets_size/sizeof(size_t), this);
}
} else {
freeBuffer(NULL,
reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
tr.data_size,
reinterpret_cast<const size_t*>(tr.data.ptr.offsets),
tr.offsets_size/sizeof(size_t), this);
continue;
}
}
goto finish;

default:
err = executeCommand(cmd);
if (err != NO_ERROR) goto finish;
break;
}
}

finish:
if (err != NO_ERROR) {
if (acquireResult) *acquireResult = err;
if (reply) reply->setError(err);
mLastError = err;
}

return err;
}

可以看到是调用了talkWithDriver(),把数据传给了Binder,并读取返回的数据到Parcel mIn

根据mIn读到的值执行操作。


接着进程调用

ProcessState::self()->startThreadPool();

void ProcessState::startThreadPool()
{
AutoMutex _l(mLock);
if (!mThreadPoolStarted) {
mThreadPoolStarted = true;
spawnPooledThread(true);
}
}



oid ProcessState::spawnPooledThread(bool isMain)
{
if (mThreadPoolStarted) {
String8 name = makeBinderThreadName();
ALOGV("Spawning new pooled thread, name=%s\n", name.string());
sp<Thread> t = new PoolThread(isMain);
t->run(name.string());
}
}

status_t Thread::run(const char* name, int32_t priority, size_t stack)
{
Mutex::Autolock _l(mLock);

if (mRunning) {
// thread already started
return INVALID_OPERATION;
}

// reset status and exitPending to their default value, so we can
// try again after an error happened (either below, or in readyToRun())
mStatus = NO_ERROR;
mExitPending = false;
mThread = thread_id_t(-1);

// hold a strong reference on ourself
mHoldSelf = this;

mRunning = true;

bool res;
if (mCanCallJava) {
res = createThreadEtc(_threadLoop,
this, name, priority, stack, &mThread);
} else {
res = androidCreateRawThreadEtc(_threadLoop,
this, name, priority, stack, &mThread);
}

if (res == false) {
mStatus = UNKNOWN_ERROR; // something happened!
mRunning = false;
mThread = thread_id_t(-1);
mHoldSelf.clear(); // "this" may have gone away after this.

return UNKNOWN_ERROR;
}

// Do not refer to mStatus here: The thread is already running (may, in fact
// already have exited with a valid mStatus result). The NO_ERROR indication
// here merely indicates successfully starting the thread and does not
// imply successful termination/execution.
return NO_ERROR;

// Exiting scope of mLock is a memory barrier and allows new thread to run
}

开启了一个主线程,然后再调用函数

IPCThreadState::self()->joinThreadPool();

把当前线程加入线程池,用于接收客户端过来的命令,实际上就是在读取binder过来的信息

void IPCThreadState::joinThreadPool(bool isMain)
{
LOG_THREADPOOL("**** THREAD %p (PID %d) IS JOINING THE THREAD POOL\n", (void*)pthread_self(), getpid());

mOut.writeInt32(isMain ? BC_ENTER_LOOPER : BC_REGISTER_LOOPER);

// This thread may have been spawned by a thread that was in the background
// scheduling group, so first we will make sure it is in the foreground
// one to avoid performing an initial transaction in the background.
set_sched_policy(mMyThreadId, SP_FOREGROUND);

status_t result;
do {
int32_t cmd;

// When we've cleared the incoming command queue, process any pending derefs
if (mIn.dataPosition() >= mIn.dataSize()) {
size_t numPending = mPendingWeakDerefs.size();
if (numPending > 0) {
for (size_t i = 0; i < numPending; i++) {
RefBase::weakref_type* refs = mPendingWeakDerefs[i];
refs->decWeak(mProcess.get());
}
mPendingWeakDerefs.clear();
}

numPending = mPendingStrongDerefs.size();
if (numPending > 0) {
for (size_t i = 0; i < numPending; i++) {
BBinder* obj = mPendingStrongDerefs[i];
obj->decStrong(mProcess.get());
}
mPendingStrongDerefs.clear();
}
}

// now get the next command to be processed, waiting if necessary
result = talkWithDriver(); // 读取binder信息
if (result >= NO_ERROR) {
size_t IN = mIn.dataAvail();
if (IN < sizeof(int32_t)) continue;
cmd = mIn.readInt32();
IF_LOG_COMMANDS() {
alog << "Processing top-level Command: "
<< getReturnString(cmd) << endl; }


result = executeCommand(cmd);
} else if (result != TIMED_OUT && result != -ECONNREFUSED && result != -EBADF) {
ALOGE("talkWithDriver(fd=%d) returned unexpected error %d, aborting",
mProcess->mDriverFD, result);
abort();
}

// After executing the command, ensure that the thread is returned to the
// foreground cgroup before rejoining the pool. The driver takes care of
// restoring the priority, but doesn't do anything with cgroups so we
// need to take care of that here in userspace. Note that we do make
// sure to go in the foreground after executing a transaction, but
// there are other callbacks into user code that could have changed
// our group so we want to make absolutely sure it is put back.
set_sched_policy(mMyThreadId, SP_FOREGROUND);

// Let this thread exit the thread pool if it is no longer
// needed and it is not the main process thread.
if(result == TIMED_OUT && !isMain) {
break;
}
} while (result != -ECONNREFUSED && result != -EBADF);

LOG_THREADPOOL("**** THREAD %p (PID %d) IS LEAVING THE THREAD POOL err=%p\n",
(void*)pthread_self(), getpid(), (void*)result);

mOut.writeInt32(BC_EXIT_LOOPER);
talkWithDriver(false);
}

并通过executeCommand(cmd) 处理

status_t IPCThreadState::executeCommand(int32_t cmd)
{
BBinder* obj;
RefBase::weakref_type* refs;
status_t result = NO_ERROR;

switch (cmd) {
case BR_ERROR:
result = mIn.readInt32();
break;
。。。。。
case BR_TRANSACTION:
{
binder_transaction_data tr;
result = mIn.read(&tr, sizeof(tr));
ALOG_ASSERT(result == NO_ERROR,
"Not enough command data for brTRANSACTION");
if (result != NO_ERROR) break;

Parcel buffer;
buffer.ipcSetDataReference(
reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
tr.data_size,
reinterpret_cast<const size_t*>(tr.data.ptr.offsets),
tr.offsets_size/sizeof(size_t), freeBuffer, this);

const pid_t origPid = mCallingPid;
const uid_t origUid = mCallingUid;

mCallingPid = tr.sender_pid;
mCallingUid = tr.sender_euid;

int curPrio = getpriority(PRIO_PROCESS, mMyThreadId);
if (gDisableBackgroundScheduling) {
if (curPrio > ANDROID_PRIORITY_NORMAL) {
// We have inherited a reduced priority from the caller, but do not
// want to run in that state in this process. The driver set our
// priority already (though not our scheduling class), so bounce
// it back to the default before invoking the transaction.
setpriority(PRIO_PROCESS, mMyThreadId, ANDROID_PRIORITY_NORMAL);
}
} else {
if (curPrio >= ANDROID_PRIORITY_BACKGROUND) {
// We want to use the inherited priority from the caller.
// Ensure this thread is in the background scheduling class,
// since the driver won't modify scheduling classes for us.
// The scheduling group is reset to default by the caller
// once this method returns after the transaction is complete.
set_sched_policy(mMyThreadId, SP_BACKGROUND);
}
}

//ALOGI(">>>> TRANSACT from pid %d uid %d\n", mCallingPid, mCallingUid);

Parcel reply;
IF_LOG_TRANSACTIONS() {
TextOutput::Bundle _b(alog);
alog << "BR_TRANSACTION thr " << (void*)pthread_self()
<< " / obj " << tr.target.ptr << " / code "
<< TypeCode(tr.code) << ": " << indent << buffer
<< dedent << endl
<< "Data addr = "
<< reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer)
<< ", offsets addr="
<< reinterpret_cast<const size_t*>(tr.data.ptr.offsets) << endl;
}
if (tr.target.ptr) {
sp<BBinder> b((BBinder*)tr.cookie);
const status_t error = b->transact(tr.code, buffer, &reply, tr.flags);
if (error < NO_ERROR) reply.setError(error);

} else {
const status_t error = the_context_object->transact(tr.code, buffer, &reply, tr.flags);
if (error < NO_ERROR) reply.setError(error);
}

//ALOGI("<<<< TRANSACT from pid %d restore pid %d uid %d\n",
// mCallingPid, origPid, origUid);

if ((tr.flags & TF_ONE_WAY) == 0) {
LOG_ONEWAY("Sending reply to %d!", mCallingPid);
sendReply(reply, 0);
} else {
LOG_ONEWAY("NOT sending reply to %d!", mCallingPid);
}

mCallingPid = origPid;
mCallingUid = origUid;

IF_LOG_TRANSACTIONS() {
TextOutput::Bundle _b(alog);
alog << "BC_REPLY thr " << (void*)pthread_self() << " / obj "
<< tr.target.ptr << ": " << indent << reply << dedent << endl;
}

}
break;

。。。。
default:
printf("*** BAD COMMAND %d received from Binder driver\n", cmd);
result = UNKNOWN_ERROR;
break;
}

if (result != NO_ERROR) {
mLastError = result;
}

return result;
}
在 if (tr.target.ptr) {
sp<BBinder> b((BBinder*)tr.cookie);
const status_t error = b->transact(tr.code, buffer, &reply, tr.flags);
if (error < NO_ERROR) reply.setError(error);

}
可以看出调用BBinder,而这里的BBinder为MediaPlayerService,最后是调用了MediaPlayerService的父类BnMediaPlayerService的onTransact函数,实现功能
status_t BBinder::transact(
uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags)
{
data.setDataPosition(0);

status_t err = NO_ERROR;
switch (code) {
case PING_TRANSACTION:
reply->writeInt32(pingBinder());
break;
default:
err = onTransact(code, data, reply, flags);
break;
}

if (reply != NULL) {
reply->setDataPosition(0);
}

return err;
}


status_t BnMediaPlayerService::onTransact(
uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags)
{
switch (code) {
case CREATE: {
CHECK_INTERFACE(IMediaPlayerService, data, reply);
sp<IMediaPlayerClient> client =
interface_cast<IMediaPlayerClient>(data.readStrongBinder());
int audioSessionId = data.readInt32();
sp<IMediaPlayer> player = create(client, audioSessionId);
reply->writeStrongBinder(player->asBinder());
return NO_ERROR;
} break;
。。。。。。
}
}

到这里可以看出ProcessState负责打开Binder驱动,建立线程池,让其进程里面的所有线程都能通过Binder通信。而IPCthreadState则就是这样的线程,负责着Binder的读取,写入和请求处理框架,并实现Service的功能。