博客 / 詳情

返回

NIO的零拷貝如何實現高效數據傳輸?

Java NIO零拷貝

在 Java NIO 中的通道(Channel)就相當於操作系統的內核空間(kernel space)的緩衝區,而緩衝區(Buffer)對應的相當於操作系統的用户空間(user space)中的用户緩衝區(user buffer)。

  • 通道(Channel)是全雙工的(雙向傳輸),它既可能是讀緩衝區(read buffer),也可能是網絡緩衝區(socket buffer)。
  • 緩衝區(Buffer)分為堆內存(HeapBuffer)和堆外內存(DirectBuffer),這是通過 malloc() 分配出來的用户態內存。

堆外內存(DirectBuffer)在使用後需要應用程序手動回收,而堆內存(HeapBuffer)的數據在 GC 時可能會被自動回收。因此,在使用 HeapBuffer 讀寫數據時,為了避免緩衝區數據因為 GC 而丟失,NIO 會先把 HeapBuffer 內部的數據拷貝到一個臨時的 DirectBuffer 中的本地內存(native memory),這個拷貝涉及到 sun.misc.Unsafe.copyMemory() 的調用,背後的實現原理與 memcpy() 類似。 最後,將臨時生成的 DirectBuffer 內部的數據的內存地址傳給 I/O 調用函數,這樣就避免了再去訪問 Java 對象處理 I/O 讀寫。

內存映射文件

內存映射文件 I/O 是一種讀和寫文件數據的方法,它可以比常規的基於流或者基於通道的 I/O 快得多。

向內存映射文件寫入可能是危險的,只是改變數組的單個元素這樣的簡單操作,就可能會直接修改磁盤上的文件。修改數據與將數據保存到磁盤是沒有分開的。

下面代碼行將文件的前 1024 個字節映射到內存中,map() 方法返回一個 MappedByteBuffer,它是 ByteBuffer 的子類。因此,可以像使用其他任何 ByteBuffer 一樣使用新映射的緩衝區,操作系統會在需要時負責執行映射。

MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, 1024);

MappedByteBuffer

MappedByteBuffer 是 NIO 基於內存映射(mmap)這種零拷貝方式的提供的一種實現,可以減少一次數據拷貝的過程。它繼承自 ByteBuffer。FileChannel 定義了一個 map() 方法,它可以把一個文件從 position 位置開始的 size 大小的區域映射為內存映像文件。抽象方法 map() 方法在 FileChannel 中的定義如下:

public abstract MappedByteBuffer map(MapMode mode, long position, long size)
        throws IOException;
  • mode:限定內存映射區域(MappedByteBuffer)對內存映像文件的訪問模式,包括只可讀(READ_ONLY)、可讀可寫(READ_WRITE)和寫時拷貝(PRIVATE)三種模式。
  • position:文件映射的起始地址,對應內存映射區域(MappedByteBuffer)的首地址。
  • size:文件映射的字節長度,從 position 往後的字節數,對應內存映射區域(MappedByteBuffer)的大小。

MappedByteBuffer 相比 ByteBuffer 新增了 fore()、load() 和 isLoad() 三個重要的方法:

  • fore():對於處於 READ_WRITE 模式下的緩衝區,把對緩衝區內容的修改強制刷新到本地文件。
  • load():將緩衝區的內容載入物理內存中,並返回這個緩衝區的引用。
  • isLoaded():如果緩衝區的內容在物理內存中,則返回 true,否則返回 false。

下面給出一個利用 MappedByteBuffer 對文件進行讀寫的使用示例:

private final static String CONTENT = "Zero copy implemented by MappedByteBuffer";
private final static String FILE_NAME = "/mmap.txt";
private final static String CHARSET = "UTF-8";
  • 寫文件數據:打開文件通道 fileChannel 並提供讀權限、寫權限和數據清空權限,通過 fileChannel 映射到一個可寫的內存緩衝區 mappedByteBuffer,將目標數據寫入 mappedByteBuffer,通過 force() 方法把緩衝區更改的內容強制寫入本地文件。
