从collector到buffer
下面我们从数据源出开始分析数据是如何写入到Flink缓存中的。
NonTimestampContext.collect方法。该方法位于数据源(SourceFunction)中。
@Overridepublic void collect(T element) {synchronized (lock) {output.collect(reuse.replace(element));}}
这里调用的是output对象的collect方法。Output对象是Output
CountingOutput仅仅是一个包装类型,包装了一个Output。相比于其他Output而言多出了收集元素数量的监控。CountingOutput维护了一个计数器类型监控变量:
private final Counter numRecordsOut;
在collect元素的时候调用了numRecordsOut.inc()方法,实现了对收集元素数量的监控。
NoTimestampContext的CountingOuput封装的output是什么类型的呢?我们通过debug查看发现内层的类型为RecordWriterOutput。
RecordWriterOutput的collect方法如下所示:
@Overridepublic void collect(StreamRecord<OUT> record) {if (this.outputTag != null) {// we are not responsible for emitting to the main output.return;}pushToRecordWriter(record);}
pushToRecordWriter方法使用序列化代理,将record传递给recordWriter。代码如下:
private <X> void pushToRecordWriter(StreamRecord<X> record) {serializationDelegate.setInstance(record);try {recordWriter.emit(serializationDelegate);}catch (Exception e) {throw new RuntimeException(e.getMessage(), e);}}
RecordWriter负责把数据序列化,然后写入到缓存中。它有两个实现类:
- BroadcastRecordWriter: 维护了多个下游channel,发送数据到下游所有的channel中。
- ChannelSelectorRecordWriter: 通过channelSelector对象判断数据需要发往下游的哪个channel。keyBy算子用的正是这个RecordWriter。
这里我们分析下ChannelSelectorRecordWriter的emit方法:
public void emit(T record) throws IOException {emit(record, channelSelector.selectChannel(record));}
很明显这里使用了channelSelector.selectChannel方法。该方法为record和对应下游channel id的函数关系。
接下来我们又回到了父类RecordWriter。
protected void emit(T record, int targetSubpartition) throws IOException {checkErroneous();// 序列化record,并向指定的目标分区发送targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);// 判断是否进行刷新操作if (flushAlways) {targetPartition.flush(targetSubpartition);}}
关键的逻辑在emitRecord方法内,进一步分析该方法。
ResultPartition用于保存单个task产生的数据。它有四个实现类。
�BufferWritingResultPartition:负责将buffers直接ResultSubpartition。BufferWritingResultPartition有两个实现类:
BoundedBlockingResultPartition:保存单个任务的输出数据结果。用于批量数据。
PipelinedResultPartition:用于流式数据。
SortMergeResultPartition:用于排序合并结果数据。
主要查看一下BufferWritingResultPartition的emitRecord方法
@Overridepublic void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {// 将record追加到缓存中BufferBuilder buffer = appendUnicastDataForNewRecord(record, targetSubpartition);while (record.hasRemaining()) {// full buffer, partial recordfinishUnicastBufferBuilder(targetSubpartition);buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition);}if (buffer.isFull()) {// full buffer, full recordfinishUnicastBufferBuilder(targetSubpartition);}// partial buffer, full record}
进一步分析appendUnicastDataForNewRecord方法。
�
