今日目標
- 掌握文件的打開、讀取和寫入操作
- 理解不同的文件打開模式
- 學習處理CSV、JSON等常見文件格式
- 掌握異常處理的基本語法
- 學習自定義異常
- 瞭解上下文管理器(with語句)的使用
詳細內容
1. 文件基礎操作(90分鐘)
文件操作的基本步驟
- 打開文件
- 讀取或寫入文件
- 關閉文件
打開和關閉文件
# 基本文件操作
# 打開文件
file = open('example.txt', 'r') # 以只讀模式打開
content = file.read() # 讀取文件內容
file.close() # 關閉文件
print(content)
# 使用try-finally確保文件關閉
file = None
try:
file = open('example.txt', 'r')
content = file.read()
print(content)
finally:
if file:
file.close()
# 使用with語句(推薦)
with open('example.txt', 'r') as file:
content = file.read()
print(content)
# 文件會自動關閉,無需手動調用close()
文件打開模式
"""
文件打開模式:
r - 只讀(默認)
w - 寫入,會覆蓋已有文件
a - 追加,在文件末尾添加內容
x - 創建新文件,如果文件已存在則失敗
b - 二進制模式
t - 文本模式(默認)
+ - 讀寫模式
組合模式:
r+ - 讀寫,文件指針在開頭
w+ - 讀寫,會覆蓋已有文件
a+ - 讀寫,文件指針在末尾
"""
文件寫入操作
# 寫入文件(覆蓋模式)
with open('output.txt', 'w') as file:
file.write("Hello, World!\n")
file.write("這是第二行\n")
file.write("這是第三行\n")
print("文件寫入完成")
# 追加模式
with open('output.txt', 'a') as file:
file.write("這是追加的內容\n")
file.write("繼續追加...\n")
print("內容追加完成")
# 寫入多行內容
lines = [
"第一行內容",
"第二行內容",
"第三行內容",
"第四行內容"
]
with open('multiline.txt', 'w') as file:
for line in lines:
file.write(line + '\n')
# 或者使用writelines方法
with open('multiline2.txt', 'w') as file:
file.writelines(line + '\n' for line in lines)
print("多行內容寫入完成")
文件讀取操作
# 讀取整個文件
with open('example.txt', 'r') as file:
content = file.read()
print("整個文件內容:")
print(content)
# 逐行讀取
print("\n逐行讀取:")
with open('example.txt', 'r') as file:
line = file.readline()
while line:
print(f"行: {line.strip()}") # strip()去除換行符
line = file.readline()
# 讀取所有行到列表
print("\n所有行:")
with open('example.txt', 'r') as file:
lines = file.readlines()
for i, line in enumerate(lines, 1):
print(f"第{i}行: {line.strip()}")
# 直接遍歷文件對象(推薦用於大文件)
print("\n直接遍歷:")
with open('example.txt', 'r') as file:
for line_num, line in enumerate(file, 1):
print(f"第{line_num}行: {line.strip()}")
2. 文件位置和編碼(45分鐘)
文件指針操作
# 文件指針操作
with open('example.txt', 'r') as file:
# 獲取當前指針位置
print(f"初始位置: {file.tell()}")
# 讀取前10個字符
content = file.read(10)
print(f"前10個字符: {content}")
print(f"讀取後位置: {file.tell()}")
# 移動指針到開頭
file.seek(0)
print(f"移動後位置: {file.tell()}")
# 讀取第一行
first_line = file.readline()
print(f"第一行: {first_line.strip()}")
# 移動到第5個字節
file.seek(5)
content = file.read(5)
print(f"從第5字節讀取5字節: {content}")
# 使用seek實現隨機訪問
with open('example.txt', 'r') as file:
file.seek(0, 2) # 移動到文件末尾
file_size = file.tell()
print(f"文件大小: {file_size} 字節")
# 讀取最後10個字符
if file_size > 10:
file.seek(-10, 2) # 從末尾向前移動10字節
last_chars = file.read()
print(f"最後10個字符: {last_chars}")
文件編碼處理
# 處理不同編碼的文件
# UTF-8編碼(推薦)
with open('utf8_file.txt', 'w', encoding='utf-8') as file:
file.write("這是UTF-8編碼的文件\n")
file.write("包含中文和English\n")
# 讀取UTF-8文件
with open('utf8_file.txt', 'r', encoding='utf-8') as file:
content = file.read()
print("UTF-8文件內容:")
print(content)
# GBK編碼(中文Windows常用)
with open('gbk_file.txt', 'w', encoding='gbk') as file:
file.write("這是GBK編碼的文件\n")
file.write("中文內容\n")
# 讀取GBK文件
with open('gbk_file.txt', 'r', encoding='gbk') as file:
content = file.read()
print("GBK文件內容:")
print(content)
# 處理編碼錯誤
try:
with open('gbk_file.txt', 'r', encoding='utf-8') as file:
content = file.read()
except UnicodeDecodeError as e:
print(f"編碼錯誤: {e}")
# 使用錯誤處理策略
with open('gbk_file.txt', 'r', encoding='utf-8', errors='ignore') as file:
content = file.read()
print("忽略錯誤後的內容:")
print(content)
3. CSV文件操作(60分鐘)
CSV文件基礎
import csv
# 寫入CSV文件
data = [
['姓名', '年齡', '城市', '職業'],
['張三', '25', '北京', '工程師'],
['李四', '30', '上海', '設計師'],
['王五', '28', '廣州', '教師'],
['趙六', '35', '深圳', '醫生']
]
with open('people.csv', 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerows(data)
print("CSV文件寫入完成")
# 讀取CSV文件
print("\n讀取CSV文件:")
with open('people.csv', 'r', encoding='utf-8') as file:
reader = csv.reader(file)
for row_num, row in enumerate(reader):
print(f"第{row_num}行: {row}")
# 使用字典方式讀寫CSV(推薦)
people = [
{'name': '張三', 'age': 25, 'city': '北京', 'job': '工程師'},
{'name': '李四', 'age': 30, 'city': '上海', 'job': '設計師'},
{'name': '王五', 'age': 28, 'city': '廣州', 'job': '教師'},
{'name': '趙六', 'age': 35, 'city': '深圳', 'job': '醫生'}
]
# 寫入字典CSV
with open('people_dict.csv', 'w', newline='', encoding='utf-8') as file:
fieldnames = ['name', 'age', 'city', 'job']
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader() # 寫入表頭
writer.writerows(people)
print("字典CSV寫入完成")
# 讀取字典CSV
print("\n讀取字典CSV:")
with open('people_dict.csv', 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for person in reader:
print(f"姓名: {person['name']}, 年齡: {person['age']}, 城市: {person['city']}, 職業: {person['job']}")
CSV數據處理
# CSV數據分析和處理
import csv
def analyze_csv(filename):
"""分析CSV文件"""
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
total_age = 0
count = 0
cities = {}
jobs = {}
for person in reader:
age = int(person['age'])
total_age += age
count += 1
# 統計城市
city = person['city']
cities[city] = cities.get(city, 0) + 1
# 統計職業
job = person['job']
jobs[job] = jobs.get(job, 0) + 1
if count > 0:
average_age = total_age / count
print(f"平均年齡: {average_age:.1f}")
print(f"總人數: {count}")
print("\n城市分佈:")
for city, num in cities.items():
print(f" {city}: {num}人")
print("\n職業分佈:")
for job, num in jobs.items():
print(f" {job}: {num}人")
# 測試分析函數
analyze_csv('people_dict.csv')
# 篩選和修改CSV數據
def filter_csv(input_file, output_file, condition_func):
"""篩選CSV數據"""
with open(input_file, 'r', encoding='utf-8') as infile:
reader = csv.DictReader(infile)
with open(output_file, 'w', newline='', encoding='utf-8') as outfile:
writer = csv.DictWriter(outfile, fieldnames=reader.fieldnames)
writer.writeheader()
for person in reader:
if condition_func(person):
writer.writerow(person)
# 篩選年齡大於28歲的人
def age_above_28(person):
return int(person['age']) > 28
filter_csv('people_dict.csv', 'filtered_people.csv', age_above_28)
print("\n篩選後的文件:")
with open('filtered_people.csv', 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for person in reader:
print(f"姓名: {person['name']}, 年齡: {person['age']}")
4. JSON文件操作(45分鐘)
JSON基礎操作
import json
# Python對象
person = {
"name": "張三",
"age": 25,
"city": "北京",
"hobbies": ["讀書", "游泳", "編程"],
"is_student": False,
"education": {
"degree": "本科",
"school": "清華大學"
}
}
# 將Python對象轉換為JSON字符串
json_string = json.dumps(person, ensure_ascii=False, indent=2)
print("JSON字符串:")
print(json_string)
# 將JSON字符串寫入文件
with open('person.json', 'w', encoding='utf-8') as file:
json.dump(person, file, ensure_ascii=False, indent=2)
print("JSON文件寫入完成")
# 從JSON字符串解析Python對象
parsed_person = json.loads(json_string)
print("\n解析後的Python對象:")
print(f"姓名: {parsed_person['name']}")
print(f"年齡: {parsed_person['age']}")
print(f"愛好: {', '.join(parsed_person['hobbies'])}")
# 從JSON文件讀取
with open('person.json', 'r', encoding='utf-8') as file:
loaded_person = json.load(file)
print("\n從文件加載的對象:")
print(f"學校: {loaded_person['education']['school']}")
複雜JSON數據處理
import json
# 處理包含多個對象的JSON
students = [
{
"id": 1,
"name": "小明",
"scores": {"數學": 90, "英語": 85, "物理": 88},
"active": True
},
{
"id": 2,
"name": "小紅",
"scores": {"數學": 95, "英語": 92, "物理": 90},
"active": True
},
{
"id": 3,
"name": "小剛",
"scores": {"數學": 78, "英語": 80, "物理": 75},
"active": False
}
]
# 寫入JSON文件
with open('students.json', 'w', encoding='utf-8') as file:
json.dump(students, file, ensure_ascii=False, indent=2)
print("學生JSON文件寫入完成")
# 讀取和分析JSON數據
def analyze_students(filename):
"""分析學生數據"""
with open(filename, 'r', encoding='utf-8') as file:
students_data = json.load(file)
total_students = len(students_data)
active_students = sum(1 for student in students_data if student['active'])
print(f"總學生數: {total_students}")
print(f"活躍學生數: {active_students}")
# 計算各科平均分
subjects = ['數學', '英語', '物理']
subject_totals = {subject: 0 for subject in subjects}
subject_counts = {subject: 0 for subject in subjects}
for student in students_data:
for subject in subjects:
if subject in student['scores']:
subject_totals[subject] += student['scores'][subject]
subject_counts[subject] += 1
print("\n各科平均分:")
for subject in subjects:
if subject_counts[subject] > 0:
average = subject_totals[subject] / subject_counts[subject]
print(f" {subject}: {average:.1f}")
# 測試分析函數
analyze_students('students.json')
# JSON數據查詢
def find_student_by_id(filename, student_id):
"""根據ID查找學生"""
with open(filename, 'r', encoding='utf-8') as file:
students_data = json.load(file)
for student in students_data:
if student['id'] == student_id:
return student
return None
# 查找學生
found_student = find_student_by_id('students.json', 2)
if found_student:
print(f"\n找到學生: {found_student['name']}")
print(f"數學成績: {found_student['scores']['數學']}")
else:
print("未找到該學生")
5. 異常處理基礎(60分鐘)
基本異常處理
# 基本try-except結構
try:
# 可能引發異常的代碼
number = int(input("請輸入一個數字: "))
result = 10 / number
print(f"10 / {number} = {result}")
except ValueError:
print("錯誤:請輸入有效的數字!")
except ZeroDivisionError:
print("錯誤:不能除以零!")
except Exception as e:
print(f"發生了未知錯誤: {e}")
else:
print("計算成功完成!")
finally:
print("程序執行結束。")
# 多個異常一起處理
try:
file = open('nonexistent.txt', 'r')
content = file.read()
number = int(content)
except (FileNotFoundError, ValueError) as e:
print(f"文件或數值錯誤: {e}")
常見的異常類型
# ValueError - 值錯誤
try:
int("abc") # 不能將字符串轉換為整數
except ValueError as e:
print(f"值錯誤: {e}")
# TypeError - 類型錯誤
try:
"2" + 2 # 字符串和數字不能直接相加
except TypeError as e:
print(f"類型錯誤: {e}")
# IndexError - 索引錯誤
try:
numbers = [1, 2, 3]
print(numbers[5]) # 索引超出範圍
except IndexError as e:
print(f"索引錯誤: {e}")
# KeyError - 鍵錯誤
try:
person = {'name': '張三'}
print(person['age']) # 不存在的鍵
except KeyError as e:
print(f"鍵錯誤: {e}")
# AttributeError - 屬性錯誤
try:
text = "hello"
text.append(" world") # 字符串沒有append方法
except AttributeError as e:
print(f"屬性錯誤: {e}")
# FileNotFoundError - 文件未找到
try:
open('nonexistent_file.txt', 'r')
except FileNotFoundError as e:
print(f"文件未找到: {e}")
主動拋出異常
# 使用raise拋出異常
def calculate_bmi(weight, height):
"""計算BMI"""
if weight <= 0:
raise ValueError("體重必須大於0")
if height <= 0:
raise ValueError("身高必須大於0")
bmi = weight / (height ** 2)
return bmi
# 測試BMI計算
try:
# 正常情況
bmi1 = calculate_bmi(70, 1.75)
print(f"正常BMI: {bmi1:.2f}")
# 異常情況
bmi2 = calculate_bmi(-70, 1.75)
except ValueError as e:
print(f"BMI計算錯誤: {e}")
# 重新拋出異常
def process_number(text):
"""處理數字字符串"""
try:
number = float(text)
return number * 2
except ValueError:
print("在process_number中捕獲到錯誤")
raise # 重新拋出異常
try:
result = process_number("abc")
except ValueError as e:
print(f"主程序中捕獲錯誤: {e}")
6. 自定義異常(30分鐘)
創建自定義異常
# 基礎自定義異常
class InvalidAgeError(Exception):
"""年齡無效異常"""
def __init__(self, age, message="年齡無效"):
self.age = age
self.message = message
super().__init__(self.message)
def __str__(self):
return f"{self.message}: {self.age}"
class UnderAgeError(InvalidAgeError):
"""未成年異常"""
def __init__(self, age):
super().__init__(age, "年齡太小")
class OverAgeError(InvalidAgeError):
"""超齡異常"""
def __init__(self, age):
super().__init__(age, "年齡太大")
# 使用自定義異常
def check_age(age):
"""檢查年齡是否有效"""
if age < 0:
raise InvalidAgeError(age)
elif age < 18:
raise UnderAgeError(age)
elif age > 100:
raise OverAgeError(age)
else:
return True
# 測試自定義異常
ages = [15, 25, 120, -5]
for age in ages:
try:
if check_age(age):
print(f"年齡 {age} 有效")
except UnderAgeError as e:
print(f"未成年錯誤: {e}")
except OverAgeError as e:
print(f"超齡錯誤: {e}")
except InvalidAgeError as e:
print(f"無效年齡錯誤: {e}")
複雜的自定義異常
# 銀行賬户相關的自定義異常
class BankAccountError(Exception):
"""銀行賬户基礎異常"""
pass
class InsufficientFundsError(BankAccountError):
"""餘額不足異常"""
def __init__(self, balance, amount):
self.balance = balance
self.amount = amount
super().__init__(f"餘額不足: 當前餘額{balance},嘗試取款{amount}")
class InvalidAmountError(BankAccountError):
"""無效金額異常"""
def __init__(self, amount):
self.amount = amount
super().__init__(f"無效金額: {amount}")
class AccountNotFoundError(BankAccountError):
"""賬户未找到異常"""
def __init__(self, account_id):
self.account_id = account_id
super().__init__(f"賬户未找到: {account_id}")
# 銀行賬户類
class BankAccount:
def __init__(self, account_id, owner, balance=0):
self.account_id = account_id
self.owner = owner
self.balance = balance
def deposit(self, amount):
"""存款"""
if amount <= 0:
raise InvalidAmountError(amount)
self.balance += amount
return self.balance
def withdraw(self, amount):
"""取款"""
if amount <= 0:
raise InvalidAmountError(amount)
if amount > self.balance:
raise InsufficientFundsError(self.balance, amount)
self.balance -= amount
return self.balance
def transfer(self, amount, to_account):
"""轉賬"""
if not isinstance(to_account, BankAccount):
raise ValueError("目標賬户無效")
self.withdraw(amount) # 可能拋出InsufficientFundsError
to_account.deposit(amount)
return True
# 測試銀行賬户異常
account1 = BankAccount("001", "小明", 1000)
account2 = BankAccount("002", "小紅", 500)
try:
# 正常操作
account1.deposit(200)
print(f"存款後餘額: {account1.balance}")
# 異常操作
account1.withdraw(2000) # 餘額不足
except InsufficientFundsError as e:
print(f"取款失敗: {e}")
try:
account1.transfer(300, account2)
print("轉賬成功")
print(f"賬户1餘額: {account1.balance}")
print(f"賬户2餘額: {account2.balance}")
except BankAccountError as e:
print(f"銀行操作失敗: {e}")
7. 上下文管理器(45分鐘)
with語句的工作原理
# 簡單的上下文管理器示例
class SimpleFileManager:
def __init__(self, filename, mode):
self.filename = filename
self.mode = mode
self.file = None
def __enter__(self):
"""進入上下文時調用"""
self.file = open(self.filename, self.mode)
print(f"打開文件: {self.filename}")
return self.file
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文時調用"""
if self.file:
self.file.close()
print(f"關閉文件: {self.filename}")
# 如果發生異常,返回True表示已處理,False表示繼續傳播
if exc_type:
print(f"發生異常: {exc_type.__name__}: {exc_val}")
return False # 不抑制異常
# 使用自定義上下文管理器
with SimpleFileManager('test.txt', 'w') as file:
file.write("Hello, Context Manager!")
print("文件寫入完成")
print("上下文管理器使用結束")
# 異常情況下的上下文管理器
try:
with SimpleFileManager('test.txt', 'r') as file:
content = file.read()
# 模擬一個錯誤
raise ValueError("測試異常")
except ValueError as e:
print(f"捕獲到異常: {e}")
使用contextlib創建上下文管理器
from contextlib import contextmanager
# 使用裝飾器創建上下文管理器
@contextmanager
def timer_context():
"""計時上下文管理器"""
import time
start_time = time.time()
try:
print("開始計時...")
yield # 在這裏執行代碼塊
finally:
end_time = time.time()
elapsed = end_time - start_time
print(f"代碼執行耗時: {elapsed:.4f}秒")
# 使用計時器
with timer_context():
# 模擬耗時操作
total = 0
for i in range(1000000):
total += i
print(f"計算結果: {total}")
# 帶參數的上下文管理器
@contextmanager
def open_file_context(filename, mode='r'):
"""文件操作上下文管理器"""
file = None
try:
file = open(filename, mode, encoding='utf-8')
print(f"成功打開文件: {filename}")
yield file
except Exception as e:
print(f"文件操作錯誤: {e}")
raise
finally:
if file:
file.close()
print(f"已關閉文件: {filename}")
# 使用帶參數的文件上下文管理器
with open_file_context('example.txt', 'r') as file:
content = file.read()
print(f"文件內容長度: {len(content)}")
8. 綜合示例(60分鐘)
示例1:日誌文件分析器
import re
from datetime import datetime
class LogAnalyzer:
def __init__(self, log_file):
self.log_file = log_file
self.stats = {
'total_entries': 0,
'error_count': 0,
'warning_count': 0,
'info_count': 0,
'start_time': None,
'end_time': None
}
self.errors = []
def analyze(self):
"""分析日誌文件"""
try:
with open(self.log_file, 'r', encoding='utf-8') as file:
for line_num, line in enumerate(file, 1):
self._process_line(line.strip(), line_num)
self._generate_report()
except FileNotFoundError:
print(f"錯誤:日誌文件 {self.log_file} 未找到")
except Exception as e:
print(f"分析日誌時發生錯誤: {e}")
def _process_line(self, line, line_num):
"""處理單行日誌"""
if not line:
return
self.stats['total_entries'] += 1
# 簡單的日誌級別檢測
if 'ERROR' in line:
self.stats['error_count'] += 1
self.errors.append((line_num, line))
elif 'WARNING' in line:
self.stats['warning_count'] += 1
elif 'INFO' in line:
self.stats['info_count'] += 1
# 提取時間戳(簡化版)
timestamp_match = re.search(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', line)
if timestamp_match:
timestamp_str = timestamp_match.group()
try:
timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
if self.stats['start_time'] is None or timestamp < self.stats['start_time']:
self.stats['start_time'] = timestamp
if self.stats['end_time'] is None or timestamp > self.stats['end_time']:
self.stats['end_time'] = timestamp
except ValueError:
pass
def _generate_report(self):
"""生成分析報告"""
print("=" * 50)
print("日誌分析報告")
print("=" * 50)
print(f"總日誌條目: {self.stats['total_entries']}")
print(f"信息級別: {self.stats['info_count']}")
print(f"警告級別: {self.stats['warning_count']}")
print(f"錯誤級別: {self.stats['error_count']}")
if self.stats['start_time'] and self.stats['end_time']:
duration = self.stats['end_time'] - self.stats['start_time']
print(f"日誌時間範圍: {self.stats['start_time']} 到 {self.stats['end_time']}")
print(f"持續時間: {duration}")
if self.errors:
print(f"\n發現 {len(self.errors)} 個錯誤:")
for line_num, error in self.errors[:5]: # 只顯示前5個錯誤
print(f" 第{line_num}行: {error[:100]}...")
# 計算各級別佔比
if self.stats['total_entries'] > 0:
error_percent = (self.stats['error_count'] / self.stats['total_entries']) * 100
warning_percent = (self.stats['warning_count'] / self.stats['total_entries']) * 100
info_percent = (self.stats['info_count'] / self.stats['total_entries']) * 100
print(f"\n級別分佈:")
print(f" 錯誤: {error_percent:.1f}%")
print(f" 警告: {warning_percent:.1f}%")
print(f" 信息: {info_percent:.1f}%")
# 創建測試日誌文件
test_log_content = """
2024-01-15 10:00:01 INFO Application started
2024-01-15 10:00:15 INFO User login successful
2024-01-15 10:01:23 WARNING Disk space running low
2024-01-15 10:02:45 ERROR Database connection failed
2024-01-15 10:03:12 INFO Retrying connection
2024-01-15 10:03:45 ERROR Connection timeout
2024-01-15 10:04:20 WARNING High memory usage
2024-01-15 10:05:30 INFO Backup completed
"""
with open('test.log', 'w', encoding='utf-8') as file:
file.write(test_log_content)
# 分析日誌
analyzer = LogAnalyzer('test.log')
analyzer.analyze()
示例2:配置文件管理器
import json
import os
from typing import Dict, Any
class ConfigManager:
"""配置文件管理器"""
def __init__(self, config_file='config.json'):
self.config_file = config_file
self.config = self._load_default_config()
self._load_config()
def _load_default_config(self) -> Dict[str, Any]:
"""加載默認配置"""
return {
'app_name': 'My Application',
'version': '1.0.0',
'debug': False,
'database': {
'host': 'localhost',
'port': 5432,
'username': 'admin',
'password': 'password'
},
'logging': {
'level': 'INFO',
'file': 'app.log',
'max_size': '10MB'
}
}
def _load_config(self):
"""從文件加載配置"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r', encoding='utf-8') as file:
file_config = json.load(file)
self._update_config(file_config)
print(f"從 {self.config_file} 加載配置")
else:
print("配置文件不存在,使用默認配置")
except (json.JSONDecodeError, IOError) as e:
print(f"加載配置文件失敗: {e},使用默認配置")
def _update_config(self, new_config: Dict[str, Any], current_config: Dict[str, Any] = None):
"""遞歸更新配置"""
if current_config is None:
current_config = self.config
for key, value in new_config.items():
if (key in current_config and
isinstance(current_config[key], dict) and
isinstance(value, dict)):
# 遞歸更新嵌套字典
self._update_config(value, current_config[key])
else:
# 更新或添加配置項
current_config[key] = value
def save_config(self):
"""保存配置到文件"""
try:
with open(self.config_file, 'w', encoding='utf-8') as file:
json.dump(self.config, file, ensure_ascii=False, indent=2)
print(f"配置已保存到 {self.config_file}")
return True
except IOError as e:
print(f"保存配置失敗: {e}")
return False
def get(self, key: str, default=None):
"""獲取配置值"""
keys = key.split('.')
current = self.config
for k in keys:
if isinstance(current, dict) and k in current:
current = current[k]
else:
return default
return current
def set(self, key: str, value):
"""設置配置值"""
keys = key.split('.')
current = self.config
# 導航到最後一個鍵的父級
for k in keys[:-1]:
if k not in current or not isinstance(current[k], dict):
current[k] = {}
current = current[k]
# 設置值
current[keys[-1]] = value
def show_config(self):
"""顯示當前配置"""
print("當前配置:")
print(json.dumps(self.config, ensure_ascii=False, indent=2))
# 使用配置管理器
def test_config_manager():
"""測試配置管理器"""
config_mgr = ConfigManager('my_app_config.json')
# 顯示初始配置
config_mgr.show_config()
# 修改一些配置
config_mgr.set('debug', True)
config_mgr.set('database.host', '192.168.1.100')
config_mgr.set('database.port', 3306)
config_mgr.set('new_setting.timeout', 30)
print("\n修改後的配置:")
config_mgr.show_config()
# 獲取特定配置
db_host = config_mgr.get('database.host')
timeout = config_mgr.get('new_setting.timeout')
nonexistent = config_mgr.get('nonexistent.key', '默認值')
print(f"\n數據庫主機: {db_host}")
print(f"超時設置: {timeout}")
print(f"不存在的鍵: {nonexistent}")
# 保存配置
config_mgr.save_config()
# 運行測試
if __name__ == "__main__":
test_config_manager()
9. 今日練習(60分鐘)
練習1:文件加密解密工具
import os
class FileEncryptor:
"""簡單的文件加密解密工具"""
def __init__(self, key=13):
self.key = key # 簡單的凱撒密碼偏移量
def _transform_text(self, text, encrypt=True):
"""轉換文本(加密或解密)"""
result = []
key = self.key if encrypt else -self.key
for char in text:
if char.isalpha():
# 處理字母字符
if char.isupper():
base = ord('A')
else:
base = ord('a')
# 應用凱撒密碼
shifted = (ord(char) - base + key) % 26 + base
result.append(chr(shifted))
else:
# 非字母字符保持不變
result.append(char)
return ''.join(result)
def encrypt_file(self, input_file, output_file=None):
"""加密文件"""
if output_file is None:
name, ext = os.path.splitext(input_file)
output_file = f"{name}_encrypted{ext}"
try:
with open(input_file, 'r', encoding='utf-8') as infile:
content = infile.read()
encrypted_content = self._transform_text(content, encrypt=True)
with open(output_file, 'w', encoding='utf-8') as outfile:
outfile.write(encrypted_content)
print(f"文件已加密: {input_file} -> {output_file}")
return True
except Exception as e:
print(f"加密失敗: {e}")
return False
def decrypt_file(self, input_file, output_file=None):
"""解密文件"""
if output_file is None:
if input_file.endswith('_encrypted'):
output_file = input_file.replace('_encrypted', '_decrypted')
else:
name, ext = os.path.splitext(input_file)
output_file = f"{name}_decrypted{ext}"
try:
with open(input_file, 'r', encoding='utf-8') as infile:
content = infile.read()
decrypted_content = self._transform_text(content, encrypt=False)
with open(output_file, 'w', encoding='utf-8') as outfile:
outfile.write(decrypted_content)
print(f"文件已解密: {input_file} -> {output_file}")
return True
except Exception as e:
print(f"解密失敗: {e}")
return False
# 測試加密解密工具
def test_encryptor():
"""測試文件加密解密"""
# 創建測試文件
test_content = """
這是一個測試文件。
This is a test file.
Hello, World!
1234567890
"""
with open('test_original.txt', 'w', encoding='utf-8') as file:
file.write(test_content)
# 創建加密器實例
encryptor = FileEncryptor(key=3)
# 加密文件
encryptor.encrypt_file('test_original.txt')
# 解密文件
encryptor.decrypt_file('test_original_encrypted.txt')
# 驗證結果
with open('test_original_decrypted.txt', 'r', encoding='utf-8') as file:
decrypted_content = file.read()
print("原始內容 == 解密內容:", test_content.strip() == decrypted_content.strip())
# 運行測試
test_encryptor()
練習2:學生成績文件管理器
import json
import csv
class StudentGradeManager:
"""學生成績文件管理器"""
def __init__(self, data_file='students.json'):
self.data_file = data_file
self.students = self._load_data()
def _load_data(self):
"""從文件加載數據"""
try:
with open(self.data_file, 'r', encoding='utf-8') as file:
return json.load(file)
except (FileNotFoundError, json.JSONDecodeError):
return []
def _save_data(self):
"""保存數據到文件"""
try:
with open(self.data_file, 'w', encoding='utf-8') as file:
json.dump(self.students, file, ensure_ascii=False, indent=2)
return True
except IOError as e:
print(f"保存數據失敗: {e}")
return False
def add_student(self, student_id, name):
"""添加學生"""
# 檢查學號是否已存在
for student in self.students:
if student['id'] == student_id:
print(f"錯誤:學號 {student_id} 已存在")
return False
student = {
'id': student_id,
'name': name,
'grades': {}
}
self.students.append(student)
self._save_data()
print(f"成功添加學生: {name} (學號: {student_id})")
return True
def add_grade(self, student_id, subject, grade):
"""添加成績"""
student = self._find_student(student_id)
if not student:
print(f"錯誤:未找到學號 {student_id} 的學生")
return False
try:
grade = float(grade)
if grade < 0 or grade > 100:
raise ValueError("成績必須在0-100之間")
except ValueError as e:
print(f"錯誤:無效的成績 - {e}")
return False
student['grades'][subject] = grade
self._save_data()
print(f"成功為 {student['name']} 添加 {subject} 成績: {grade}")
return True
def _find_student(self, student_id):
"""根據學號查找學生"""
for student in self.students:
if student['id'] == student_id:
return student
return None
def show_student(self, student_id):
"""顯示學生信息"""
student = self._find_student(student_id)
if not student:
print(f"錯誤:未找到學號 {student_id} 的學生")
return
print(f"\n學生信息:")
print(f" 學號: {student['id']}")
print(f" 姓名: {student['name']}")
if student['grades']:
print(" 成績:")
total = 0
for subject, grade in student['grades'].items():
print(f" {subject}: {grade}")
total += grade
average = total / len(student['grades'])
print(f" 平均分: {average:.2f}")
else:
print(" 暫無成績")
def export_to_csv(self, csv_file='students.csv'):
"""導出為CSV文件"""
try:
with open(csv_file, 'w', newline='', encoding='utf-8') as file:
# 收集所有科目
subjects = set()
for student in self.students:
subjects.update(student['grades'].keys())
subjects = sorted(list(subjects))
fieldnames = ['學號', '姓名'] + subjects + ['平均分']
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
for student in self.students:
row = {
'學號': student['id'],
'姓名': student['name']
}
total = 0
count = 0
for subject in subjects:
grade = student['grades'].get(subject, '')
row[subject] = grade
if grade != '':
total += grade
count += 1
if count > 0:
row['平均分'] = f"{total/count:.2f}"
else:
row['平均分'] = ''
writer.writerow(row)
print(f"數據已導出到 {csv_file}")
return True
except Exception as e:
print(f"導出失敗: {e}")
return False
def show_statistics(self):
"""顯示統計信息"""
if not self.students:
print("沒有學生數據")
return
# 收集所有科目
subjects = set()
for student in self.students:
subjects.update(student['grades'].keys())
print("\n=== 成績統計 ===")
print(f"學生總數: {len(self.students)}")
for subject in sorted(subjects):
grades = []
for student in self.students:
if subject in student['grades']:
grades.append(student['grades'][subject])
if grades:
avg = sum(grades) / len(grades)
max_grade = max(grades)
min_grade = min(grades)
print(f"\n{subject}:")
print(f" 平均分: {avg:.2f}")
print(f" 最高分: {max_grade}")
print(f" 最低分: {min_grade}")
print(f" 參考人數: {len(grades)}")
# 測試學生成績管理器
def test_grade_manager():
"""測試學生成績管理器"""
manager = StudentGradeManager()
# 添加學生
manager.add_student('001', '張三')
manager.add_student('002', '李四')
manager.add_student('003', '王五')
# 添加成績
manager.add_grade('001', '數學', 85)
manager.add_grade('001', '英語', 92)
manager.add_grade('002', '數學', 78)
manager.add_grade('002', '英語', 88)
manager.add_grade('003', '數學', 95)
manager.add_grade('003', '英語', 90)
manager.add_grade('003', '物理', 87)
# 顯示學生信息
manager.show_student('001')
# 顯示統計信息
manager.show_statistics()
# 導出為CSV
manager.export_to_csv()
# 運行測試
if __name__ == "__main__":
test_grade_manager()
今日總結
掌握內容
- ✓ 文件的打開、讀取、寫入和關閉操作
- ✓ 不同文件模式的使用場景
- ✓ CSV和JSON文件的讀寫操作
- ✓ 異常處理的基本語法和常見異常類型
- ✓ 自定義異常的創建和使用
- ✓ 上下文管理器和with語句
重點概念
- 文件操作:始終使用with語句確保文件正確關閉
- 異常處理:精確捕獲特定異常,避免過於寬泛的except
- 編碼問題:處理文本文件時注意指定正確的編碼
- 數據持久化:使用JSON、CSV等格式保存和加載數據
最佳實踐
- 文件操作:使用with語句,避免手動管理文件關閉
- 異常處理:具體異常具體處理,記錄有用的錯誤信息
- 數據驗證:在寫入文件前驗證數據的有效性
- 資源管理:確保異常情況下也能正確釋放資源
- 編碼規範:統一使用UTF-8編碼處理文本文件
常見錯誤
- 忘記關閉文件導致資源泄漏
- 文件路徑錯誤或權限不足
- 編碼不匹配導致亂碼
- 異常處理過於寬泛,隱藏了真正的錯誤
- 在finally塊中再次拋出異常
明日預告 第六天:面向對象編程基礎
學習建議
- 多練習文件讀寫操作,理解不同模式的區別
- 嘗試處理各種異常情況,編寫健壯的程序
- 使用自定義異常提高代碼的可讀性
- 掌握上下文管理器的使用,簡化資源管理
- 實踐數據持久化,將程序數據保存到文件
第五天學習完成!你已經掌握了Python文件操作和異常處理的重要技能,這是編寫實用程序的基礎。繼續加油!