博客 / 詳情

返回

【AI辦公自動化】如何使用Python實現讀寫文件自動化

在日常工作中,文件操作是最基礎也是最常見的任務之一。無論是批量處理數據文件,還是整理工作文檔,掌握高效的文件讀寫技巧都能極大提升工作效率。本文將介紹幾種實用的Python文件操作方法,幫助你輕鬆應對各種文件處理需求。

使用pathlib庫操作文件

傳統的文件路徑處理往往依賴於os和os.path模塊,代碼繁瑣且平台兼容性差。而Python 3.4引入的pathlib庫提供了面向對象的文件系統路徑處理方式,使代碼更簡潔、更易讀。

基本路徑操作

from pathlib import Path

# 創建路徑對象
file_path = Path('工作報告.docx')
project_dir = Path('/Users/sunlei/projects')

# 路徑拼接(無需擔心斜槓問題)
doc_path = project_dir / 'documents' / file_path
print(doc_path)  # 輸出: /Users/sunlei/projects/documents/工作報告.docx

# 獲取路徑信息
print(doc_path.name)      # 輸出: 工作報告.docx
print(doc_path.suffix)    # 輸出: .docx
print(doc_path.stem)      # 輸出: 工作報告
print(doc_path.parent)    # 輸出: /Users/sunlei/projects/documents

文件重命名與刪除

對文件進行重命名是常見的需求,通過pathlib庫的Path.rename方法可以輕鬆實現對某文件的重命名操作。

from pathlib import Path
import datetime

# 獲取當前日期
today = datetime.date.today().strftime('%Y%m%d')

# 批量重命名文件(添加日期前綴)
def rename_with_date(directory, pattern='*.txt'):
    dir_path = Path(directory)
    for file_path in dir_path.glob(pattern):
        # 構建新文件名
        new_name = f"{today}_{file_path.name}"
        new_path = file_path.with_name(new_name)
        
        # 執行重命名
        file_path.rename(new_path)
        print(f"已重命名: {file_path.name} -> {new_name}")

# 使用示例
rename_with_date('./reports')

Path.unlink方法等價於os.remove方法,用於刪除已存在的文件;Path.rmdir方法等價於os.rmdir方法,用於刪除空的目錄,如果目錄非空,該方法會拋出異常。

# 安全刪除文件
def safe_delete(file_path):
    path = Path(file_path)
    if path.exists():
        if path.is_file():
            path.unlink()
            print(f"已刪除文件: {path}")
        elif path.is_dir() and not any(path.iterdir()):
            path.rmdir()
            print(f"已刪除空目錄: {path}")
        else:
            print(f"目錄非空,無法刪除: {path}")
    else:
        print(f"路徑不存在: {path}")

文件查找與遍歷

使用的listdir()函數返回的只是文件和子文件夾的名稱,而pathlib的glob()函數返回的則是文件和子文件夾的完整路徑對象,更加方便操作。

# 查找所有Excel文件並按修改時間排序
def find_excel_files(directory):
    dir_path = Path(directory)
    excel_files = list(dir_path.glob('**/*.xlsx')) + list(dir_path.glob('**/*.xls'))
    
    # 按修改時間排序
    excel_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
    
    print(f"找到 {len(excel_files)} 個Excel文件:")
    for file in excel_files[:5]:  # 只顯示前5個
        mod_time = datetime.datetime.fromtimestamp(file.stat().st_mtime)
        print(f"{file.name} - 修改時間: {mod_time.strftime('%Y-%m-%d %H:%M')}")
    
    return excel_files

文件讀寫操作

pathlib還提供了簡便的文件讀寫方法,無需傳統的open()函數:

# 讀取文本文件
def read_log_file(log_path):
    path = Path(log_path)
    if path.exists() and path.is_file():
        # 直接讀取文本內容
        content = path.read_text(encoding='utf-8')
        lines = content.split('\n')
        print(f"日誌共 {len(lines)} 行")
        
        # 查找錯誤信息
        error_lines = [line for line in lines if 'ERROR' in line]
        if error_lines:
            print(f"發現 {len(error_lines)} 條錯誤記錄:")
            for line in error_lines[:3]:  # 只顯示前3條
                print(f"- {line}")
    else:
        print(f"日誌文件不存在: {path}")

# 寫入文本文件
def append_to_log(log_path, message):
    path = Path(log_path)
    timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    log_entry = f"[{timestamp}] {message}\n"
    
    # 追加內容到文件
    path.write_text(log_entry, encoding='utf-8') if not path.exists() else path.open('a', encoding='utf-8').write(log_entry)
    print(f"日誌已更新: {path}")

使用zipfile、tarfile壓縮解壓文件

在處理大量文件時,壓縮和解壓是常見的需求。Python中提供了zipfile與tarfile內置庫來分別實現對兩種常見壓縮文件格式的操作。

ZIP文件操作

import zipfile
from pathlib import Path
import os

# 創建ZIP壓縮文件
def create_zip_archive(directory, zip_name=None):
    dir_path = Path(directory)
    
    # 如果沒有指定壓縮包名稱,使用目錄名
    if zip_name is None:
        zip_name = f"{dir_path.name}.zip"
    
    zip_path = dir_path.parent / zip_name
    
    # 創建壓縮文件
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # 遍歷目錄下所有文件
        for file_path in dir_path.rglob('*'):
            if file_path.is_file():
                # 計算相對路徑作為壓縮包內路徑
                rel_path = file_path.relative_to(dir_path.parent)
                zipf.write(file_path, rel_path)
                print(f"已添加: {rel_path}")
    
    print(f"壓縮完成: {zip_path},大小: {zip_path.stat().st_size / 1024:.2f} KB")
    return zip_path