@Test
public void writeToFileByMappedByteBuffer() {
    Path path = Paths.get(getClass().getResource(FILE_NAME).getPath());
    byte[] bytes = CONTENT.getBytes(Charset.forName(CHARSET));
    try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ,
            StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {
        MappedByteBuffer mappedByteBuffer = fileChannel.map(READ_WRITE, 0, bytes.length);
        if (mappedByteBuffer != null) {
            mappedByteBuffer.put(bytes);
            mappedByteBuffer.force();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}
  • 讀文件數據:打開文件通道 fileChannel 並提供只讀權限,通過 fileChannel 映射到一個只可讀的內存緩衝區 mappedByteBuffer,讀取 mappedByteBuffer 中的字節數組即可得到文件數據。
@Test
public void readFromFileByMappedByteBuffer() {
    Path path = Paths.get(getClass().getResource(FILE_NAME).getPath());
    int length = CONTENT.getBytes(Charset.forName(CHARSET)).length;
    try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) {
        MappedByteBuffer mappedByteBuffer = fileChannel.map(READ_ONLY, 0, length);
        if (mappedByteBuffer != null) {
            byte[] bytes = new byte[length];
            mappedByteBuffer.get(bytes);
            String content = new String(bytes, StandardCharsets.UTF_8);
            assertEquals(content, "Zero copy implemented by MappedByteBuffer");
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

下面介紹 map() 方法的底層實現原理map() 方法是 java.nio.channels.FileChannel 的抽象方法,由子類 sun.nio.ch.FileChannelImpl.java 實現,下面是和內存映射相關的核心代碼:

public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
    int pagePosition = (int)(position % allocationGranularity);
    long mapPosition = position - pagePosition;
    long mapSize = size + pagePosition;
    try {
        addr = map0(imode, mapPosition, mapSize);
    } catch (OutOfMemoryError x) {
        System.gc();
        try {
            Thread.sleep(100);
        } catch (InterruptedException y) {
            Thread.currentThread().interrupt();
        }
        try {
            addr = map0(imode, mapPosition, mapSize);
        } catch (OutOfMemoryError y) {
            throw new IOException("Map failed", y);
        }
    }

    int isize = (int)size;
    Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
    if ((!writable) || (imode == MAP_RO)) {
        return Util.newMappedByteBufferR(isize, addr + pagePosition, mfd, um);
    } else {
        return Util.newMappedByteBuffer(isize, addr + pagePosition, mfd, um);
    }
}

map() 方法通過本地方法 map0() 為文件分配一塊虛擬內存,作為它的內存映射區域,然後返回這塊內存映射區域的起始地址。

  • 文件映射需要在 Java 堆中創建一個 MappedByteBuffer 的實例。如果第一次文件映射導致 OOM,則手動觸發垃圾回收,休眠 100ms 後再嘗試映射,如果失敗則拋出異常。
  • 通過 Util 的 newMappedByteBuffer (可讀可寫)方法或者 newMappedByteBufferR(僅讀) 方法方法反射創建一個 DirectByteBuffer 實例,其中 DirectByteBuffer 是 MappedByteBuffer 的子類。

map() 方法返回的是內存映射區域的起始地址,通過(起始地址 + 偏移量)就可以獲取指定內存的數據。這樣一定程度上替代了 read()write() 方法,底層直接採用 sun.misc.Unsafe類的 getByte()putByte() 方法對數據進行讀寫。

private native long map0(int prot, long position, long mapSize) throws IOException;

上面是本地方法(native method)map0 的定義,它通過 JNI(Java Native Interface)調用底層 C 的實現,這個 native 函數(Java_sun_nio_ch_FileChannelImpl_map0)的實現位於 JDK 源碼包下的 native/sun/nio/ch/FileChannelImpl.c這個源文件裏面。

JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this,
                                     jint prot, jlong off, jlong len)
{
    void *mapAddress = 0;
    jobject fdo = (*env)->GetObjectField(env, this, chan_fd);
    jint fd = fdval(env, fdo);
    int protections = 0;
    int flags = 0;

    if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
        protections = PROT_READ;
        flags = MAP_SHARED;
    } else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
        protections = PROT_WRITE | PROT_READ;
        flags = MAP_SHARED;
    } else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
        protections =  PROT_WRITE | PROT_READ;
        flags = MAP_PRIVATE;
    }

    mapAddress = mmap64(
        0,                    /* Let OS decide location */
        len,                  /* Number of bytes to map */
        protections,          /* File permissions */
        flags,                /* Changes are shared */
        fd,                   /* File descriptor of mapped file */
        off);                 /* Offset into file */

    if (mapAddress == MAP_FAILED) {
        if (errno == ENOMEM) {
            JNU_ThrowOutOfMemoryError(env, "Map failed");
            return IOS_THROWN;
        }
        return handle(env, -1, "Map failed");
    }

    return ((jlong) (unsigned long) mapAddress);
}

