整理学习过程
Binder
extension Reactive where Base: UILabel {
public var fontSignal: Binder<UIFont> {
return Binder<UIFont>.init(self.base) { (lab, infont) in
lab.font = infont
}
}
}
Observable.of(UIFont.systemFont(ofSize: 30)).bind(to: self.label.rx.fontSignal).disposed(by: bag)
常用操作符详解
@objc dynamic func testBinder() -> Void {
let binder = Binder<String>.init(label) { (lab, text) in
lab.text = text
}
Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).map{"选中的index\($0)"}
.bind(to: binder)
.disposed(by: bag)
return
}
@objc dynamic func testBindto() -> Void {
let observer = AnyObserver<String>.init {[weak self] (event) in
guard let self = self else {return}
switch event {
case .next(let text):
self.label.text = text
default :
print("default")
}
}
Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance)
.map { (intv) -> String in
return "this is index of \(intv)"
}.bind(to: observer)
.disposed(by: bag)
return
}
func subscribeOnAndObserveOn() {
//subscribeOn 会对之前和之后的创建及订阅指定线程,
//observeOn 只会对之后的订阅指定线程
Observable<Int>.create { observer in
print("产生事件 -> \(self.currentQueueName() ?? "queue")")
observer.onNext(1)
return Disposables.create()
}
.subscribeOn(SerialDispatchQueueScheduler(qos: .background))
.map { element -> Int in
print("操作事件1 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.map { element -> Int in
print("操作事件2 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.observeOn(MainScheduler.instance)
.subscribe(onNext: { element in
print("响应事件 -> \(self.currentQueueName() ?? "queue"), element -> \(element)\n\n")
}).disposed(by: self.bag)
}
@objc dynamic func currentQueueName() -> String {
return Thread.current.debugDescription
}
@objc dynamic func merge() -> Void {
// 相当于将多个信号合并,一次发射所有信号的元素
// 假如多张图片去下载,就创建多个信号,每个信号在下载完成时 onNext,然后merge之后订阅,一次就拿到了所有图片
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
let subject3 = PublishSubject<String>()
Observable.of(subject1, subject2,subject3)
.merge()
.subscribe(onNext: {
print($0)
print(Thread.current)
})
.disposed(by: bag)
subject1.onNext("1")
subject2.onNext("2")
subject3.onNext("3")
return
}
@objc dynamic func connect() -> Void {
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.publish()
_ = intSequence
.subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
_ = intSequence
.subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
_ = intSequence
.subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}
// 8秒之后connect(),然后之前的所有订阅才开始发射
DispatchQueue.main.asyncAfter(deadline: .now() + 8) {
_ = intSequence.connect()
}
return
}
@objc dynamic func concat() -> Void {
let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")
subject1.concat(subject2)
.subscribe(onNext: { (strv) in
print(strv)
}).disposed(by: bag)
subject1.onNext("🍐")
subject1.onNext("🍊")
// 只有 subject1 调用了onCompleted,才会开始subject2的订阅
subject1.onCompleted()
subject2.onNext("B")
return
}
@objc dynamic func combineLatest() -> Void {
let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<String>()
Observable.combineLatest(first, second) { $0 + $1 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")
return
}
@objc dynamic func testflatMapLatest() -> Void {
//flatMap 就是把信号中的信号取出来,然后转变成平坦的信号
//原来的信号中的信号发射不会被订阅
let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "👦🏻")
let second = BehaviorSubject(value: "🅰️")
let variable = BehaviorRelay.init(value: first)
variable.asObservable()
.flatMapLatest { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("🐱")
variable.accept(second)
second.onNext("🅱️")
first.onNext("🐶")
second.onNext("C")
first.onNext("first")
return
}
@objc dynamic func testFlatMap() -> Void {
//flatMap 就是把信号中的信号取出来,然后转变成平坦的信号
//原来的信号中的信号发射还会被订阅
let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "first")
let second = BehaviorSubject(value: "second")
let variable = BehaviorRelay.init(value: first)
variable.asObservable()
.flatMap({ (first) -> Observable<String> in
first.asObservable()
})
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("first-1")
variable.accept(second)
second.onNext("second-1")
first.onNext("first-2")
return
}
@objc dynamic func testMap() -> Void {
// 转换成另一个信号
Observable<Int>.of(1,3,4,5).map { (inv) -> String in
return "\(inv)"
}.subscribe(onNext: { (strv) in
print(strv)
}).disposed(by: bag)
return
}
@objc dynamic func testignoreElements() -> Void {
//ignoreElements 忽略所有的元素,只执行complete
Observable<Int>.of(1,3,4,5).ignoreElements()
.subscribe { (complete) in
print("complete")
}.disposed(by: bag)
return
}
@objc dynamic func testdelay() -> Void {
self.view.addSubview(textField)
textField.frame = CGRect.init(x: 0, y: 200, width: 414, height: 70)
textField.rx.text
.delay(DispatchTimeInterval.seconds(2), scheduler: MainScheduler.instance)
.distinctUntilChanged()
.subscribe { (event) in
print(event.element)
}.disposed(by: bag)
return
}
@objc dynamic func testdistinctUntilChanged() -> Void {
//当信号发生变化的时候才订阅
//throttle 在限定的时间内,只接收第一条,和最后一条数据
Observable<Int>.of(1,1,2,3,3,4)
.distinctUntilChanged()
.throttle(0.5, latest: true, scheduler: MainScheduler.instance)
.subscribe { (event) in
print(event.element)
}.disposed(by: bag)
return
}
@objc dynamic func takeUntil() -> Void {
// 1
let subject = PublishSubject<String>()
let trigger = PublishSubject<String>()
subject
.takeUntil(trigger)
.subscribe { (event) in
print(event.element)
}.disposed(by: bag)
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
trigger.onNext("4")
subject.onNext("5")
return
}
@objc dynamic func takeWhileIndex() -> Void {
// 按照一定条件取前面的信号
// 1
Observable.of(2, 2, 4, 4, 6, 6)
// 2
.takeWhileWithIndex { integer, index in
// 3
integer % 2 == 0 && index < 2
}
// 4
.subscribe(onNext: {
print($0)
})
.disposed(by: bag)
return
}
@objc dynamic func testSkipUntil() -> Void {
//直到 source2 发射信号之后,source1才开始发射信号
let source1 = PublishSubject<Int>()
let source2 = PublishSubject<Int>()
source1.skipUntil(source2)
.subscribe { (event) in
print(event.element)
}.disposed(by: bag)
source1.onNext(1)
source1.onNext(2)
source1.onNext(3)
source2.onNext(99)
source1.onNext(4)
source1.onNext(5)
return
}
@objc dynamic func testskipWhile() -> Void {
// skipWhile 跳过前面的不符合要求的,一旦有符合要求的,从这个符合要求的开始,后面的将不再跳过
let disposeBag = DisposeBag()
// 1
Observable.of(2, 2, 3, 4, 4,5)
// 2
.skipWhile { integer in
return integer != 4
}
.subscribe(onNext: {
print($0)
})
.disposed(by: bag)
return
}
@objc dynamic func testSkipOperator() -> Void {
// 跳过几个信号
// 1
Observable.of("A", "B", "C", "D", "E", "F")
// 2
.skip(3)
.subscribe(onNext: {
print($0)
})
.disposed(by: bag)
return
}
@objc dynamic func testRePlaySubject() -> Void {
// bufferSize 为缓存元素的个数
let sub = ReplaySubject<String>.create(bufferSize: 2)
sub.onNext("first")
sub.onNext("second")
sub.onNext("third")
sub.subscribe { (event) in
print(event.element)
}.disposed(by: bag)
// 一旦调用了 onComplete ,后面订阅也只会发送 onComplete之前的序列
sub.onCompleted()
sub.onNext("forth")
sub.subscribe { (event) in
print(event.element)
}.disposed(by: bag)
sub.onNext("666")
sub.subscribe { (event) in
print(event.element)
}.disposed(by: bag)
sub.onNext("777")
return
}
@objc dynamic func testBehaviorRelay() -> Void {
// BehaviorRelay 除了发射 最新元素外,还会发射之前的一个元素
// 其内部就是封装了一个 BehaviorSubject
// 不能发送onComplete 和 onError
let breal = BehaviorRelay(value: "first")
breal.subscribe { (event) in
print(event.element)
}.disposed(by: bag)
breal.accept("second")
return
}
@objc dynamic func teseBehaviorSubject() -> Void {
// 定订阅 BehaviorSubject 的时候,会将最新的元素发射出来
let sub = BehaviorSubject<Int>.init(value: 10)
sub.subscribe { (event) in
print(event.element)
}.disposed(by: bag)
sub.onNext(99)
return
}
@objc dynamic func testPublishiSubject() -> Void {
let sub = PublishSubject<String>()
sub.onNext("first")
let subScrition = sub.subscribe { (event) in
print(event.element)
}.disposed(by: bag)
sub.onNext("second")
// 当onCompleted 执行完之后,后面的事件不会在发射
// sub.onCompleted()
// 当onError 执行完之后,后面的事件不会在发射
sub.onError(MyError.normalError)
sub.onNext("third")
// 一旦前面有onComplete or onError,后面的就不会再发射 .next
sub.subscribe { (event) in
print("ok\(event.element)")
}.disposed(by: bag)
sub.onNext("forth")
return
}
@objc dynamic func testCreateObserver() -> Void {
let ele = Observable<Int>.create { (anyObserver) -> Disposable in
// 如果这里来个定时器不停的触发anyObserver的on事件,那么,就不会不停的触发subscribe闭包
Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { (timer) in
anyObserver.on(Event<Int>.next(10))
}
return Disposables.create {
print("cancle")
}
}
/*
其实对于 subscribe(onNext{})闭包内也是帮助我们创建了一个观察者
所有的 subscribe 都是一个桥梁, 左边是 Observerable 序列信号,相当于被观察者
右边都是 Observer ,观察者,实际上我们是把我们的监听闭包传递给了观察者,然后观察者在收到
消息的时候,调用 on(_ event: Event<Element>) 方法,然后我们的监听闭包就会被触发
*/
ele.subscribe(onNext: { (element) in
print(element)
}).disposed(by: bag)
return
}
网友评论