/**
This method creates a new observable instance
with a variable number of elements.
see also:http://reactivex.io/documentation/operators/from.html
- parameter elements: Elements to generate
- parameter scheduler: Scheduler to send elements on.
If 'nil', elements are sent immediately on subscription
- returns: The observable sequence whose elements are pulled from the given arguments.
*/
public static func of(elements: E ..., scheduler: ImmediateSchedulerType? = nil)->Observable<E> {
return Sequence(elements: elements, scheduler: scheduler)
}
class Sequence<E>: Producer<E> {
private let _elements: [E]
private let _scheduler: ImmediateSchedulerType?
init(elements: [E], scheduler: ImmediateSchedulerType?) {
_elements = elements
_scheduler = scheduler
}
override func subscribe<O: ObserverType where O.E == E>(observer: O)->Disposable {
// optimized version without scheduler
guard _scheduler != nil else {
for element in _elements {
observer.on(.Next(element))
}
observer.on(.Completed)
return NoDisposable.instance
}
let sink = SequenceSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
class SequenceSink<O: ObserverType>: Sink<O> {
typealias Parent = Sequence<O.E>
private let _parent: Parent
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer)
}
func run()->Disposable {
return _parent._scheduler!.scheduleRecursive( (0, _parent._elements)) { (state, recurse) in
if state.0 < state.1.count {
self.forwardOn(.Next(state.1[state.0]))
recurse((state.0 + 1, state.1))
}else {
self.forwardOn(.Completed)
}
}
}
}
网友评论