緣起

多語言的大型網路遊戲 FF14,作為擁有豐富文本內容的 MMORPG,其文本涉及大量專有名詞與情感表達,很難用普通的機器翻譯做到精確的語境翻譯。

基於此,我萌生了開發 FF14 中文語系模型的念頭,希望能透過機器學習技術,翻譯遊戲內的專有名詞和劇情術語,從日文與英文與既存中文,推算未翻譯的文本為繁體中文。

這篇文章記錄了我在開發過程中如何從數據清理,到訓練翻譯模型,以及遇到的挑戰與解決方法。


環境設置

在開始之前,特別提醒:
使用 PyTorch 進行機器學習模型訓練時,若希望利用 GPU 加速,請注意 CUDA 版本的相容性。

截至目前,PyTorch 官方提供的預建二進位檔(prebuilt binaries)主要支援 CUDA 11.8 和 12.1 版本。 若你安裝了 CUDA 12.5 或更新版本,可能會遇到相容性問題,導致無法正確使用 GPU。 因此,建議將 CUDA 降級至 12.1 版本,並確保同時安裝相容的 cuDNN 版本(如 v8.8.1),以確保與 PyTorch 的良好相容性。

此外,請確認你的 NVIDIA 顯示卡驅動程式已更新至與所需 CUDA 版本相容的版本。需要注意的是,PyTorch 的預建二進位檔已包含所需的 CUDA 執行時(runtime),因此即使系統中未安裝完整的 CUDA 工具包(toolkit),也能正常運行。 但若你需要自行編譯 PyTorch 或自訂 CUDA 擴展,則需要安裝對應版本的 CUDA 工具包。

為獲取最新的相容性資訊,建議參考 PyTorch 官方網站的相容性支援表,以確保你的開發環境與所使用的軟體版本相容。


開始:數據清理與準備

遊戲內翻譯數據多來自 CSV 文件,每條數據包含日文、英文和繁體中文三種語言的對應文本。但這些數據格式可能參差不齊,因此數據清理是第一步。

資料處理是整個專案的基石。使用 pandas 對 CSV 文件進行處理,並應用正則表達式來清理數據。

 1import zipfile
 2import pandas as pd
 3import os
 4import re
 5
 6
 7def clean_text(text):
 8    if not isinstance(text, str):
 9        text = str(text)
10    text = re.sub(r'<.*?>', '', text)  # 移除HTML標記
11    text = re.sub(r'\[.*?\]', '', text)  # 移除中括號內的內容
12    text = re.sub(r'\(.*?\)', '', text)  # 移除括號內的內容
13    text = re.sub(r'ObjectParameter\(\d+\)', '', text)  # 移除 ObjectParameter
14    text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text)  # 在小寫字母和大寫字母之間添加空格
15    text = re.sub(r'\s+', ' ', text)  # 移除多餘的空格
16    text = re.sub(r'nan', '', text)  # 移除 'nan'
17    text = re.sub(r'[^\w\s]', '', text)  # 移除所有非字母數字和空格字符
18    text = text.strip()  # 去除首尾空白字符
19    return text
20
21
22def read_csv_files(directory):
23    dataframes = []
24    for root, dirs, files in os.walk(directory):
25        print(f"Searching in directory: {root}")
26        for file in files:
27            if file.endswith('.csv'):
28                file_path = os.path.join(root, file)
29                try:
30                    df = pd.read_csv(file_path, low_memory=False)
31                    df.insert(0, 'ID', range(1, 1 + len(df)))
32                    # 忽略前兩行
33                    df = df.drop([0, 1])
34                    # 清洗數據
35                    for col in df.columns:
36                        df[col] = df[col].map(clean_text)
37                    dataframes.append(df)
38                    print(f"Successfully read {file_path} with {len(df)} rows")
39                except Exception as e:
40                    print(f"Error reading {file_path}: {e}")
41    return dataframes

這段程式負責讀取散亂的 CSV 語系文件,並對數據進行清理。清理過程包括移除標籤、多餘的空格、括號內的內容等。

 1def extract_files(zip_path, extract_path):
 2    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
 3        zip_ref.extractall(extract_path)
 4
 5
 6def ensure_columns(df, column_names):
 7    for col in column_names:
 8        if col not in df.columns:
 9            df[col] = ''
