博客 / 詳情

返回

從0構建深度學習框架——揭秘深度學習框架的黑箱

引言

你有沒有好奇過,當你在 PyTorch 或 TensorFlow 中調用 .backward() 計算梯度時,框架到底在背後做了什麼?

我們每天都在使用這些成熟的深度學習工具,但很少有人真正去探索它們的底層實現——自動微分的魔法、計算圖的構建、張量運算的優化……這些隱藏在API背後的核心原理,才是深度學習的真正基石。

今天,我將向大家介紹一個從零開始實現的輕量級深度學習框架——Needle。它不是為了替代現有的成熟框架,而是為了幫你揭開深度學習的神秘面紗

  • 你將親手見證自動微分如何通過計算圖和鏈式法則實現
  • 你將理解張量運算的底層邏輯和實現原理
  • 你將學會如何從0到1構建自己的深度學習組件

Needle的代碼結構清晰,註釋詳細,所有功能都從零實現,是學習深度學習核心原理的最佳實踐項目。

Needle 的核心功能

Needle 雖然是一個輕量級框架,但卻實現了深度學習框架的所有核心功能:

🧮 自動微分系統

自動微分是深度學習的核心技術之一,Needle 實現了基於計算圖的自動微分系統。當你執行張量運算時,框架會自動構建計算圖,記錄所有運算操作和依賴關係。當需要計算梯度時,框架會沿着計算圖反向傳播,自動計算每個參數的梯度,無需手動推導。

➕ 豐富的張量運算

Needle 支持各種基本的張量運算,包括:

  • 元素級運算(Add, Mul, Div, Pow等)
  • 矩陣運算(MatMul, Transpose, Dot等)
  • 歸約運算(Sum, Mean, Max等)
  • 變形運算(Reshape, Broadcast, Concatenate等)

這些運算與 PyTorch 等主流框架的 API 設計非常相似,易於上手,同時保持了代碼的簡潔性和可理解性。

🔌 多後端支持

Needle 支持兩種後端:

  • NumPy 後端:用於快速原型開發和 CPU 計算,便於調試和學習
  • 自定義 NDArray 後端:支持 CPU 和 CUDA 加速,提供了更高的性能

通過環境變量 NEEDLE_BACKEND 可以輕鬆切換後端,滿足不同場景的需求。

📦 神經網絡模塊

Needle 提供了完整的神經網絡模塊支持,包括:

  • 線性層(Linear)、卷積層(Conv)等基礎組件
  • ReLU、Sigmoid、Softmax 等激活函數
  • 損失函數(CrossEntropyLoss, MSELoss等)

這些模塊與自動微分系統無縫集成,方便構建各種複雜的神經網絡模型。

🎯 優化算法

Needle 實現了多種常用的優化算法,包括:

  • 隨機梯度下降(SGD)
  • Momentum
  • RMSprop
  • Adam

這些優化算法可以直接用於訓練神經網絡模型,支持權重衰減和學習率調度等功能。

📊 數據加載

Needle 提供了數據加載和預處理支持,包括:

  • 自定義 Dataset 接口
  • DataLoader 用於批量加載數據
  • 常用的數據變換操作

這些功能可以幫助用户輕鬆處理各種數據集,提高訓練效率。

接下來,讓我們深入瞭解 Needle 背後的深度學習原理。我將用簡單的例子和代碼來説明自動微分、計算圖等核心概念。

計算圖 & 自動微分的從零實現

早期的 ML 通常從手動計算梯度開始,但是隨着網絡變得越來越複雜,手動計算梯度變得越來越困難。

為了解決這個問題,自動微分被提出。自動微分是一種計算梯度的方法,它可以自動計算函數在任意點的梯度。

自動微分通過將所有計算操作表示為計算圖,其中值(包括張量)為計算圖中的節點,計算(op)是圖中的邊(一個 op 可能是多條邊)。將梯度計算轉換為計算圖上的反向傳播。最終利用鏈式法的迭代計算,一步步算出最終的完整梯度。

我們來從零實現一個自動微分計算框架,完整代碼參考.

先定義計算圖的邊,也就是計算(op)本身,其抽象類非常簡單:

class Op:
    """Operator definition."""
    def __call__(self, *args):
        raise NotImplementedError()

    def compute(self, *args: Tuple[NDArray]):
        """Calculate forward pass of operator.

        Parameters
        ----------
        input: np.ndarray
            A list of input arrays to the function

        Returns
        -------
        output: nd.array
            Array output of the operation

        """
        raise NotImplementedError()

    def gradient(
        self, out_grad: "Value", node: "Value"
    ) -> Union["Value", Tuple["Value"]]:
        """Compute partial adjoint for each input value for a given output adjoint.

        Parameters
        ----------
        out_grad: Value
            The adjoint wrt to the output value.

        node: Value
            The value node of forward evaluation.

        Returns
        -------
        input_grads: Value or Tuple[Value]
            A list containing partial gradient adjoints to be propagated to
            each of the input node.
        """
        raise NotImplementedError()

compute 和 gradient 分別是這條邊上兩個方向的計算,compute 是前向計算,gradient 是反向計算。
compute 依賴與這個計算的所有輸入作為入參,並計算出輸出節點的值;gradient 依賴與這個計算的所有輸入和最終輸出的 adjoint 作為入參,並計算出這個計算的所有輸入節點的 adjoint。

比起單值,深度學習更通常處理張量值,因此我們再定義一組專門處理張量的計算(op).

class TensorOp(Op):
    """Op class specialized to output tensors, will be alternate subclasses for other structures"""
    def __call__(self, *args):
        return Tensor.make_from_op(self, args)

class TensorTupleOp(Op):
    """Op class specialized to output TensorTuple"""
    def __call__(self, *args):
        return TensorTuple.make_from_op(self, args)

接着定義計算圖的節點,也就是值(value)本身,其抽象類也非常簡單:

class Value:
    """A value in the computational graph."""

    # trace of computational graph
    op: Optional[Op]
    inputs: List["Value"]
    # The following fields are cached fields for
    # dynamic computation
    cached_data: NDArray
    requires_grad: bool

    def realize_cached_data(self):
        """Run compute to realize the cached data"""
        # avoid recomputation
        if self.cached_data is not None:
            return self.cached_data
        # note: data implicitly calls realized cached data
        self.cached_data = self.op.compute(
            *[x.realize_cached_data() for x in self.inputs]
        )
        return self.cached_data

    def is_leaf(self):
        return self.op is None

    def __del__(self):
        global TENSOR_COUNTER
        TENSOR_COUNTER -= 1

    def _init(
        self,
        op: Optional[Op],
        inputs: List["Tensor"],
        *,
        num_outputs: int = 1,
        cached_data: List[object] = None,
        requires_grad: Optional[bool] = None,
    ):
        global TENSOR_COUNTER
        TENSOR_COUNTER += 1
        if requires_grad is None:
            requires_grad = any(x.requires_grad for x in inputs)
        self.op = op
        self.inputs = inputs
        self.num_outputs = num_outputs
        self.cached_data = cached_data
        self.requires_grad = requires_grad

    @classmethod
    def make_const(cls, data, *, requires_grad=False):
        value = cls.__new__(cls)
        value._init(
            None,
            [],
            cached_data=data,
            requires_grad=requires_grad,
        )
        return value

    @classmethod
    def make_from_op(cls, op: Op, inputs: List["Value"]):
        value = cls.__new__(cls)
        value._init(op, inputs)

        if not LAZY_MODE:
            if not value.requires_grad:
                return value.detach()
            value.realize_cached_data()
        return value

Value 的定義串聯起來了整個計算圖,每個值節點中記錄了這個值是由哪個計算(op)得到的,以及這個計算的所有輸入節點(也是值節點)。
為了避免重複計算定義了緩存區域,以及是否需要計算梯度的 flag。
is_leaf 方法用於判斷這個值節點是否是葉子節點,也就是是否沒有輸入節點。通過這個信息可以對計算圖進行拓撲排序,從而完成前向和反向傳播的計算。

接下來進入主角,Tensor這種最常用 Value 的定義。

