一.項目背景
隨着全民健身的深入與健身文化的普及,以引體向上為代表的自重訓練,因其便捷性與高效性,成為衡量個人基礎力量與身體素質的重要標誌,廣泛應用於學校體測、軍事訓練及大眾健身。然而,傳統的動作評估高度依賴教練員的肉眼觀察與主觀經驗,存在標準不一、反饋延遲、難以量化等侷限性。在缺少專業指導的環境中,訓練者往往難以察覺自身動作模式的細微偏差,如借力、擺動、幅度不足等,這不僅影響訓練效果,長期更可能導致運動損傷。如何將人工智能與計算機視覺技術,轉化為每個人觸手可及的“AI教練”,提供客觀、即時、精準的動作反饋,已成為提升科學化訓練水平的一個迫切需求。
二.項目概述:
本項目旨在開發一套基於計算機視覺的智能引體向上動作分析與評估系統。系統通過訓練者上傳的視頻,運用先進的人體姿態估計算法,自動識別並追蹤身體關鍵點。針對引體向上動作的複雜性,我們創新性地構建了雙視角協同分析框架:正面視角專注於分析握距對稱性、身體穩定性和左右平衡,確保動作的規範與基礎架構;側面視角則着重評估動作的完整性、軀幹角度與發力模式,判斷動作幅度與效率。通過多維度量化指標,系統能夠自動分解動作週期、識別違規代償,並生成直觀的可視化報告與改進建議。最終,本項目致力於打造一個低成本、高精度的自動化評估工具,為個人訓練者、體育教育及專業機構提供一種數據驅動的科學訓練輔助解決方案。、
三.項目分工:
- 蔡駿:負責用户界面前端所需前端功能的構建。
- 趙雅萱:負責管理員系統構建。
- 薛雨晨:實現功能部署到服務器的使用,以及前後端接口的書寫修訂。
- 海米沙:墨刀進行原型設計,實時記錄市場調研結果並彙報分析需求,項目logo及產品名稱設計,進行軟件測試。
- 謝文傑:負責正面評分標準制定,搭建知識庫。
- 賴翊煊:負責側面評分標準制定,API接口接入AI
- 葉騁愷:負責數據庫方面創建與設計
- 尚子琪:負責進行爬蟲爬取對應相關視頻,進行軟件測
四.個人貢獻
本人主要負責後端代碼的編寫,包括功能接口編寫,資源路由,作業管理與數據庫調用。同時負責所有功能整合與調試,服務部署等工作。
4.1 後端服務
後端服務器使用了flask服務器進行功能開發,在部署階段使用了gunicorn服務器進行業務處理。使用了flask的藍圖對項目各個模塊進行劃分,便於開發調試。認證管理使用jwt工具對安全頁面進行保護。後端代碼結構:
│ gunicorn.conf.py
│ run.py
│
├─app
│ │ admin.py
│ │ auth.py
│ │ celery_app.py
│ │ config.py
│ │ config.template.py
│ │ database.py
│ │ example.py
│ │ feedback.py
│ │ history.py
│ │ redis_manager.py
│ │ requirements.txt
│ │ train_plan.py
│ │ upload.py
│ │ init.py
│ │
│ ├─api
│ │ celery_tasks.py
│ │ mysql.py
│ │ opengauss.py
│ │ process_front.py
│ │ process_front.py.back
│ │ process_side.py
│ │ tools.py
│ │ init.py
│
│
├─docker
│ │ Dockerfile
│ │
│ ├─celery
│ │ Dockerfile
│ │
│ └─server
│ Dockerfile
│
├─example
│ ├─thumbnail
│ └─video
└─uploads
4.1.1 登錄模塊
這個模塊註冊了/auth路由,所有涉及認證的內容都走這個路由,主要功能如下:
- login:負責接收登錄表單並進行身份認證,在認證成功後返回短期token。
- register:負責接收註冊表單,對信息進行核查,無誤後寫入數據庫。
- profile:負責個人信息的獲取,通過token進行身份驗證,通過後返回用户信息,前端自動登錄與自動跳轉是通過這個接口進行驗證的。
- change_password:用於用户密碼的修改,驗證用户信息通過後進行修改操作。
- update_simple_profile:用於用户基本信息的修改。
- refresh:刷新token使用,使用户在登錄活躍期不會因為token過期導致重新登錄。
這個模塊邏輯都比較簡單,就不多説明了。
auth.py
import datetime
import hashlib
from flask import Flask, request, jsonify,Blueprint
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity, get_jwt
from jwt import ExpiredSignatureError, InvalidTokenError
from .api.tools import error_response, success_response, hash_password
auth_bp = Blueprint('auth', __name__, url_prefix='/auth')
from .database import get_db
from . import config
system=get_db(config.DataBase_Name)
@auth_bp.route('/login', methods=[ 'POST'])
def login():
try:
data = request.get_json()
if not data:
return error_response('請求數據不能為空')
username = data.get('username', '').strip()
password = data.get('password', '').strip()
if not username or not password:
return error_response('用户名和密碼不能為空')
# 驗證用户
user = system.user_manager.get_user_by_username(username)
if not user:
return error_response('用户不存在',400)
hashed_password = hash_password(password)
if user['password'] != hashed_password:
return error_response('密碼錯誤',401)
# 添加登錄記錄
system.login_manager.add_login_record(user['id'])
# 生成JWT令牌
access_token = create_access_token(identity=str(user['id']))
return success_response(
data={
'token': access_token,
'user': {
'id': user['id'],
'username': user['username'],
'role': user['role'],
'height': user['height'],
'weight': user['weight']
}
},
message='登錄成功'
)
except Exception as e:
return error_response(f'登錄失敗: {str(e)}', 500)
@auth_bp.route('/register', methods=['POST'])
def register():
"""用户註冊"""
try:
data = request.get_json()
if not data:
return error_response('請求數據不能為空')
username = data.get('username', '').strip()
password = data.get('password', '').strip()
height = data.get('height')
weight = data.get('weight')
# 驗證必填字段
if not username or not password:
return error_response('用户名和密碼不能為空')
if len(username) < 3:
return error_response('用户名至少3位')
if len(password) < 6:
return error_response('密碼至少6位')
# 檢查用户是否已存在
existing_user = system.user_manager.get_user_by_username(username)
if existing_user:
return error_response('用户名已存在')
# 創建用户
hashed_password = hash_password(password)
if system.user_manager.add_user(username, hashed_password, height, weight):
return success_response(message='註冊成功')
else:
return error_response('註冊失敗')
except Exception as e:
return error_response(f'註冊失敗: {str(e)}', 500)
@auth_bp.route('/profile', methods=['GET'])
@jwt_required()
def get_profile():
"""獲取用户信息"""
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
user=system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
return success_response(
data={
'id': user['id'],
'username': user['username'],
'role': user['role'],
'height': user['height'],
'weight': user['weight']
}
)
except Exception as e:
return error_response(f'獲取用户信息失敗: {str(e)}', 500)
@auth_bp.route('/change_password', methods=['PUT'])
@jwt_required()
def change_password():
try:
user_id = get_jwt_identity()
data = request.get_json()
if not data:
return error_response('請求數據不能為空')
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
password=data.get('password')
hashed_password = hash_password(password)
# print(password)
username=None
height =None
weight =None
if system.user_manager.update_user(user['id'],username, hashed_password, height, weight):
return success_response(message='更新成功')
return error_response('更新失敗')
except Exception as e:
return error_response(f'更新失敗: {str(e)}', 500)
@auth_bp.route('/update_simple_profile', methods=['PUT'])
@jwt_required()
def update_simple_profile():
"""更新用户信息"""
try:
user_id = get_jwt_identity()
data = request.get_json()
if not data:
return error_response('請求數據不能為空')
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
password=None
username=data.get('username', user['username']).strip()
height = data.get('height')
weight = data.get('weight')
if system.user_manager.update_user(user['id'],username, password, height, weight):
return success_response(message='更新成功')
return error_response('更新失敗')
except Exception as e:
return error_response(f'更新失敗: {str(e)}', 500)
@auth_bp.route('/refresh',methods=['GET'])
@jwt_required()
def refresh():
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
system.login_manager.add_login_record(user['id'])
access_token = create_access_token(identity=str(user['id']))
return success_response(
data={
'token': access_token,
},
message='刷新token'
)
except Exception as e:
return error_response(f'服務錯誤: {str(e)}', 500)
4.1.2 示例視頻模塊
這個模塊負責精選視頻等靜態資源路由,父路由為/api。
- media/videos:負責獲取所有精選視頻信息,包括名稱,鏈接,縮略圖,時長等信息。
- video/<filename>: 負責獲取某個指定視頻。
- thumbnail/<filename>:負責獲取視頻對應的縮略圖。
主要問題:相對路徑難以控制,絕對路徑又不合適。通過定位到項目目錄再往下補目錄解決的。
example.py
import datetime
import hashlib
import os
from pathlib import Path
from . import config
from flask import Flask, request, jsonify, Blueprint, send_file,send_from_directory
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity, get_jwt
from jwt import ExpiredSignatureError, InvalidTokenError
from .api.tools import error_response, success_response, hash_password
example_bp = Blueprint('example', __name__, url_prefix='/api')
from .database import get_db
system=get_db(config.DataBase_Name)
@example_bp.route('/media/videos', methods=['GET'])
@jwt_required()
def get_media_videos():
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
videos=system.video_manager.get_all_video_records()
return success_response(data=videos)
@example_bp.route('/video/<filename>')
def video(filename):
try:
# print(f"請求視頻文件: {filename}")
# 獲取項目根目錄(app 文件夾的父目錄)
base_dir = Path(__file__).parent.parent # 這會指向 app 的父目錄
video_path = base_dir / config.VIDEO_FOLDER / filename
# print(f"視頻完整路徑: {video_path}")
# print(f"路徑是否存在: {video_path.exists()}")
if not video_path.exists():
return jsonify({"error": "視頻文件不存在"}), 404
# 使用 send_file 自動處理範圍請求
# print("使用 send_file 自動處理範圍請求")
response = send_file(
str(video_path), # 轉換為字符串
as_attachment=False,
conditional=True,
mimetype='video/mp4'
)
# 添加必要的響應頭
response.headers['Accept-Ranges'] = 'bytes'
response.headers['Cache-Control'] = 'no-cache'
# print("文件發送成功")
return response
except Exception as e:
print(f"服務器錯誤詳情: {str(e)}")
import traceback
traceback.print_exc()
return jsonify({"error": f"服務器錯誤: {str(e)}"}), 500
@example_bp.route('/thumbnail/<filename>')
def thumbnail(filename):
try:
# print(f"請求視頻文件: {filename}")
# 獲取項目根目錄(app 文件夾的父目錄)
base_dir = Path(__file__).parent.parent # 這會指向 app 的父目錄
thumbnail_path = base_dir / config.THUMBNAIL_FOLDER / filename
print(f"查找文件路徑: {thumbnail_path}")
print(f"文件是否存在: {thumbnail_path.exists()}")
if not thumbnail_path.exists():
return jsonify({"error": "縮略圖不存在"}), 404
# 直接返回圖片文件
return send_file(str(thumbnail_path))
except Exception as e:
print(f"<UNK>: {str(e)}")
# 返回錯誤圖片
return send_file('static/error.jpg', mimetype='image/jpeg')
4.1.3 反饋信息模塊
這個模塊複用父路由/api,負責向服務器發送用户反饋信息並存入數據庫。
- feedback:向數據庫寫入用户反饋的內容。
feedback.py
from flask import Flask, request, jsonify,Blueprint
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity, get_jwt
from jwt import ExpiredSignatureError, InvalidTokenError
from . import config
from .api.tools import error_response, success_response, hash_password
feedback_bp = Blueprint('feedback', __name__, url_prefix='/api')
from .database import get_db
system=get_db(config.DataBase_Name)
@feedback_bp.route('/feedback', methods=['POST'])
@jwt_required()
def feedback():
try:
data=request.get_json()
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
content=data.get('content')
email=data.get('email','')
system.feedback_manager.add_feedback(user['id'],content, email)
return success_response('')
except Exception as e:
return error_response( str(e),400)
4.1.4 歷史記錄模塊
該模塊複用/api路由,負責所有有關歷史記錄信息的增刪改查。
- history:獲取用户的所有歷史記錄。
- history/detail/<id>:獲取具體某條歷史記錄的內容。
history.py
import datetime
import hashlib
import json
import os
import threading
import time
import uuid
import requests
from flask import Flask, request, jsonify, Blueprint, Response
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity, get_jwt
from jwt import ExpiredSignatureError, InvalidTokenError
from werkzeug.utils import secure_filename
from . import config
from .api.tools import error_response, success_response, hash_password
history_bp = Blueprint('history', __name__, url_prefix='/api')
from .database import get_db
system=get_db(config.DataBase_Name)
@history_bp.route('/history', methods=['GET'])
@jwt_required()
def history():
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
historys=system.history_manager.get_history_by_user(user['id'])
# print(historys)
result=[]
for history in historys:
r={}
r['id'] = int(history['id'])
r['project'] = history['project']
r['time'] = history['time'].strftime("%Y-%m-%d %H:%M:%S")
r['score'] = int(history['score'])
r['date'] =history['time'].strftime("%Y-%m-%d")
result.append(r)
# print(result)
data={
'data':result
}
return success_response(data,200)
except Exception as ex:
return error_response(str(ex), 500)
@history_bp.route('/history/detail/<id>', methods=['GET'])
@jwt_required()
def history_detail(id):
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
history = system.history_manager.get_history_records_by_id(int(id))
data={
'project': history['project'],
'time': history['time'].strftime("%Y-%m-%d %H:%M:%S"),
'score': int(history['score']),
'evaluation': history['content'],
}
return success_response(data, 200)
except Exception as ex:
print(ex)
print(system.history_manager.get_history_records_by_id(int(id)))
return error_response(str(ex), 500)
4.1.5 訓練計劃模塊
這個模塊負責所有訓練計劃相關的內容,增刪改查等。
- training-plan:寫入用户的訓練計劃到數據庫。
- training-plan/list:獲取用户所有訓練計劃。
- training-plan/<id>(PUT):修改某條訓練計劃的內容。
- training-plan/<id>(DELETE):刪除某條訓練計劃。
train_plan.py
import datetime
import hashlib
import json
import os
import threading
import time
import uuid
import requests
from flask import Flask, request, jsonify, Blueprint, Response
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity, get_jwt
from jwt import ExpiredSignatureError, InvalidTokenError
from werkzeug.utils import secure_filename
from . import config
from .api.tools import error_response, success_response, hash_password
train_bp = Blueprint('train-plan', __name__, url_prefix='/api')
from .database import get_db
system=get_db(config.DataBase_Name)
@train_bp.route('/training-plan', methods=['POST'])
@jwt_required()
def train_plan():
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
data=request.json
id=system.training_plan_manager.add_training_plan(user['id'],data['date'],data['project'],data['target'],data['note'])
result={
"id": id,
"date": data['date'],
"project": data['project'],
"target": data['target'],
"note": data['note'],
"completed": False,
"actualCount": 0
}
return success_response(message='創建成功',data=result)
except Exception as e:
return error_response(str(e), 500)
@train_bp.route('/training-plan/list',methods=['GET'])
@jwt_required()
def train_plan_list():
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
result=system.training_plan_manager.get_user_plans(user['id'])
data={
'list':result,
'total':len(result)
}
return success_response(message='獲取成功',data=data)
except Exception as e:
print(str(e))
return error_response(str(e), 500)
@train_bp.route('/training-plan/<id>',methods=['PUT'])
@jwt_required()
def train_plan_update(id):
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
data=request.json
target=data.get('target',None)
note=data.get('note',None)
actualCount=data.get('actualCount',None)
completed=data.get('completed',None)
bl=system.training_plan_manager.update_training_plan(id,user['id'],target,note,completed,actualCount)
if bl:
result=system.training_plan_manager.get_plan_by_id(id)
return success_response(message='更新成功',data=result)
else:
return error_response('更新失敗', 201)
except Exception as e:
print(str(e))
return error_response(str(e), 500)
@train_bp.route('/training-plan/<id>',methods=['DELETE'])
@jwt_required()
def train_plan_delete(id):
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
if system.training_plan_manager.delete_training_plan(id,user['id']):
return success_response(message='刪除成功')
else:
return error_response('刪除失敗',201)
except Exception as e:
print(str(e))
return error_response(str(e), 500)
@train_bp.route('/training-plan/trained-dates',methods=['GET'])
@jwt_required()
def train_plan_trained_dates():
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
year=request.args.get('year',None)
month=request.args.get('month',None)
print(year,month)
result=system.training_plan_manager.get_trained_date(user['id'],year,month)
result['date']=result['date']
return success_response(message='獲取成功',data=result)
except Exception as e:
print(str(e))
return error_response(str(e), 500)
4.1.6 上傳模塊
核心業務在這個模塊實現,路由複用/api,實現測評視頻的上傳,任務生成與大模型調用。
- clear:清除當前會話,用於開啓新一輪對話。
- upload:負責視頻接收,生成任務發送往redis,後續任務實現由celery負責。
- evaluate/result/<task_id>:負責任務狀態與結果查詢。
- chat:負責與大模型進行對話,返回流式信息。
- save:負責測評記錄的保存。
這個模塊起初沒有使用redis管理短期數據,後面發現在多線程環境下出現任務丟失的情況,故使用redis專門管理數據。chat功能調用我們在雲上託管的dify聊天機器,管理用户的會話id,上下文由平台自己管理省去了一部分工作,因為我們使用的deepseek api自身不支持多輪對話,需要自行管理上下文,所以使用dify作為我們定製的api。
upload.py
import datetime
import hashlib
import json
import os
import threading
import time
import uuid
from .api import process_side
from .api import process_front
import requests
from flask import Flask, request, jsonify, Blueprint, Response
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity, get_jwt
from jwt import ExpiredSignatureError, InvalidTokenError
from werkzeug.utils import secure_filename
from .api.celery_tasks import process_video_task
from . import config
from .redis_manager import redis_manager
from .api.tools import error_response, success_response, hash_password
upload_bp = Blueprint('upload', __name__, url_prefix='/api')
from .database import get_db
system=get_db(config.DataBase_Name)
os.makedirs(config.UPLOAD_FOLDER, exist_ok=True)
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in config.ALLOWED_EXTENSIONS
# conversation={}
# tasks = {}
#
# user_task={}
# def process_video(task_id,front_video_path, side_video_path):
# try:
# side=process_side.process(side_video_path)
# front,num=process_front.process(front_video_path)
# if side is None or front is None:
# raise Exception('視頻處理失敗,請檢查視頻清晰度或背景顏色')
# result = {
# 'message': front+side,
# }
# # 更新任務狀態
# tasks[task_id]['status'] = 'completed'
# tasks[task_id]['result'] = result
# tasks[task_id]['project']='引體向上'+str(num)+'個'
#
#
# except Exception as e:
# tasks[task_id]['status'] = 'error'
# tasks[task_id]['message'] = str(e)
@upload_bp.route('/clear',methods=['GET'])
@jwt_required()
def clear():
print('clear')
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
# if user_task.get(user_id):
#
# tasks[user_task[user_id]]=None
# user_task[user_id] = None
# if conversation.get(user_id):
#
# conversation[user_id]=''
redis_manager.delete_task(redis_manager.get_user_task(user_id))
redis_manager.clear_conversation(user_id)
return success_response()
@upload_bp.route('/upload', methods=['POST'])
@jwt_required()
def upload():
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
if 'front_video' not in request.files or 'side_video' not in request.files:
return jsonify({'success': False, 'message': '請上傳正面和側面兩個視頻'})
front_video = request.files['front_video']
side_video = request.files['side_video']
# 檢查文件是否選擇
if front_video.filename == '' or side_video.filename == '':
return jsonify({'success': False, 'message': '請選擇視頻文件'})
# 檢查文件格式
if not (allowed_file(front_video.filename) and allowed_file(side_video.filename)):
return jsonify({'success': False, 'message': '不支持的文件格式'})
# 生成任務ID
task_id = str(uuid.uuid4())
# 保存文件
front_filename = secure_filename(front_video.filename)
side_filename = secure_filename(side_video.filename)
front_path = os.path.join(config.UPLOAD_FOLDER, f"{task_id}_front_{front_filename}")
side_path = os.path.join(config.UPLOAD_FOLDER, f"{task_id}_side_{side_filename}")
front_video.save(front_path)
side_video.save(side_path)
redis_manager.create_task(task_id,{
'status': 'processing',
'result': None
})
# 初始化任務狀態
# tasks[task_id] = {
# 'status': 'processing',
# 'result': None
# }
# 在後台處理視頻評估
# thread = threading.Thread(
# target=process_video,
# args=(task_id, front_path, side_path)
# )
# thread.daemon = True
# thread.start()
async_task = process_video_task.delay(task_id, front_path, side_path, user_id)
# 存儲Celery任務ID
redis_manager.update_task(task_id, {
'celery_task_id':async_task.id,
'user_id': user_id,
})
redis_manager.set_user_task(user_id, task_id)
# tasks[task_id]['celery_task_id'] = async_task.id
# tasks[task_id]['user_id'] = user_id
# print('taskid',task_id)
# print('celery_id',tasks[task_id]['celery_task_id'])
print(redis_manager.get_task(task_id))
return jsonify({
'success': True,
'task_id': task_id,
'message': '視頻上傳成功,正在分析中...'
})
@upload_bp.route('/evaluate/result/<task_id>', methods=['GET'])
@jwt_required()
def get_evaluation_result(task_id):
"""
獲取評估結果
"""
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
# if task_id not in tasks:
# return jsonify({'success': False, 'message': '任務不存在'})
# if task_id!=user_task[user_id]:
# return jsonify({'success': False, 'message': '權限不足'})
# task = tasks[task_id]
#
# if task['status'] == 'completed':
# print(task['result'])
# return jsonify({
# 'success': True,
# 'status': 'completed',
# 'result': task['result']
# })
# elif task['status'] == 'processing':
# return jsonify({
# 'success': True,
# 'status': 'processing',
# 'message': '視頻分析中...'
# })
# else:
# return jsonify({
# 'success': False,
# 'status': 'error',
# 'message': '分析過程中出現錯誤'
# })
# 檢查任務是否存在
# if task_id not in tasks:
# return jsonify({'success': False, 'message': '任務不存在'})
#
# # 檢查任務權限
# if tasks[task_id]['user_id'] != user_id:
# return jsonify({'success': False, 'message': '權限不足'})
if redis_manager.get_task(task_id) is None:
return jsonify({'success': False, 'message': '任務不存在'})
# print(redis_manager.get_task(task_id),user_id)
if redis_manager.get_task(task_id)['user_id'] != int(user_id):
return jsonify({'success': False, 'message': '權限不足'})
# task = tasks[task_id]
task=redis_manager.get_task(task_id)
# print(task)
# 如果有Celery任務ID,查詢任務狀態
if 'celery_task_id' in task:
celery_task = process_video_task.AsyncResult(task['celery_task_id'])
if celery_task.ready():
if celery_task.successful():
result = celery_task.result
task['status'] = 'completed'
task['result'] = result.get('result')
task['project'] = result.get('project')
redis_manager.update_task(task_id, task)
else:
task['status'] = 'error'
task['error'] = str(celery_task.result)
if task['status'] == 'completed':
return jsonify({
'success': True,
'status': 'completed',
'result': task['result'],
'project': task.get('project', '')
})
elif task['status'] == 'processing':
return jsonify({
'success': True,
'status': 'processing',
'message': '視頻分析中...'
})
elif task['status'] == 'error':
return jsonify({
'success': False,
'status': 'error',
'message': task.get('error', '分析過程中出現錯誤')
})
else:
return jsonify({
'success': False,
'status': 'unknown',
'message': '未知的任務狀態'
})
@upload_bp.route('/chat', methods=['POST'])
@jwt_required()
def chat_stream():
"""處理聊天請求並返回流式響應"""
data = request.json
user_input = data.get('message')
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
conversation_id=redis_manager.get_conversation(user_id)
# conversation_id=conversation.get(user_id,'')
print(conversation_id)
def generate():
# 調用 Dify API
url = config.DIFY_API_URL
headers = {
"Authorization": f"Bearer {config.DIFY_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"inputs": {},
"query": user_input,
"response_mode": "streaming",
"user": user_id,
"conversation_id": conversation_id
}
# print(conversation.get(user_id,''))
# print(payload)
response = requests.post(url, json=payload, headers=headers, stream=True)
if response.status_code == 200:
for line in response.iter_lines(decode_unicode=True):
# print(line)
if line and line.startswith('data:'):
data = line[5:] # 移除 'data:' 前綴
try:
data_dict = json.loads(data)
# if conversation.get(user_id, '') == '':
# conversation[user_id] = data_dict.get('conversation_id', '')
if redis_manager.get_conversation(user_id) == '':
redis_manager.set_conversation(user_id, data_dict.get('conversation_id',''))
if data_dict['event'] == 'message_end':
# yield f"data: {json.dumps({'event': 'end'})}\n\n"
break
# 提取關鍵信息
event_data = {
'content': data_dict.get('answer', ''),
}
yield f"data: {json.dumps(event_data)}\n\n"
except json.JSONDecodeError:
continue
else:
error_data = {'error': f'請求失敗: {response.status_code}'}
yield f"data: {json.dumps(error_data)}\n\n"
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type'
}
)
@upload_bp.route('/save', methods=['POST'])
@jwt_required()
def save():
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
msg=request.json['message'].replace('*','').replace('#','').replace('-','')
line=msg.split('\n')
message=''
score=None
for line in line:
line = line.strip()
if line.startswith('評分'):
try :
score=int(line[3:].strip())
except ValueError:
continue
elif line !='':
message=message+line+'\n'
# print(score)
# print(message)
project=redis_manager.get_task(redis_manager.get_user_task(user_id))['project']
rid=system.rating_manager.add_rating(score,message)
system.history_manager.add_history_record(user['id'],rid,project)
return success_response('保存記錄成功')
except Exception as e:
return error_response('服務錯誤', 500)
4.1.7 管理員模塊
這個模塊負責處理管理員用户的一些相關操作:包括用户流量分析,用户反饋處理,精選視頻上傳。路由使用/api/admin
- media/upload:負責精選視頻的上傳,生成獨特的文件名與訪問鏈接生成。
- media/videos/<id>(DELETE):負責刪除精選視頻。
- media/videos:獲取所有精選視頻,加了管理員權限驗證。
- dashboard/stats:管理員頁的狀態欄信息獲取,包括單日登錄量、註冊量、待處理反饋與媒體數目。
- dashboard/chart-data:從數據庫獲取用户一週的登錄量與註冊量。
- feedback/pending:獲取所有待處理的用户反饋數量。
- feedback_all:獲取所有待處理的用户反饋內容。
- feedback/<int:id>/process:處理用户反饋。
- feedback/<int:id>/ignore:忽略用户反饋。
這個模塊是後期新增的,所以按部就班的按照前端提供的接口信息編寫的。
admin.py
import datetime
import hashlib
import json
import os
import threading
import time
import uuid
import numpy as np
import requests
from PIL import Image
from flask import Flask, request, jsonify, Blueprint, Response
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity, get_jwt
from jwt import ExpiredSignatureError, InvalidTokenError
from werkzeug.utils import secure_filename
from . import config
from .api.tools import error_response, success_response
from moviepy import VideoFileClip
admin_bp = Blueprint('admin', __name__, url_prefix='/api/admin')
from .database import get_db
system=get_db(config.DataBase_Name)
def allowed_file(filename):
"""檢查文件擴展名是否允許"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in config.ALLOWED_EXTENSIONS
def get_video_duration(file_path):
"""獲取視頻時長(需要安裝moviepy或opencv)"""
try:
# 方法1: 使用moviepy(推薦)
with VideoFileClip(file_path) as video:
return video.duration
except Exception as e:
return 0
def extract_first_frame_moviepy(video_path, thumbnail_path=None, size=(320, 180)):
"""
使用moviepy提取視頻第一幀
Args:
video_path: 視頻文件路徑
thumbnail_path: 縮略圖保存路徑
size: 縮略圖尺寸 (寬, 高)
Returns:
str: 縮略圖保存路徑
"""
try:
# 打開視頻文件
with VideoFileClip(video_path) as video:
# 獲取第一幀
frame = video.get_frame(2) # 0表示第一秒的第一幀
# 轉換為PIL Image
pil_image = Image.fromarray((frame).astype(np.uint8))
# print(pil_image.size)
# 調整大小
pil_image.thumbnail(size, Image.Resampling.LANCZOS)
if thumbnail_path:
pil_image.save(thumbnail_path, quality=100)
return thumbnail_path
else:
return pil_image
except Exception as e:
print(e)
return ''
@admin_bp.route('/media/upload', methods=['POST'])
@jwt_required()
def upload():
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
if user['role'] != 'admin':
return error_response('權限不足', 405)
if 'multipart/form-data' not in request.content_type:
return error_response('請求類型必須是 multipart/form-data',400)
name = request.form.get('name')
annotation = request.form.get('annotation', '')
file = request.files.get('file')
if not name:
return error_response('視頻名稱不能為空',400)
if not file:
return error_response('未提供視頻文件',400)
if not allowed_file(file.filename):
return error_response(f'不支持的文件類型。允許的類型: {", ".join(config.ALLOWED_EXTENSIONS)}',400)
original_filename = secure_filename(file.filename)
file_extension = original_filename.rsplit('.', 1)[1].lower()
# 生成唯一文件名
unique_filename = f"{uuid.uuid4().hex}"
# 7. 保存文件
save_path = os.path.join(config.VIDEO_FOLDER, unique_filename+f'.{file_extension}')
thumbnail_path = os.path.join(config.THUMBNAIL_FOLDER, unique_filename+f'.jpeg')
file.save(save_path)
# print(save_path)
# 獲取文件大小
file_size = os.path.getsize(save_path)
# 8. 獲取視頻信息(如時長)
duration = get_video_duration(save_path)
extract_first_frame_moviepy(save_path,thumbnail_path)
# 9. 生成文件URL(根據你的實際部署環境調整)
file_url = f"/api/video/{unique_filename}.{file_extension}"
thumbnail_url= f"/api/thumbnail/{unique_filename}.jpeg"
id=system.video_manager.add_video(name,annotation,file_size,file_url,duration, thumbnail_url)
if id==-1:
os.remove(save_path)
os.remove(thumbnail_path)
response_data = {
'code': 200,
'message': '上傳成功',
'data': {
'id': id, # 生成簡單ID,實際應該用數據庫ID
'name': name,
'annotation': annotation,
'size': file_size,
'url': file_url,
'duration': duration
}
}
return success_response(data=response_data)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/media/videos/<id>', methods=['DELETE'])
@jwt_required()
def delete_video(id):
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
if user['role'] != 'admin':
return error_response('權限不足', 405)
try:
d=system.video_manager.get_video_records_by_id(id)
url=d['url']
thumbnail_url=d['thumbnail']
file_name=config.VIDEO_FOLDER+'/'+url.split('/')[-1]
thumbnail_name=config.THUMBNAIL_FOLDER+'/'+thumbnail_url.split('/')[-1]
if os.path.exists(thumbnail_name):
os.remove(thumbnail_name)
if os.path.exists(file_name):
os.remove(file_name)
res=system.video_manager.delete_video_record(id)
if res:
return success_response(data=res,message='刪除成功')
return error_response(message='刪除失敗')
except Exception as e:
return error_response(str(e), 500)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/media/videos', methods=['GET'])
@jwt_required()
def get_media_videos():
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
if user['role'] != 'admin':
return error_response('權限不足', 405)
videos = system.video_manager.get_all_video_records()
return success_response(data=videos,message='獲取成功')
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/dashboard/stats', methods=['GET'])
@jwt_required()
def dashboard_stats():
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
if user['role'] != 'admin':
return error_response('權限不足', 405)
loginCount=system.login_manager.get_login_count_today()['logincount']
registerCount=system.user_manager.get_register_count_today()['registercount']
pendingFeedback=len(system.feedback_manager.get_pending_feedback())
mediaFiles=len(system.video_manager.get_all_video_records())
return success_response(data={
'loginCount': loginCount,
'registerCount': registerCount,
'pendingFeedback': pendingFeedback,
'mediaFiles': mediaFiles,
},
message='獲取成功'
)
except Exception as e:
print(e)
print(system.login_manager.get_login_count_today())
return error_response(str(e), 500)
@admin_bp.route('/dashboard/chart-data', methods=['GET'])
@jwt_required()
def dashboard_chart_data():
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
if user['role'] != 'admin':
return error_response('權限不足', 405)
loginCounts=system.login_manager.get_login_count_week()
loginData=[]
dates=[]
for c in loginCounts:
loginData.append(c['logincount'])
dates.append(c['login_date'].strftime('%m-%d'))
registerCounts=system.user_manager.get_register_count_week()
registerData=[]
for c in registerCounts:
registerData.append(c['registercount'])
return success_response(data={
'loginData': loginData,
'registerData': registerData,
'dates': dates,
'totalLogin':sum(loginData),
'totalRegister':sum(registerData)
},message='獲取成功')
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/feedback/pending', methods=['GET'])
@jwt_required()
def feedback_pending():
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
if user['role'] != 'admin':
return error_response('權限不足', 405)
feedbacks = system.feedback_manager.get_pending_feedback()
return success_response(data={
'list': feedbacks,
'totalCount': len(feedbacks),
},message='獲取成功')
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/feedback_all', methods=['GET'])
@jwt_required()
def feedback_all():
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
if user['role'] != 'admin':
return error_response('權限不足', 405)
feedbacks = system.feedback_manager.get_all_feedbacks()
print(feedbacks)
return success_response(
data={
'id': user['id'],
'feedbacks': feedbacks
}
)
except Exception as e:
return error_response(str(e), 400)
@admin_bp.route('/feedback/<int:id>/process', methods=['PUT'])
@jwt_required()
def feedback_process(id):
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
if user['role'] != 'admin':
return error_response('權限不足', 405)
res=system.feedback_manager.upload_feedback(id,"已處理")
if res:
return success_response(data=True,message='處理成功')
else:
return error_response(message='錯誤')
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/feedback/<int:id>/ignore', methods=['PUT'])
@jwt_required()
def feedback_ignore(id):
try:
user_id = get_jwt_identity()
# 通過ID獲取用户信息
# 通過ID獲取用户信息
user = system.user_manager.get_user_by_id(user_id)
if user is None:
return error_response('用户不存在', 404)
if user['role'] != 'admin':
return error_response('權限不足', 405)
res=system.feedback_manager.delete_feedback_by_id(id)
if res:
return success_response(data=True,message='忽略成功')
else:
return error_response(message='錯誤')
except Exception as e:
return error_response(str(e), 500)
4.1.8 作業與數據管理
這裏使用celery+redis的方式來實現在生產環境下任務與一些數據管理,包括用户會話id管理,用户任務管理,celery作業管理。
- app/redis_manager.py:這裏負責編寫用户會話id管理,用户任務管理的實現類與用户id_作業id映射表。
- app/celery_app.py:負責初始化celery作業管理器,配置連接信息,作業狀態字段等。
- app/api/celery_task.py:負責任務具體內容實現,包括正側面視頻的處理,狀態反饋等。
這裏只使用了單個redis節點,沒有使用連接池可能會出現神秘問題,不過暫時沒有發現。
redis_manager.py
# app/redis_manager.py (新增部分)
import json
import redis
from datetime import timedelta
from . import config
class RedisManager:
def __init__(self):
self.redis_client = redis.Redis(
host=config.REDIS_HOST,
port=config.REDIS_PORT,
db=0,
decode_responses=True # 注意:存儲JSON時需特殊處理
)
# ============== 任務字典管理 (核心新增部分) ==============
def create_task(self, task_id, initial_data=None):
"""
創建新任務記錄
initial_data: 初始數據,如 {'status': 'processing', 'user_id': 123}
"""
key = f"task:{task_id}"
data = initial_data or {}
data.setdefault('created_at', self._current_time())
data.setdefault('status', 'pending')
# 使用哈希表存儲,方便更新單個字段
mapping = {k: json.dumps(v, ensure_ascii=False) if isinstance(v, (dict, list)) else str(v)
for k, v in data.items()}
# 設置24小時過期
pipeline = self.redis_client.pipeline()
pipeline.hset(key, mapping=mapping)
pipeline.expire(key, timedelta(hours=24))
pipeline.execute()
return True
def update_task(self, task_id, updates):
"""
更新任務信息(部分字段)
updates: 要更新的字段字典,如 {'status': 'processing', 'progress': 50}
"""
key = f"task:{task_id}"
if not self.redis_client.exists(key):
return False
# 準備更新的字段
update_data = {}
for field, value in updates.items():
if isinstance(value, (dict, list)):
update_data[field] = json.dumps(value, ensure_ascii=False)
else:
update_data[field] = str(value)
if update_data:
self.redis_client.hset(key, mapping=update_data)
return True
def get_task(self, task_id):
"""
獲取完整任務信息
"""
key = f"task:{task_id}"
data = self.redis_client.hgetall(key)
if not data:
return None
# 反序列化JSON字段
result = {}
for field, value in data.items():
# 嘗試解析JSON
if value.startswith('{') or value.startswith('['):
try:
result[field] = json.loads(value)
except json.JSONDecodeError:
result[field] = value
elif value.lower() in ('true', 'false'):
result[field] = value.lower() == 'true'
elif value.isdigit():
result[field] = int(value)
else:
result[field] = value
return result
def delete_task(self, task_id):
"""刪除任務記錄"""
key = f"task:{task_id}"
return self.redis_client.delete(key)
def task_exists(self, task_id):
"""檢查任務是否存在"""
key = f"task:{task_id}"
return self.redis_client.exists(key) > 0
def set_task_result(self, task_id, result, project=None):
"""
專門設置任務結果(完成時調用)
"""
updates = {
'status': 'completed',
'result': result,
'completed_at': self._current_time()
}
if project:
updates['project'] = project
return self.update_task(task_id, updates)
def set_task_error(self, task_id, error_message):
"""
設置任務錯誤狀態
"""
return self.update_task(task_id, {
'status': 'error',
'error': error_message,
'failed_at': self._current_time()
})
# ============== 輔助方法 ==============
def _current_time(self):
"""獲取當前時間字符串"""
from datetime import datetime
return datetime.now().isoformat()
# ============== 之前的用户任務和對話管理方法保持不變 ==============
def set_user_task(self, user_id, task_id, expire_hours=24):
"""關聯用户和其最新的任務ID"""
key = f"user_task:{user_id}"
self.redis_client.setex(key, timedelta(hours=expire_hours), task_id)
return True
def get_user_task(self, user_id):
"""獲取用户最新的任務ID"""
key = f"user_task:{user_id}"
return self.redis_client.get(key)
# ... 其他已有方法保持不變 ...
def set_conversation(self, user_id, conversation_id, expire_hours=24):
"""設置用户的對話ID"""
key = f"conversation:{user_id}"
self.redis_client.setex(key, timedelta(hours=expire_hours), conversation_id)
return True
def get_conversation(self, user_id):
"""獲取用户的對話ID"""
key = f"conversation:{user_id}"
result=self.redis_client.get(key)
return result if result is not None else ''
def clear_conversation(self, user_id):
"""清除用户的對話ID"""
key = f"conversation:{user_id}"
return self.redis_client.delete(key)
# 創建全局實例
redis_manager = RedisManager()
celery_app.py
# app/celery_app.py
from celery import Celery
import os
from . import config
def make_celery(app_name=__name__):
# 從配置文件讀取Redis配置,如果沒有則使用默認值
redis_host = getattr(config, 'REDIS_HOST', 'localhost')
redis_port = getattr(config, 'REDIS_PORT', 6379)
redis_url = f"redis://{redis_host}:{redis_port}/0"
# 注意:include路徑要正確指向任務模塊
celery = Celery(
app_name,
broker=redis_url,
backend=redis_url,
include=['app.api.celery_tasks'] # 修改為絕對路徑
)
# 配置
celery.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Shanghai',
enable_utc=True,
task_track_started=True,
task_time_limit=300, # 5分鐘超時
task_soft_time_limit=240, # 4分鐘軟超時
worker_prefetch_multiplier=1,
worker_max_tasks_per_child=100,
broker_connection_retry_on_startup=True,
# 添加更多配置
task_acks_late=True, # 任務完成後才確認
task_reject_on_worker_lost=True, # worker丟失時重新排隊
)
return celery
# 創建全局Celery實例
celery_app = make_celery()
celery_task.py
# app/api/celery_tasks.py
import os
from app.celery_app import celery_app
from app.api import process_side, process_front
@celery_app.task(bind=True, name='process_video_task')
def process_video_task(self, task_id, front_video_path, side_video_path, user_id):
"""
Celery任務:處理視頻分析
"""
try:
files_to_delete = []
if front_video_path and os.path.exists(front_video_path):
files_to_delete.append(front_video_path)
if side_video_path and os.path.exists(side_video_path):
files_to_delete.append(side_video_path)
# 更新任務進度
self.update_state(
state='PROCESSING',
meta={
'status': 'processing',
'progress': 10,
'message': '開始處理側面視頻...'
}
)
# 處理側面視頻
side = process_side.process(side_video_path)
# 更新進度
self.update_state(
state='PROCESSING',
meta={
'status': 'processing',
'progress': 50,
'message': '側面視頻處理完成,開始處理正面視頻...'
}
)
# 處理正面視頻
front, num = process_front.process(front_video_path)
if side is None or front is None:
raise Exception('視頻處理失敗,請檢查視頻清晰度或背景顏色')
# 構建結果
result = {
'message': front + side,
}
# 更新完成進度
self.update_state(
state='PROCESSING',
meta={
'status': 'processing',
'progress': 90,
'message': '視頻處理完成,正在生成報告...'
}
)
# 這裏可以添加保存結果到數據庫的邏輯
# save_result_to_db(user_id, task_id, result, f'引體向上{num}個')
for file_path in files_to_delete:
try:
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
pass
return {
'status': 'completed',
'result': result,
'project': f'引體向上{num}個',
'progress': 100
}
except Exception as e:
try:
for file_path in [front_video_path, side_video_path]:
if file_path and os.path.exists(file_path):
os.remove(file_path)
except Exception as cleanup_error:
pass
# 記錄錯誤
self.update_state(
state='FAILURE',
meta={
'status': 'error',
'progress': 0,
'error': str(e)
}
)
raise e
4.1.9 數據庫管理
這一部分主要不是我做的,我只稍加修改了驅動部分。我們早期使用了mysql與opengauss做為我們的數據庫,最後我們使用了oceanbase數據庫,由於oceanbase基本適配mysql的驅動與代碼所以二者用起來暫時沒有區別。所以此處只介紹連接驅動的配置。
- mysql使用pymysql作為基本連接驅動,我做了連接池的擴展,連接池工具使用dbutils。
class DatabaseConfig:
"""數據庫配置類"""
def __init__(self, host, user, password, database,port=3306):
self.host = host
self.user = user
self.password = password
self.database = database
self.port = int(port)
class DatabaseConnection:
"""數據庫連接管理"""
def __init__(self, config: DatabaseConfig):
self.config = config
self.connection = None
def get_connection(self):
pool = PersistentDB(
creator=pymysql,
maxusage=1000, # 單個連接最大使用次數
setsession=[], # 可選的會話命令列表
ping=0, # 檢查連接是否可用(0=從不, 1=默認, 2=創建遊標時, 4=執行查詢時, 7=總是)
closeable=False,
threadlocal=None, # 線程局部變量
host=self.config.host,
port=self.config.port,
user=self.config.user,
password=self.config.password,
database=self.config.database,
charset='utf8mb4',
cursorclass=DictCursor
)
return pool.connection()
- opengauss使用psycopg2進行連接,連接池使用dbutils。與pymysql不同,PersistentDB不支持將cursor_factory作為遊標參數傳給psycopg2,所以定製一個creator來適配psycopg2。
class DatabaseConnection:
"""數據庫連接管理"""
def __init__(self, config: DatabaseConfig):
self.config = config
self.connection = None
def get_connection(self):
def create_connection():
conn = psycopg2.connect(
host=self.config.host,
port=self.config.port,
user=self.config.user,
password=self.config.password,
database=self.config.database,
cursor_factory=RealDictCursor
)
return conn
pool = PersistentDB(
creator=create_connection,
maxusage=1000, # 單個連接最大使用次數
setsession=[], # 可選的會話命令列表
ping=0, # 檢查連接是否可用(0=從不, 1=默認, 2=創建遊標時, 4=執行查詢時, 7=總是)
closeable=False,
threadlocal=None, # 線程局部變量
)
return pool.connection()
4.2 整合與調試
這一塊我負責調試後端服務與前端通信,api調試,前端路由管理,安全認證等功能。完成代碼詳見前端部分。
4.2.1 路由管理與api調用
這裏管理前端的路由,與前後端通信時信息的獲取。
- /:這裏設置了重定向到/login
- /login:這裏裝載了login組件頁
- /main:這裏裝載了main組件頁
- /profile:這裏裝載了profile組件頁
- /admin:這裏裝載了admin組件頁
- api.ts:負責常用auth相關的函數
api.ts
import type { LoginData, RegisterData, AuthResponse, User, refreshTokenResponse ,SimpleProfileForm,FeedbackData} from '@/types/auth';
const API_BASE_URL = import.meta.env.VITE_API_BASE_URL;
export const authAPI = {
async login(loginData: LoginData): Promise<AuthResponse> {
const response = await fetch(`${API_BASE_URL}/auth/login`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(loginData),
});
if (response.status !== 200 && response.status !== 401 && response.status !== 400) {
throw new Error('登錄請求失敗');
}
return await response.json();
},
async register(registerData: Omit<RegisterData, 'confirmPassword'>): Promise<AuthResponse> {
const response = await fetch(`${API_BASE_URL}/auth/register`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(registerData),
});
if (!response.ok) {
throw new Error('註冊請求失敗');
}
return await response.json();
},
async verifyToken(token: string): Promise<User> {
const response = await fetch(`${API_BASE_URL}/auth/profile`, {
method: 'GET',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
});
if (!response.ok) {
throw new Error('Token驗證失敗');
}
const data = await response.json();
return data.data;
},
async refreshToken(Token: string): Promise<refreshTokenResponse> {
const response = await fetch(`${API_BASE_URL}/auth/refresh`, {
method: 'GET',
headers: {
'Authorization': `Bearer ${Token}`,
'Content-Type': 'application/json',
},
});
if (!response.ok) {
throw new Error('刷新Token失敗');
}
const data = await response.json();
return data;
},
async update_simple_profile(token: string, userData: SimpleProfileForm): Promise<string> {
const response = await fetch(`${API_BASE_URL}/auth/update_simple_profile`, {
method: 'PUT',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(userData),
});
if (!response.ok) {
throw new Error('更新用户資料失敗');
}
return 'success';
},
async changePassword(token: string, newPassword: string): Promise<string> {
const response = await fetch(`${API_BASE_URL}/auth/change_password`, {
method: 'PUT',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ password: newPassword }),
});
if (!response.ok) {
throw new Error('修改密碼失敗');
}
return 'success';
},
async feedback(token: string, content: FeedbackData): Promise<string> {
const response = await fetch(`${API_BASE_URL}/api/feedback`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(content),
});
if (!response.ok) {
throw new Error('提交反饋失敗');
}
return 'success';
}
};
router/index.ts
// src/router/index.js
import { createRouter, createWebHistory } from 'vue-router'
import Login from '../views/LoginPage.vue'
import Main from '../views/MainPage.vue'
import Admin from '../views/AdminPage.vue'
import profile from '../views/ProfilePage.vue'
import { authAPI } from '@/utils/api'
const routes = [
{
path: '/',
redirect: '/login'
},
{
path: '/login',
name: 'Login',
component: Login
},
{
path: '/main',
name: 'Main',
component: Main,
meta: {
KeepAlive: true
}
},
{
path:'/profile',
name:'Profile',
component: profile,
},
{
path:'/admin',
name:'admin',
component:Admin,
}
]
const router = createRouter({
history: createWebHistory(),
routes
})
const checkAuth = async (token: string) => {
try {
await authAPI.verifyToken(token);
}
catch {
return false
}
return true
}
const token = localStorage.getItem('token')||'';
const valid = await checkAuth(token);
// 添加路由守衞
router.beforeEach((to, from, next) => {
// 檢查是否需要認證
if (to.meta.requiresAuth) {
if (token==='') {
next('/login')
} else {
if (valid) {
next()
} else {
next('/login')
}
}
} else {
next()
}
})
export default router
剩下一些函數基本一個組件就用一次,就沒有編寫成ts文件複用了。調試這一塊沒有什麼特別的內容,前端頁面寫好了測一測功能,看看服務端的log再進行調整。路由管理這一塊我主要負責自動登錄與自動跳轉的設計,大概邏輯就是通過token驗證之後自動登錄到主頁,各個頁面驗證通不過就自動跳轉到登錄頁,避免一些簡單的安全問題。
4.2.2 安全認證
這一塊負責一些權限認證,主要是服務端那邊就做好了,在這裏提一嘴邏輯。還是通過前端發送請求時帶着token進行信息認證,通過後返回相關的信息給前端。否則返回錯誤信息。
4.3 服務部署
這裏使用docker進行各個服務的部署,包括前後端,數據庫,redis等。這裏選擇編寫一個docker compose文件進行編排。同時我們的整個項目結構為:
docker-compose.yml
#docker-compose模板
#默認三個數據庫都會啓動,不需要可以刪除相關的服務,同時在backend和celery-worker的啓動依賴中刪除相關的服務
services:
base:
build:
context: ./server
dockerfile: docker/Dockerfile
image: software-base:latest # 指定鏡像名稱和標籤
front:
build:
context: ./web # 構建上下文為當前目錄[citation:1]
dockerfile: docker/Dockerfile # 指定 Dockerfile 路徑[citation:6]
image: vue-app:1.0 # 支持環境變量設置標籤[citation:4]
container_name: software-web
ports:
- "80:80" # 主機端口:容器端口[citation:1]
networks:
- vue-network
restart: unless-stopped # 自動重啓策略[citation:7]
environment:
- NODE_ENV=production
volumes:
# 掛載日誌文件(可選)
- vue-logs:/var/log/nginx
depends_on:
- backend # 定義服務依賴關係[citation:6]
# 如果需要後端 API,可以在這裏添加
backend:
build:
context: ./server
dockerfile: docker/server/Dockerfile
cache_from:
- software-base:latest
image: server:1.0
container_name: software-server
environment:
- FLASK_ENV=production
- MYSQL_HOST=${MYSQL_HOST}
- MYSQL_USER=root
- MYSQL_PASSWORD=${MYSQL_PASSWORD}
- MYSQL_DATABASE=app_db
- MYSQL_PORT=${MYSQL_PORT}
- GS_PORT=5432
- GS_PASSWORD=${GS_PASSWORD}
- GS_USERNAME=gaussdb
- GS_DATABASE=postgres
- GS_HOST=opengauss_db
- DATABASE=${DATABASE}
- DIFY_API_URL=${DIFY_API_URL}
- DIFY_API_KEY=${DIFY_API_KEY}
- REDIS_HOST=redis
- REDIS_PORT=6379
networks:
- vue-network
restart: unless-stopped # 自動重啓策略[citation:7]
volumes:
- uploads_volume:/app/uploads
depends_on:
- oceanbase
- opengauss_db
- mysql
oceanbase:
image: oceanbase/oceanbase-ce:4.5.0.0-100000012025112711
container_name: software_oceanbase
environment:
- OB_SYS_PASSWORD=${OB_SYS_PASSWORD}
- MINI_MODE=true
ulimits:
nofile:
soft: 20000
hard: 20000
volumes:
- oceanbase_data:/root/obdata
networks:
vue-network:
ipv4_address: 172.18.0.100
entrypoint: ["/bin/bash", "-c"]
command: >
'
# 1. 等待OceanBase自動啓動
/usr/sbin/sshd
/root/boot/start.sh &
echo "等待OceanBase啓動..."
for i in {1..30}; do
if obclient -h127.1 -uroot -P2881 -p"$OB_SYS_PASSWORD" -e "SELECT 1" 2>/dev/null; then
echo "✅ OceanBase已就緒"
break
fi
echo "等待中..."
sleep 5
done
echo "________________________________"
# 2. 執行初始化SQL - EOF必須單獨一行
obclient -h127.1 -uroot -P2881 -p"$OB_SYS_PASSWORD" << "EOF"
SET GLOBAL time_zone = "+08:00";
CREATE DATABASE IF NOT EXISTS app_db;
SELECT "init completed" as status;
EOF
wait
'
mysql:
image: mysql:8.0.17
container_name: software-mysql
restart: always
environment:
MYSQL_ROOT_PASSWORD: lyxsb666
MYSQL_DATABASE: app_db
MYSQL_USER: root
TZ: Asia/Shanghai
networks:
- vue-network
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-uroot", "-ppassword"]
timeout: 10s
retries: 10
start_period: 30s
interval: 10s
volumes:
- mysql_data:/var/lib/mysql
opengauss_db:
image: opengauss/opengauss:7.0.0-RC2.B015-openEuler22.03
container_name: software-opengauss
restart: always
environment:
- GS_PASSWORD=${GS_PASSWORD}
- GS_PORT=5432
- TZ=Asia/Shanghai
- GAUSSDATA=/var/lib/opengauss/data # 數據目錄
- GAUSSLOG=/var/lib/opengauss/log # 日誌目錄
volumes:
- opengauss_data:/var/lib/opengauss
privileged: true
# 為容器分配足夠的內存
shm_size: '512MB'
networks:
- vue-network
redis:
image: redis:7-alpine
container_name: software_redis
# ports:
# - "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
networks:
- vue-network
celery-worker:
build:
context: ./server
dockerfile: docker/celery/Dockerfile
cache_from:
- software-base:latest
container_name: pullup_celery_worker
environment:
- FLASK_ENV=production
- REDIS_HOST=redis
- REDIS_PORT=6379
depends_on:
- redis
- opengauss_db
restart: always
volumes:
- uploads_volume:/app/uploads # 與Web服務共享上傳目錄
- logs_volume:/app/logs
networks:
- vue-network
networks:
# create a network between sandbox, api and ssrf_proxy, and can not access outside.
vue-network:
driver: bridge # 使用橋接網絡[citation:6]
ipam:
config:
- subnet: 172.18.0.0/16 # 使用橋接網絡[citation:6]
volumes:
vue-logs:
mysql_data:
opengauss_data:
uploads_volume:
redis_data:
logs_volume:
oceanbase_data:
4.3.1 前端服務
前端服務首先使用node鏡像進行編譯構建,生成傳統的靜態頁面。再使用nginx鏡像進行生產部署。使用nginx服務器進行一些代理。
- Dockerfile:兩階段構建的指令文件
- nginx.config:nginx配置文件
Dockerfile
# 構建階段
FROM node:24-alpine AS builder
WORKDIR /app
# 複製包文件並安裝依賴
COPY web/package*.json ./
RUN npm install
# 複製源代碼並構建
COPY web/ .
RUN npm run build
# 生產階段
FROM nginx:alpine
# 複製構建好的靜態文件到 Nginx
COPY --from=builder app/dist /usr/share/nginx/html
# 複製自定義 Nginx 配置
COPY docker/nginx.conf /etc/nginx/conf.d/default.conf
EXPOSE 80
CMD ["nginx", "-g", "daemon off;"]
nginx.conf
server {
listen 80;
server_name localhost;
root /usr/share/nginx/html;
index index.html;
# 開啓 gzip 壓縮
gzip on;
gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;
resolver 127.0.0.1 valid=30s;
resolver_timeout 5s;
# 處理 Vue Router 的 History 模式
location / {
try_files $uri $uri/ /index.html;
}
location /server/ {
proxy_pass "http://backend:5000/";
# # 設置代理頭信息
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /server/api/upload {
# 重寫路徑:去掉/server前綴
rewrite ^/server(/.*)$ $1 break;
proxy_pass http://backend:5000;
client_max_body_size 100M;
# 相同的代理頭
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /server/api/admin/media/upload {
# 重寫路徑:去掉/server前綴
rewrite ^/server(/.*)$ $1 break;
proxy_pass http://backend:5000;
client_max_body_size 100M;
# 相同的代理頭
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /server/api/chat {
rewrite ^/server(/.*)$ $1 break;
# 繼承父級配置
proxy_pass http://backend:5000;
# 只添加chat特有的配置,不要破壞代理設置
proxy_buffering off; # 流式傳輸需要關閉緩衝
proxy_read_timeout 300s; # 增加超時時間
# 保持原有的代理頭
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
Dockerfile編寫還是比較簡單的,把相關操作寫進去即可。nginx還是比較難調的,在調試階段出現大文件攔截,流式傳輸卡頓,靜態資源路由失效等情況,通過設置相關路由的代理配置解決。
- 反向代理:我們通過設置反向代理使瀏覽器發送的請求能順利到達後端服務器,同時也能避免開發階段瀏覽器出現的跨域攔截情況。
- 大文件攔截:這裏通過增大/server/api/upload與/server/api/admin/media/upload的body大小限制,即可順利解決ngnix攔截大文件的情況。
- 流式傳輸卡頓:ngnix服務器默認會將數據包自動緩存然後一次發送,所以通過減小緩衝區大小就能解決流式傳輸卡頓的問題。
- 靜態路由失效:雖然服務端設置了靜態資源的訪問路由,但是請求會先經過nginx,所以如果nginx配置了靜態路由,那麼請求就不會發送到服務端,所以配置好靜態文件路由使請求正確發送帶服務端就行了。
4.3.2 基礎鏡像構建
我們的後端服務代碼與celery作業代碼是通用的,所以先進行基礎鏡像的構建,提高資源利用率。我們將基礎鏡像命名為software-base:latest方便後續使用。
Dockerfile
FROM python:3.11-slim
# 設置工作目錄
WORKDIR /app
# 安裝系統依賴
# 先備份並替換默認軟件源
RUN sed -i 's/deb.debian.org/mirrors.tuna.tsinghua.edu.cn/g' /etc/apt/sources.list.d/debian.sources && \
sed -i 's|security.debian.org/debian-security|mirrors.tuna.tsinghua.edu.cn/debian-security|g' /etc/apt/sources.list.d/debian.sources
# 然後更新並安裝軟件包
RUN apt-get update && apt-get install -y gcc libgl1 libglib2.0-0 && rm -rf /var/lib/apt/lists/*
# 複製依賴文件
COPY ./app/requirements.txt .
# 安裝 Python 依賴
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
# 複製應用代碼
RUN echo "----${date}" > /tmp/build.txt
COPY . .
4.3.3 後端與作業服務
- 服務端
這裏使用剛剛構建的基礎鏡像,編寫Dockerfile。
# Dockerfile
FROM software-base:latest
# 暴露端口
EXPOSE 5000
# 啓動命令
CMD ["gunicorn", "-c", "gunicorn.conf.py", "run:app"]
- 作業管理
# Dockerfile
FROM software-base:latest
# 暴露端口
EXPOSE 5000
# 啓動命令
CMD ["celery", "-A", "app.celery_app", "worker", "--loglevel=info", "--concurrency=4"]
backend的環境變量説明:
- FLASK_ENV=production 這個用來指定是生產環境還是開發環境
- MYSQL_HOST=${MYSQL_HOST} 這個用來指定mysql驅動連接的目標host(mysql|oceanbase)
- MYSQL_USER=root 這個用來指定mysql連接的用户
- MYSQL_PASSWORD=${MYSQL_PASSWORD} 這個是mysql連接的密碼
- MYSQL_DATABASE=app_db 這個是連接的數據庫
- MYSQL_PORT=${MYSQL_PORT} 這個是mysql連接的端口號(mysql:3306,oceanbase:2881)
- GS_PORT=5432 這是opengauss的端口號
- GS_PASSWORD=${GS_PASSWORD} opengauss的密碼,需要強密碼
- GS_USERNAME=gaussdb opengauss官方鏡像的默認用户
- GS_DATABASE=postgres opengauss官方鏡像的默認數據庫
- GS_HOST=opengauss_db 這個用來指定opengauss驅動連接的目標host
- DATABASE=${DATABASE} 這個用來指定連接的數據庫驅動類型(mysql|opengauss)
- DIFY_API_URL=${DIFY_API_URL} 這個是dify的api鏈接
- DIFY_API_KEY=${DIFY_API_KEY} 這是dify的連接密鑰
- REDIS_HOST=redis
- REDIS_PORT=6379
4.3.4 redis容器配置
這部分配置內容在docker-compose文件裏寫了,沒有特殊要求。
4.3.5 數據庫配置
這裏做了mysql,opengauss,oceanbase三種數據庫的適配,oceanbase經過禮貌問價後發現財力不足,就也用docker部署了一個單機版。詳細內容在docker-compose文件裏都有。列舉一些遇到的問題;
- opengauss出現密碼強度問題。需要設置強密碼,大小寫,數字與特殊符號。
- opengauss官方鏡像找不到gsql路徑:進入容器後執行
export GAUSSHOME=/usr/local/opengauss
export PATH=$GAUSSHOME/bin:/scws/bin:$PATH
export LD_LIBRARY_PATH=$GAUSSHOME/lib:/scws/lib:$LD_LIBRARY_PATH
export DATAVEC_PQ_LIB_PATH=/usr/local/sra_recall/lib
- oceanbase在重啓後連不上,由於首次啓動後偽集羣會記錄當時的網絡ip,重啓後docker網絡重新分配了一個ip就啓動不了了,所以通過給容器設置靜態ip就可以解決了。
- oceanbase無法執行自定義的初始化命令,只調用鏡像默認的初始化命令,可能是鏡像的原因導致的,不過可以通過修改entrypoint與command實現
4.3.6 華為雲部署
4.3.6.1 系統運行環境配置:
我們通過配置docker容器來簡化環境配置,提高部署效率。
- 數據庫基礎鏡像:opengauss/opengauss:7.0.0-RC2.B015-openEuler22.03
- 後端基礎鏡像:python:3.11-slim 構建software-base
- flask服務端:software-base
- celery作業處理: software-base
- vue前端:構建使用node:24-alpine 生產使用nginx:alpine
- redis:redis:7-alpine
4.3.6.2安裝操作説明
Docker配置:
- 首先我們需要安裝docker,window環境下推薦使用docker desktop(linux安裝docker-engine或docker desktop)。詳見:https://docs.docker.com/engine/install/
- 其次需要拉取鏡像,鏡像見上。使用docker pull ****進行拉取,受限於網絡環境,我們通過上傳離線鏡像的操作進行鏡像拉取:使用docker save -o **.tar nginx:latest打包成壓縮包,使用docker load -i **.tar進行解壓鏡像。
- 代碼拉取:
我們的代碼已經上傳到github上zilv2333/SoftWare: 軟工/數據採集課設 首先確保git的安裝,其次建一個文件夾來存儲代碼。我們在文件夾執行git clone https://github.com/zilv2333/SoftWare 獲取代碼。 - 文件配置:
完善相關文件配置 - 在項目目錄下首先使用docker compose build base構建後端基礎鏡像,然後使用docker compose up --build –d啓動程序們就可以在本機的80端口看到我們的網頁了。
4.3.6.3 展示
這是雲上的鏈接,能正常訪問。
五、心得
通過參與本次項目的開發,我在技術實踐、團隊協作和項目管理方面都獲得了寶貴的經驗。作為後端開發的主要負責人之一,我從最初的系統設計到最終的雲端部署,全程參與了項目的各個關鍵環節,對軟件工程的全流程有了更深刻的理解。
5.1技術層面的收穫
5.1.1微服務架構的實踐
項目中我們採用了前後端分離的微服務架構,前端使用Vue3,後端使用Flask框架。這種架構模式讓我深刻體會到:
- 解耦優勢:前後端獨立開發部署,提高了開發效率,減少了團隊間的依賴。
- 技術棧靈活性:可以根據不同模塊的特點選擇最適合的技術方案。
- 維護便利性:單個服務的更新不會影響整個系統的運行。
5.1.2異步任務處理
面對視頻處理這類計算密集型任務,我引入了Celery + Redis的異步任務隊列方案:
- 用户體驗優化:用户無需長時間等待,後台處理後通過輪詢獲取結果。
- 系統穩定性:即使某個任務失敗,也不會影響整個服務。
- 可擴展性:可以輕鬆增加worker節點來處理更多併發任務。
5.1.3數據庫設計與優化
項目中同時使用MySQL/OpenGauss兩種數據庫,讓我對數據庫設計有了更全面的認識:
- 連接池管理:通過DBUtils實現高效的連接複用,顯著提升了數據庫訪問性能。
- 跨數據庫兼容:編寫了統一的數據庫訪問接口,降低了系統對特定數據庫的依賴。
5.1.4 Docker容器化部署
通過Docker Compose實現了一鍵部署,這個過程中我學到了:
- 環境一致性:確保了開發、測試、生產環境的高度一致。
- 資源隔離:每個服務運行在獨立的容器中,互不干擾。
- 自動化部署:通過編排文件簡化了複雜的多服務部署流程。
5.2 團隊協作經驗
5.2.1 接口設計與協調
作為後端負責人,我需要與前端同學密切配合:
- API文檔先行:在開發前就定義好接口規範,減少了後期的溝通成本。
- 及時溝通:通過定期會議和即時通訊工具,確保前後端進度同步。
- 版本管理:使用Git進行代碼管理,建立分支策略,確保代碼質量。
5.2.2 功能整合與調試
整合各個模塊時,我體會到了:
- 模塊化開發的重要性:清晰的模塊邊界讓整合過程更加順利。
- 自動化測試的必要性:編寫單元測試和集成測試大大提高了代碼質量。
- 問題定位能力:通過日誌分析和調試工具快速定位和解決問題。
5.4 個人成長與反思
5.4.1 技術能力的提升
通過這個項目,我不僅鞏固了Python Web開發的基礎,還學習到了:
- 現代Web開發流程:從需求分析到部署上線的完整流程
- 系統設計能力:如何設計可擴展、易維護的系統架構
- 問題解決能力:面對複雜問題時如何分析、定位和解決
5.4.2 項目管理的體會
雖然主要負責技術實現,但我也對項目管理有了初步認識:
- 時間規劃的重要性:合理的進度安排是項目成功的關鍵
- 需求變更的處理:如何平衡需求變更與項目進度
5.4.3 不足之處與改進方向
回顧整個項目,我也認識到一些需要改進的地方:
- 文檔完善度:項目文檔可以更加詳細和規範
- 測試覆蓋率:需要增加更多的自動化測試
- 性能監控:可以添加更完善的系統監控和告警機制