10    return df[column_names]
11
12
13def merge_translation_files(jp_dfs, en_dfs, tw_dfs, batch_size=10):
14    batch_count = 0
15    column_names = ['ID', 'jp_text1', 'jp_text2', 'en_text1', 'en_text2', 'tw_text1', 'tw_text2']
16
17    with open('data/languages/merged.csv', 'w', encoding='utf-8-sig', newline='') as output_file:
18        for i in range(0, len(jp_dfs), batch_size):
19            jp_batch = jp_dfs[i:i + batch_size]
20            en_batch = en_dfs[i:i + batch_size]
21            tw_batch = tw_dfs[i:i + batch_size]
22
23            for jp_df, en_df, tw_df in zip(jp_batch, en_batch, tw_batch):
24                try:
25                    # 檢查每個數據框的列數是否足夠
26                    if len(jp_df.columns) > 3:
27                        jp_df = ensure_columns(jp_df, ['ID', jp_df.columns[2], jp_df.columns[3]])
28                    else:
29                        jp_df = ensure_columns(jp_df, ['ID', jp_df.columns[2], ''])
30
31                    if len(en_df.columns) > 3:
32                        en_df = ensure_columns(en_df, ['ID', en_df.columns[2], en_df.columns[3]])
33                    else:
34                        en_df = ensure_columns(en_df, ['ID', en_df.columns[2], ''])
35
36                    if len(tw_df.columns) > 3:
37                        tw_df = ensure_columns(tw_df, ['ID', tw_df.columns[2], tw_df.columns[3]])
38                    else:
39                        tw_df = pd.DataFrame(
40                            {'ID': jp_df['ID'], 'tw_text1': [''] * len(jp_df), 'tw_text2': [''] * len(jp_df)})
41
42                    jp_df.columns = ['ID', 'jp_text1', 'jp_text2']
43                    en_df.columns = ['ID', 'en_text1', 'en_text2']
44                    tw_df.columns = ['ID', 'tw_text1', 'tw_text2']
45
46                    merged_df = jp_df.merge(en_df, on='ID').merge(tw_df, on='ID', how='left')
47                    valid_rows = merged_df.dropna(subset=['jp_text1', 'en_text1'])
48
49                    # 移除無效數據
50                    valid_rows = valid_rows[(valid_rows['jp_text1'] != '') & (valid_rows['en_text1'] != '')]
51
52                    if not valid_rows.empty:
53                        valid_rows.to_csv(output_file, index=False, header=batch_count == 0, encoding='utf-8-sig',
54                                          mode='a')
55                        batch_count += 1
56                        print(f"Processed batch {batch_count}")
57
58                except KeyError as e:
59                    print(f"Merge error: {e}")
60
61    return batch_count
62
63
64def concatenate_batches(batch_count):
65    all_batch_dfs = []
66    for i in range(batch_count):
67        batch_df = pd.read_csv(f'data/languages/merged_batch_{i}.csv', low_memory=False)
68        all_batch_dfs.append(batch_df)
69    if all_batch_dfs:
70        all_df = pd.concat(all_batch_dfs, ignore_index=True)
71        return all_df
72    else:
73        return pd.DataFrame()  # 返回空的DataFrame

這段程式負責解壓縮壓縮檔,並將日文、英文和繁體中文的數據合併到一個文件中。在合併過程中,我們檢查每個數據框的列數是否足夠,並對缺失的列進行填充。

 1# 使用
 2if __name__ == "__main__":
 3    zip_path = 'data/languages.zip'
 4    extract_path = 'data/languages'
 5
 6    extract_files(zip_path, extract_path)
 7
 8    jp_path = os.path.join(extract_path, 'jp')
 9    en_path = os.path.join(extract_path, 'en')
10    tw_path = os.path.join(extract_path, 'tw')
11
12    jp_dfs = read_csv_files(jp_path)
13    en_dfs = read_csv_files(en_path)
14    tw_dfs = read_csv_files(tw_path)
15
16    print(f"Read {len(jp_dfs)} Japanese files, {len(en_dfs)} English files, {len(tw_dfs)} Chinese files")
17
18    if jp_dfs and en_dfs:
19        batch_count = merge_translation_files(jp_dfs, en_dfs, tw_dfs)
20        print(f"合併的數據已保存到 data/languages/merged.csv")
21    else:
22        print("Japanese and English lists are empty, cannot merge files")

這段程式將上述函數組合在一起,並對數據進行合併。

檢查與驗證

  1. 壓縮檔解壓縮是否正常運行?
    程式使用 zipfile.ZipFile,可穩定解壓。

  2. 資料清洗是否全面?
    正規表示式處理了常見的雜訊格式,並將空白字串清除。

  3. 多檔案讀取有沒有潛在風險?
    加入了例外處理機制(try-except),能在出錯時記錄檔案名稱。

經過清理後,數據格式統一,為後續模型訓練打下基礎。


模型選擇與實現

翻譯模型選用 Hugging Face 提供的 MarianMTModel,這是一款專為多語言翻譯設計的模型。我們使用日文到英文的預訓練模型,並加入繁體中文進行微調。