class Tensor(Value):
    grad: "Tensor"

    def __init__(
        self,
        array,
        *,
        device: Optional[Device] = None,
        dtype=None,
        requires_grad=True,
        **kwargs,
    ):
        if isinstance(array, Tensor):
            if device is None:
                device = array.device
            if dtype is None:
                dtype = array.dtype
            if device == array.device and dtype == array.dtype:
                cached_data = array.realize_cached_data()
            else:
                # fall back, copy through numpy conversion
                cached_data = Tensor._array_from_numpy(
                    array.numpy(), device=device, dtype=dtype
                )
        else:
            device = device if device else cpu()
            cached_data = Tensor._array_from_numpy(array, device=device, dtype=dtype)

        self._init(
            None,
            [],
            cached_data=cached_data,
            requires_grad=requires_grad,
        )

    @staticmethod
    def make_const(data, requires_grad=False):
        tensor = Tensor.__new__(Tensor)
        tensor._init(
            None,
            [],
            cached_data=(
                data if not isinstance(data, Tensor) else data.realize_cached_data()
            ),
            requires_grad=requires_grad,
        )
        return tensor

    @property
    def data(self):
        return self.detach()

    @property
    def shape(self):
        return self.realize_cached_data().shape

    @property
    def dtype(self):
        return self.realize_cached_data().dtype

    @property
    def device(self):
        data = self.realize_cached_data()
        # numpy array always sits on cpu
        if array_api is numpy:
            return cpu()
        return data.device

    def backward(self, out_grad=None):
        out_grad = (
            out_grad
            if out_grad
            else init.ones(*self.shape, dtype=self.dtype, device=self.device)
        )
        compute_gradient_of_variables(self, out_grad)

    def __add__(self, other):
        if isinstance(other, Tensor):
            return needle.ops.EWiseAdd()(self, other)
        else:
            return needle.ops.AddScalar(other)(self)

這裏 device 是用來指定計算設備的,支持 cpu 和 gpu,我們後續為分別基於 CPU 和 GPU 實現不同的計算後端。
Tensor 本身需要支持各類矩陣數值計算,包括加減乘除、矩陣乘法、求和、廣播、reshape 等。這裏為了簡單隻定義了加法和加法標量的實現。完整代碼請參考 autograd.py。

上面的compute_gradient_of_variables函數用於計算一個張量的反向傳播。我們需要遍歷這個張量所在計算圖,並按照拓撲排序進行反向傳播,主要實現如下:

def compute_gradient_of_variables(output_tensor, out_grad):
    """Take gradient of output node with respect to each node in node_list.

    Store the computed result in the grad field of each Variable.
    """
    # a map from node to a list of gradient contributions from each output node
    node_to_output_grads_list: Dict[Tensor, List[Tensor]] = {}
    # Special note on initializing gradient of
    # We are really taking a derivative of the scalar reduce_sum(output_node)
    # instead of the vector output_node. But this is the common case for loss function.
    node_to_output_grads_list[output_tensor] = [out_grad]

    # Traverse graph in reverse topological order given the output_node that we are taking gradient wrt.
    reverse_topo_order = list(reversed(find_topo_sort([output_tensor])))

    for tenser in reverse_topo_order:
        tensor_grad = sum_node_list(node_to_output_grads_list[tenser])
        tenser.grad = tensor_grad
        # leaf node
        if tenser.op is None:
            continue
        inputs_out_grads = tenser.op.gradient_as_tuple(tensor_grad, tenser)
        for input, out_grad in zip(tenser.inputs, inputs_out_grads):
            if input not in node_to_output_grads_list:
                node_to_output_grads_list[input] = []
            node_to_output_grads_list[input].append(out_grad)


def find_topo_sort(node_list: List[Value]) -> List[Value]:
    """Given a list of nodes, return a topological sort list of nodes ending in them.

    A simple algorithm is to do a post-order DFS traversal on the given nodes,
    going backwards based on input edges. Since a node is added to the ordering
    after all its predecessors are traversed due to post-order DFS, we get a topological
    sort.
    """
    visited = set()
    order = []
    for node in node_list:
        topo_sort_dfs(node, visited, order)
    return order


def topo_sort_dfs(node, visited, topo_order):
    """Post-order DFS"""
    if node not in visited:
        visited.add(node)
        for n in node.inputs:
            topo_sort_dfs(n, visited, topo_order)
        topo_order.append(node)
        """Post-order DFS"""

上面我們已經完成了整個計算圖的構建,接下來便是細化上面提到的張量需要支持的各類計算(op)的正向傳播和反向傳播的邏輯。
張量內部的數據是矩陣,因此支持各類矩陣的常用運算,這裏的實現主要是體力活,通過底層提供的矩陣計算來實現正向和反向傳播,具體完整實現可以參考ops_logarithmic.py和ops_mathematic.py。

