此分類用於記錄吳恩達深度學習課程的學習筆記。
課程相關信息鏈接如下:
- 原課程視頻鏈接:[雙語字幕]吳恩達深度學習deeplearning.ai
- github課程資料,含課件與筆記:吳恩達深度學習教學資料
- 課程配套練習(中英)與答案:吳恩達深度學習課後習題與答案
本篇為第五課第三週的課後習題和代碼實踐部分。
1.理論習題
【中英】【吳恩達課後測驗】Course 5 - 序列模型 - 第三週測驗
在這一週的習題內容中有一道題的相關內容需要展開,先看一下題面:
網絡通過學習注意力分數來決定「把注意力放在哪裏」,這些值是由一個小神經網絡計算得到的。
這裏,我們不能把 \(s^{<t-1>}\) 替換成 \(s^{<t>}\) 作為這個神經網絡的輸入,
原因如下:因為 \(s^{<t>}\) 依賴於當前的注意力權重,而當前的注意力權重又依賴於注意力分數,因此在需要計算注意力分數的時刻,\(s^{<t>}\) 尚未被計算出來, 此時只能使用前一時刻的隱藏狀態 \(s^{<t-1>}\),而無法使用當前的 \(s^{<t>}\)。
答案:正確
這道題一眼看下來是很迷惑的,這是因為這道題講的是最早的注意力機制的邏輯,和我們在理論部分介紹的主流邏輯有所不同。
在理論部分,我們介紹的注意力機制,學習注意力分數的網絡的輸入就是 \(s^{<t>}\) ,但在最早的注意力機制中,它的輸入是 \(s^{<t-1>}\)。
其傳播邏輯可以用一句話總結:在當前時間步中,原始注意力機制將由上一時刻解碼狀態計算得到的上下文向量 \(c^{<t>}\) 作為解碼器輸入的一部分,用於計算當前解碼狀態 \(s^{<t>}\)。
我們展開如下:
14 年的原論文使用的就是這種傳播方式,具有開創性價值。但在現代,這種將上下文向量直接納入狀態遞推的做法更多隻具有歷史意義,瞭解即可。
2.代碼實踐
【中文】【吳恩達課後編程作業】Course 5 - 序列模型 - 第三週作業
還是先擺鏈接,這篇裏博主就機器翻譯和觸發詞檢測兩部分內容,非常詳細地演示了本週的內容,但還是要注意 Keras 的導包更新問題。
我們同樣使用框架來演示這部分內容,主要內容如下:
- 使用編碼解碼框架進行日期格式翻譯
- 在編碼解碼框架下對比貪心解碼和束搜索性能
- 使用帶注意力機制的編碼解碼框架進行日期格式翻譯
2.1 數據準備
需要説明的是,由於 seq2seq 模型自身的編碼–解碼結構 以及 序列數據的時序特性,這類任務相比許多常規監督學習任務,通常對計算資源有更高的要求。
以機器翻譯任務為例,網絡上存在大量公開數據集,如英法翻譯、中英翻譯等。由於這類數據獲取相對容易,同時自然語言本身具有詞彙規模大、表達形式多樣的特點,模型在訓練過程中往往需要同時維護源語言詞典和目標語言詞典,其規模通常至少達到 數萬級別。這直接導致模型在參數規模、計算量以及顯存佔用等方面的成本顯著提升。
因此,這次我們選用一種相對簡化的機器翻譯任務——日期格式翻譯,該類任務的主要特點在於詞典規模小、輸入輸出結構清晰、語義歧義極少,可以較高效率地演示本週內容。
這次我們不使用網上公開的數據集,而是拓展吳恩達老師在編程作業裏的邏輯,進行人工合成數據。
我們通過設計好的腳本,生成 10000 條 以下格式的相關數據和對應標籤並保存為文件:
HUMAN_TEMPLATES = [
"{day} {month} {year}",
"{month} {day}, {year}",
"{day} of {month}, {year}",
"{month} {day} {year}",
"{day}/{month_num}/{year_short}",
"{month_num}/{day}/{year_short}",
"{weekday}, {day} {month_abbr} {year}",
"{month_abbr} {day}th {year_short}",
"{year}-{month_num}-{day}",
"The {day}{day_suffix} of {month}, {year}",
"{day}th {month_abbr}, {year}",
"{month} the {day}{day_suffix}, {year}",
"Date: {year}/{month_num}/{day}",
"{day} in Roman: {roman_day} {month} {year}",
"{month_abbr}. {day}, '{year_short}",
]
完整代碼放在附錄,打印幾條生成數據如下:
[1] The 28th of April, 1975 ==> 1975-04-28
[2] Date: 2017/05/5 ==> 2017-05-05
[3] Jan 10th 44 ==> 1944-01-10
[4] Tuesday, 6 Nov 2096 ==> 2096-11-06
[5] The 26th of August, 2089 ==> 2089-08-26
[6] Date: 2061/08/18 ==> 2061-08-18
[7] 08/4/35 ==> 1935-08-04
[8] 27 of May, 1948 ==> 1948-05-27
[9] 8 of April, 1907 ==> 1907-04-08
[10] November 11, 1937 ==> 1937-11-11
準備好數據後,我們便可以進行下一步內容。
2.2 模型設計
在本週第一篇內容裏我們就提到過:編碼解碼框架下的網絡結構實際上是分別設計的兩個子網絡,在代碼邏輯中,就是先分別定義設計編碼器和解碼器,再在整體網絡中調用。我們分點展開如下:
(1)編碼器
class Encoder(nn.Module):
def __init__(self, vocab_size, emb_dim, hidden_dim):
super().__init__()
# 嵌入層
self.embedding = nn.Embedding(vocab_size, emb_dim)
# 使用雙向 LSTM
self.lstm = nn.LSTM(emb_dim, hidden_dim, batch_first=True, bidirectional=True)
def forward(self, x):
embedded = self.embedding(x)
outputs, (hidden, cell) = self.lstm(embedded)
# outputs: LSTM 對每個時間步的輸出,用於注意力的相關計算。
# (hidden,cell): 最後時間步的隱藏狀態和細胞狀態,是解碼器的初始輸入。
return outputs, (hidden, cell)
(2)解碼器
class Decoder(nn.Module):
def __init__(self, vocab_size, emb_dim, hidden_dim, use_attention=False):
super().__init__()
# 設置參數,默認不使用注意力機制
self.use_attention = use_attention
# 嵌入層
self.embedding = nn.Embedding(vocab_size, emb_dim)
# LSTM
self.lstm = nn.LSTM(emb_dim, hidden_dim, batch_first=True)
if use_attention:
# 注意力評分層,將解碼器 hidden 與編碼器 outputs 拼接計算注意力分數
self.attn = nn.Linear(hidden_dim + hidden_dim * 2, 1)
# 注意力結合層:將解碼器 hidden 與上下文向量融合為最終 LSTM 輸出
self.attn_combine = nn.Linear(hidden_dim + hidden_dim * 2, hidden_dim)
# 輸出層:將 LSTM 輸出映射到詞表維度,預測下一個 token
self.out = nn.Linear(hidden_dim, vocab_size)
def forward(self, input_step, hidden_cell, encoder_outputs):
embedded = self.embedding(input_step)
# LSTM 前向計算,得到當前時間步輸出和新的 hidden/cell
lstm_out, hidden_cell = self.lstm(embedded, hidden_cell)
# 去掉時間維度,方便注意力計算
query = lstm_out.squeeze(1)
if self.use_attention:
# 擴展 query,實際上就是把當前解碼隱藏狀態複製到和編碼隱藏狀態數相同。
query_exp = query.unsqueeze(1).repeat(1, encoder_outputs.size(1), 1)
# 計算注意力分數
attn_scores = self.attn(torch.cat((query_exp, encoder_outputs), dim=-1)).squeeze(-1)
# 轉為注意力權重
attn_weights = F.softmax(attn_scores, dim=-1).unsqueeze(1)
# 計算上下文向量 bmm:批量矩陣乘法
context = torch.bmm(attn_weights, encoder_outputs).squeeze(1)
# 將 query 與上下文向量融合
combined = torch.cat((query, context), dim=1)
output = torch.tanh(self.attn_combine(combined))
else:
# 不使用注意力時,直接用 LSTM 輸出預測
output = query
# 輸出層
output = self.out(output)
return output, hidden_cell
(3)整體網絡
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder):
super().__init__()
# 編碼器和解碼器直接作為初始化參數,在主函數中創建傳入
self.encoder = encoder
self.decoder = decoder
def forward(self, src, tgt):
# src:樣本 tgt:標籤
# 編碼器
enc_outputs, (hidden, cell) = self.encoder(src)
# 這裏是合併雙向 LSTM 的 hidden/cell
# 將兩個方向的 hidden/cell 在層維度上分開,再求和或拼接作為解碼器初始狀態
hidden = hidden.view(2, -1, hidden.size(-1)).sum(dim=0, keepdim=True)
cell = cell.view(2, -1, cell.size(-1)).sum(dim=0, keepdim=True)
dec_hidden = (hidden, cell) # 解碼器初始狀態
outputs = [] # 存儲每個時間步的輸出
# 解碼器
for t in range(tgt.size(1)):
# 訓練邏輯:每步使用真實標籤作為輸入
out, dec_hidden = self.decoder(
tgt[:, t].unsqueeze(1), dec_hidden, enc_outputs
)
# tgt[:, t] 因為批次訓練,每次要去取第 t 列
# out: 當前時間步預測的 token 概率分佈 [batch_size, vocab_size]
# dec_hidden: 更新後的解碼器隱藏狀態
outputs.append(out) # 保存當前輸出
# 將所有時間步輸出堆疊為最終結果。
return torch.stack(outputs, dim=1)
這樣我們就完成了模型的設計,繼續下一部分內容。
2.3 解碼函數
還是理論部分提到的,在推理階段,解碼器的輸出其實是一系列的概率分佈,我們使用解碼函數,就是設計相應的搜索策略來尋找最大的聯合概率,得到最終輸出。
這裏,我們定義完整的解碼函數,主邏輯為貪心解碼,補充束搜索的相關邏輯並通過參數選擇是否使用:
def decode(model, input_tensor, use_beam_search=False, beam_width=3, max_len=20):
"""
參數:
- model: 已訓練的 Seq2Seq 模型(包含 encoder 和 decoder)
- input_tensor: 輸入序列的 tensor(一個樣本)
- use_beam_search: 是否使用束搜索
- beam_width: 束搜索時的候選數量
- max_len: 解碼最大長度
"""
model.eval() # 評估模式
with torch.no_grad():
input_tensor = input_tensor.unsqueeze(0)
# 編碼器傳播
enc_outputs, (hidden, cell) = model.encoder(input_tensor)
hidden = hidden.view(2, -1, hidden.size(-1)).sum(dim=0, keepdim=True)
cell = cell.view(2, -1, cell.size(-1)).sum(dim=0, keepdim=True)
dec_hidden = (hidden, cell) # 解碼器初始狀態
# 貪心解碼
if not use_beam_search:
# 初始解碼輸入:SOS:初始符,這裏是簡化為了 \t
dec_input = torch.tensor([[target_token_index['\t']]], device=device)
result = []
for _ in range(max_len):
# 解碼器傳播
out, dec_hidden = model.decoder(dec_input, dec_hidden, enc_outputs)
# 取當前時間步最大概率的 token
pred = out.argmax(dim=-1)
token = pred.item()
if token == target_token_index['\n']: # 遇到 EOS 停止,同樣簡化為\n
break
result.append(reverse_target_char_index[token]) # 保存字符
dec_input = pred.unsqueeze(0) # 下一步輸入為當前預測,即自迴歸。
return ''.join(result) # 返回解碼後的字符串
# 束搜索
candidates = [([target_token_index['\t']], 0.0, dec_hidden)] # 初始化候選序列
for _ in range(max_len):
new_cands = []
for seq, score, h in candidates:
# 如果序列已遇到 EOS,保留候選
if seq[-1] == target_token_index['\n']:
new_cands.append((seq, score, h))
continue
# 當前時間步輸入
inp = torch.tensor([[seq[-1]]], device=device)
out, new_h = model.decoder(inp, h, enc_outputs)
# 計算 log 概率
log_probs = F.log_softmax(out, dim=-1).squeeze(0)
# 取 top-k 候選
top_vals, top_idx = log_probs.topk(beam_width)
for v, idx in zip(top_vals, top_idx):
new_seq = seq + [idx.item()] # 擴展序列
new_score = score + v.item() # 更新累積 log 概率
new_cands.append((new_seq, new_score, new_h))
# 按累積概率排序,保留前 beam_width 個序列
candidates = sorted(new_cands, key=lambda x: x[1], reverse=True)[:beam_width]
# 最終選擇概率最高的序列
best = candidates[0][0]
# 去掉控制符,轉換為最終字符
return ''.join(reverse_target_char_index.get(i, '')
for i in best[1:] if i != target_token_index['\n'])
解碼的邏輯也被使用在之後的評估環節中。
2.4 評估指標:BELU 和完全匹配率
自然,在完成訓練後,我們要有相應的指標來進行評估,首先就是我們在理論部分介紹的 BELU ,它通過計算輸出和標籤在不同尺度上的匹配度來評估模型效果。
但是,我們説:BELU 這類語言指標使用的原因是因為存在”同義不同形“的情況。
而在日期翻譯裏,翻譯結果是唯一的:錯一個數字就是完全不同的一天。
因此,我們再手工定義一個完全匹配率:輸出和標籤只有相同和不相同兩種情況。
這樣,我們使用 BELU 來評估序列模型擬合的大致效果,再用完全匹配率來觀察在日期翻譯任務中模型的真正性能。
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
# BLEU
def evaluate_bleu(model, test_enc_input, test_targets, use_beam_search=False, beam_width=3):
total_bleu = 0
# BLEU 平滑方法,避免短句導致分數過低
smoother = SmoothingFunction().method4
for i in range(len(test_enc_input)):
# 使用 decode 函數生成預測序列
pred = decode(model, test_enc_input[i], use_beam_search, beam_width)
# 去掉目標序列的起始符和結束符
true = test_targets[i][1:-1]
# BLEU 接受列表形式:ref 是參考序列列表,hyp 是預測序列
ref = [list(true)]
hyp = list(pred)
# 計算當前樣本 BLEU 分數
bleu = sentence_bleu(ref, hyp, smoothing_function=smoother)
total_bleu += bleu
# 平均 BLEU 分數,並轉換為百分比
avg_bleu = total_bleu / len(test_enc_input) * 100
print(f"BLEU: {avg_bleu:.2f}")
return avg_bleu
# 完全匹配率
def evaluate_exact_match(model, test_enc_input, test_targets, use_beam_search=False, beam_width=3):
correct = 0
total = len(test_enc_input)
for i in range(total):
pred = decode(model, test_enc_input[i], use_beam_search, beam_width)
true = test_targets[i][1:-1]
# 判斷預測與真實是否完全一致
if pred == true:
correct += 1
acc = correct / total * 100
print(f"完全匹配率: {acc:.2f}%")
return acc
這裏分開寫兩個指標方便演示和單獨調用,實際上,把二者的前半部分邏輯合在一起是更高效的做法。
到此,需要強調的部分就全部定義完畢,我們開始正式運行。
2.5 運行效果
我們設置參數如下:
EMB_DIM = 64
HIDDEN_DIM = 256
EPOCHS = 10
BATCH_SIZE = 128
LR = 0.001
下面,就來分情況看看效果:
(1)不使用注意力機制,使用貪心解碼
在這個條件下,我們定義主函數如下:
if __name__ == "__main__":
EMB_DIM = 64
HIDDEN_DIM = 256
EPOCHS = 10
BATCH_SIZE = 128
LR = 0.001
# 關鍵設置
USE_ATTENTION = False
USE_BEAM_SEARCH = False
BEAM_WIDTH = 3
print(f"\n=== 配置 ===")
print(f"使用注意力: {USE_ATTENTION}")
print(f"使用束搜索: {USE_BEAM_SEARCH} (beam width = {BEAM_WIDTH if USE_BEAM_SEARCH else 'No'})")
encoder = Encoder(num_encoder_tokens, EMB_DIM, HIDDEN_DIM).to(device)
decoder = Decoder(num_decoder_tokens, EMB_DIM, HIDDEN_DIM, use_attention=USE_ATTENTION).to(device)
model = Seq2Seq(encoder, decoder).to(device)
print("\n開始訓練...")
train_time = train(model, epochs=EPOCHS, batch_size=BATCH_SIZE, lr=LR)
print("\n評估中...")
bleu = evaluate_bleu(model, test_encoder_input, test_targets,
use_beam_search=USE_BEAM_SEARCH, beam_width=BEAM_WIDTH)
exact_acc = evaluate_exact_match(model, test_encoder_input, test_targets,
use_beam_search=USE_BEAM_SEARCH, beam_width=BEAM_WIDTH)
print("\n=== 抽取測試樣本輸出 ===")
sample_indices = random.sample(range(len(test_encoder_input)), 10)
for i in sample_indices:
src = test_inputs[i]
true = test_targets[i][1:-1]
pred,_ = decode(model, test_encoder_input[i],
use_beam_search=USE_BEAM_SEARCH, beam_width=BEAM_WIDTH)
print(f"Human : {src} | True : {true} | Predicted : {pred}")
結果如下:
得益於任務本身和合成數據較為簡單,可以發現指標還不錯,我們不着急分析 ,繼續下一步設置進行對比。
(2)不使用注意力機制,使用束搜索
這次,設置主函數參數如下:
USE_ATTENTION = False
USE_BEAM_SEARCH = True # 使用束搜索
BEAM_WIDTH = 3 # 束寬
看看結果:
指標好像有所提升,但也可能是訓練中的偶然性,我們再增加束寬看看:
BEAM_WIDTH = 10
可以發現,隨着束寬的增加,解碼時間明顯增加,而指標浮動不大,這説明限制指標的關鍵因素可能不是搜索策略。
由此,我們進入下一步:
(3)使用注意力機制
現在,設置參數如下:
USE_ATTENTION = True # 使用注意力機制
USE_BEAM_SEARCH = True
BEAM_WIDTH = 3
看看結果:
好像也沒什麼用啊? 你會發現:使用注意力機制後,訓練時間因相關計算明顯增加,但是指標並沒有明顯提升。
實際上,這和我們的日期翻譯任務本身的特點有關:
- 輸入輸出序列較短:每個樣本通常只有 5~15 個字符左右,LSTM 很容易在沒有注意力的情況下就捕捉到序列整體信息。
- 信息對齊較簡單:日期翻譯本質上是字符級的對齊映射,例如 “Jan 5, 2003” → “2003-01-05”,幾乎不存在複雜的長距離依賴。
注意力機制最明顯的作用是在序列較長或輸入輸出關係複雜時,幫助解碼器聚焦到相關的編碼狀態。而對於現在這種短序列、規則性強的任務,其計算成本增加,而帶來的性能提升很有限。
在這個任務中,想要指標繼續增加,有一個很簡單的方法就是在腳本中增加我們人工合成的數據量,就不再展示了。
至此,吳恩達深度學習課程五課十五週的內容就全部結束了,之後會出一個完整的目錄。
3. 附錄
3.1 數據合成
```plaintext
import torch
import random
import numpy as np
from datetime import datetime, timedelta
from sklearn.model_selection import train_test_split
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PAD_TOKEN = "<PAD>"
MONTHS_FULL = [
"January", "February", "March", "April", "May", "June",
"July", "August", "September", "October", "November", "December"
]
MONTHS_ABBR = ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
HUMAN_TEMPLATES = [
"{day} {month} {year}",
"{month} {day}, {year}",
"{day} of {month}, {year}",
"{month} {day} {year}",
"{day}/{month_num}/{year_short}",
"{month_num}/{day}/{year_short}",
"{weekday}, {day} {month_abbr} {year}",
"{month_abbr} {day}th {year_short}",
"{year}-{month_num}-{day}",
"The {day}{day_suffix} of {month}, {year}",
"{day}th {month_abbr}, {year}",
"{month} the {day}{day_suffix}, {year}",
"Date: {year}/{month_num}/{day}",
"{day} in Roman: {roman_day} {month} {year}",
"{month_abbr}. {day}, '{year_short}",
]
def roman_numeral(n):
vals = (10, 9, 5, 4, 1)
nums = ('X', 'IX', 'V', 'IV', 'I')
result = ""
for v, r in zip(vals, nums):
while n >= v:
result += r
n -= v
return result
def random_date(start_year=1900, end_year=2100):
start = datetime(start_year, 1, 1)
end = datetime(end_year, 12, 31)
delta = end - start
return start + timedelta(days=random.randint(0, delta.days))
def human_readable_date(date):
template = random.choice(HUMAN_TEMPLATES)
day = date.day
month_idx = date.month - 1
month_full = MONTHS_FULL[month_idx]
month_abbr = MONTHS_ABBR[month_idx]
year = date.year
month_num = str(date.month).zfill(2)
year_short = str(year % 100).zfill(2)
weekday = date.strftime("%A")
day_suffix = "th"
if day % 10 == 1 and day != 11: day_suffix = "st"
elif day % 10 == 2 and day != 12: day_suffix = "nd"
elif day % 10 == 3 and day != 13: day_suffix = "rd"
day_str = str(day)
roman_day = roman_numeral(day) if random.random() < 0.2 else day_str
return template.format(
day=day_str,
day_suffix=day_suffix,
month=month_full,
month_abbr=month_abbr,
year=year,
month_num=month_num,
year_short=year_short,
weekday=weekday,
roman_day=roman_day
)
def machine_readable_date(date):
return date.strftime("%Y-%m-%d")
def generate_date_pair():
d = random_date()
return human_readable_date(d), machine_readable_date(d)
NUM_SAMPLES = 10000
input_texts, target_texts = [], []
for _ in range(NUM_SAMPLES):
human, machine = generate_date_pair()
input_texts.append(human)
target_texts.append('\t' + machine + '\n')
train_inputs, test_inputs, train_targets, test_targets = train_test_split(
input_texts, target_texts, test_size=0.2, random_state=42
)
all_input_chars = [PAD_TOKEN] + sorted(set(''.join(input_texts)))
all_target_chars = [PAD_TOKEN] + sorted(set(''.join(target_texts)))
input_token_index = {c: i for i, c in enumerate(all_input_chars)}
target_token_index = {c: i for i, c in enumerate(all_target_chars)}
reverse_input_char_index = {i: c for c, i in input_token_index.items()}
reverse_target_char_index = {i: c for c, i in target_token_index.items()}
max_encoder_len = max(len(txt) for txt in input_texts)
max_decoder_len = max(len(txt) for txt in target_texts)
num_encoder_tokens = len(all_input_chars)
num_decoder_tokens = len(all_target_chars)
print(f"max enc len: {max_encoder_len}, max dec len: {max_decoder_len}")
print(f"input vocab size: {num_encoder_tokens}, target vocab size: {num_decoder_tokens}")
def texts_to_tensor(texts, token_index, max_len):
pad_idx = token_index[PAD_TOKEN]
data = np.full((len(texts), max_len), pad_idx, dtype=np.int64)
for i, txt in enumerate(texts):
for t, char in enumerate(txt):
if t < max_len:
data[i, t] = token_index[char]
return torch.tensor(data, dtype=torch.long, device=device)
train_encoder_input = texts_to_tensor(train_inputs, input_token_index, max_encoder_len)
train_decoder_input = texts_to_tensor(train_targets, target_token_index, max_decoder_len)
test_encoder_input = texts_to_tensor(test_inputs, input_token_index, max_encoder_len)
test_decoder_input = texts_to_tensor(test_targets, target_token_index, max_decoder_len)
# ========= 保存 PyTorch 數據集 =========
dataset_pt = {
'train_encoder_input': train_encoder_input,
'train_decoder_input': train_decoder_input,
'test_encoder_input': test_encoder_input,
'test_decoder_input': test_decoder_input,
'input_token_index': input_token_index,
'target_token_index': target_token_index,
'reverse_input_char_index': reverse_input_char_index,
'reverse_target_char_index': reverse_target_char_index,
'max_encoder_len': max_encoder_len,
'max_decoder_len': max_decoder_len,
'num_encoder_tokens': num_encoder_tokens,
'num_decoder_tokens': num_decoder_tokens
}
torch.save(dataset_pt, "date_dataset.pt")
print("已保存:date_dataset.pt")
def tensor_to_numpy(t):
return t.detach().cpu().numpy()
dataset_npz = {
"train_encoder_input": tensor_to_numpy(train_encoder_input),
"train_decoder_input": tensor_to_numpy(train_decoder_input),
"test_encoder_input": tensor_to_numpy(test_encoder_input),
"test_decoder_input": tensor_to_numpy(test_decoder_input),
# 詞表與元信息(object 類型,需要 allow_pickle)
"input_token_index": np.array(input_token_index, dtype=object),
"target_token_index": np.array(target_token_index, dtype=object),
"reverse_input_char_index": np.array(reverse_input_char_index, dtype=object),
"reverse_target_char_index": np.array(reverse_target_char_index, dtype=object),
"max_encoder_len": max_encoder_len,
"max_decoder_len": max_decoder_len,
"num_encoder_tokens": num_encoder_tokens,
"num_decoder_tokens": num_decoder_tokens
}
np.savez("date_dataset.npz", **dataset_npz)
print("已保存:date_dataset.npz\n")
def decode(tensor_row, reverse_dict):
return ''.join(
reverse_dict[idx.item()]
for idx in tensor_row
if reverse_dict[idx.item()] != PAD_TOKEN
)
print("示例數據:")
for i in range(20):
src = decode(train_encoder_input[i], reverse_input_char_index)
tgt = decode(train_decoder_input[i], reverse_target_char_index)
print(f"[{i+1}] {src} ==> {tgt.strip()}")
3.2 日期格式翻譯-PyTorch版
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
import time
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
data = torch.load("date_dataset.pt", map_location=device)
train_encoder_input = data['train_encoder_input']
train_decoder_input = data['train_decoder_input']
test_encoder_input = data['test_encoder_input']
test_decoder_input = data['test_decoder_input']
input_token_index = data['input_token_index']
target_token_index = data['target_token_index']
reverse_target_char_index = data['reverse_target_char_index']
num_encoder_tokens = data['num_encoder_tokens']
num_decoder_tokens = data['num_decoder_tokens']
max_encoder_len = data['max_encoder_len']
max_decoder_len = data['max_decoder_len']
PAD_TOKEN = "<PAD>"
reverse_input_char_index = {i: c for c, i in input_token_index.items()}
def tensor_to_text(tensor_row, reverse_dict):
return ''.join(
reverse_dict[idx.item()]
for idx in tensor_row
if reverse_dict[idx.item()] != PAD_TOKEN
)
test_inputs = [tensor_to_text(test_encoder_input[i], reverse_input_char_index)
for i in range(len(test_encoder_input))]
test_targets = [tensor_to_text(test_decoder_input[i], reverse_target_char_index)
for i in range(len(test_decoder_input))]
class Encoder(nn.Module):
def __init__(self, vocab_size, emb_dim, hidden_dim):
super().__init__()
self.embedding = nn.Embedding(vocab_size, emb_dim)
self.lstm = nn.LSTM(emb_dim, hidden_dim, batch_first=True, bidirectional=True)
def forward(self, x):
embedded = self.embedding(x)
outputs, (hidden, cell) = self.lstm(embedded)
return outputs, (hidden, cell)
class Decoder(nn.Module):
def __init__(self, vocab_size, emb_dim, hidden_dim, use_attention=False):
super().__init__()
self.use_attention = use_attention
self.embedding = nn.Embedding(vocab_size, emb_dim)
self.lstm = nn.LSTM(emb_dim, hidden_dim, batch_first=True)
if use_attention:
self.attn = nn.Linear(hidden_dim + hidden_dim * 2, 1)
self.attn_combine = nn.Linear(hidden_dim + hidden_dim * 2, hidden_dim)
self.out = nn.Linear(hidden_dim, vocab_size)
def forward(self, input_step, hidden_cell, encoder_outputs):
embedded = self.embedding(input_step)
lstm_out, hidden_cell = self.lstm(embedded, hidden_cell)
query = lstm_out.squeeze(1)
if self.use_attention:
query_exp = query.unsqueeze(1).repeat(1, encoder_outputs.size(1), 1)
attn_scores = self.attn(torch.cat((query_exp, encoder_outputs), dim=-1)).squeeze(-1)
attn_weights = F.softmax(attn_scores, dim=-1).unsqueeze(1)
context = torch.bmm(attn_weights, encoder_outputs).squeeze(1)
combined = torch.cat((query, context), dim=1)
output = torch.tanh(self.attn_combine(combined))
else:
output = query
output = self.out(output)
return output, hidden_cell
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder):
super().__init__()
self.encoder = encoder
self.decoder = decoder
def forward(self, src, tgt):
enc_outputs, (hidden, cell) = self.encoder(src)
hidden = hidden.view(2, -1, hidden.size(-1)).sum(dim=0, keepdim=True)
cell = cell.view(2, -1, cell.size(-1)).sum(dim=0, keepdim=True)
dec_hidden = (hidden, cell)
outputs = []
for t in range(tgt.size(1)):
out, dec_hidden = self.decoder(tgt[:, t].unsqueeze(1), dec_hidden, enc_outputs)
outputs.append(out)
return torch.stack(outputs, dim=1)
def train(model, epochs=10, batch_size=128, lr=0.001):
enc_optimizer = optim.Adam(model.encoder.parameters(), lr=lr)
dec_optimizer = optim.Adam(model.decoder.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss(ignore_index=0)
start_time = time.time()
for epoch in range(epochs):
total_loss = 0
num_batches = 0
perm = torch.randperm(len(train_encoder_input))
for i in range(0, len(train_encoder_input), batch_size):
idx = perm[i:i+batch_size]
enc_in = train_encoder_input[idx]
dec_in_full = train_decoder_input[idx]
dec_in = dec_in_full[:, :-1]
dec_target = dec_in_full[:, 1:]
enc_optimizer.zero_grad()
dec_optimizer.zero_grad()
outputs = model(enc_in, dec_in)
loss = criterion(outputs.reshape(-1, num_decoder_tokens), dec_target.reshape(-1))
loss.backward()
enc_optimizer.step()
dec_optimizer.step()
total_loss += loss.item()
num_batches += 1
avg_loss = total_loss / num_batches
print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}")
train_time = time.time() - start_time
print(f"訓練總時間: {train_time:.2f} 秒")
return train_time
import time
def decode(model, input_tensor, use_beam_search=False, beam_width=3, max_len=20):
model.eval()
start_time = time.time()
with torch.no_grad():
input_tensor = input_tensor.unsqueeze(0)
enc_outputs, (hidden, cell) = model.encoder(input_tensor)
hidden = hidden.view(2, -1, hidden.size(-1)).sum(dim=0, keepdim=True)
cell = cell.view(2, -1, cell.size(-1)).sum(dim=0, keepdim=True)
dec_hidden = (hidden, cell)
if not use_beam_search:
dec_input = torch.tensor([[target_token_index['\t']]], device=device)
result = []
for _ in range(max_len):
out, dec_hidden = model.decoder(dec_input, dec_hidden, enc_outputs)
pred = out.argmax(dim=-1)
token = pred.item()
if token == target_token_index['\n']:
break
result.append(reverse_target_char_index[token])
dec_input = pred.unsqueeze(0)
decoded = ''.join(result)
else:
candidates = [([target_token_index['\t']], 0.0, dec_hidden)]
for _ in range(max_len):
new_cands = []
for seq, score, h in candidates:
if seq[-1] == target_token_index['\n']:
new_cands.append((seq, score, h))
continue
inp = torch.tensor([[seq[-1]]], device=device)
out, new_h = model.decoder(inp, h, enc_outputs)
log_probs = F.log_softmax(out, dim=-1).squeeze(0)
k = min(beam_width, log_probs.size(0))
top_vals, top_idx = log_probs.topk(k)
for v, idx in zip(top_vals, top_idx):
new_seq = seq + [idx.item()]
new_score = score + v.item()
new_cands.append((new_seq, new_score, new_h))
candidates = sorted(new_cands, key=lambda x: x[1], reverse=True)[:beam_width]
best = candidates[0][0]
decoded = ''.join(reverse_target_char_index.get(i, '') for i in best[1:] if i != target_token_index['\n'])
decode_time = time.time() - start_time
return decoded, decode_time
def evaluate_bleu(model, test_enc_input, test_targets, use_beam_search=False, beam_width=3):
total_bleu = 0
total_time = 0
smoother = SmoothingFunction().method4
for i in range(len(test_enc_input)):
pred, dt = decode(model, test_enc_input[i], use_beam_search, beam_width)
total_time += dt
true = test_targets[i][1:-1]
ref = [list(true)]
hyp = list(pred)
bleu = sentence_bleu(ref, hyp, smoothing_function=smoother)
total_bleu += bleu
avg_bleu = total_bleu / len(test_enc_input) * 100
avg_time = total_time / len(test_enc_input)
print(f"BLEU: {avg_bleu:.2f} | 平均解碼時間: {avg_time:.4f} 秒/樣本")
return avg_bleu, avg_time
def evaluate_exact_match(model, test_enc_input, test_targets, use_beam_search=False, beam_width=3):
correct = 0
total = len(test_enc_input)
for i in range(total):
pred, _ = decode(model, test_enc_input[i], use_beam_search, beam_width)
true = test_targets[i][1:-1]
if pred == true:
correct += 1
acc = correct / total * 100
print(f"完全匹配率: {acc:.2f}%")
return acc
if __name__ == "__main__":
EMB_DIM = 64
HIDDEN_DIM = 256
EPOCHS = 10
BATCH_SIZE = 128
LR = 0.001
USE_ATTENTION = True
USE_BEAM_SEARCH = True
BEAM_WIDTH = 3
print(f"\n=== 配置 ===")
print(f"使用注意力: {USE_ATTENTION}")
print(f"使用束搜索: {USE_BEAM_SEARCH} (beam width = {BEAM_WIDTH if USE_BEAM_SEARCH else 'No'})")
encoder = Encoder(num_encoder_tokens, EMB_DIM, HIDDEN_DIM).to(device)
decoder = Decoder(num_decoder_tokens, EMB_DIM, HIDDEN_DIM, use_attention=USE_ATTENTION).to(device)
model = Seq2Seq(encoder, decoder).to(device)
print("\n開始訓練...")
train_time = train(model, epochs=EPOCHS, batch_size=BATCH_SIZE, lr=LR)
print("\n評估中...")
bleu = evaluate_bleu(model, test_encoder_input, test_targets,
use_beam_search=USE_BEAM_SEARCH, beam_width=BEAM_WIDTH)
exact_acc = evaluate_exact_match(model, test_encoder_input, test_targets,
use_beam_search=USE_BEAM_SEARCH, beam_width=BEAM_WIDTH)
print("\n=== 抽取測試樣本輸出 ===")
sample_indices = random.sample(range(len(test_encoder_input)), 10)
for i in sample_indices:
src = test_inputs[i]
true = test_targets[i][1:-1]
pred,_ = decode(model, test_encoder_input[i],
use_beam_search=USE_BEAM_SEARCH, beam_width=BEAM_WIDTH)
print(f"Human : {src} | True : {true} | Predicted : {pred}")
3.3 日期格式翻譯-TF版
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Embedding, LSTM, Dense, Bidirectional
import random
import time
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
dataset = np.load("date_dataset.npz", allow_pickle=True)
train_encoder_input = dataset['train_encoder_input']
train_decoder_input = dataset['train_decoder_input']
test_encoder_input = dataset['test_encoder_input']
test_decoder_input = dataset['test_decoder_input']
input_token_index = dataset['input_token_index'].item()
target_token_index = dataset['target_token_index'].item()
reverse_target_char_index = dataset['reverse_target_char_index'].item()
num_encoder_tokens = int(dataset['num_encoder_tokens'])
num_decoder_tokens = int(dataset['num_decoder_tokens'])
max_encoder_len = int(dataset['max_encoder_len'])
max_decoder_len = int(dataset['max_decoder_len'])
reverse_input_char_index = {i: c for c, i in input_token_index.items()}
PAD_TOKEN = "<PAD>"
def tensor_to_text(tensor_row, reverse_dict):
return ''.join(
reverse_dict.get(int(idx), '')
for idx in tensor_row
if reverse_dict.get(int(idx), '') != PAD_TOKEN
)
test_inputs = [tensor_to_text(test_encoder_input[i], reverse_input_char_index)
for i in range(len(test_encoder_input))]
test_targets = [tensor_to_text(test_decoder_input[i], reverse_target_char_index)
for i in range(len(test_decoder_input))]
class Encoder(tf.keras.Model):
def __init__(self, vocab_size, emb_dim, hidden_dim):
super(Encoder, self).__init__()
self.embedding = Embedding(vocab_size, emb_dim, mask_zero=True)
self.lstm = Bidirectional(
LSTM(hidden_dim, return_sequences=True, return_state=True)
)
def call(self, x):
embedded = self.embedding(x)
outputs, fw_h, fw_c, bw_h, bw_c = self.lstm(embedded)
hidden = tf.concat([fw_h, bw_h], axis=-1)
cell = tf.concat([fw_c, bw_c], axis=-1)
return outputs, (hidden, cell)
class Decoder(tf.keras.layers.Layer):
def __init__(self, vocab_size, emb_dim, hidden_dim, use_attention=False):
super(Decoder, self).__init__()
self.use_attention = use_attention
self.embedding = Embedding(vocab_size, emb_dim, mask_zero=True)
self.lstm = LSTM(hidden_dim, return_sequences=False, return_state=True)
self.out = Dense(vocab_size)
if use_attention:
self.attn_W1 = Dense(hidden_dim)
self.attn_W2 = Dense(hidden_dim)
self.attn_V = Dense(1)
self.attn_combine = Dense(hidden_dim)
def call(self, input_step, hidden_cell, encoder_outputs):
embedded = self.embedding(input_step)
lstm_out, h, c = self.lstm(embedded, initial_state=hidden_cell)
query = lstm_out
if self.use_attention:
score = self.attn_V(
tf.tanh(
self.attn_W1(encoder_outputs) +
self.attn_W2(query[:, tf.newaxis, :])
)
)
attn_weights = tf.nn.softmax(score, axis=1)
context = tf.reduce_sum(encoder_outputs * attn_weights, axis=1)
combined = tf.concat([query, context], axis=-1)
output = tf.tanh(self.attn_combine(combined))
else:
output = query
logits = self.out(output)
return logits, (h, c)
class Seq2Seq(tf.keras.Model):
def __init__(self, encoder, decoder):
super(Seq2Seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
def call(self, src, tgt, training=True):
enc_outputs, (hidden, cell) = self.encoder(src)
hidden_fw, hidden_bw = tf.split(hidden, num_or_size_splits=2, axis=-1)
cell_fw, cell_bw = tf.split(cell, num_or_size_splits=2, axis=-1)
hidden = hidden_fw + hidden_bw
cell = cell_fw + cell_bw
dec_hidden = (hidden, cell)
outputs = []
for t in range(tgt.shape[1]):
step_input = tgt[:, t:t + 1]
logits, dec_hidden = self.decoder(
step_input, dec_hidden, enc_outputs
)
outputs.append(logits)
return tf.stack(outputs, axis=1)
def train(model, epochs=10, batch_size=128, lr=0.001):
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
start_time = time.time()
n_samples = len(train_encoder_input)
for epoch in range(epochs):
total_loss = 0.0
num_batches = 0
indices = np.random.permutation(n_samples)
enc = train_encoder_input[indices]
dec = train_decoder_input[indices]
for i in range(0, n_samples, batch_size):
enc_batch = enc[i:i + batch_size]
dec_batch = dec[i:i + batch_size]
dec_in = dec_batch[:, :-1]
dec_tgt = dec_batch[:, 1:]
with tf.GradientTape() as tape:
logits = model(enc_batch, dec_in, training=True)
loss_value = loss_fn(dec_tgt, logits)
loss_value += tf.reduce_sum(model.losses)
grads = tape.gradient(loss_value, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
total_loss += float(loss_value)
num_batches += 1
avg_loss = total_loss / num_batches
print(f"Epoch [{epoch + 1}/{epochs}], Loss: {avg_loss:.4f}")
train_time = time.time() - start_time
print(f"訓練用時: {train_time:.2f} 秒")
return train_time
@tf.function
def greedy_decode_step(decoder, dec_input, dec_hidden, enc_outputs):
logits, dec_hidden = decoder(dec_input, dec_hidden, enc_outputs)
return logits, dec_hidden
def decode(model, input_tensor, use_beam_search=False, beam_width=3, max_len=20):
dummy_src = tf.zeros((1, max_encoder_len), dtype=tf.int32)
dummy_tgt = tf.zeros((1, 1), dtype=tf.int32)
_ = model(dummy_src, dummy_tgt, training=False)
start_time = time.time()
input_tensor = tf.expand_dims(input_tensor, 0)
enc_outputs, (hidden, cell) = model.encoder(input_tensor)
hidden_fw, hidden_bw = tf.split(hidden, num_or_size_splits=2, axis=-1)
cell_fw, cell_bw = tf.split(cell, num_or_size_splits=2, axis=-1)
hidden = hidden_fw + hidden_bw
cell = cell_fw + cell_bw
dec_hidden = (hidden, cell)
if not use_beam_search:
dec_input = tf.constant([[target_token_index['\t']]], dtype=tf.int32)
result = []
for _ in range(max_len):
logits, dec_hidden = greedy_decode_step(
model.decoder, dec_input, dec_hidden, enc_outputs
)
pred = tf.argmax(logits, axis=-1)
token = int(pred[0])
if token == target_token_index.get('\n', -1):
break
result.append(reverse_target_char_index.get(token, ''))
dec_input = pred[:, tf.newaxis]
decoded = ''.join(result)
else:
candidates = [([target_token_index['\t']], 0.0, dec_hidden)]
for _ in range(max_len):
new_cands = []
for seq, score, h_c in candidates:
if seq[-1] == target_token_index.get('\n', -1):
new_cands.append((seq, score, h_c))
continue
inp = tf.constant([[seq[-1]]], dtype=tf.int32)
logits, new_h_c = model.decoder(inp, h_c, enc_outputs)
log_probs = tf.nn.log_softmax(logits, axis=-1)[0]
top_vals, top_idx = tf.math.top_k(log_probs, k=beam_width)
for v, idx in zip(top_vals, top_idx):
new_seq = seq + [int(idx)]
new_score = score + float(v)
new_cands.append((new_seq, new_score, new_h_c))
candidates = sorted(new_cands, key=lambda x: x[1], reverse=True)[:beam_width]
best_seq = candidates[0][0]
decoded = ''.join(
reverse_target_char_index.get(i, '') for i in best_seq[1:] if i != target_token_index.get('\n', -1))
decode_time = time.time() - start_time
return decoded, decode_time
def evaluate_bleu(model, test_enc_input, test_targets, use_beam_search=False, beam_width=3):
total_bleu = 0
total_time = 0
smoother = SmoothingFunction().method4
for i in range(len(test_enc_input)):
pred, dt = decode(model, test_enc_input[i], use_beam_search, beam_width)
total_time += dt
true = test_targets[i][1:-1]
ref = [list(true)]
hyp = list(pred)
bleu = sentence_bleu(ref, hyp, smoothing_function=smoother)
total_bleu += bleu
avg_bleu = total_bleu / len(test_enc_input) * 100
avg_time = total_time / len(test_enc_input)
print(f"BLEU: {avg_bleu:.2f} | 平均解碼時間: {avg_time:.4f} 秒/樣本")
return avg_bleu, avg_time
def evaluate_exact_match(model, test_enc_input, test_targets, use_beam_search=False, beam_width=3):
correct = 0
total = len(test_enc_input)
for i in range(total):
pred, _ = decode(model, test_enc_input[i], use_beam_search, beam_width)
true = test_targets[i][1:-1]
if pred == true:
correct += 1
acc = correct / total * 100
print(f"完全匹配率: {acc:.2f}%")
return acc
if __name__ == "__main__":
EMB_DIM = 64
HIDDEN_DIM = 256
BATCH_SIZE = 128
LR = 0.001
USE_ATTENTION = True
USE_BEAM_SEARCH = True
BEAM_WIDTH = 3
print(f"\n=== 配置 ===")
print(f"使用注意力: {USE_ATTENTION}")
print(f"使用束搜索: {USE_BEAM_SEARCH} (beam width = {BEAM_WIDTH if USE_BEAM_SEARCH else 'No'})")
encoder = Encoder(num_encoder_tokens, EMB_DIM, HIDDEN_DIM)
decoder = Decoder(num_decoder_tokens, EMB_DIM, HIDDEN_DIM, use_attention=USE_ATTENTION)
model = Seq2Seq(encoder, decoder)
print("\n開始訓練...")
train_time = train(model, epochs=EPOCHS, batch_size=BATCH_SIZE, lr=LR)
print("\n評估中...")
bleu = evaluate_bleu(model, test_encoder_input, test_targets,
use_beam_search=USE_BEAM_SEARCH, beam_width=BEAM_WIDTH)
exact_acc = evaluate_exact_match(model, test_encoder_input, test_targets,
use_beam_search=USE_BEAM_SEARCH, beam_width=BEAM_WIDTH)
print("\n=== 抽取測試樣本輸出 ===")
sample_indices = random.sample(range(len(test_encoder_input)), 10)
for i in sample_indices:
src = test_inputs[i]
true = test_targets[i][1:-1]
pred, _ = decode(model, test_encoder_input[i],
use_beam_search=USE_BEAM_SEARCH, beam_width=BEAM_WIDTH)
print(f"Human : {src} | True : {true} | Predicted : {pred}")