# 解壓ZIP文件
def extract_zip_archive(zip_path, extract_to=None):
    zip_path = Path(zip_path)
    
    # 如果沒有指定解壓目錄,使用當前目錄
    if extract_to is None:
        extract_to = zip_path.parent / zip_path.stem
    
    extract_path = Path(extract_to)
    extract_path.mkdir(exist_ok=True)
    
    with zipfile.ZipFile(zip_path, 'r') as zipf:
        # 獲取壓縮包內文件列表
        file_list = zipf.namelist()
        print(f"壓縮包內共 {len(file_list)} 個文件")
        
        # 解壓所有文件
        zipf.extractall(extract_path)
        print(f"解壓完成: {extract_path}")
    
    return extract_path

TAR文件操作

import tarfile
from pathlib import Path

# 創建TAR壓縮文件
def create_tar_archive(directory, tar_name=None, compression='gz'):
    dir_path = Path(directory)
    
    # 如果沒有指定壓縮包名稱,使用目錄名
    if tar_name is None:
        tar_name = f"{dir_path.name}.tar.{compression}"
    
    tar_path = dir_path.parent / tar_name
    
    # 設置壓縮模式
    mode = f"w:{compression}" if compression else "w"
    
    # 創建壓縮文件
    with tarfile.open(tar_path, mode) as tarf:
        # 添加整個目錄
        tarf.add(dir_path, arcname=dir_path.name)
        print(f"已添加目錄: {dir_path}")
    
    print(f"壓縮完成: {tar_path},大小: {tar_path.stat().st_size / 1024:.2f} KB")
    return tar_path

# 解壓TAR文件
def extract_tar_archive(tar_path, extract_to=None):
    tar_path = Path(tar_path)
    
    # 如果沒有指定解壓目錄,使用當前目錄
    if extract_to is None:
        extract_to = tar_path.parent
    
    extract_path = Path(extract_to)
    extract_path.mkdir(exist_ok=True)
    
    # 自動檢測壓縮格式
    with tarfile.open(tar_path, 'r:*') as tarf:
        # 獲取壓縮包內文件列表
        file_list = tarf.getnames()
        print(f"壓縮包內共 {len(file_list)} 個文件/目錄")
        
        # 解壓所有文件
        tarf.extractall(extract_path)
        print(f"解壓完成: {extract_path}")
    
    return extract_path

實際應用場景

場景一:日誌文件自動歸檔

from pathlib import Path
import zipfile
import datetime
import shutil

def archive_logs(log_dir, days_to_keep=30):
    """自動歸檔超過指定天數的日誌文件"""
    log_path = Path(log_dir)
    today = datetime.datetime.now()
    archive_dir = log_path / 'archives'
    archive_dir.mkdir(exist_ok=True)
    
    # 獲取所有日誌文件
    log_files = list(log_path.glob('*.log'))
    archived_count = 0
    
    for log_file in log_files:
        # 獲取文件修改時間
        mtime = datetime.datetime.fromtimestamp(log_file.stat().st_mtime)
        days_old = (today - mtime).days
        
        # 如果文件超過保留天數,進行歸檔
        if days_old > days_to_keep:
            # 創建年月子目錄
            year_month = mtime.strftime('%Y-%m')
            month_dir = archive_dir / year_month
            month_dir.mkdir(exist_ok=True)
            
            # 創建壓縮文件
            zip_name = f"{log_file.stem}_{mtime.strftime('%Y%m%d')}.zip"
            zip_path = month_dir / zip_name
            
            with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
                zipf.write(log_file, log_file.name)
            
            # 刪除原日誌文件
            log_file.unlink()
            archived_count += 1
            print(f"已歸檔: {log_file.name} -> {zip_path}")
    
    print(f"歸檔完成,共處理 {archived_count} 個日誌文件")
    return archived_count

場景二:批量文件格式轉換

from pathlib import Path
import csv
import json

def convert_csv_to_json(csv_dir, output_dir=None):
    """批量將CSV文件轉換為JSON格式"""
    csv_path = Path(csv_dir)
    
    # 如果沒有指定輸出目錄,在原目錄創建json子目錄
    if output_dir is None:
        output_dir = csv_path / 'json_output'
    
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)
    
    # 獲取所有CSV文件
    csv_files = list(csv_path.glob('*.csv'))
    converted_count = 0
    
    for csv_file in csv_files:
        # 讀取CSV文件
        data = []
        try:
            with open(csv_file, 'r', encoding='utf-8', newline='') as f:
                reader = csv.DictReader(f)
                for row in reader:
                    data.append(row)
            
            # 創建對應的JSON文件
            json_file = output_path / f"{csv_file.stem}.json"
            with open(json_file, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
            
            converted_count += 1
            print(f"已轉換: {csv_file.name} -> {json_file.name}")
        except Exception as e:
            print(f"轉換失敗: {csv_file.name} - {str(e)}")
    
    print(f"轉換完成,共處理 {converted_count} 個CSV文件")
    return converted_count

通過這些實用的代碼示例,你可以輕鬆實現各種文件操作自動化,大幅提高工作效率。無論是日常的文件整理,還是批量的數據處理,這些技巧都能幫你節省大量時間。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.