矩陣計算的從零實現

矩陣的表示:無論任何維度(shape)的矩陣,我們都可以將其表示為一個一維數組,只是需要根據 strides 來計算每個元素的位置。

理解strides的含義是實現高效矩陣計算的關鍵。strides 表示每個維度上的步長,即每個元素在內存中的偏移量。
例如,對於一個 $2 \times 3$ 的矩陣,其 strides 為 $[3, 1]$,表示在內存中每個元素的偏移量為 $3$ 個元素大小(如果元素大小為 $4$ 字節),而在第二個維度上每個元素的偏移量為 $1$ 個元素大小。
如果需要對這個 $2 \times 3$ 的矩陣進行轉置操作得到一個 $3 \times 2$ 的矩陣,只需要對原矩陣的 strides 進行交換即可,即 $[3, 1] \rightarrow [1, 3]$。完全不需要對底層矩陣元素進行復制。類似的操作包括但不限於:

  • transpose
  • reshape
  • broadcast
  • view
  • squeeze/unsqueeze
  • slice
  • flip
  • reverse
  • permute

無論是CPU還是 GPU 介質,優化矩陣運算的關鍵都是如何利用好更告訴的存儲。

  • 對 CPU 來説是如何實現 cache 友好的矩陣運算,減少 cache miss帶來的對 main memory 的訪問。
  • 對 GPU 來説是如何利用好 shared memory,減少對 GPU global memory 的訪問。
    我們將分別在下面的 CPU 和 GPU 矩陣運算實現中詳細介紹。

CPU 矩陣運算實現

完整代碼參考ndarray_backend_cpu.cc.

使用矩陣分塊乘法,將矩陣乘法分解為多個小的矩陣乘法,每個小的矩陣乘法可以利用 cache 友好的方式進行計算,減少 cache miss 帶來的對 main memory 的訪問。
每個小的矩陣乘法的大小為 $TILE \times TILE$,其中 $TILE$ 是一個超參數,一般取 $8$ 或 $16$。 取 $TILE$ 為 $8$ 或 $16$ 是因為現代 CPU 的 cache 行大小為 $64$ 字節,即兩個 $8$ 個 $float32$ 類型的元素。

#define ALIGNMENT 256
#define TILE 8
typedef float scalar_t;
const size_t ELEM_SIZE = sizeof(scalar_t);

/**
 * This is a utility structure for maintaining an array aligned to ALIGNMENT
 * boundaries in memory.  This alignment should be at least TILE * ELEM_SIZE,
 * though we make it even larger here by default.
 */
struct AlignedArray {
  AlignedArray(const size_t size) {
    int ret = posix_memalign((void **)&ptr, ALIGNMENT, size * ELEM_SIZE);
    if (ret != 0)
      throw std::bad_alloc();
    this->size = size;
  }
  ~AlignedArray() { free(ptr); }
  size_t ptr_as_int() { return (size_t)ptr; }
  scalar_t *ptr;
  size_t size;
};


void MatmulTiled(const AlignedArray &a, const AlignedArray &b,
                 AlignedArray *out, uint32_t m, uint32_t n, uint32_t p) {
  /**
   * Matrix multiplication on tiled representations of array.  In this
   * setting, a, b, and out are all *4D* compact arrays of the appropriate
   * size, e.g. a is an array of size a[m/TILE][n/TILE][TILE][TILE] You should
   * do the multiplication tile-by-tile to improve performance of the array
   * (i.e., this function should call `AlignedDot()` implemented above).
   *
   * Note that this function will only be called when m, n, p are all
   * multiples of TILE, so you can assume that this division happens without
   * any remainder.
   *
   * Args:
   *   a: compact 4D array of size m/TILE x n/TILE x TILE x TILE
   *   b: compact 4D array of size n/TILE x p/TILE x TILE x TILE
   *   out: compact 4D array of size m/TILE x p/TILE x TILE x TILE to write to
   *   m: rows of a / out
   *   n: columns of a / rows of b
   *   p: columns of b / out
   *
   */
  for (size_t i = 0; i < m * p; ++i) {
    out->ptr[i] = 0.;
  }
  for (size_t i = 0; i < m / TILE; i++) {
    for (size_t j = 0; j < p / TILE; j++) {
      float *_out = out->ptr + i * p * TILE + j * TILE * TILE;
      for (size_t k = 0; k < n / TILE; k++) {
        const float *_a = a.ptr + i * n * TILE + k * TILE * TILE;
        const float *_b = b.ptr + k * p * TILE + j * TILE * TILE;
        AlignedDot(_a, _b, _out);
      }
    }
  }
}