可以看出 map0() 函數最終是通過 mmap64() 這個函數對 Linux 底層內核發出內存映射的調用, mmap64() 函數的原型如下:

#include <sys/mman.h>

void *mmap64(void *addr, size_t len, int prot, int flags, int fd, off64_t offset);

下面詳細介紹一下 mmap64() 函數各個參數的含義以及參數可選值:

  • addr:文件在用户進程空間的內存映射區中的起始地址,是一個建議的參數,通常可設置為 0 或 NULL,此時由內核去決定真實的起始地址。當 + flags 為 MAP_FIXED 時,addr 就是一個必選的參數,即需要提供一個存在的地址。
  • len:文件需要進行內存映射的字節長度
  • prot :控制用户進程對內存映射區的訪問權限
    • PROT_READ:讀權限
    • PROT_WRITE:寫權限
    • PROT_EXEC:執行權限
    • PROT_NONE:無權限
  • flags:控制內存映射區的修改是否被多個進程共享
    • MAP_PRIVATE:對內存映射區數據的修改不會反映到真正的文件,數據修改發生時採用寫時複製機制
    • MAP_SHARED:對內存映射區的修改會同步到真正的文件,修改對共享此內存映射區的進程是可見的
    • MAP_FIXED:不建議使用,這種模式下 addr 參數指定的必須的提供一個存在的 addr 參數
  • fd:文件描述符。每次 map 操作會導致文件的引用計數加 1,每次 unmap 操作或者結束進程會導致引用計數減 1
  • offset:文件偏移量。進行映射的文件位置,從文件起始地址向後的位移量

下面總結一下 MappedByteBuffer 的特點和不足之處:

  • MappedByteBuffer 使用是堆外的虛擬內存,因此分配(map)的內存大小不受 JVM 的 -Xmx 參數限制,但是也是有大小限制的。 如果當文件超出 Integer.MAX_VALUE 字節限制時,可以通過 position 參數重新 map 文件後面的內容。
  • MappedByteBuffer 在處理大文件時性能的確很高,但也存內存佔用、文件關閉不確定等問題,被其打開的文件只有在垃圾回收的才會被關閉,而且這個時間點是不確定的。
  • MappedByteBuffer 提供了文件映射內存的 mmap() 方法,也提供了釋放映射內存的 unmap() 方法。然而 unmap() 是 FileChannelImpl 中的私有方法,無法直接顯示調用。因此,用户程序需要通過 Java 反射的調用 sun.misc.Cleaner 類的 clean() 方法手動釋放映射佔用的內存區域
public static void clean(final Object buffer) throws Exception {
    AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
        try {
            Method getCleanerMethod = buffer.getClass().getMethod("cleaner", new Class[0]);
            getCleanerMethod.setAccessible(true);
            Cleaner cleaner = (Cleaner) getCleanerMethod.invoke(buffer, new Object[0]);
            cleaner.clean();
        } catch(Exception e) {
            e.printStackTrace();
        }
    });
}

DirectByteBuffer

DirectByteBuffer 的對象引用位於 Java 內存模型的堆裏面,JVM 可以對 DirectByteBuffer 的對象進行內存分配和回收管理,一般使用 DirectByteBuffer 的靜態方法 allocateDirect() 創建 DirectByteBuffer 實例並分配內存。

