文章目錄
- 安裝
- 基本概念
- 基礎用法
- 1. 基本解析
- 2. 解析文件中的特定路徑
- 高級用法
- 1. 使用事件解析
- 2. 處理大型 JSON 數組
- 3. 處理嵌套結構
- 4. 錯誤處理和性能優化
- 實際應用場景
- 1. 處理 API 響應流
- 2. 日誌文件分析
我來詳細介紹一下 Python ijson 庫的用法,這是一個用於流式解析大型 JSON 文件的工具。
安裝
pip install ijson
基本概念
ijson 的主要優勢在於它不需要將整個 JSON 文件加載到內存中,而是逐個讀取和解析 JSON 元素。
基礎用法
1. 基本解析
import ijson
# 示例 JSON 數據
json_data = '''
{
"users": [
{"id": 1, "name": "Alice", "age": 25},
{"id": 2, "name": "Bob", "age": 30},
{"id": 3, "name": "Charlie", "age": 35}
],
"total": 3
}
'''
# 方法1: 從字符串解析
items = ijson.items(json_data, 'users.item')
for user in items:
print(f"ID: {user['id']}, Name: {user['name']}, Age: {user['age']}")
# 方法2: 從文件解析
with open('data.json', 'w') as f:
f.write(json_data)
with open('data.json', 'r') as f:
users = ijson.items(f, 'users.item')
for user in users:
print(f"User: {user}")
2. 解析文件中的特定路徑
import ijson
# 假設有 large_data.json 文件
json_structure = '''
{
"company": "Tech Corp",
"departments": [
{
"name": "Engineering",
"employees": [
{"id": 101, "name": "John", "salary": 50000},
{"id": 102, "name": "Jane", "salary": 60000}
]
},
{
"name": "Marketing",
"employees": [
{"id": 201, "name": "Mike", "salary": 45000},
{"id": 202, "name": "Sarah", "salary": 55000}
]
}
]
}
'''
with open('large_data.json', 'w') as f:
f.write(json_structure)
# 解析特定路徑
with open('large_data.json', 'r') as f:
# 獲取公司名稱
company = ijson.items(f, 'company')
print(f"Company: {next(company)}")
with open('large_data.json', 'r') as f:
# 獲取所有部門名稱
dept_names = ijson.items(f, 'departments.item.name')
print("Departments:")
for name in dept_names:
print(f" - {name}")
with open('large_data.json', 'r') as f:
# 獲取所有員工
employees = ijson.items(f, 'departments.item.employees.item')
print("\nAll Employees:")
for emp in employees:
print(f" - {emp['name']} (ID: {emp['id']})")
高級用法
1. 使用事件解析
import ijson
json_data = '''
{
"users": [
{"id": 1, "name": "Alice", "hobbies": ["reading", "swimming"]},
{"id": 2, "name": "Bob", "hobbies": ["gaming", "coding"]}
]
}
'''
with open('events_data.json', 'w') as f:
f.write(json_data)
print("解析事件流:")
with open('events_data.json', 'r') as f:
parser = ijson.parse(f)
current_path = []
for prefix, event, value in parser:
print(f"路徑: {prefix:20} 事件: {event:10} 值: {value}")
# 可以根據事件類型進行特定處理
if event == 'start_array':
print(f"開始數組: {prefix}")
elif event == 'end_array':
print(f"結束數組: {prefix}")
elif event == 'start_map':
print(f"開始對象: {prefix}")
elif event == 'end_map':
print(f"結束對象: {prefix}")
2. 處理大型 JSON 數組
import ijson
import json
# 創建大型測試數據
large_data = {"products": []}
for i in range(1000):
large_data["products"].append({
"id": i + 1,
"name": f"Product {i + 1}",
"price": i * 10 + 5.99,
"category": f"Category {i % 5}",
"in_stock": i % 3 == 0
})
with open('large_products.json', 'w') as f:
json.dump(large_data, f)
# 使用 ijson 流式處理
def process_expensive_products(price_threshold=500.0):
count = 0
with open('large_products.json', 'r') as f:
products = ijson.items(f, 'products.item')
for product in products:
if product['price'] > price_threshold:
count += 1
print(f"高價商品: {product['name']} - ${product['price']:.2f}")
return count
expensive_count = process_expensive_products(500.0)
print(f"\n高價商品總數: {expensive_count}")
# 按類別統計
def count_by_category():
category_count = {}
with open('large_products.json', 'r') as f:
products = ijson.items(f, 'products.item')
for product in products:
category = product['category']
category_count[category] = category_count.get(category, 0) + 1
return category_count
category_stats = count_by_category()
print("\n按類別統計:")
for category, count in category_stats.items():
print(f"{category}: {count} 個商品")
3. 處理嵌套結構
import ijson
complex_data = '''
{
"school": {
"name": "Central High",
"classes": [
{
"grade": "10A",
"students": [
{"name": "Alice", "scores": {"math": 95, "english": 88}},
{"name": "Bob", "scores": {"math": 87, "english": 92}}
]
},
{
"grade": "10B",
"students": [
{"name": "Charlie", "scores": {"math": 78, "english": 85}},
{"name": "Diana", "scores": {"math": 92, "english": 90}}
]
}
]
}
}
'''
with open('school_data.json', 'w') as f:
f.write(complex_data)
# 提取學生數學成績
print("學生數學成績:")
with open('school_data.json', 'r') as f:
students = ijson.items(f, 'school.classes.item.students.item')
for student in students:
print(f"{student['name']}: 數學 {student['scores']['math']}分")
# 提取特定路徑的分數
def get_english_scores():
scores = []
with open('school_data.json', 'r') as f:
# 使用更精確的路徑
parser = ijson.parse(f)
current_student = None
for prefix, event, value in parser:
if prefix.endswith('name') and event == 'string':
current_student = value
elif prefix.endswith('scores.english') and event == 'number':
scores.append((current_student, value))
return scores
english_scores = get_english_scores()
print("\n英語成績:")
for student, score in english_scores:
print(f"{student}: {score}分")
4. 錯誤處理和性能優化
import ijson
import json
import time
def safe_json_parsing(file_path):
"""安全的 JSON 解析函數"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
# 使用 ijson 的基本驗證
try:
# 嘗試解析第一個元素來驗證文件格式
parser = ijson.parse(f)
first_event = next(parser, None)
if first_event is None:
print("文件為空")
return
# 重置文件指針
f.seek(0)
# 實際解析數據
data = ijson.items(f, 'item')
count = 0
for item in data:
count += 1
# 處理每個項目
if count % 1000 == 0:
print(f"已處理 {count} 個項目...")
print(f"總共處理了 {count} 個項目")
except ijson.JSONError as e:
print(f"JSON 解析錯誤: {e}")
except Exception as e:
print(f"其他錯誤: {e}")
except FileNotFoundError:
print(f"文件不存在: {file_path}")
except UnicodeDecodeError:
print("文件編碼錯誤")
# 性能對比測試
def performance_comparison():
"""對比 ijson 和標準 json 庫的性能"""
# 創建測試數據
test_data = [{"id": i, "data": "x" * 100} for i in range(10000)]
with open('test_large.json', 'w') as f:
json.dump(test_data, f)
# 測試標準 json 庫
start_time = time.time()
with open('test_large.json', 'r') as f:
data = json.load(f)
count = len(data)
std_time = time.time() - start_time
# 測試 ijson
start_time = time.time()
with open('test_large.json', 'r') as f:
items = ijson.items(f, 'item')
count = 0
for item in items:
count += 1
ijson_time = time.time() - start_time
print(f"標準 json 庫: {std_time:.4f} 秒")
print(f"ijson 流式解析: {ijson_time:.4f} 秒")
print(f"處理項目數: {count}")
# 運行示例
if __name__ == "__main__":
# 創建測試文件
sample_data = [
{"id": 1, "name": "Item 1", "value": 100},
{"id": 2, "name": "Item 2", "value": 200},
{"id": 3, "name": "name": "Item 3", "value": 300}
]
with open('sample.json', 'w') as f:
json.dump(sample_data, f)
# 測試安全解析
safe_json_parsing('sample.json')
# 性能對比
performance_comparison()
實際應用場景
1. 處理 API 響應流
import ijson
import requests
def stream_large_api_response():
"""處理大型 API 響應"""
url = "https://api.example.com/large-data"
response = requests.get(url, stream=True)
response.raise_for_status()
# 使用 ijson 流式解析響應內容
items = ijson.items(response.raw, 'items.item')
for item in items:
# 處理每個項目,避免內存溢出
process_item(item)
def process_item(item):
"""處理單個數據項"""
print(f"處理: {item.get('id', 'N/A')} - {item.get('name', 'Unnamed')}")
2. 日誌文件分析
import ijson
def analyze_json_logs(log_file_path):
"""分析 JSON 格式的日誌文件"""
error_count = 0
warning_count = 0
with open(log_file_path, 'r') as f:
# 假設每行是一個獨立的 JSON 對象
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
# 解析每行的 JSON
log_entry = ijson.items(line, 'item')
entry = next(log_entry, None)
if entry:
level = entry.get('level', '').lower()
if level == 'error':
error_count += 1
print(f"錯誤在第 {line_num} 行: {entry.get('message')}")
elif level == 'warning':
warning_count += 1
except ijson.JSONError:
print(f"第 {line_num} 行 JSON 格式錯誤")
print(f"\n統計結果:")
print(f"錯誤數: {error_count}")
print(f"警告數: {warning_count}")
# 使用示例
analyze_json_logs('app_logs.json')
ijson 特別適合處理以下幾種情況:
- 非常大的 JSON 文件(GB 級別)
- 網絡流式 JSON 數據
- 只需要提取部分數據的場景
- 內存受限的環境
記住在處理完成後及時關閉文件,並在生產環境中添加適當的錯誤處理。