func catchErrorAndReconver(){
let sequenceThatFails = PublishSubject<Int>()
let recoverySequence = Observable.of(100, 200, 300, 400)
_ = sequenceThatFails
.catchError { error in
return recoverySequence
}
.subscribe {
print($0)
}
sequenceThatFails.on(.Next(1))
sequenceThatFails.on(.Next(2))
sequenceThatFails.on(.Error(NSError(domain: "Test", code: 0, userInfo: nil)))
sequenceThatFails.on(.Next(3))
sequenceThatFails.on(.Next(4))
//
/**////
/**
Next(1)
Next(2)
遇到错误就返回另外一个observable
Next(100)
Next(200)
Next(300)
Next(400)
Completed
*/
}
func catchErorAndReturnJust(){
let sequenceThatFails = PublishSubject<Int>()
_ = sequenceThatFails
.catchErrorJustReturn(100)
.subscribe {
print($0)
}
//sequenceThatFails.on(.Error(NSError(domain: "Test", code: 0, userInfo: nil)))
sequenceThatFails.on(.Next(1))
sequenceThatFails.on(.Next(2))
sequenceThatFails.on(.Next(3))
sequenceThatFails.on(.Error(NSError(domain: "Test", code: 0, userInfo: nil)))
sequenceThatFails.on(.Next(4))
/**
Next(1)
Next(2)
Next(3)
Next(100)
Completed
*/
}
func retryAgain(){
var count = 1;
let funnyLookingSequence = Observable<Int>.create { (obs) -> Disposable in
let error = NSError(domain: "text", code: 0, userInfo: nil)
obs.on(.Next(0))
obs.on(Event.Next(1))
obs.on(Event.Next(2))
if(count<2){
obs.on(Event.Error(error))
count += 1;
}
obs.on(.Next(4))
obs.on(Event.Next(5))
obs.onCompleted()
return NopDisposable.instance;
}
_ = funnyLookingSequence.retry().subscribeNext({ (intV) -> Void in
print("\(intV)")
})
/**
0
1
2
发送了一个error 所以会retry
0
1
2
4
5
*/
}
/**
The Subscribe operator is the glue that connects an observer to an Observable. In order for an observer to see the items being emitted by an Observable, or to receive error or completed notifications from the Observable, it must first subscribe to that Observable with this operator.
Observable -> 发送emited item
Observer 接收 item
A typical implementaiton of the Subscribe operator may accept one to three methods (which then constitute the observer), or it may accept an object (sometimes called an Observer or Subscriber) that implements the interface which includes those three methods:
*/
func subscribeNext(){
let sequenceOfInts = PublishSubject<Int>()
_ = sequenceOfInts
.subscribeNext {
print($0)
}
sequenceOfInts.on(.Next(1))
sequenceOfInts.on(.Completed)
/**
1
*/
}
func subScribeCompleted(){
let sequenceOfInts = PublishSubject<Int>()
_ = sequenceOfInts
.subscribeCompleted {
print("It's completed")
}
sequenceOfInts.on(.Next(1))
sequenceOfInts.on(.Completed)
/**
It's completed
*/
}
func doOn(){
let sequenceOfInts = PublishSubject<Int>()
_ = sequenceOfInts
.doOn {
print("Intercepted event \($0)")
print(".......")
// 可以捕获每一个事件,以作处理
}
.subscribeNext({ (para) -> Void in
print("para =\(para)")
})
sequenceOfInts.on(.Next(1))
sequenceOfInts.on(.Completed)
/**
Intercepted event Next(1)
Next(1)
Intercepted event Completed
Completed
subNext:输出如下
Intercepted event Next(1)
.......
para =1
Intercepted event Completed
.......
*/
}
func takeUntil(){
let originalSequence = PublishSubject<Int>()
let whenThisSendsNextWorldStops = PublishSubject<Int>()
_ = originalSequence
.takeUntil(whenThisSendsNextWorldStops)
.subscribe {
print($0)
}
originalSequence.on(.Next(1))
originalSequence.on(.Next(2))
originalSequence.on(.Next(3))
originalSequence.on(.Next(4))
whenThisSendsNextWorldStops.on(.Next(1))
originalSequence.on(.Next(5))
/**
Discard any items emitted by an Observable after a second Observable emits an item or terminates.
在第二个Observable 发生后 第一个observable抛弃之后的任何items
Next(1)
Next(2)
Next(3)
Next(4)
Completed
*/
}
func takewhile(){
let sequence = PublishSubject<Int>()
_ = sequence
.takeWhile { int in
int < 4
}
.subscribe {
print($0)
}
sequence.on(.Next(1))
sequence.on(.Next(2))
sequence.on(.Next(3))
sequence.on(.Next(4))
sequence.on(.Next(5))
/**
Mirror items emitted by an Observable until a specified condition becomes false
如果takeWhile 返回false 就停止
Next(1)
Next(2)
Next(3)
Completed
*/
}
func cancat(){
let var1 = BehaviorSubject(value: 0)
let var2 = BehaviorSubject(value: 200)
// var3 is like an Observable<Observable<Int>>
let var3 = BehaviorSubject(value: var1)
let d = var3
.concat()
.subscribe {
print($0)
}
var1.on(.Next(1))
var1.on(.Next(2))
var1.on(.Next(3))
var1.on(.Next(4))
var3.on(.Next(var2))
var2.on(.Next(201))
var1.on(.Next(5))
var1.on(.Next(6))
var1.on(.Next(7))
var1.on(.Completed)
var2.on(.Next(202))
var2.on(.Next(203))
var2.on(.Next(204))
/**
Next(0)
Next(1)
Next(2)
Next(3)
Next(4)
Next(5)
Next(6)
Next(7)
Next(201)
Next(202)
Next(203)
Next(204)
*/
}
func reduce(){
/**
Apply a function to each item emitted by an Observable, sequentially, and emit the final value. This function will perform a function on each element in the sequence until it is completed, then send a message with the aggregate value. It works much like the Swift reduce function works on sequences.
遍历所有的item 并依次调用一次计算函数,返回最后的计算结果
*/
// _ = Observable.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
// .reduce(0, accumulator: +)
// .subscribe {
// print($0)
// }
_ = Observable.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
.reduce(0, accumulator: { (v1, v2) -> Int in
print("v1 =\(v1) v2= \(v2)")
return v1 + v2
}).subscribeNext({ (v3) -> Void in
print("\(v3)")
})
/**
v1 =0 v2= 0
v1 =0 v2= 1
v1 =1 v2= 2
v1 =3 v2= 3
v1 =6 v2= 4
v1 =10 v2= 5
v1 =15 v2= 6
v1 =21 v2= 7
v1 =28 v2= 8
v1 =36 v2= 9
45
*/
}
网友评论