映射
map
map
用于将可观察序列中的元素进行新的转换,返回新的观察序列。
let ob = Observable.of(1,2,3,4)
ob.map { (number) -> Int in
return number*2
}
.subscribe{
print("\($0)")
}
.disposed(by: disposeBag)
运行结果如下:
next(2)
next(4)
next(6)
next(8)
completed
那么map
是如何实现呢?我们来看看源码:
首先调用map
,会调用以下函数:
public func map<Result>(_ transform: @escaping (Element) throws -> Result)
-> Observable<Result> {
return self.asObservable().composeMap(transform)
}
经过一系列的调用之后,最终会来到Map
这个类中调用其的初始化方法,这个时候source
就是观察序列,而transform
则是map
后面的闭包。
final private class Map<SourceType, ResultType>: Producer<ResultType> {
typealias Transform = (SourceType) throws -> ResultType
private let _source: Observable<SourceType>
private let _transform: Transform
init(source: Observable<SourceType>, transform: @escaping Transform) {
self._source = source
self._transform = transform
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == ResultType {
let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel)
let subscription = self._source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
Map
继承自Producer
,当map
出来的结果调用subscribe
方法的时候,会调用Producer
的subscribe
方法,然后在回调中会调用Map.run()
,最终在MapSink
的on
方法中进行响应。
public func subscribe(_ on: @escaping (Event<Element>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
final private class MapSink<SourceType, Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Transform = (SourceType) throws -> ResultType
typealias ResultType = Observer.Element
typealias Element = SourceType
// map的尾随闭包
private let _transform: Transform
init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
self._transform = transform
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<SourceType>) {
switch event {
case .next(let element):
do {
// 先使用_transform对element进行变换
// 然后发送事件
let mappedElement = try self._transform(element)
self.forwardOn(.next(mappedElement))
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
case .error(let error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
self.forwardOn(.completed)
self.dispose()
}
}
}
这里的处理是先使用之前保存的_transform
对当前元素进行变化,然后将事件发送出去。
flatMap
将可观察序列的每个元素映射转换成一个可观察序列,并将所得的可观察序列合并为一个可观察序列。一般用在当Observable
的元素本身拥有其他的Observable
时,可以将所有子Observables
的元素作为事件发送出来。
func flatMapTest() {
let first = BehaviorSubject.init(value: "1")
let second = BehaviorSubject.init(value: "A")
let rable = BehaviorRelay.init(value: first) // 切换监听哪一个序列
rable.asObservable()
.flatMap { $0 }
.subscribe(onNext: { (element) in
print("==\(element)==")
}).disposed(by: disposeBag)
first.onNext("2")
rable.accept(second)
second.onNext("B")
first.onNext("3")
}
控制台输出:
==1==
==2==
==A==
==B==
==3==
flatMapFirst
将可观察序列的每个元素映射转换成一个可观察序列Observables
,然后取这些Observables
中的第一个。
func flatMapTest() {
let first = BehaviorSubject.init(value: "1")
let second = BehaviorSubject.init(value: "A")
let rable = BehaviorRelay.init(value: first)
rable.asObservable()
.flatMapFirst { $0 }
.subscribe(onNext: { (element) in
print("==\(element)==")
}).disposed(by: disposeBag)
first.onNext("2")
rable.accept(second)
second.onNext("B")
first.onNext("3")
}
控制台输出:
==1==
==2==
==3==
flatMapLatest
将可观察序列的每个元素映射转换成一个可观察序列Observables
,然后取这些Observables
中的最后一个。
func flatMapTest() {
let first = BehaviorSubject.init(value: "1")
let second = BehaviorSubject.init(value: "A")
let rable = BehaviorRelay.init(value: first)
rable.asObservable()
.flatMapLatest { $0 }
.subscribe(onNext: { (element) in
print("==\(element)==")
}).disposed(by: disposeBag)
first.onNext("2")
rable.accept(second)
second.onNext("B")
first.onNext("3")
}
控制台输出:
==1==
==2==
==A==
==B==
组合
startWith
startWith
用于将元素插入可观察序列的头部
func startWithTest() {
Observable.of("1", "2", "3", "4")
.startWith("5")
.startWith("6", "7")
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
}
控制台输出:
==6==
==7==
==5==
==1==
==2==
==3==
==4==
源码实现:
final private class StartWith<Element>: Producer<Element> {
let elements: [Element]
let source: Observable<Element>
init(source: Observable<Element>, elements: [Element]) {
self.source = source
self.elements = elements
super.init()
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
// 遍历发出
for e in self.elements {
observer.on(.next(e))
}
return (sink: Disposables.create(), subscription: self.source.subscribe(observer))
}
}
concat
连接所有内部可观察序列,只要先前的可观察序列成功终止即可。也就是说前面的序列只要成功调用onCompleted
方法,就可以连接后面的序列。concat
连接是按顺序执行的。
func concatTest() {
let first = BehaviorSubject.init(value: "1")
let second = BehaviorSubject.init(value: "A")
let rable = BehaviorRelay.init(value: first)
rable.asObservable()
.concat()
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
first.onNext("2")
second.onNext("B")
rable.accept(second)
first.onCompleted()
second.onNext("C")
}
控制台输出:
==1==
==2==
==B==
==C==
当我们把second.onNext("B")
注释掉,控制台输出:
==1==
==2==
==A==
==C==
为什么会有这个区别呢?这是因为concat
连接的时候需要等前一个序列执行完毕,也就是说后面序列的发送事件如果在前面序列的onCompleted
事件之前,那就只会保留一个,因此会出现上述情况。
merge
将源可观察序列中的元素组合成一个新的可观察序列,如果某一个Observable
发出一个onError
事件,那么被合并的Observable
也会将它发出,并且立即终止序列。
func mergeTest() {
let sub1 = PublishSubject<String>()
let sub2 = PublishSubject<String>()
Observable.of(sub1, sub2)
.merge()
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
sub1.onNext("1")
sub2.onNext("A")
sub1.onNext("2")
sub1.onNext("3")
sub2.onNext("B")
}
控制台输出:
==1==
==A==
==2==
==3==
==B==
zip
通过一个函数将多个Observables
的元素组合起来,然后将每一个组合的结果作为一个元素发出来。zip
能够组合的可观察序列是有限制的,最多不超过8个。最后发出的素数量等于源Observable
中元素数量最少的那个,也就是每一个元素的组成必须是和所有源Observable
一一对应,如果缺少一个源Observable
则不会发出元素。
func zipTest() {
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
Observable.zip(stringSubject, intSubject) { stringElement, intElement in
// 合并的方式
"\(stringElement) \(intElement)"
}
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
stringSubject.onNext("A")
stringSubject.onNext("B")
intSubject.onNext(1)
intSubject.onNext(2)
stringSubject.onNext("C")
intSubject.onNext(3)
stringSubject.onNext("D")
}
控制台输出:
==A 1==
==B 2==
==C 3==
combineLatest
当多个Observables
中任何一个发出一个元素,就发出一个元素。这个元素是由这些Observables
中最新的元素,通过一个函数组合起来的。
func combineLatestTest() {
let stringSub = PublishSubject<String>()
let intSub = PublishSubject<Int>()
Observable.combineLatest(stringSub, intSub) { strElement, intElement in
// 组合的方式
"\(strElement)+\(intElement)"
}
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
stringSub.onNext("A")
stringSub.onNext("B") // B会覆盖A
intSub.onNext(1)
intSub.onNext(2)
stringSub.onNext("C")
intSub.onNext(3)
stringSub.onNext("D")
}
控制台输出:
==B+1==
==B+2==
==C+2==
==C+3==
==D+3==
可以看出combineLatest
和zip
的相同点是,两者都是子Observables
有值才会发出元素,但是combineLatest
只要有值过,就会发出新值,并且当只是某一个子Observables
有值时,后面的元素会覆盖前面的元素;而zip
不会覆盖,必须是成对出现。
switchLatest
将可观察序列发出的元素转换为可观察序列,并从最新的内部可观察序列发出元素。
func switchLatestTest() {
let sub1 = PublishSubject<String>()
let sub2 = PublishSubject<String>()
Observable.of(sub1, sub2)
.switchLatest()
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
sub1.onNext("1")
sub2.onNext("A")
sub1.onNext("2")
sub1.onNext("3")
sub2.onNext("B")
}
控制台输出:
==A==
==B==
scan
在可观察序列上应用累加器函数并返回每个中间结果,指定的种子值用作初始累加器值。
func scanTest() {
Observable.of(10, 100, 1000)
.scan(2) { aggregateValue, newValue in
aggregateValue + newValue // 10 + 2 , 100 + 10 + 2 , 1000 + 100 + 2
}
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
}
控制台输出:
==12==
==112==
==1112==
reduce
将可观察序列中的元素和初始种子值在累加器中累加,输出累加之后的结果。也就是对第一个元素应用一个函数。然后,将结果作为参数填入到第二个元素的应用函数中。以此类推,直到遍历完全部的元素后发出最终结果。
func reduceTest() {
Observable.of(10, 100, 1000)
.reduce(2, accumulator: +) // 2+10+100+1000
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
}
控制台输出:
==1112==
和scan
不同的是,scan
每一个元素计算出来都会发送一次,reduce
只会发出最终结果。
toArray
将可观察序列转换为Single
信号,也就是将可观察序列中的元素整合之后作为一个元素发出,然后终止。
func toArrayTest() {
Observable.range(start: 1, count: 5)
.toArray()
.subscribe(onSuccess: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
}
控制台输出
==[1, 2, 3, 4, 5]==
过滤
filter
根据谓词过滤可观察序列的元素。
func filterTest() {
Observable.of(1,2,3,4,5,6,7,8,9)
.filter { $0 % 2 == 0 }
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
}
控制台输出:
==2==
==4==
==6==
==8==
源码如下:
extension ObservableType {
public func filter(_ predicate: @escaping (Element) throws -> Bool)
-> Observable<Element> {
return Filter(source: self.asObservable(), predicate: predicate)
}
}
final private class FilterSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
func on(_ event: Event<Element>) {
switch event {
case .next(let value):
do {
// 谓词过滤
let satisfies = try self._predicate(value)
if satisfies {
self.forwardOn(.next(value))
}
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
case .completed, .error:
self.forwardOn(event)
self.dispose()
}
}
}
elementAt
将可观察序列中指定索引数的元素发出。
func elementAtTest () {
Observable.of("A", "B", "C", "D")
.elementAt(2)
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
}
控制台输出:
==C==
distinctUntilChanged
阻止可观察序列发出相同的元素。如果后一个元素和前一个元素是相同的,那么这个元素将不会被发出来。如果后一个元素和前一个元素不相同,那么这个元素才会被发出来。
func distinctUntilChangedTest() {
Observable.of("1", "2", "2", "2", "3", "3", "4")
.distinctUntilChanged()
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
}
控制台输出
==1==
==2==
==3==
==4==
single
限制可观察序列只发出一个元素,如果没有元素或者元素数量大于1,它将产生一个error
事件。
func singleTest() {
Observable.of("1", "2", "3","4")
.single()
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
}
控制台输出:
==1==
Unhandled error happened: Sequence contains more than one element.
subscription called from:
take
从可观察序列的开始位置返回指定数量的连续元素。
func takeTest() {
Observable.of("1", "2", "3","4")
.take(2)
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
}
控制台输出:
==1==
==2==
takeLast
从可观察序列的末尾返回指定数量的连续元素,可能会存在延迟的情况。
func takeLastTest() {
Observable.of("1", "2", "3","4")
.takeLast(2)
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
}
控制台输出:
==3==
==4==
takeWhile
只要指定条件的值为true
,就从可观察序列的开始发出元素。一旦为false
则直接发送完成事件。
func takeWhileTest() {
Observable.of("1", "2", "3","4")
.takeWhile({ (item) -> Bool in
return item != "2"
})
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
}
控制台输出:
==1==
上述例子中,虽然我们后面的元素还是不等于"2"
,但是前面等于"2"
的时候就已经发送了完成信息,结束了订阅观察。
takeUntil
忽略掉在第二个可观察序列产生事件后发出的那部分元素。当我们在订阅一个观察序列的时候,一旦参考观察序列发出一个元素或者产生一个终止事件,原理订阅的观察序列会立即终止。
func takeUntilTest() {
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.takeUntil(referenceSequence)
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
sourceSequence.onNext("A")
sourceSequence.onNext("B")
sourceSequence.onNext("C")
referenceSequence.onNext("1") // 会终止原来的订阅
sourceSequence.onNext("D")
sourceSequence.onNext("E")
}
控制台输出:
==A==
==B==
==C==
skip
从可观察序列的开始位置跳过指定数量的元素。
func skipTest() {
Observable.of(1, 2, 3, 4, 5, 6)
.skip(2)
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
}
控制台输出:
==3==
==4==
==5==
==6==
skipWhile
判断可观察元素的开始位置,只要指定条件为true
,就绕过可观察序列中对应的元素,然后返回其余元素,如果为false
,则正常返回剩余的所有元素。
func skipTest() {
Observable.of(1, 2, 3, 4, 5, 6)
.skipWhile { $0 < 3 }
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
}
控制台输出:
Observable.of(1, 2, 3, 4, 5, 6) // 3456
Observable.of(6, 5, 4, 3, 2, 1) // 654321
skipUntil
跳过可观察序列开始位置的几个元素,直到参考的可观察序列发出一个元素才会接着发出原来可观察序列的元素。
func skipUntilTest() {
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.skipUntil(referenceSequence)
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
sourceSequence.onNext("A")
sourceSequence.onNext("B")
sourceSequence.onNext("C")
referenceSequence.onNext("1") // 只发出条件下面的元素
sourceSequence.onNext("D")
sourceSequence.onNext("E")
}
控制台输出:
==D==
==E==
可以看出,skipUntil
和takeUntil
正好相反。
错误处理
catchErrorJustReturn
可观察序列发送错误事件,catchErrorJustReturn
会返回一个正常的事件,然后再终止观察序列。
func catchErrorJustReturnTest() {
let errorSub = PublishSubject<String>()
errorSub.catchErrorJustReturn("这是我们自定义的错误")
.subscribe(onNext: { (element) in
print("==\(element)==")
})
.disposed(by: disposeBag)
errorSub.onNext("A")
errorSub.onNext("B")
errorSub.onError(NSError.init(domain: "错误", code: 10000, userInfo: nil)) // 发error就会调用我们自定义的错误
}
控制台输出:
==A==
==B==
==这是我们自定义的错误==
catchError
拦截一个error
事件,将它替换成其他的元素或者一组元素,然后传递给观察者。这样可以使得可观察序列正常运行。
func catchErrorTest() {
let sub1 = PublishSubject<String>()
let sub2 = PublishSubject<String>()
sub1.catchError { (element) -> Observable<String> in
print("==Error==\(element)==")
return sub2
}
.subscribe { (element) in
print("==\(element)==")
}
.disposed(by: disposeBag)
sub1.onNext("A")
sub1.onNext("B")
sub1.onError(NSError.init(domain: "错误", code: 10000, userInfo: nil))
sub2.onNext("1")
sub2.onNext("2")
}
控制台输出:
==next(A)==
==next(B)==
==Error==Error Domain=错误 Code=10000 "(null)"==
==next(1)==
==next(2)==
多播、链接
publish & connect
publish
会将可观察序列转换为可被共享的观察序列,connect
则通知可被共享的观察序列可以开始发出元素了。
被共享的观察序列在被订阅后不会发出元素,直到应用链接connect
为止,也就是说publish
将多次订阅共享链接之后,才会发出事件。这样一来我们就可以控制可观察序列在什么时候开始发出元素。
func multicastTest() {
// *** multicast : 将源可观察序列转换为可连接序列,并通过指定的主题广播其发射。
let pOb = Observable<Any>.create { (observer) -> Disposable in
print("==开始发送事件了==")
observer.onNext("发送广播事件")
return Disposables.create {
print("销毁回调了")
}
}.publish()
pOb.subscribe(onNext: { (element) in
print("订阅1==\(element)==")
})
.disposed(by: disposeBag)
pOb.subscribe(onNext: { (element) in
print("订阅2==\(element)==")
})
.disposed(by: disposeBag)
_ = pOb.connect()
}
控制台输出:
==开始发送事件了==
订阅1==发送广播事件==
订阅2==发送广播事件==
源码如下:
在我们创建可观察序列,调用publish
方法的时候会进入以下方法:
public func publish() -> ConnectableObservable<Element> {
return self.multicast { PublishSubject() }
}
public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
-> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
}
返回的是一个ConnectableObservableAdapter
对象,该对象继承自ConnectableObservable
,而ConnectableObservable
则是直接继承自Observable
,而不是producer
,这个继承关系和前面的一些方法是不一样的。那么其订阅方法也就不是producer
中的subscribe
方法了。
public class ConnectableObservable<Element>
: Observable<Element>
, ConnectableObservableType {
public func connect() -> Disposable {
rxAbstractMethod()
}
}
final private class ConnectableObservableAdapter<Subject: SubjectType>
: ConnectableObservable<Subject.Element> {
typealias ConnectionType = Connection<Subject>
fileprivate let _source: Observable<Subject.Observer.Element>
fileprivate let _makeSubject: () -> Subject
fileprivate let _lock = RecursiveLock()
fileprivate var _subject: Subject?
// state
fileprivate var _connection: ConnectionType?
init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
self._source = source
self._makeSubject = makeSubject
self._subject = nil
self._connection = nil
}
// 使用懒加载方法将我们传入的PublishSubject()对象返回
fileprivate var lazySubject: Subject {
if let subject = self._subject {
return subject
}
let subject = self._makeSubject()
self._subject = subject
return subject
}
// 自己实现的订阅方法
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
return self.lazySubject.subscribe(observer)
}
}
可以看出,ConnectableObservableAdapter
自己实现了subscribe
方法,例子中pOb
调用的方法就是该方法了,我们再来看看这个方法做了什么。这其中lazySubject
就是我们之前外界传入的_makeSubject()
创建的对象,也就是PublishSubject()
。
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType {
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
// 订阅
func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
}
我们再来看看pOb
调用connect()
方法的相关实现:
final private class ConnectableObservableAdapter<Subject: SubjectType>
: ConnectableObservable<Subject.Element> {
override func connect() -> Disposable {
return self._lock.calculateLocked {
if let connection = self._connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
// 创建connection观察者
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
// 订阅观察
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
}
final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
typealias Element = Subject.Observer.Element
private var _lock: RecursiveLock
// state
private var _parent: ConnectableObservableAdapter<Subject>?
private var _subscription : Disposable?
private var _subjectObserver: Subject.Observer
private let _disposed = AtomicInt(0)
init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
self._parent = parent
self._subscription = subscription
self._lock = lock
self._subjectObserver = subjectObserver
}
// 观察者发送事件的时候 最终会来到这里
// 从这里进入到PublishSubject.on 然后再给订阅者分发
func on(_ event: Event<Subject.Observer.Element>) {
if isFlagSet(self._disposed, 1) {
return
}
if event.isStopEvent {
self.dispose()
}
// 发送事件
self._subjectObserver.on(event)
}
}
在调用connect()
方法的时候也做了一层判断,一旦创建过connection
就再也不会创建了。而且在第一次创建的时候,会把connection
作为观察者传入订阅的方法中。按照之前订阅流程,发送事件的时候,必然会调用Connection
的on
方法,而最终会让我们之前懒加载的lazySubject
对象调用on
方法。也就是发送事件的观察者只有一个,也就是lazySubject
。
上述例子中,如果我们去掉publish()
和connect()
,则就成为了普通的可观察序列,此时运行程序的结果如下:
==开始发送事件了==
订阅1==发送广播事件==
==开始发送事件了==
订阅2==发送广播事件==
因为我们在订阅方法中,每一次订阅都创建了一个观察者,所以创建观察序列的回调会走两次。而在多播中,lazySubject
只有一个,也就是说观察者是唯一的,事件也就只会发送一次。这样就是多播的含义,一人发送,多人接收。
网友评论