GPU 矩陣運算實現

完整代碼參考ndarray_backend_cuda.cu.

使用矩陣分塊乘法,將矩陣乘法分解為多個小的矩陣乘法,每個小的矩陣乘法可以利用 shared memory 友好的方式進行計算,減少對 GPU global memory 的訪問。
每個小的矩陣乘法的大小為 $TILE \times TILE$,其中 $TILE$ 是一個超參數,一般取 $8$ 或 $16$。 取 $TILE$ 為 $8$ 或 $16$ 是因為現代 GPU 的 shared memory 行大小為 $128$ 字節,即兩個 $8$ 個 $float32$ 類型的元素。

#define BASE_THREAD_NUM 256

#define TILE 4
typedef float scalar_t;
const size_t ELEM_SIZE = sizeof(scalar_t);

struct CudaArray {
  CudaArray(const size_t size) {
    cudaError_t err = cudaMalloc(&ptr, size * ELEM_SIZE);
    if (err != cudaSuccess)
      throw std::runtime_error(cudaGetErrorString(err));
    this->size = size;
  }
  ~CudaArray() { cudaFree(ptr); }
  size_t ptr_as_int() { return (size_t)ptr; }

  scalar_t *ptr;
  size_t size;
};

struct CudaDims {
  dim3 block, grid;
};

CudaDims CudaOneDim(size_t size) {
  /**
   * Utility function to get cuda dimensions for 1D call
   */
  CudaDims dim;
  size_t num_blocks = (size + BASE_THREAD_NUM - 1) / BASE_THREAD_NUM;
  dim.block = dim3(BASE_THREAD_NUM, 1, 1);
  dim.grid = dim3(num_blocks, 1, 1);
  return dim;
}

#define MAX_VEC_SIZE 8
struct CudaVec {
  uint32_t size;
  int32_t data[MAX_VEC_SIZE];
};

CudaVec VecToCuda(const std::vector<int32_t> &x) {
  CudaVec shape;
  if (x.size() > MAX_VEC_SIZE)
    throw std::runtime_error("Exceeded CUDA supported max dimesions");
  shape.size = x.size();
  for (size_t i = 0; i < x.size(); i++) {
    shape.data[i] = x[i];
  }
  return shape;
}


__global__ void MatmulKernel(const scalar_t *a, const scalar_t *b,
                             scalar_t *out, uint32_t M, uint32_t N,
                             uint32_t P) {
  __shared__ scalar_t a_tile[TILE][TILE];
  __shared__ scalar_t b_tile[TILE][TILE];

  size_t thread_x = threadIdx.x;
  size_t thread_y = threadIdx.y;

  size_t block_x = blockIdx.x;
  size_t block_y = blockIdx.y;

  size_t x = thread_x + block_x * blockDim.x;
  size_t y = thread_y + block_y * blockDim.y;

  size_t cnt = (N + TILE - 1) / TILE;
  scalar_t sum = 0;
  for (size_t i = 0; i < cnt;
       i++) // 遍歷a的某一行和b某一列的TILE塊,對應位置累加得到這個塊的out
  {
    if ((i * TILE + thread_y) < N) // 防越界
    {
      a_tile[thread_x][thread_y] = a[x * N + i * TILE + thread_y];
    }
    if ((i * TILE + thread_x) < N) {
      b_tile[thread_x][thread_y] = b[(i * TILE + thread_x) * P + y];
    }

    __syncthreads();

    if (x < M && y < P) {
      for (size_t j = 0; j < TILE; j++)
        if (i * TILE + j < N)
          sum += a_tile[thread_x][j] * b_tile[j][thread_y];
    }

    __syncthreads();
  }

  if (x < M && y < P)
    out[x * P + y] = sum;
}

