视频讲解
这儿B站上有个大神的Stream的讲解视频
点击查看【bilibili】
文字讲解
Stream字面上可以理解成流,类似水滴
final stream = Stream.periodic(Duration(seconds: 1), (_) => 43 );
使用场景
StreamBuilder
File file = new File('/test.txt');Stream fileStream = file.openRead();StreamBuilder(stream: fileStream,builder: (context, snapshot) {switch (snapshot.connectionState) {case ConnectionState.none:return Text('没有数据流');break;case ConnectionState.waiting:return Text('等待数据流');break;case ConnectionState.active:if (snapshot.hasError) {return Text('数据流错误');}else {return Text('数据流正常'+snapshot.data);}break;case ConnectionState.done:return Text('数据流已经关闭');break;default:}return Container();},)
然后需要我们手动的去添加流,那就需要用到
StreamController controller = StreamController();void onTap() {controller.sink.add(10);}void addErrorStream() {controller.sink.addError('error');}StreamBuilder(stream: controller.stream,builder: (context, snapshot) {switch (snapshot.connectionState) {case ConnectionState.none:return Text('没有数据流');break;case ConnectionState.waiting:return Text('等待数据流');break;case ConnectionState.active:if (snapshot.hasError) {return Text('数据流错误');}else {return Text('数据流正常'+snapshot.data);}break;case ConnectionState.done:return Text('数据流已经关闭');break;default:}return Container();},)
想要流结束时可以用
controller.sink.close();
页面注销掉时
controller.close();
这是通过StreamBuilder的方式去处理数据流,但有的时候并不能满足业务需求,比如流事件延时触发,这个时候就需要通过监听的方式去处理。
listen
void listenStream() {Future.delayed(Duration(seconds: 1), () {controller.stream.listen((event) {},onError: (err) {},onDone: () {});});}
EventBus
对象的创建
EventBus({bool sync = false}): _streamController = StreamController.broadcast(sync: sync);EventBus.customController(StreamController controller): _streamController = controller;
两种方式对象的创建,最终的目的就是创建一个流控制器StreamController,也就是用数据流的思维去解决事件监听。
事件的开端
void fire(event) {streamController.add(event);}
每一个事件event当作一个数据流,添加到controller中,这是事件的触发点,类似发布者。
事件的监听和响应
Stream<T> on<T>() {if (T == dynamic) {return streamController.stream as Stream<T>;} else {return streamController.stream.where((event) => event is T).cast<T>();}}
就是获取流控制器中的stream,接着通过listen的方式,做出监听后的响应
eventBus.on().listen((event) {//do something});
当然如果再往深层去看listen和add的方法实现,可以看看StreamController的代码实现
StreamController源码
对象创建
factory StreamController({void onListen()?,void onPause()?,void onResume()?,FutureOr<void> onCancel()?,bool sync = false}) {return sync? _SyncStreamController<T>(onListen, onPause, onResume, onCancel): _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);}
说明对象有几个方法,但listen的触发是stream对象的,那么就要去Stream中看看
StreamSubscription<T> listen(void onData(T event)?,{Function? onError, void onDone()?, bool? cancelOnError});
发现这里面确实有几个回调方法,但stream的回调怎么和controller的add方法有什么联系呢?
子对象
class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T>implements SynchronousStreamController<T> {_SyncBroadcastStreamController(void onListen()?, void onCancel()?): super(onListen, onCancel);void _sendData(T data) {if (_isEmpty) return;if (_hasOneListener) {_state |= _BroadcastStreamController._STATE_FIRING;_BroadcastSubscription<T> firstSubscription =_firstSubscription as dynamic;firstSubscription._add(data);_state &= ~_BroadcastStreamController._STATE_FIRING;if (_isEmpty) {_callOnCancel();}return;}_forEachListener((_BufferingStreamSubscription<T> subscription) {subscription._add(data);});}}//在_BroadcastStreamController发现void add(T data) {if (!_mayAddEvent) throw _addEventError();_sendData(data);}
这样就发现两者就联系起来了,通过_BroadcastStreamController中的add方法添加事件,然后通过_sendData方法对应到controller中的此方法,之后就是把这个事件action了
任务的调度
void _forEachListener(void action(_BufferingStreamSubscription<T> subscription)) {if (_isFiring) {throw new StateError("Cannot fire new event. Controller is already firing an event");}if (_isEmpty) return;int id = (_state & _STATE_EVENT_ID);_state ^= _STATE_EVENT_ID | _STATE_FIRING;_BroadcastSubscription<T>? subscription = _firstSubscription;while (subscription != null) {if (subscription._expectsEvent(id)) {subscription._eventState |= _BroadcastSubscription._STATE_FIRING;action(subscription);subscription._toggleEventId();_BroadcastSubscription<T>? next = subscription._next;if (subscription._removeAfterFiring) {_removeListener(subscription);}subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;subscription = next;} else {subscription = subscription._next;}}_state &= ~_STATE_FIRING;if (_isEmpty) {_callOnCancel();}}
会发现action(subscription)这行代码,说明在执行上面的subscription._add(data);,然后在看看这个_add(data);
void _add(T data) {assert(!_isClosed);if (_isCanceled) return;if (_canFire) {_sendData(data);} else {_addPending(new _DelayedData<T>(data));}}void _addPending(_DelayedEvent event) {_StreamImplEvents<T>? pending = _pending as dynamic;pending ??= _StreamImplEvents<T>();_pending = pending;pending.add(event);if (!_hasPending) {_state |= _STATE_HAS_PENDING;if (!_isPaused) {pending.schedule(this);}}}void schedule(_EventDispatch<T> dispatch) {if (isScheduled) return;assert(!isEmpty);if (_eventScheduled) {assert(_state == _STATE_CANCELED);_state = _STATE_SCHEDULED;return;}scheduleMicrotask(() {int oldState = _state;_state = _STATE_UNSCHEDULED;if (oldState == _STATE_CANCELED) return;handleNext(dispatch);});_state = _STATE_SCHEDULED;}void handleNext(_EventDispatch<T> dispatch);
会发现scheduleMicrotask把这个dispatch放到微任务日程队列中,剩下的就是系统去执行那个任务了。
