在前面提到了RAC的订阅原理,虽然中间忽略了一些其它东西(比如说RACSchedule,RACDisposable),但任何一个RACSignal的订阅事件的value的流向都是如此。既然是一步步的流,那我们在使用过程中,必然会出现想要改变这个流,或者对这个流进行一些包装。简单的说,从一个RACSignal在经过中间某一步的操作,已经变成另外一个RACSignal;
1. RACSignal bind概况
RACSignal的bind的主要过程如下:
- 创建一个RACSignal的数组,把初始signal添加进去,如果该数组为空,发送complete;数组中任意的signal出现error,都会发送error;数组中任意的signal出现complete,都会让该数组删除该signal。
- 订阅初始Signal,获得初始Signal中所有的value,error,complete信息。
- 接受到value之后,通过RACStreamBindBlock这个block执行获得一个新的Signal,将这个Signal添加到数组中,并订阅这个signal。这个RACStreamBindBlock执行中会带一个BOOL值,如果BOOL值变成YES,初始Signal的订阅就结束了。
2. RACSignal bind源码分析
(1)如下代码段,首先是创建一个RACSignal的数组signals,初始只有
self也就是最初的signal。
NSMutableArray
*signals = [
NSMutableArray
arrayWithObject
:
self
];
(2)如下代码段,在往这个数组添加一个RACSignal时,这个signal都会被订阅:value值都会被传递的send出去,error会终止整体的订阅,complete会将这个signal从数组signals中删除。
void
(^addSignal)(
RACSignal
*) = ^(
RACSignal
*signal) {
@synchronized (signals) {
[signals addObject :signal];
}
RACSerialDisposable *selfDisposable = [[ RACSerialDisposable alloc ] init ];
[compoundDisposable addDisposable :selfDisposable];
RACDisposable *disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext :x];
} error :^( NSError *error) {
[compoundDisposable dispose ];
[subscriber sendError :error];
} completed :^{
@autoreleasepool {
completeSignal(signal, selfDisposable);
}
}];
selfDisposable. disposable = disposable;
@synchronized (signals) {
[signals addObject :signal];
}
RACSerialDisposable *selfDisposable = [[ RACSerialDisposable alloc ] init ];
[compoundDisposable addDisposable :selfDisposable];
RACDisposable *disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext :x];
} error :^( NSError *error) {
[compoundDisposable dispose ];
[subscriber sendError :error];
} completed :^{
@autoreleasepool {
completeSignal(signal, selfDisposable);
}
}];
selfDisposable. disposable = disposable;
};
(3)如下代码段,在这个数组中删除一个RACSignal时,判断数组中是否为空,为空的话,就sendCompleted
void
(^completeSignal)(
RACSignal
*,
RACDisposable
*) = ^(
RACSignal
*signal,
RACDisposable
*finishedDisposable) {
BOOL removeDisposable = NO ;
@synchronized (signals) {
[signals removeObject :signal];
if (signals. count == 0 ) {
[subscriber sendCompleted ];
[compoundDisposable dispose ];
} else {
removeDisposable = YES ;
}
}
if (removeDisposable) [compoundDisposable removeDisposable :finishedDisposable];
BOOL removeDisposable = NO ;
@synchronized (signals) {
[signals removeObject :signal];
if (signals. count == 0 ) {
[subscriber sendCompleted ];
[compoundDisposable dispose ];
} else {
removeDisposable = YES ;
}
}
if (removeDisposable) [compoundDisposable removeDisposable :finishedDisposable];
};
(4)如下代码段,在bind中,我们自己订阅最初的signal,然后将获得的值,通过block变成一个新的signal,如果*stop变成YES,就终止这次订阅,不再接受后面的数据。
RACDisposable
*bindingDisposable = [
self
subscribeNext
:^(
id
x) {
// Manually check disposal to handle synchronous errors.
if (compoundDisposable. disposed ) return ;
BOOL stop = NO ;
id signal = bindingBlock(x, &stop);
@autoreleasepool {
if (signal != nil ) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose ];
completeSignal( self , selfDisposable);
}
}
} error :^( NSError *error) {
[compoundDisposable dispose ];
[subscriber sendError :error];
} completed :^{
@autoreleasepool {
completeSignal( self , selfDisposable);
}
// Manually check disposal to handle synchronous errors.
if (compoundDisposable. disposed ) return ;
BOOL stop = NO ;
id signal = bindingBlock(x, &stop);
@autoreleasepool {
if (signal != nil ) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose ];
completeSignal( self , selfDisposable);
}
}
} error :^( NSError *error) {
[compoundDisposable dispose ];
[subscriber sendError :error];
} completed :^{
@autoreleasepool {
completeSignal( self , selfDisposable);
}
}];
3. RACSignal bind Demo
RACSignal
*signal = [
RACSignal
createSignal
:^
RACDisposable
*(
id
<
RACSubscriber
> subscriber) {
[subscriber sendNext : @"first value" ];
[subscriber sendNext : @"second value" ];
[subscriber sendNext : @"third value" ];
[subscriber sendCompleted ];
return nil ;
}];
RACSignal *bindSignal = [signal bind :^ RACStreamBindBlock {
return ^ RACStream *( id value, BOOL *stop) {
NSString *oValue = value;
if ([oValue isEqualToString : @"first value" ]) {
return [ RACSignal createSignal :^ RACDisposable *( id < RACSubscriber > subscriber) {
[subscriber sendNext : @"first value bind after" ];
[subscriber sendCompleted ];
return nil ;
}];
}
if ([oValue isEqualToString : @"second value" ]) {
*stop = YES ;
return [ RACSignal createSignal :^ RACDisposable *( id < RACSubscriber > subscriber) {
[subscriber sendNext : @"second value bind after" ];
[subscriber sendCompleted ];
return nil ;
}];
}
if ([oValue isEqualToString : @"third value" ]) {
return [ RACSignal createSignal :^ RACDisposable *( id < RACSubscriber > subscriber) {
[subscriber sendNext : @"third value bind after" ];
[subscriber sendCompleted ];
return nil ;
}];
}
return nil ;
};
[subscriber sendNext : @"first value" ];
[subscriber sendNext : @"second value" ];
[subscriber sendNext : @"third value" ];
[subscriber sendCompleted ];
return nil ;
}];
RACSignal *bindSignal = [signal bind :^ RACStreamBindBlock {
return ^ RACStream *( id value, BOOL *stop) {
NSString *oValue = value;
if ([oValue isEqualToString : @"first value" ]) {
return [ RACSignal createSignal :^ RACDisposable *( id < RACSubscriber > subscriber) {
[subscriber sendNext : @"first value bind after" ];
[subscriber sendCompleted ];
return nil ;
}];
}
if ([oValue isEqualToString : @"second value" ]) {
*stop = YES ;
return [ RACSignal createSignal :^ RACDisposable *( id < RACSubscriber > subscriber) {
[subscriber sendNext : @"second value bind after" ];
[subscriber sendCompleted ];
return nil ;
}];
}
if ([oValue isEqualToString : @"third value" ]) {
return [ RACSignal createSignal :^ RACDisposable *( id < RACSubscriber > subscriber) {
[subscriber sendNext : @"third value bind after" ];
[subscriber sendCompleted ];
return nil ;
}];
}
return nil ;
};
}];
如上代码,首先创建了一个简单的signal,这个流中发送了三个值对象,经过我们bind方法中RACStreamBindBlock,变成了三个RACsignal,分别对初始流中的值对象进行了替换加工,这样我们订阅新的流,就会获得加工之后的值对象。然而,我们在second value的值处理中,我们将stop的值为YES,这意味着新的Signal不再接受初始Signal中的值,所以就不会有第三个bind转化后的Signal。PS:初始信号的sendNext(third value)之后的代码还是会继续走的,只是因为订阅被终止了
4. RACSignal bind 扩展
之前在提到过,RACStream中有很多都是基于bind的改变而封装的更上层的改变方法。总的来说,是在RACStreamBindBlock的实现内部对value转化成新的RACSignal的过程的封装和简化。
flattenMap:忽略stop对bind的影响,只是将value通过入参block执行变成了新的Signal,同时对nil做了兼容。彻底的实现了value和signal的一一对应。代码如下:
- (
instancetype
)flattenMap:(
RACStream
* (^)(
id
value))block {
Class class = self.class;
return
[[
self
bind
:^{
return ^( id value, BOOL *stop) {
id stream = block(value) ?: [class empty ];
NSCAssert ([stream isKindOfClass:], @"Value returned from -flattenMap: is not a stream: %@" , stream);
return stream;
};
}] setNameWithFormat : @"[%@] -flattenMap:" , self . name ];
return ^( id value, BOOL *stop) {
id stream = block(value) ?: [class empty ];
NSCAssert ([stream isKindOfClass:], @"Value returned from -flattenMap: is not a stream: %@" , stream);
return stream;
};
}] setNameWithFormat : @"[%@] -flattenMap:" , self . name ];
}
flatten:在flattenMap的基础进行了简单了封装,情况是:如果初始Signal中的传递值对象也是一个Signal,这样就不做啥转换,直接把这个值对象变成新的RACSignal。代码如下:
- (
instancetype
)flatten {
__weak RACStream *stream __attribute__ ((unused)) = self ;
return [[ self flattenMap :^( id value) {
return value;
}] setNameWithFormat : @"[%@] -flatten" , self . name ];
__weak RACStream *stream __attribute__ ((unused)) = self ;
return [[ self flattenMap :^( id value) {
return value;
}] setNameWithFormat : @"[%@] -flatten" , self . name ];
}
map:在flattenMap的基础进行了简单了封装,情况是:对初始Signal中的传递值对象进行加工变成一个新的值对象,map就是将这个新的值对象变成了新的流对象的值对象。代码如下:
- (
instancetype
)map:(
id
(^)(
id
value))block {
NSCParameterAssert
(block !=
nil
);
Class class =
self
.
class
;
return
[[
self
flattenMap
:^(
id
value) {
return [class return :block(value)];
}] setNameWithFormat : @"[%@] -map:" , self . name ];
return [class return :block(value)];
}] setNameWithFormat : @"[%@] -map:" , self . name ];
}
mapReplace:在map的基础进行的封装,一股脑的将初始Signal中的传递值对象都变成传入的同一个对象
filter/ignore:过滤掉初始Signal中不满足条件的值对象
take/skip : take运用了stop这个属性,在传递值的次数到达一定数量,stop被置为YES,后续值不被接收;skip没有运用stop这个属性,在最初的一定数量的值都被转化成空流,然后才变为正常。
distinctUntilChanged:在bind基础封装的改变方法,当流中后一次的值和前一次的值不同的时候,才会返回当前值的流,否则返回空流(第一次默认被忽略)
takeUntilBlock:在bind基础封装的改变方法,取当前流的对象值,直到当前值满足提供的block,就会将当前流变为空(不是空流)
takeWhileBlock:在bind基础封装的改变方法,取当前流的对象值,直到当前值不满足提供的block,就会将当前流变为空(不是空流)
skipUntilBlock:在bind基础封装的改变方法,忽略当前流的对象值(变为空流),直到当前值满足提供的block。
skipWhileBlock:在bind基础封装的改变方法,忽略当前流的对象值(变为空流),直到当前值不满足提供的block
5. RACSignal empty和nil的自己看法
从看源码中,我们发现,empty空流和nil空对象,在RACSignal中具有完全不同的作用。empty:在RACSignal的加工包装中,empty意味着忽略,而nil则意味着结束。返回empty,原始的RACSignal还是继续走下去,经过一步步,最终还是会到达订阅的怀抱中;返回nil,则说明在流中的加工链,直接就被断掉了,前面的一部分还是在源源不断的继续到达,但是后面再也接收不到了。