void Matmul(const CudaArray &a, const CudaArray &b, CudaArray *out, uint32_t M,
            uint32_t N, uint32_t P) {
  /**
   * Multiply two (compact) matrices into an output (also comapct) matrix.  You
   * will want to look at the lecture and notes on GPU-based linear algebra to
   * see how to do this.  Since ultimately mugrade is just evaluating
   * correctness, you _can_ implement a version that simply parallelizes over
   * (i,j) entries in the output array.  However, to really get the full benefit
   * of this problem, we would encourage you to use cooperative fetching, shared
   * memory register tiling, and other ideas covered in the class notes.  Note
   * that unlike the tiled matmul function in the CPU backend, here you should
   * implement a single function that works across all size matrices, whether or
   * not they are a multiple of a tile size.  As with previous CUDA
   * implementations, this function here will largely just set up the kernel
   * call, and you should implement the logic in a separate MatmulKernel() call.
   *
   *
   * Args:
   *   a: compact 2D array of size m x n
   *   b: comapct 2D array of size n x p
   *   out: compact 2D array of size m x p to write the output to
   *   M: rows of a / out
   *   N: columns of a / rows of b
   *   P: columns of b / out
   */

  dim3 block = dim3(TILE, TILE, 1);
  size_t grid_x = (M + TILE - 1) / TILE;
  size_t grid_y = (P + TILE - 1) / TILE;
  dim3 grid = dim3(grid_x, grid_y, 1);
  MatmulKernel<<<grid, block>>>(a.ptr, b.ptr, out->ptr, M, N, P);
}

Triton 矩陣運算實現

也可以使用triton實現矩陣乘法,triton是一個基於python的編譯器,用於將python代碼編譯為cuda kernel。
如果你對triton感興趣,可以參考我之前關於的一些其他文章:

  • 突破性能瓶頸:深入解析 Flash Attention 內核優化技術
  • 深入解析:使用 Triton 實現 Flash Attention2 - 讓大模型訓練飛起來

深度學習框架搭建

以上內容串聯起了一個完整的深度學習框架的基本底層要素:包括自動微分計算,矩陣計算的 CPU、GPU實現。
接下來補齊深度學習框架中其他的基本要素,從而構建起一個基本開箱可用的深度學習框架。

model parameter

本質是Tensor,封裝一層方便 nn Module 管理。

class Parameter(Tensor):
    """A special kind of tensor that represents parameters."""

nn module

nn.Module 是所有神經網絡模塊的基類。Module 是封裝模型結構、參數和計算的統一單元。

每個基礎模型或者自定義模型都通過繼承 nn.Module 來定義。

它可以包含可學習參數(通過 nn.Parameter 或子模塊如 nn.Linear 自動註冊)和前向計算邏輯。