模型訓練程式

訓練過程中,我將數據切塊(chunking),避免一次處理過多數據造成記憶體不足:

 1import pandas as pd
 2import torch
 3from transformers import MarianMTModel, MarianTokenizer, Seq2SeqTrainingArguments, Seq2SeqTrainer
 4from datasets import Dataset, DatasetDict
 5import evaluate
 6import os
 7
 8
 9def read_and_chunk_data(file_path, chunk_size=10000):
10    for chunk in pd.read_csv(file_path, chunksize=chunk_size, low_memory=False):
11        chunk = chunk.astype(str).dropna().reset_index(drop=True)
12        yield chunk
13
14
15def process_chunk(chunk):
16    jp_en_tw_pairs = chunk[['jp_text1', 'en_text1', 'tw_text1']].drop_duplicates()
17    jp_en_tw_pairs.columns = ['jp_text', 'en_text', 'tw_text']
18    jp_en_tw_pairs = jp_en_tw_pairs.dropna()
19    jp_en_tw_pairs.reset_index(drop=True, inplace=True)
20    dataset = Dataset.from_pandas(jp_en_tw_pairs)
21    return dataset

這段程式將數據切塊,並將每個切塊轉換為 Dataset 對象。

 1def train_chunk(dataset, model, tokenizer, training_args, checkpoint_dir):
 2    # 檢查數據集的大小,確保有足夠的樣本進行訓練和驗證
 3    if len(dataset) < 5:
 4        print(f"Skipping chunk with {len(dataset)} samples due to insufficient data")
 5        return
 6
 7    # 拆分數據集為訓練集和驗證集
 8    train_test_split = dataset.train_test_split(test_size=0.2)
 9    datasets = DatasetDict({'train': train_test_split['train'], 'test': train_test_split['test']})
10
11    # 編碼函數
12    def preprocess_function(examples):
13        inputs = [jp + " " + en for jp, en in zip(examples['jp_text'], examples['en_text'])]
14        targets = examples['tw_text']
15        model_inputs = tokenizer(inputs, max_length=128, truncation=True, padding='max_length')
16        labels = tokenizer(targets, max_length=128, truncation=True, padding='max_length')
17        model_inputs["labels"] = labels["input_ids"]
18        return model_inputs
19
20    # 預處理數據
21    tokenized_datasets = datasets.map(preprocess_function, batched=True)
22
23    # 加載評估指標
24    metric = evaluate.load("sacrebleu")
25
26    def compute_metrics(eval_pred):
27        predictions, labels = eval_pred
28        decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)
29        decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
30        labels = [[label] for label in decoded_labels]
31        result = metric.compute(predictions=decoded_preds, references=labels)
32        return result
33
34    # 初始化訓練器
35    trainer = Seq2SeqTrainer(
36        model=model,
37        args=training_args,
38        train_dataset=tokenized_datasets['train'],
39        eval_dataset=tokenized_datasets['test'],
40        tokenizer=tokenizer,
41        compute_metrics=compute_metrics
42    )
43
44    # 繼續訓練
45    if checkpoint_dir and os.path.isdir(checkpoint_dir) and len(os.listdir(checkpoint_dir)) > 0:
46        trainer.train(resume_from_checkpoint=checkpoint_dir)
47    else:
48        trainer.train()
49
50    # 保存模型檢查點
51    trainer.save_model(checkpoint_dir)

