模仿Semaphore自定义自己的 信号量

时间:2021-03-03 15:12:41

简介

这里模仿Semaphore,自定义自己的信号量,利用AQS共享模式

1、MySemaphore.java

package com.jacky;

import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
* Created by jacky on 2018/2/13.
*/
public class MySemaphore {

private Sync sync;

public MySemaphore(int permits){
sync
= new NonFairSync(permits);
}

public MySemaphore(int permits,boolean fair){
sync
= fair ? new FairSync(permits) : new NonFairSync(permits);
}

static class Sync extends AbstractQueuedSynchronizer{
Sync(
int permits) {
setState(permits);
}

@Override
protected boolean tryReleaseShared(int arg) {
for (;;){
int oldState = getState();
int newState = oldState+arg;
if (compareAndSetState(oldState,newState)){
return true;
}

}
}
}

static final class FairSync extends Sync{
FairSync(
int permits) {
super(permits);
}

@Override
protected int tryAcquireShared(int arg) {
for(;;){
if (hasQueuedPredecessors()){
return -1;
}
int oldState = getState();
int newState = oldState-arg;
if (newState <0 || compareAndSetState(oldState,newState)){
return newState;
}
}
}
}

static final class NonFairSync extends Sync{

NonFairSync(
int permits) {
super(permits);
}

@Override
protected int tryAcquireShared(int arg) {
for(;;){
int oldState = getState();
int newState = oldState-arg;
if (newState <0 || compareAndSetState(oldState,newState)){
return newState;
}
}
}
}

/**
* 获取许可证
*/
public void acquire(){
try {
sync.acquireSharedInterruptibly(
1);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 释放许可证
*/
public void release(){
sync.releaseShared(
1);
}


}

2、测试

package com.jacky;

import java.util.concurrent.Semaphore;

/**
* Created by jacky on 2018/2/12.
*/
public class SemaphoreDemo {
public static void main(String[] args) {
//Semaphore semaphore = new Semaphore(2, true);
MySemaphore semaphore = new MySemaphore(2, true);
Runnable runnable
= new Runnable() {

@Override
public void run() {
Thread thread
= Thread.currentThread();
System.out.println(
"semaphore start:"+thread.getName());
try {
semaphore.acquire();
Thread.sleep(
2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
semaphore.release();
System.out.println(
"semaphore end:"+thread.getName());

}
};

for (int i=0;i<10;i++){
Thread thread
= new Thread(runnable, "t" + i);
thread.start();
try {
Thread.sleep(
100);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}