Module 支持層級嵌套(一個 Module 可包含其他 Module),自動管理參數、狀態(如訓練/評估模式)和設備(CPU/

關鍵代碼如下,完整代碼參考nn_basic.py。

class Module:
    def __init__(self):
        self.training = True

    def parameters(self) -> List[Tensor]:
        """Return the list of parameters in the module."""
        return _unpack_params(self.__dict__)

    def _children(self) -> List["Module"]:
        return _child_modules(self.__dict__)

    def eval(self):
        self.training = False
        for m in self._children():
            m.training = False

    def train(self):
        self.training = True
        for m in self._children():
            m.training = True

    def __call__(self, *args, **kwargs):
        return self.forward(*args, **kwargs)

def _unpack_params(value: object) -> List[Tensor]:
    if isinstance(value, Parameter):
        return [value]
    elif isinstance(value, Module):
        return value.parameters()
    elif isinstance(value, dict):
        params = []
        for k, v in value.items():
            params += _unpack_params(v)
        return params
    elif isinstance(value, (list, tuple)):
        params = []
        for v in value:
            params += _unpack_params(v)
        return params
    else:
        return []


def _child_modules(value: object) -> List["Module"]:
    if isinstance(value, Module):
        modules = [value]
        modules.extend(_child_modules(value.__dict__))
        return modules
    if isinstance(value, dict):
        modules = []
        for k, v in value.items():
            modules += _child_modules(v)
        return modules
    elif isinstance(value, (list, tuple)):
        modules = []
        for v in value:
            modules += _child_modules(v)
        return modules
    else:
        return []

常見內置模塊

為了做到開箱即用,我們直接 cosplay pytorch,實現一些常見的神經網絡模塊,如 Linear、ReLU、Softmax 等。

最簡單的 Linear:

class Linear(Module):
    def __init__(
        self, in_features, out_features, bias=True, device=None, dtype="float32"
    ):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.weight = Parameter(
            init.kaiming_uniform(
                in_features,
                out_features,
                device=device,
                dtype=dtype,
                requires_grad=True,
            )
        )
        if bias:
            self.bias = Parameter(
                init.kaiming_uniform(
                    out_features, 1, device=device, dtype=dtype, requires_grad=True
                ).transpose()
            )
        else:
            self.bias = None

    def forward(self, X: Tensor) -> Tensor:
        ret = X @ self.weight
        if self.bias:
            b = self.bias.broadcast_to(ret.shape)
            ret = ret + b
        return ret


class Flatten(Module):
    def forward(self, X):
        s = X.shape
        batch = s[0]
        other = 1
        for i, x in enumerate(s):
            if i == 0:
                continue
            other = other * x
        return X.reshape((batch, other))


class ReLU(Module):
    def forward(self, x: Tensor) -> Tensor:
        return ops.relu(x)


class Sequential(Module):
    def __init__(self, *modules):
        super().__init__()
        self.modules = modules

    def forward(self, x: Tensor) -> Tensor:
        input = x
        for module in self.modules:
            input = module(input)
        return input


class SoftmaxLoss(Module):
    def forward(self, logits: Tensor, y: Tensor):
        one_hot_y = init.one_hot(logits.shape[-1], y)
        z_y = ops.summation(logits * one_hot_y, axes=(-1,))
        log_sum = ops.logsumexp(logits, axes=(-1,))
        return ops.summation((log_sum - z_y) / logits.shape[0])


class BatchNorm1d(Module):
    def __init__(self, dim, eps=1e-5, momentum=0.1, device=None, dtype="float32"):
        super().__init__()
        self.dim = dim
        self.eps = eps
        self.momentum = momentum
        self.weight = Parameter(init.ones(dim, device=device, requires_grad=True))
        self.bias = Parameter(init.zeros(dim, device=device, requires_grad=True))
        self.running_mean = init.zeros(dim)
        self.running_var = init.ones(dim)

    def forward(self, x: Tensor) -> Tensor:
        if self.training:
            batch_mean = x.sum((0,)) / x.shape[0]
            batch_var = ((x - batch_mean.broadcast_to(x.shape)) ** 2).sum(
                (0,)
            ) / x.shape[0]
            self.running_mean = (
                1 - self.momentum
            ) * self.running_mean + self.momentum * batch_mean.data
            self.running_var = (
                1 - self.momentum
            ) * self.running_var + self.momentum * batch_var.data
            norm = (x - batch_mean.broadcast_to(x.shape)) / (
                batch_var.broadcast_to(x.shape) + self.eps
            ) ** 0.5
            return self.weight.broadcast_to(x.shape) * norm + self.bias.broadcast_to(
                x.shape
            )
        else:
            norm = (x - self.running_mean.broadcast_to(x.shape)) / (
                self.running_var.broadcast_to(x.shape) + self.eps
            ) ** 0.5
            return self.weight.broadcast_to(x.shape) * norm + self.bias.broadcast_to(
                x.shape
            )


class LayerNorm1d(Module):
    def __init__(self, dim, eps=1e-5, device=None, dtype="float32"):
        super().__init__()
        self.dim = dim
        self.eps = eps
        self.weights = Parameter(init.ones(dim, requires_grad=True))
        self.bias = Parameter(init.zeros(dim, requires_grad=True))

    def forward(self, x: Tensor) -> Tensor:
        mean = (
            (ops.summation(x, axes=(-1,)) / x.shape[-1])
            .reshape((x.shape[0], 1))
            .broadcast_to(x.shape)
        )
        var = (
            (ops.summation((x - mean) ** 2, axes=(-1,)) / x.shape[-1])
            .reshape((x.shape[0], 1))
            .broadcast_to(x.shape)
        )
        deno = (var + self.eps) ** 0.5
        return self.weights.broadcast_to(x.shape) * (
            (x - mean) / deno
        ) + self.bias.broadcast_to(x.shape)


class Dropout(Module):
    def __init__(self, p=0.5):
        super().__init__()
        self.p = p

    def forward(self, x: Tensor) -> Tensor:
        if self.training:
            mask = init.randb(*x.shape, p=1 - self.p) / (1 - self.p)
            x = x * mask
        return x


class Residual(Module):
    def __init__(self, fn: Module):
        super().__init__()
        self.fn = fn

    def forward(self, x: Tensor) -> Tensor:
        return self.fn(x) + x

Module 參數初始化時,我們需要實現一些常見的參數初始化方法,如 kaiming_uniform 等。具體代碼參考init_initializers.py,這裏就不展開了。

還有一些更復雜的Module,如RNN、CNN,這裏暫時沒有實現了。

如果你對大語言模型中常見的 Module,比如 transformer 的實現感興趣,可以參考我的文章

  • 從 0 搭建 LLM 不再難!這個 PyTorch 項目幫你吃透大模型底層邏輯
  • 以及對應的代碼倉庫:llm-from-scratch

optimizer

optimizer 用於更新模型參數以最小化損失函數。
這裏實現了 SGD 和 Adam 兩種 optimizer。

class Optimizer:
    def __init__(self, params):
        self.params = params

    def step(self):
        raise NotImplementedError()

    def reset_grad(self):
        for p in self.params:
            p.grad = None


class SGD(Optimizer):
    def __init__(self, params, lr=0.01, momentum=0.0, weight_decay=0.0):
        super().__init__(params)
        self.lr = lr
        self.momentum = momentum
        self.u = defaultdict(float)
        self.weight_decay = weight_decay

    def step(self):
        for p in self.params:
            if self.weight_decay > 0:
                grad = p.grad.data + self.weight_decay * p.data
            else:
                grad = p.grad.data
            self.u[p] = self.momentum * self.u[p] + (1 - self.momentum) * grad
            p.data = p.data - ndl.Tensor(self.lr * self.u[p], dtype=p.dtype)
            

class Adam(Optimizer):
    def __init__(
        self,
        params,
        lr=0.01,
        beta1=0.9,
        beta2=0.999,
        eps=1e-8,
        weight_decay=0.0,
    ):
        super().__init__(params)
        self.lr = lr
        self.beta1 = beta1
        self.beta2 = beta2
        self.eps = eps
        self.weight_decay = weight_decay
        self.t = 0

        self.m = defaultdict(float)
        self.v = defaultdict(float)

    def step(self):
        self.t += 1
        for p in self.params:
            if self.weight_decay > 0:
                grad = p.grad.data + self.weight_decay * p.data
            else:
                grad = p.grad.data
            self.m[p] = self.beta1 * self.m[p] + (1 - self.beta1) * grad
            self.v[p] = self.beta2 * self.v[p] + (1 - self.beta2) * (grad**2)
            unbiased_m = self.m[p] / (1 - self.beta1**self.t)
            unbiased_v = self.v[p] / (1 - self.beta2**self.t)
            p.data = p.data - ndl.Tensor(
                self.lr * unbiased_m / (unbiased_v**0.5 + self.eps),
                dtype=p.dtype,
            )

DataLoader

DataLoader 用於批量加載數據集,支持 shuffle、並行加載等功能。
這裏實現了一個簡單的Dataset 和對應的 DataLoader,具體代碼參考data_basic.py。以及常見的數據轉化方法

總結

通過本文的介紹,我們從零構建了一個完整的輕量級深度學習框架 Needle,它包含了深度學習框架的所有核心功能:

  1. 自動微分系統:基於計算圖實現了自動微分功能,支持動態構建計算圖和反向傳播。
  2. 張量運算系統:實現了豐富的張量操作,包括元素級運算、矩陣運算、歸約運算和變形運算等。
  3. 多後端支持:提供了 NumPy 後端用於快速原型開發和 CPU 計算,以及自定義 NDArray 後端用於更高性能的 CPU 和 CUDA 加速。
  4. 神經網絡模塊:實現了線性層、卷積層等基礎組件,以及 ReLU、Sigmoid 等激活函數,支持構建複雜的神經網絡模型。
  5. 優化算法:實現了 SGD、Momentum、RMSprop 和 Adam 等常用優化算法,支持權重衰減和學習率調度等功能。
  6. 數據加載系統:提供了 Dataset 接口和 DataLoader,支持批量加載數據和數據預處理。

如果你對深度學習框架的實現感興趣,可以訪問項目的 GitHub 倉庫獲取完整代碼:https://github.com/fangpin/dl-framework。如有任何疑問歡迎與我交流,你也可以參考獲取更多理論知識。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.