Active Object[接收异步消息的对象]
一:Active Object的参与者
--->客户端线程(发起某种操作请求处理)
--->代理角色(工头)
--->实际执行者(工人)
--->主动对象接口(工人和工头)
--->生产端线程(加工产品的线程)
--->存放生产请求的队列(存放请求的队列)
--->请求实例化(将方法的启动和执行分离的实例化包含)
--->订单
--->产品
--->订单产品的共同接口
二:Active Object模式什么时候使用
--->大型模式,适合处理大并发量的业务场景
三:Active Object思考
--->
四进阶说明
--->
Active Object例子
工厂大量需要制造文字
一:订单和产品的接口
package com.yeepay.sxf.thread12;
/**
* 订单和产品的接口
* @author sxf
*
*/
public abstract class Result {
//获取结果的方法
public abstract Object getResultValue(); }
二:订单类
package com.yeepay.sxf.thread12;
/**
* 订单类
* @author sxf
*
*/
public class FutureResult extends Result{
//真正的产品
private Result result;
//产品是否生产好
private boolean ready=false; //供生产线程使用,将生产好的产品放入订单
public synchronized void setResult(Result result){
this.result=result;
this.ready=true;
notifyAll();
} //从订单里获取真正的产品结果
@Override
public synchronized Object getResultValue() {
while(!ready){
try {
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return result.getResultValue();
} }
三:产品类
package com.yeepay.sxf.thread12; /**
* 产品类
* @author sxf
*
*/
public class RealResult extends Result {
private final Object resultValue; //制造产品的方法
public RealResult(Object resultValue) {
// TODO Auto-generated constructor stub
this.resultValue=resultValue;
} //获得产品
@Override
public Object getResultValue() { return resultValue;
} }
四:工头和工人的接口
package com.yeepay.sxf.thread12; /**
* 工头和工人的接口
* @author sxf
*
*/
public interface ActiveObject {
//制造字符串
public abstract Result makeString(int count,char fillchar); //显示字符串
public abstract void displayString(String string); }
五:工头类
package com.yeepay.sxf.thread12; /**
* 发起制作操作的代理(工头)
* @author sxf
*
*/
public class Poxy implements ActiveObject{
//制造操作的线程
private final SchedulerTHread schedulerTHread;
//制造操作的工人
private final Servant servant;
//构造器
public Poxy(SchedulerTHread schedulerTHread,Servant servant){
this.schedulerTHread=schedulerTHread;
this.servant=servant;
} /**
* 发起制造操作(开启制造操作的单据,并将制造原材料和制造工人打包,放入制造线程的队列中)
*/
@Override
public Result makeString(int count, char fillchar) {
//制作一张订单
FutureResult future=new FutureResult();
//将原材料和工人和订单放入队列
schedulerTHread.Invoke(new MakeStringRequset(servant, future, count, fillchar));
//返回订单
return future;
} /**
* 查看字符串
*/
@Override
public void displayString(String string) {
//这个制造不需要订单
schedulerTHread.Invoke(new DisplayStringRequest(servant, string));
} }
六:工人类
package com.yeepay.sxf.thread12;
/**
* 执行者(工人)
* @author sxf
*
*/ public class Servant implements ActiveObject { /**
* 生产字符串
*/
@Override
public Result makeString(int count, char fillchar) {
char[] buffer=new char[count];
for(int i=0;i<count;i++){
buffer[i]=fillchar;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return new RealResult(buffer.toString());
} /**
* 打印字符串
*/
@Override
public void displayString(String string) {
System.out.println("Servant.displayString()"+string);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} } }
七:工作请求抽象的类
package com.yeepay.sxf.thread12;
/**
* 执行请求的实例(工人,订单,原材料)
* @author sxf
*
*/
public abstract class MethodRequest {
//工人
protected final Servant servant;
//订单
protected final FutureResult future; protected MethodRequest(Servant servant,FutureResult future) {
this.servant=servant;
this.future=future;
}
//工人的具体操作
public abstract void execute();
}
八:制造文字的请求
package com.yeepay.sxf.thread12;
/**
* 制造字符串请求实例
* 其做用:将制造的发起者和制造的执行者分离。
* ==》方法的启动和方法的执行进行分离
* ==》分离的本质:(1)启动者将执行请求转换成实例(原材料,执行者,订单)
* (2)执行者用原材料生产完产品,将产品存入订单,供发起者使用
* @author sxf
*
*/
public class MakeStringRequset extends MethodRequest{
private final int count;
private final char fillchar;
//构造器(制造工人,订单,产品原材料)
public MakeStringRequset(Servant servant,FutureResult futureResult,int count,char fillchar) {
super(servant, futureResult);
this.count=count;
this.fillchar=fillchar;
} @Override
public void execute() {
//制造工人制造出产品
RealResult realResult=(RealResult) servant.makeString(count, fillchar);
//再将产品放入订单
future.setResult(realResult);
} }
九:打印文字的请求
package com.yeepay.sxf.thread12;
/**
* 打印字符串的请求实例
* @author sxf
*
*/
public class DisplayStringRequest extends MethodRequest{
//打印原材料
private final String string;
public DisplayStringRequest(Servant servant,String string){
super(servant, null);
this.string=string;
}
//执行打印的方法
@Override
public void execute() {
servant.displayString(string);
} }
十:十:客户线程(1)(发起制造字符串请求的线程)
package com.yeepay.sxf.thread12; /**
* 发起制造字符串请求的线程
* @author sxf
*
*/
public class MakerClientThread implements Runnable{
//代理类(工头)
private final ActiveObject activeObject;
//生产原材料
private final char fillChar; public MakerClientThread(ActiveObject activeObject,char fillChar) {
this.activeObject=activeObject;
this.fillChar=fillChar;
} @Override
public void run() {
for(int i=0;true;i++){
//(工头)发起制造,得到订单
Result result=activeObject.makeString(i, fillChar);
//忙别的事情
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//从订单中获取产品结果
String value=(String) result.getResultValue();
//打印产品
System.out.println(Thread.currentThread().getName()+"=>value:["+value+"]");
} } }
十一:客户线程(2)发起打印字符串的请求线程
package com.yeepay.sxf.thread12;
/**
* 发起打印字符串请求的线程
* @author sxf
*
*/
public class DisplayClientThread implements Runnable{
//工头
private final ActiveObject activeObject; public DisplayClientThread(ActiveObject activeObject){
this.activeObject=activeObject;
} @Override
public void run() {
for(int i=0;true;i++){
//打印原材料
String string=Thread.currentThread().getName()+" "+i;
//工头进行找人打印
activeObject.displayString(string);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} } }
十二:生产线程
package com.yeepay.sxf.thread12; /**
* 制造线程(制造原材料==>原材料+工人)
* @author sxf
*
*/
public class SchedulerTHread implements Runnable{
//制造线程中的队列
private final ActivationQueue activatonQueue;
public SchedulerTHread(ActivationQueue activationQueue) {
this.activatonQueue=activationQueue;
} //发起制造的线程,调用该方法,该方法是将制造请求(原材料+工人),放入队列
public void Invoke(MethodRequest request){
activatonQueue.putRequest(request);
} //制造线程体
@Override
public void run() {
while(true){
//从队列中取出制造请求
MethodRequest request=activatonQueue.takeRequest();
//制造请求被执行
request.execute();
} } }
十三:客户线程和生产线程的队列
package com.yeepay.sxf.thread12; /**
* 队列
* @author sxf
*
*/
public class ActivationQueue {
private static final int MAX_METHOD_REQUEST=100;
private final MethodRequest[] requestQueue;
private int tail;//下一个put Request的地方
private int head;//下一个take Request的地方
private int countRequest;//request的数量 public ActivationQueue() {
this.requestQueue=new MethodRequest[MAX_METHOD_REQUEST];
this.head=0;
this.tail=0;
this.countRequest=0;
} //放入请求
public synchronized void putRequest(MethodRequest request){
while(countRequest>=requestQueue.length){
try {
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
requestQueue[tail]=request;
//计算下一个放请求的位置
tail=(tail+1)%requestQueue.length;
//请求数加1
countRequest++;
//唤醒其他线程
notifyAll();
} //取出请求
public synchronized MethodRequest takeRequest(){
while(countRequest<=0){
try {
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//取出
MethodRequest methodRequest=requestQueue[head];
//计算下一个取出请求的位置
head=(head+1)%requestQueue.length;
//请求数减去1
countRequest--;
//唤醒其他线程
notifyAll();
return methodRequest;
}
}
十四:产生工头的工厂类
package com.yeepay.sxf.thread12; /**
* 工头对象的工厂类
* @author sxf
*
*/
public class ActiveObjectFactory { //生产工头的对象
public static ActiveObject createActiveObject(){
//执行制造操作(工人)
Servant servant=new Servant();
//存放制造请求实例
ActivationQueue queue=new ActivationQueue();
//制造操作的线程
SchedulerTHread schedulerTHread=new SchedulerTHread(queue);
//发起制造动作(工头)
Poxy poxy=new Poxy(schedulerTHread, servant);
//启动制造操作的线程
new Thread(schedulerTHread).start();
return poxy;
}
}
十五:测试类
package com.yeepay.sxf.thread12;
/**
* 测试类
* @author sxf
*
*/
public class Test { public static void main(String[] args) {
ActiveObject activeObject=ActiveObjectFactory.createActiveObject();
new Thread(new MakerClientThread(activeObject, 's')).start();;
new Thread(new DisplayClientThread(activeObject)).start(); }
}