public static ByteBuffer allocateDirect(int capacity) {
    return new DirectByteBuffer(capacity);
}

DirectByteBuffer 內部的字節緩衝區位在於堆外的(用户態)直接內存,它是通過 Unsafe 的本地方法 allocateMemory() 進行內存分配,底層調用的是操作系統的 malloc() 函數,因此DirectByteBuffer 使用的是操作系統內存。

DirectByteBuffer(int cap) {
    super(-1, 0, cap, cap);
    boolean pa = VM.isDirectMemoryPageAligned();
    int ps = Bits.pageSize();
    long size = Math.max(1L, (long)cap + (pa ? ps : 0));
    Bits.reserveMemory(size, cap);

    long base = 0;
    try {
        base = unsafe.allocateMemory(size);
    } catch (OutOfMemoryError x) {
        Bits.unreserveMemory(size, cap);
        throw x;
    }
    unsafe.setMemory(base, size, (byte) 0);
    if (pa && (base % ps != 0)) {
        address = base + ps - (base & (ps - 1));
    } else {
        address = base;
    }
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    att = null;
}

使用 DirectByteBuf 將堆外內存映射到 jvm 內存中來直接訪問使用

  • 這塊內存不受 jvm 垃圾回收的影響,因此內存地址固定,有助於 IO 讀寫
  • java 中的 DirectByteBuf 對象僅維護了此內存的虛引用,內存回收分成兩步
    • DirectByteBuf 對象被垃圾回收,將虛引用加入引用隊列
    • 通過專門線程訪問引用隊列,根據虛引用釋放堆外內存
  • 減少了一次數據拷貝,用户態與內核態的切換次數沒有減少

除此之外,初始化 DirectByteBuffer 時還會創建一個 Deallocator 線程,並通過 Cleaner 的 freeMemory() 方法來對直接內存進行回收操作,freeMemory() 底層調用的是操作系統的 free() 函數。

private static class Deallocator implements Runnable {
    private static Unsafe unsafe = Unsafe.getUnsafe();

    private long address;
    private long size;
    private int capacity;

    private Deallocator(long address, long size, int capacity) {
        assert (address != 0);
        this.address = address;
        this.size = size;
        this.capacity = capacity;
    }

    public void run() {
        if (address == 0) {
            return;
        }
        unsafe.freeMemory(address);
        address = 0;
        Bits.unreserveMemory(size, capacity);
    }
}

由於使用 DirectByteBuffer 分配的是系統本地的內存,不在 JVM 的管控範圍之內,因此直接內存的回收和堆內存的回收不同,直接內存如果使用不當,很容易造成 OutOfMemoryError。

説了這麼多,那麼 DirectByteBuffer 和零拷貝有什麼關係?前面有提到在 MappedByteBuffer 進行內存映射時,它的 map() 方法會通過 Util.newMappedByteBuffer() 來創建一個緩衝區實例,初始化的代碼如下:

static MappedByteBuffer newMappedByteBuffer(int size, long addr, FileDescriptor fd,
                                            Runnable unmapper) {
    MappedByteBuffer dbb;
    if (directByteBufferConstructor == null)
        initDBBConstructor();
    try {
        dbb = (MappedByteBuffer)directByteBufferConstructor.newInstance(
            new Object[] { new Integer(size), new Long(addr), fd, unmapper });
    } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
        throw new InternalError(e);
    }
    return dbb;
}

private static void initDBBRConstructor() {
    AccessController.doPrivileged(new PrivilegedAction<Void>() {
        public Void run() {
            try {
                Class<?> cl = Class.forName("java.nio.DirectByteBufferR");
                Constructor<?> ctor = cl.getDeclaredConstructor(
                    new Class<?>[] { int.class, long.class, FileDescriptor.class,
                                    Runnable.class });
                ctor.setAccessible(true);
                directByteBufferRConstructor = ctor;
            } catch (ClassNotFoundException | NoSuchMethodException |
                     IllegalArgumentException | ClassCastException x) {
                throw new InternalError(x);
            }
            return null;
        }});
}

