将HystrixCommand通过Observable来实现响应式执行方式

时间:2024-04-14 08:00:15

     在《spring cloud 微服务实战》第152页最下面,书上写除了传统的同步执行与异步执行之外,还可以将HystrixCommand通过Observable来实现响应式执行方式。通过调用observe()和toObservable()方法可以返回Observable对象,书上举的例子泛型是错误的,应该是User,而不是String:

Observable<User> ho=new UserCommand(com.netflix.hystrix.HystrixCommand.Setter.withGroupKey(
        HystrixCommandGroupKey.Factory.asKey("")),new RestTemplate(),0L).observe();

Observable<User> co=new UserCommand(com.netflix.hystrix.HystrixCommand.Setter.withGroupKey(
        HystrixCommandGroupKey.Factory.asKey("")),new RestTemplate(),0L).toObservable();

    书上说observe()和toObservable()虽然都返回了Observable对象,但是observe()返回的是Hot Observable,所以我们用ho作为引用名,该命令会在observe()调用的时候立即执行,当Observable每次被订阅的时候会重放他的行为;而toObservable()返回的是Cold Observable,我们用co作为引用名,toObservable()执行之后,命令不会被立即执行,只有当所有订阅者都订阅它之后才会执行。

    那么通过以上两种方式都能获得Observable对象,接下来该如何处理才能获得你想要的User对象呢?

    回到书上第5章服务容错保护-----原理分析第138页在分析命令执行的时候,提到在Hystrix的底层实现中大量的使用了RxJava响应式编程(观察者---订阅者),对于第一种observe()返回Observable对象,我们可以这样取得User对象:

Observable<User> ho=new UserCommand(com.netflix.hystrix.HystrixCommand.Setter.withGroupKey(
        HystrixCommandGroupKey.Factory.asKey("")),new RestTemplate(),0L).observe();
List<User> list=new ArrayList<>();
//注意:因为执行是异步的,所以要想看到输出结果这里就要阻塞一下
Thread.sleep(3000);
//订阅
ho.subscribe(new Observer<User>() {
    @Override
    public void onCompleted() {

        System.out.println(list.toString());
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
    }

    @Override
    public void onNext(User user) {
        list.add(user);
    }
});

        对于第二种toObservable()返回Observable对象,我们参考书中第143页这个关系图:

将HystrixCommand通过Observable来实现响应式执行方式

    书上写execute()、queue()也都使用了RxJava来实现,并且queue()是通过toObservable()来获得一个Cold Observable,并且通过toBlocking()将该Observable转换成BlockingObservable,它可以把数据以阻塞的方式发出来,而toFuture方法则是把BlockingObservable转换成一个Future,该方法只是创建一个Future返回,并不会阻塞,这使得消费者可以自己决定如何处理异步操作。所以第二种可以这样取得User对象:

Observable<User> co=new UserCommand(com.netflix.hystrix.HystrixCommand.Setter.withGroupKey(
        HystrixCommandGroupKey.Factory.asKey("")),new RestTemplate(),0L).toObservable();
BlockingObservable<User> blockingObservable=co.toBlocking();
Future<User> future=blockingObservable.toFuture();
User user=future.get();//注意捕获异常
    感觉这本书想要传授很多知识,但是逻辑上安排不够合理,内容也不够仔细,可能是出版太匆忙了吧,欢迎看过这本书的一起交流。

    参考博客http://blog.****.net/liuchuanhong1/article/details/73293318