這段程式負責訓練模型,且為避免訓練到一半中斷,我們加入了檢查點機制,以便在中斷後繼續訓練。
在訓練過程中,我們將數據集拆分為訓練集和驗證集,並使用 Seq2SeqTrainer 進行訓練。

 1# 使用
 2# 初始化模型和分詞器
 3model_name = 'Helsinki-NLP/opus-mt-ja-en'
 4tokenizer = MarianTokenizer.from_pretrained(model_name)
 5model = MarianMTModel.from_pretrained(model_name)
 6
 7# 訓練參數設置
 8training_args = Seq2SeqTrainingArguments(
 9    output_dir='models/trained_model',
10    evaluation_strategy="epoch",
11    learning_rate=2e-5,
12    per_device_train_batch_size=16,
13    per_device_eval_batch_size=16,
14    weight_decay=0.01,
15    save_total_limit=3,
16    num_train_epochs=3,
17    predict_with_generate=True
18)
19
20# 檢查 CUDA 是否可用
21if torch.cuda.is_available():
22    device = torch.device('cuda')
23    print("CUDA is available. Using GPU.")
24else:
25    device = torch.device('cpu')
26    print("CUDA is not available. Using CPU.")
27model.to(device)
28
29# 讀取和處理數據的每個塊,並訓練模型
30file_path = 'data/languages/merged.csv'
31chunk_size = 10000  # 可以根據記憶體大小調整塊大小
32
33# 保存檢查點的目錄
34checkpoint_dir = 'models/trained_model/checkpoints'
35
36# 找到最新的檢查點
37latest_checkpoint = None
38if os.path.isdir(checkpoint_dir):
39    checkpoints = [os.path.join(checkpoint_dir, d) for d in os.listdir(checkpoint_dir) if d.startswith('checkpoint-')]
40    if checkpoints:
41        latest_checkpoint = max(checkpoints, key=os.path.getctime)
42    else:
43        latest_checkpoint = checkpoint_dir
44
45for i, chunk in enumerate(read_and_chunk_data(file_path, chunk_size)):
46    dataset = process_chunk(chunk)
47    train_chunk(dataset, model, tokenizer, training_args, latest_checkpoint)
48    print(f"Chunk {i} training complete")
49
50# 保存最終訓練好的模型
51model.save_pretrained('models/trained_model')
52tokenizer.save_pretrained('models/trained_model')
53
54print("Model training complete and saved to 'models/trained_model'")

這段程式將上述函數組合在一起,並訓練模型。

檢查與驗證

  1. 日文與英文的拼接是否合適?
    透過字串合併,構建多語言上下文,提高翻譯準確率。

  2. 預處理是否考慮到空值?
    程式中添加了 dropna(),避免空值導致錯誤。

  3. 訓練參數是否符合需求?
    批量大小和學習率經調整,適合於有限的 GPU 訓練資源。


測試與驗證

最後一步是測試訓練好的模型是否能滿足預期。

 1import torch
 2from transformers import MarianMTModel, MarianTokenizer
 3
 4# 加載已保存的模型和分詞器
 5model_path = 'models/trained_model'
 6model = MarianMTModel.from_pretrained(model_path)
 7tokenizer = MarianTokenizer.from_pretrained(model_path)
 8
 9# 檢查 CUDA 是否可用
10device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
11model.to(device)
12
13
14# 測試翻譯函數
15def translate(jp_text, en_text):
16    try:
17        text = jp_text + " " + en_text
18        print(f"Combined text: {text}")  # 調試信息:輸入的日文和英文結合後的文本
19        inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True).to(device)
20        print(f"Tokenized inputs: {inputs}")  # 調試信息:分詞後的輸入
21        translated = model.generate(**inputs, max_length=128, num_beams=4, early_stopping=True)
22        print(f"Translated tokens: {translated}")  # 調試信息:翻譯後的標記
23        translation = tokenizer.batch_decode(translated, skip_special_tokens=True)
24        print(f"Decoded translation: {translation}")  # 調試信息:解碼後的翻譯
25        return translation[0]
26    except Exception as e:
27        print(f"Error during translation: {e}")
28        return None
29
30
31# 示例翻譯
32jp_text = "こんにちは"
33en_text = "Hello"
34translation = translate(jp_text, en_text)
35print(f"Translation: {translation}")
36
37# 添加更多的測試例子
38test_cases = [
39    ("アルフィノ", "Alphinaud"),
40    ("アリゼー", "Alisaie"),
41    ("ヤ・シュトラ", "Y'shtola"),
42    ("はい", "Yes"),
43    ("いいえ", "No")
44]
45
46for jp, en in test_cases:
47    print(f"\nTesting with: Japanese='{jp}' and English='{en}'")
48    translation = translate(jp, en)
49    print(f"Translation: {translation}")

測試的結果揭示了模型的優化空間,例如詞彙上下文可能不足,或是輸出受限於原始模型的語言範圍。

檢查與驗證

1. 翻譯結果總是不準確

  • 問題:模型初次訓練後,生成的中文結果多為無意義的字符。
  • 解決方案:反覆調整資料集,且調整訓練參數,如增加 num_train_epochs,並確保數據清理的完整性。但因為手邊的 GPU 資源有限,無法進行更多的嘗試。

2. 記憶體不足

  • 問題:一次加載過多數據會導致訓練中斷。
  • 解決方案:使用 chunksize 分塊處理數據,並對數據集進行分批訓練。

3. 測試時翻譯錯誤

  • 問題:測試時日文和英文拼接格式影響翻譯效果。
  • 解決方案:對測試文本進行標準化,並確保訓練時使用相同格式。

結語

這次專案深刻體會到機器學習專案中數據清理的重要性,以及如何通過調整超參數來提升模型表現。
希望這篇文章能為有興趣開發遊戲語系翻譯模型的朋友提供一些啟發!