DirectByteBuffer 是 MappedByteBuffer 的具體實現類,也就是基於底層操作系統的 mmap 技術。實際上,Util.newMappedByteBuffer() 方法通過反射機制獲取 DirectByteBuffer 的構造器,然後創建一個 DirectByteBuffer 的實例,對應的是一個單獨用於內存映射的構造方法:

protected DirectByteBuffer(int cap, long addr, FileDescriptor fd, Runnable unmapper) {
    super(-1, 0, cap, cap, fd);
    address = addr;
    cleaner = Cleaner.create(this, unmapper);
    att = null;
}

因此,除了允許分配操作系統的直接內存以外,DirectByteBuffer 本身也具有文件內存映射的功能,這裏不做過多説明。我們需要關注的是,DirectByteBuffer 在 MappedByteBuffer 的基礎上提供了內存映像文件的隨機讀取 get() 和寫入 write() 的操作。

  • 內存映像文件的隨機讀操作
public byte get() {
    return ((unsafe.getByte(ix(nextGetIndex()))));
}

public byte get(int i) {
    return ((unsafe.getByte(ix(checkIndex(i)))));
}
  • 內存映像文件的隨機寫操作
public ByteBuffer put(byte x) {
    unsafe.putByte(ix(nextPutIndex()), ((x)));
    return this;
}

public ByteBuffer put(int i, byte x) {
    unsafe.putByte(ix(checkIndex(i)), ((x)));
    return this;
}

內存映像文件的隨機讀寫都是藉助 ix() 方法實現定位的, ix() 方法通過內存映射空間的內存首地址(address)和給定偏移量 i 計算出指針地址,然後由 unsafe 類的 get() 和 put() 方法和對指針指向的數據進行讀取或寫入。

private long ix(int i) {
    return address + ((long)i << 0);
}

FileChannel

FileChannel 是一個用於文件讀寫、映射和操作的通道,同時它在併發環境下是線程安全的,基於 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 getChannel() 方法可以創建並打開一個文件通道。FileChannel 定義了 transferFrom() 和 transferTo() 兩個抽象方法,它通過在通道和通道之間建立連接實現數據傳輸的。

  • transferTo():通過 FileChannel 把文件裏面的源數據寫入一個 WritableByteChannel 的目的通道。
public abstract long transferTo(long position, long count, WritableByteChannel target)
        throws IOException;
  • transferFrom():把一個源通道 ReadableByteChannel 中的數據讀取到當前 FileChannel 的文件裏面。
public abstract long transferFrom(ReadableByteChannel src, long position, long count)
        throws IOException;

下面介紹 transferTo() 和 transferFrom() 方法的底層實現原理,這兩個方法也是 java.nio.channels.FileChannel 的抽象方法,由子類 sun.nio.ch.FileChannelImpl.java 實現。transferTo() 和 transferFrom() 底層都是基於sendfile的方式 實現數據傳輸的,其中 FileChannelImpl.java 定義了 3 個常量,用於標示當前操作系統的內核是否支持 sendfile 以及 sendfile 的相關特性。

private static volatile boolean transferSupported = true;
private static volatile boolean pipeSupported = true;
private static volatile boolean fileSupported = true;
  • transferSupported:用於標記當前的系統內核是否支持 sendfile() 調用,默認為 true。
  • pipeSupported:用於標記當前的系統內核是否支持文件描述符(fd)基於管道(pipe)的 sendfile() 調用,默認為 true。
  • fileSupported:用於標記當前的系統內核是否支持文件描述符(fd)基於文件(file)的 sendfile() 調用,默認為 true。

