Java并发学习(五)-LockSupport里面的park和unpark

时间:2023-02-14 20:14:42

学习AQS源码时候,发现当判断队列需要入队挂起时,都是调用LockSupport里面的park和unpark方法,例如:

//park并且检查是否中断
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    ...
//而需要唤醒线程进行竞争的时候,则为:
LockSupport.unpark(node.thread);
    ...

LockSupport

LockSupport其实是一个简单的代理类,它里面的代码都是使用Unsafe类里面的native方法,这里可以简单看看sun.misc.Unsafe 本文主要学习里面的park和unpark方法。

在Unsafe里面,park和unpark分别如下定义:

    public native void park(boolean isAbsolute, long time);
    public native void unpark(Object thread);

其中park中变量isAbsolute代表传入的time是绝对时间还是相对时间。

unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。可以理解为设置一个变量0,1之间的切换。

如果线程B连续调用了多次次unpark函数,当线程A调用park函数就使用掉这个“许可”,如果线程A第二次调用park,则进入等待状态。

注意,unpark函数可以先于park调用。比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行,也就是不会阻塞。

而如果线程A处于等待许可状态,再次调用park,则会永远等待下去,调用unpark也无法唤醒。

下面先看LockSupport里面的park和unpark定义:

    /**  @param blocker the synchronization object responsible for this thread parking */
    public static void park(Object blocker) {
        //获取当前运行线程
        Thread t = Thread.currentThread();
        //设置t中的parkBlockerOffset的值为blocker,即设置锁遍历
        setBlocker(t, blocker);
        //阻塞线程
        UNSAFE.park(false, 0L);
        //线程被释放,则将parkBlockerOffset设为null
        setBlocker(t, null);
    }

    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

其中在park中,jdk文档注释为以下几种情况时,线程会被唤醒unpark:

  • Some other thread invokes {@link #unpark unpark} with the current thread as the target (调用unpark方法)
  • Some other thread {@linkplain Thread#interrupt interrupts} the current thread (被中断interrupts)
  • The call spuriously (that is, for no reason) returns.(posix condition里的”Spurious wakeup”)

park和unpark的c++实现

首先,包含park和unpark的头函数在:http://hg.openjdk.java.net/jdk8u/jdk8u40/hotspot/file/68577993c7db/src/share/vm/runtime/park.hpp

而其具体实现函数则在:http://hg.openjdk.java.net/jdk8u/jdk8u40/hotspot/file/95e9083cf4a7/src/os/solaris/vm/os_solaris.cpp

park的c++代码:

void Parker::park(bool isAbsolute, jlong time) {
//判断是否有信号量_counter是否大于0
  if (_counter > 0) {
      _counter = 0 ;
      OrderAccess::fence();
      //直接返回
      return ;
  }
  //获取当前线程
  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;
  //如果中途已经是interrupt了,那么立刻返回,不足色
  if (Thread::is_interrupted(thread, false)) {
    return;
  }
  //记录当前绝对时间戳
  timespec absTime;
  //如果park的超时时间已到,则返回
  if (time < 0) { // don't wait at all
    return;
  }
  if (time > 0) {
  //更换时间戳
    unpackTime(&absTime, isAbsolute, time);
  }
   //进入安全点,利用该thread构造一个ThreadBlockInVM
  ThreadBlockInVM tbivm(jt);
  if (Thread::is_interrupted(thread, false) ||
      os::Solaris::mutex_trylock(_mutex) != 0) {
    return;
  }
  //记录等待状态
  int status ;
  //中途再次给予了许可,则直接返回不等带。
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = os::Solaris::mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    OrderAccess::fence();
    return;
  }
#ifdef ASSERT
  sigset_t oldsigs;
  sigset_t* allowdebug_blocked = os::Solaris::allowdebug_blocked_signals();
  thr_sigsetmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
  jt->set_suspend_equivalent();
#if defined(__sparc) && defined(COMPILER2)
  if (ClearFPUAtPark) { _mark_fpu_nosave() ; }
#endif
  if (time == 0) {
    status = os::Solaris::cond_wait (_cond, _mutex) ;
  } else {
    //time不为0,则继续等待。
    status = os::Solaris::cond_timedwait (_cond, _mutex, &absTime);
  }
   assert_status(status == 0 || status == EINTR ||
                status == ETIME || status == ETIMEDOUT,
                status, "cond_timedwait");
#ifdef ASSERT
  thr_sigsetmask(SIG_SETMASK, &oldsigs, NULL);
#endif
  _counter = 0 ;
  status = os::Solaris::mutex_unlock(_mutex);
  assert_status(status == 0, status, "mutex_unlock") ;
  if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
  OrderAccess::fence();
}

上面代码只列出了单独的park方法,上述代码中,主要通过一个_counter作为判断标志,当_counter大于0时候,就说明了拥有了“许可”,由此可以大概推断出,unpark方法则是在_counter变量上做文章。
并且,当出现了中断时,一方面,线程会被unpark,但是并不会抛出任何异常!
注:_counter是Parker里面的一个私有全局变量。

unpark的c++代码:

void Parker::unpark() {
//定义两个变量,staus用于判断是否获取锁
  int s, status ;
  //获取锁
  status = os::Solaris::mutex_lock (_mutex) ;
  //判断是否成功
  assert (status == 0, "invariant") ;
  //存储原先变量_counter
  s = _counter;
  //把_counter设为1
  _counter = 1;
  //释放锁
  status = os::Solaris::mutex_unlock (_mutex) ;
  assert (status == 0, "invariant") ;
  if (s < 1) {
  //如果原先_counter信号量小于1,即为0,则进行signal操作,唤醒操作
    status = os::Solaris::cond_signal (_cond) ;
    assert (status == 0, "invariant") ;
  }
}

简而言之,是用mutex和condition保护了一个_counter的变量,当park时,这个变量置为了0,当unpark时,这个变量置为1。

当然在阅读park的c++函数时,还有多处地方没有结合其他类来理解,如果哪里有问题,欢迎提出来。

参考文章:
http://blog.csdn.net/hengyunabc/article/details/28126139
http://blog.csdn.net/wilsonpeng3/article/details/46387835
http://hg.openjdk.java.net/jdk8u/jdk8u40/hotspot/file/68577993c7db/src/share/vm/runtime/park.hpp
http://hg.openjdk.java.net/jdk8u/jdk8u40/hotspot/file/68577993c7db/src/share/vm/runtime/park.cpp
http://hg.openjdk.java.net/jdk8u/jdk8u40/hotspot/file/95e9083cf4a7/src/os/solaris/vm/os_solaris.cpp#l5801