RedCloud Help

8 Data Buffers and Codecs

Java NIO提供了ByteBuffer,但许多库都在此基础上构建了自己的字节缓冲区API,尤其是对于重复使用缓冲区或使用直接缓冲区有利于提供性能的网络操作。例如,Netty具有ByteBuf层次结构,Undertow使用XNIO,Jetty使用带有释放回调的池化字节缓冲区,等等。Spring-core模块提供了一组抽象,可与各种字节缓冲区API配置使用,具体如下:

  • DataBufferFactory抽象了缓冲区的创建。

  • DataBuffer表示字节缓冲区,可以池化。

  • DataBufferUtils为数据缓冲区提供适用方法。

  • Codecs:将数据缓冲流解码或编码成更高级别的对象。

8.1 DataBufferFactory

DataBufferFactory用于以两种方式之一创建数据缓冲区:

  1. 及时DataBuffer的实现可以按需增大或缩小,这种方法也更有效。

  2. 对现有的byte[]或java.nio.ByteBuffer进行包装,用DataBuffer实现对给定数据进行装饰,且不涉及分配。

请注意,WebFlux应用程序不会直接创建DataBufferFactory,而是通过客户端的ServerHttpResponse或ClientHttpRequest来访问它。工厂的类型取决于底层客户端或服务器,例如,Reactor Netty使用NetttyDataBufferFactory,而其他客户端或服务器使用DefaultDataBufferFactory。

8.2 DataBuffer

DataBuffer界面提供了与java.nio.ByteBuffer类似的操作,单也带来了一些额外的好处,其中的一些事收到Netty ByteBuf的启发。以下是部分优点列表:

  • 读取和写入位置独立,即不需要调用flip()来交替读取和写入。

  • 与java.lang.StringBuilder一样,可根据需求扩展容量。

  • 通过PooledDataBuffer实现池化缓冲区和引用计数。

  • 以java.nio.ByteBuffer、InputStream或OutputStream查看缓冲区。

  • 确定给定字节的索引或最后索引。

8.3 PooledDataBuffer

正如ByteBuffer的javadoc所解释的,字节缓冲区可以是直接或非直接的。直接缓冲区可以驻留在java堆之外,这样就需要复制本地I/O操作。这使得直接缓冲区在通过套接字接受和发送数据时特别有用,但它们的创建和释放成本也更高,这就产生了池化缓冲区的想法。 PooledDataBuffer是DataBuffer的扩展,有助于进行引用计数,这对字节缓冲区池至关重要。它是如何工作的?分配PooledDataBuffer时,引用计数为1.调用retain() 会使计数递增,而调用release()则会使计数递减。只要计数大于0,缓冲区就不会被释放。当计数减少到0时,池中的缓冲区就可以释放,这实际上意味着缓冲区的预留内存将返回内存池。 请注意,在大多数情况下,最好不要直接堆PooledDataBuffer进行操作,而是使用DataBufferUtils中的便捷方法,只有当DataBuffer是PooledDataBuffer的实例时,才对其应用释放或保留。

8.4 DataBufferUtils

DataBufferUtils提供了许多实用方法来操作数据缓冲区:

  • 如果底层字节缓冲区应用程序接口支持,可通过复合缓冲区等方式,将数据缓冲区流合并为一个缓冲区,且可能零拷贝。

  • 将 InputStream或NIO Channel变成Flux ,反之亦然,将Publisher 变成OutputStream或NIO channel。

  • 如果缓冲区是PooledDataBuffer的实例,则释放或保留DataBuffer的方法。

  • 从字节流中跳过或取走特定的字节数。

8.5 Codecs

org.springframework.core.codec软件包提供以下策略接口:

  • Encoder将Publisher 编码为数据缓冲区流。

  • Decoder将Publisher 解码为更高级对象流。

spring-core模块提供byte[]、ByteBuffer、DataBuffer、Resource合String编码器和解码器实现。spring-web模块添加了Jackson Json、jackson Smile、JAXB2、Protocol Buffer和其他编码器合解码器。请参见WebFlux部分中的编解码器。

8.6 Using DataBuffer

在处理数据缓冲区时,必须特别注意确保缓冲区被释放,因为它们可能会被池化。我们将使用解码器来说明其工作原理,单这些概念也适用于更广泛的情况。让我们来看看编码器内部必须如何管理数据缓冲区。 Decoder是最后一个读取输入数据缓冲区的对象,然后才创建更高级别的对象,因此它必须按也歘下方式释放缓冲区:

  1. 如果Decoder只需读取每个输入缓冲区,并准备立即释放,则可以通过DataBufferUtils.release(dataBuffer)来实现。

如果Decoder正在使用Flux或Mono操作符(如flatMap、reduce及其他在内部预取和缓存数据项的操作符),或者正在使用filter、skip及其他遗漏项的操作符,则必须将doOnDiscard( PooledDataBuffer.class,DataBufferUtils::relaese)添加到组合链中,以确保此类缓冲区在被丢弃之前被释放,也可能是错误或取消信号的结果。

  1. 如果Decoder以任何其他方式保留一个或多个数据缓冲区,则必须确保在完全读取时释放这些缓冲区,或者在缓存数据缓冲区读取和释放之前发生错误或取消信号时释放这些缓冲区。

请注意,DataBufferUtils#join提供了数据缓冲区聚合到单个数据缓冲区的安全高效方法。同时,skipUtilByteCount和takeUtilByteCount也是供解码器使用的其他安全方法。 Encoder分配了数据缓冲区,其他人必须读取(并释放)这些数据缓冲区。因此,Encoder并没有太多工作要做。但是,如果在向数据缓冲区填充数据时发生序列化错误,Encoder必须注意释放数据缓冲区。例如:

DataBuffer buffer = factory.allocateBuffer(); boolean release = true; try { // serialize and populate buffer.. release = false; } finally { if (release) { DataBufferUtils.release(buffer); } } return buffer;

Encoder的消费者负责释放其接受的数据缓冲区。在WebFlux应用程序中,Encoder的输出用于写入HTTP服务器响应或客户端HTTP请求,在这种情况下,释放数据缓冲区是写入服务器响应或客户端请求的代码的责任。 请注意,在Netty上运行时,有一些调试选项可用于排除缓冲泄露的故障。

03 May 2025