來分析一下其中原理:

  • transferTo()方法直接將當前通道內容傳輸到另一個通道,沒有涉及到Buffer的任何操作,NIO中的Buffer是JVM堆或者堆外內存,但不論如何他們都是操作系統內核空間的內存。也就是説這種方式不會有內核緩衝區和用户緩衝區之間的拷貝問題。
  • transferTo()的實現方式就是通過系統調用sendfile()(當然這是Linux中的系統調用),根據我們上面所寫説這個過程是效率遠高於從內核緩衝區到用户緩衝區的讀寫的。
  • 同理transferFrom()也是這種實現方式。

下面以 transferTo() 的源碼實現為例。FileChannelImpl 首先執行 transferToDirectly() 方法,以 sendfile 的零拷貝方式嘗試數據拷貝。如果系統內核不支持 sendfile,進一步執行 transferToTrustedChannel() 方法,以 mmap 的零拷貝方式進行內存映射,這種情況下目的通道必須是 FileChannelImpl 或者 SelChImpl 類型。如果以上兩步都失敗了,則執行 transferToArbitraryChannel() 方法,基於傳統的 I/O 方式完成讀寫,具體步驟是初始化一個臨時的 DirectBuffer,將源通道 FileChannel 的數據讀取到 DirectBuffer,再寫入目的通道 WritableByteChannel 裏面。

public long transferTo(long position, long count, WritableByteChannel target)
        throws IOException {
    // 計算文件的大小
    long sz = size();
    // 校驗起始位置
    if (position > sz)
        return 0;
    int icount = (int)Math.min(count, Integer.MAX_VALUE);
    // 校驗偏移量
    if ((sz - position) < icount)
        icount = (int)(sz - position);

    long n;

    if ((n = transferToDirectly(position, icount, target)) >= 0)
        return n;

    if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
        return n;

    return transferToArbitraryChannel(position, icount, target);
}

接下來重點分析一下 transferToDirectly() 方法的實現,也就是 transferTo() 通過 sendfile 實現零拷貝的精髓所在。可以看到,transferToDirectlyInternal() 方法先獲取到目的通道 WritableByteChannel 的文件描述符 targetFD,獲取同步鎖然後執行 transferToDirectlyInternal() 方法。

private long transferToDirectly(long position, int icount, WritableByteChannel target)
        throws IOException {
    // 省略從target獲取targetFD的過程
    if (nd.transferToDirectlyNeedsPositionLock()) {
        synchronized (positionLock) {
            long pos = position();
            try {
                return transferToDirectlyInternal(position, icount,
                        target, targetFD);
            } finally {
                position(pos);
            }
        }
    } else {
        return transferToDirectlyInternal(position, icount, target, targetFD);
    }
}

最終由 transferToDirectlyInternal() 調用本地方法 transferTo0() ,嘗試以 sendfile 的方式進行數據傳輸。如果系統內核完全不支持 sendfile,比如 Windows 操作系統,則返回 UNSUPPORTED 並把 transferSupported 標識為 false。如果系統內核不支持 sendfile 的一些特性,比如説低版本的 Linux 內核不支持 DMA gather copy 操作,則返回 UNSUPPORTED_CASE 並把 pipeSupported 或者 fileSupported 標識為 false。

private long transferToDirectlyInternal(long position, int icount,
                                        WritableByteChannel target,
                                        FileDescriptor targetFD) throws IOException {
    assert !nd.transferToDirectlyNeedsPositionLock() ||
            Thread.holdsLock(positionLock);

    long n = -1;
    int ti = -1;
    try {
        begin();
        ti = threads.add();
        if (!isOpen())
            return -1;
        do {
            n = transferTo0(fd, position, icount, targetFD);
        } while ((n == IOStatus.INTERRUPTED) && isOpen());
        if (n == IOStatus.UNSUPPORTED_CASE) {
            if (target instanceof SinkChannelImpl)
                pipeSupported = false;
            if (target instanceof FileChannelImpl)
                fileSupported = false;
            return IOStatus.UNSUPPORTED_CASE;
        }
        if (n == IOStatus.UNSUPPORTED) {
            transferSupported = false;
            return IOStatus.UNSUPPORTED;
        }
        return IOStatus.normalize(n);
    } finally {
        threads.remove(ti);
        end (n > -1);
    }
}

