BufferedInputStream和BufferedOutputStream分别是FilterInputStream和FilterOutputStream的子类,它们实现了输入输出流的缓存功能,我们来看它们是怎样实现的。
为什么要实现缓存功能呢?我们可以想一下,InputStream中的read和OutputStream中的write方法,是针对每个字节都会执行一次,如果是这种发送方式的话,会有很大的网络开销。BufferedInputStream和BufferedOutputStream能让我们避免对每个字节都执行read和write方法调用。
BufferedOutputStream
内部的缓存数组
BufferedOutputStream会将write等操作映射到内部的OutputStream中,但是它会加上一个字节数组来进行缓存。对于这个字节数组,BufferedOutputStream有两个字段比较重要:
/*** The internal buffer where data is stored.*/protected byte buf[];/*** The number of valid bytes in the buffer. This value is always* in the range <tt>0</tt> through <tt>buf.length</tt>; elements* <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid* byte data.*/protected int count;
第一个buf就是字节数组,第二个是数组中有效字节的数量,因为刚开始初始化时数组肯定是空的,然后会一个一个地添加,每添加一个count就会加1。所以count的范围是从0到buf.length。
构造方法
因为有了缓存数组,所以BufferedOutputStream的构造方法除了需要内部的OutputStream,还需要初始化内部的缓冲字节数组,它有两个构造方法:
public BufferedOutputStream(OutputStream out) {this(out, 8192);}/*** Creates a new buffered output stream to write data to the* specified underlying output stream with the specified buffer* size.** @param out the underlying output stream.* @param size the buffer size.* @exception IllegalArgumentException if size <= 0.*/public BufferedOutputStream(OutputStream out, int size) {super(out);if (size <= 0) {throw new IllegalArgumentException("Buffer size <= 0");}buf = new byte[size];}
第一个是默认内部缓存字节数组的数量是8192,第二个可以自定义这个字节数组的长度。
单字节write方法
然后我们来看BufferedOutputStream是怎么执行write方法的:
public synchronized void write(int b) throws IOException {if (count >= buf.length) {flushBuffer();}buf[count++] = (byte)b;}
首先,它会判断count是不是大于等于缓冲数组的长度,也就是缓冲数组是否已满,如果没满,就直接将参数中的字节b添加到缓冲数组,如果缓冲数组已满,就调用flushBuffer方法,flushBuffer方法的如下:
/** Flush the internal buffer */private void flushBuffer() throws IOException {if (count > 0) {out.write(buf, 0, count);count = 0;}}
它的逻辑也很简单,就是调用内部的OutputStream的write方法将内部缓存数组中的字节全部写入,然后将count重置为0。
所以BufferedOutputStream的write方法逻辑就是首先填充内部的缓存字节数组,字节数组填充满之后直接一次性将这些字节写入,然后重新再填充缓存。
多字节write方法
public synchronized void write(byte b[], int off, int len) throws IOException {if (len >= buf.length) {/* If the request length exceeds the size of the output buffer,flush the output buffer and then write the data directly.In this way buffered streams will cascade harmlessly. */flushBuffer();out.write(b, off, len);return;}if (len > buf.length - count) {flushBuffer();}System.arraycopy(b, off, buf, count, len);count += len;}
多字节的write方法的逻辑也是首先判断缓冲字节数组是否已满,未满就将参数字节数组添加到缓冲字节数组中,前提是保证能添加进去,也就是缓冲字节数组当前的剩余长度大于要添加的字节数组的长度,如果不能添加,也要先调用flushBuffer方法来刷新缓存。
整体来说,BufferedOutputStream的缓存方式比较简单也容易理解,相比较而言,BufferedInputStream就相对要复杂一些,我们来看一下它是怎么实现缓存的。
BufferedInputStream
内部的缓存数组
BufferedInputStream与缓存数组相关的有下面三个字段:
private static int DEFAULT_BUFFER_SIZE = 8192;/*** The internal buffer array where the data is stored. When necessary,* it may be replaced by another array of* a different size.*/protected volatile byte buf[];/*** The index one greater than the index of the last valid byte in* the buffer.* This value is always* in the range <code>0</code> through <code>buf.length</code>;* elements <code>buf[0]</code> through <code>buf[count-1]* </code>contain buffered input data obtained* from the underlying input stream.*/protected int count;
第一个是默认的缓存大小,第二个就是缓存数组,第三个count是缓存数组中有效字节的数量。
read方法的实现
public synchronized int read() throws IOException {if (pos >= count) {fill();if (pos >= count)return -1;}return getBufIfOpen()[pos++] & 0xff;}
它首先判断pos是否大于等于count,也就是读到的位置是否大于等于缓存数组中字节的数量,如果是,说明缓存数组中的字节已经读完,需要重新填充缓存数组,填充缓存数组是通过调用fill方法实现的,我们来看一下它的逻辑:
private void fill() throws IOException {byte[] buffer = getBufIfOpen();if (markpos < 0)pos = 0; /* no mark: throw away the buffer */else if (pos >= buffer.length) /* no room left in buffer */if (markpos > 0) { /* can throw away early part of the buffer */int sz = pos - markpos;System.arraycopy(buffer, markpos, buffer, 0, sz);pos = sz;markpos = 0;} else if (buffer.length >= marklimit) {markpos = -1; /* buffer got too big, invalidate mark */pos = 0; /* drop buffer contents */} else if (buffer.length >= MAX_BUFFER_SIZE) {throw new OutOfMemoryError("Required array size too large");} else { /* grow buffer */int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?pos * 2 : MAX_BUFFER_SIZE;if (nsz > marklimit)nsz = marklimit;byte nbuf[] = new byte[nsz];System.arraycopy(buffer, 0, nbuf, 0, pos);if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {// Can't replace buf if there was an async close.// Note: This would need to be changed if fill()// is ever made accessible to multiple threads.// But for now, the only way CAS can fail is via close.// assert buf == null;throw new IOException("Stream closed");}buffer = nbuf;}count = pos;int n = getInIfOpen().read(buffer, pos, buffer.length - pos);if (n > 0)count = n + pos;}
逻辑有点多,自己看吧。