本地方法(native method)transferTo0() 通過 JNI(Java Native Interface)調用底層 C 的函數,這個 native 函數(Java_sun_nio_ch_FileChannelImpl_transferTo0)同樣位於 JDK 源碼包下的 native/sun/nio/ch/FileChannelImpl.c 源文件裏面。JNI 函數 Java_sun_nio_ch_FileChannelImpl_transferTo0() 基於條件編譯對不同的系統進行預編譯,下面是 JDK 基於 Linux 系統內核對 transferTo() 提供的調用封裝。

#if defined(__linux__) || defined(__solaris__)
#include <sys/sendfile.h>
#elif defined(_AIX)
#include <sys/socket.h>
#elif defined(_ALLBSD_SOURCE)
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>

#define lseek64 lseek
#define mmap64 mmap
#endif

JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
                                            jobject srcFDO,
                                            jlong position, jlong count,
                                            jobject dstFDO)
{
    jint srcFD = fdval(env, srcFDO);
    jint dstFD = fdval(env, dstFDO);

#if defined(__linux__)
    off64_t offset = (off64_t)position;
    jlong n = sendfile64(dstFD, srcFD, &offset, (size_t)count);
    return n;
#elif defined(__solaris__)
    result = sendfilev64(dstFD, &sfv, 1, &numBytes);    
    return result;
#elif defined(__APPLE__)
    result = sendfile(srcFD, dstFD, position, &numBytes, NULL, 0);
    return result;
#endif
}

對 Linux、Solaris 以及 Apple 系統而言,transferTo0() 函數底層會執行 sendfile64 這個系統調用完成零拷貝操作,sendfile64() 函數的原型如下:

#include <sys/sendfile.h>

ssize_t sendfile64(int out_fd, int in_fd, off_t *offset, size_t count);

下面簡單介紹一下 sendfile64() 函數各個參數的含義:

  • out_fd:待寫入的文件描述符
  • in_fd:待讀取的文件描述符
  • offset:指定 in_fd 對應文件流的讀取位置,如果為空,則默認從起始位置開始
  • count:指定在文件描述符 in_fd 和 out_fd 之間傳輸的字節數

在 Linux 2.6.3 之前,out_fd 必須是一個 socket,而從 Linux 2.6.3 以後,out_fd 可以是任何文件。也就是説,sendfile64() 函數不僅可以進行網絡文件傳輸,還可以對本地文件實現零拷貝操作。

其它的零拷貝實現

Netty零拷貝

Netty 中的零拷貝和上面提到的操作系統層面上的零拷貝不太一樣, 我們所説的 Netty 零拷貝完全是基於(Java 層面)用户態的,它的更多的是偏向於數據操作優化這樣的概念,具體表現在以下幾個方面:

Netty 通過 DefaultFileRegion 類對 java.nio.channels.FileChannel 的 tranferTo() 方法進行包裝,在文件傳輸時可以將文件緩衝區的數據直接發送到目的通道(Channel)

ByteBuf 可以通過 wrap 操作把字節數組、ByteBuf、ByteBuffer 包裝成一個 ByteBuf 對象, 進而避免了拷貝操作 ByteBuf 支持 slice 操作, 因此可以將 ByteBuf 分解為多個共享同一個存儲區域的 ByteBuf,避免了內存的拷貝 Netty 提供了 CompositeByteBuf 類,它可以將多個 ByteBuf 合併為一個邏輯上的 ByteBuf,避免了各個 ByteBuf 之間的拷貝 其中第 1 條屬於操作系統層面的零拷貝操作,後面 3 條只能算用户層面的數據操作優化。

RocketMQ和Kafka對比

RocketMQ 選擇了 mmap + write 這種零拷貝方式,適用於業務級消息這種小塊文件的數據持久化和傳輸;而 Kafka 採用的是 sendfile 這種零拷貝方式,適用於系統日誌消息這種高吞吐量的大塊文件的數據持久化和傳輸。但是值得注意的一點是,Kafka 的索引文件使用的是 mmap + write 方式,數據文件使用的是 sendfile 方式。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.