原文轉載自「劉悦的技術博客」https://v3u.cn/a_id_218
分治算法是一種很古老但很務實的方法。本意即使將一個較大的整體打碎分成小的局部,這樣每個小的局部都不足以對抗大的整體。戰國時期,秦國破壞合縱的連橫即是一種分而治之的手段;十九世紀,比利時殖民者佔領盧旺達, 將盧旺達的種族分為胡圖族與圖西族,以圖進行分裂控制,莫不如是。
21世紀,人們往往會在Leetcode平台上刷分治算法題,但事實上,從工業角度上來看,算法如果不和實際業務場景相結合,算法就永遠是虛無縹緲的存在,它只會出現在開發者的某一次不經意的面試中,而真實的算法,並不是虛空的,它應該能幫助我們解決實際問題,是的,它應該落地成為實體。
大文件分片上傳就是這樣一個契合分治算法的場景,現而今,視頻文件的體積越來越大,高清視頻體積大概2-4g不等,但4K視頻的分辨率是標準高清的四倍,需要四倍的存儲空間——只需兩到三分鐘的未壓縮4K 電影,或者電影預告片的長度,就可以達到500GB。 8K視頻文件更是大得難以想象,而現在12K正在出現,如此巨大的文件,該怎樣設計一套合理的數據傳輸方案?這裏我們以前後端分離項目為例,前端使用Vue.js3.0配合ui庫Ant-desgin,後端採用併發異步框架Tornado實現大文件的分片無阻塞傳輸與異步IO寫入服務。
前端分片
首先,安裝Vue3.0以上版本:
npm install -g @vue/cli
安裝異步請求庫axios:
npm install axios --save
隨後,安裝Ant-desgin:
npm i --save ant-design-vue@next -S
Ant-desgin雖然因為曾經的聖誕節“彩蛋門”事件而聲名狼藉,但客觀地説,它依然是業界不可多得的優秀UI框架之一。
接着在項目程序入口文件引入使用:
import { createApp } from 'vue'
import App from './App.vue'
import { router } from './router/index'
import axios from 'axios'
import qs from 'qs'
import Antd from 'ant-design-vue';
import 'ant-design-vue/dist/antd.css';
const app = createApp(App)
app.config.globalProperties.axios = axios;
app.config.globalProperties.upload_dir = "https://localhost/static/";
app.config.globalProperties.weburl = "http://localhost:8000";
app.use(router);
app.use(Antd);
app.mount('#app')
隨後,參照Ant-desgin官方文檔:https://antdv.com/components/... 構建上傳控件:
<a-upload
@change="fileupload"
:before-upload="beforeUpload"
>
<a-button>
<upload-outlined></upload-outlined>
上傳文件
</a-button>
</a-upload>
注意這裏需要將綁定的before-upload強制返回false,設置為手動上傳:
beforeUpload:function(file){
return false;
}
接着聲明分片方法:
fileupload:function(file){
var size = file.file.size;//總大小
var shardSize = 200 * 1024; //分片大小
this.shardCount = Math.ceil(size / shardSize); //總片數
console.log(this.shardCount);
for (var i = 0; i < this.shardCount; ++i) {
//計算每一片的起始與結束位置
var start = i * shardSize;
var end = Math.min(size, start + shardSize);
var tinyfile = file.file.slice(start, end);
let data = new FormData();
data.append('file', tinyfile);
data.append('count',i);
data.append('filename',file.file.name);
const axiosInstance = this.axios.create({withCredentials: false});
axiosInstance({
method: 'POST',
url:'http://localhost:8000/upload/', //上傳地址
data:data
}).then(data =>{
this.finished += 1;
console.log(this.finished);
if(this.finished == this.shardCount){
this.mergeupload(file.file.name);
}
}).catch(function(err) {
//上傳失敗
});
}
}
具體分片邏輯是,大文件總體積按照單片體積的大小做除法並向上取整,獲取到文件的分片個數,這裏為了測試方便,將單片體積設置為200kb,可以隨時做修改。
隨後,分片過程中使用Math.min方法計算每一片的起始和結束位置,再通過slice方法進行切片操作,最後將分片的下標、文件名、以及分片本體異步發送到後台。
當所有的分片請求都發送完畢後,封裝分片合併方法,請求後端發起合併分片操作:
mergeupload:function(filename){
this.myaxios(this.weburl+"/upload/","put",{"filename":filename}).then(data =>{
console.log(data);
});
}
至此,前端分片邏輯就完成了。
後端異步IO寫入
為了避免同步寫入引起的阻塞,安裝aiofiles庫:
pip3 install aiofiles
aiofiles用於處理asyncio應用程序中的本地磁盤文件,配合Tornado的異步非阻塞機制,可以有效的提升文件寫入效率:
import aiofiles
# 分片上傳
class SliceUploadHandler(BaseHandler):
async def post(self):
file = self.request.files["file"][0]
filename = self.get_argument("filename")
count = self.get_argument("count")
filename = '%s_%s' % (filename,count) # 構成該分片唯一標識符
contents = file['body'] #異步讀取文件
async with aiofiles.open('./static/uploads/%s' % filename, "wb") as f:
await f.write(contents)
return {"filename": file.filename,"errcode":0}
這裏後端獲取到分片實體、文件名、以及分片標識後,將分片文件以文件名\_分片標識的格式異步寫入到系統目錄中,以一張378kb大小的png圖片為例,分片文件應該順序為200kb和178kb,如圖所示:
當分片文件都寫入成功後,觸發分片合併接口:
import aiofiles
# 分片上傳
class SliceUploadHandler(BaseHandler):
async def post(self):
file = self.request.files["file"][0]
filename = self.get_argument("filename")
count = self.get_argument("count")
filename = '%s_%s' % (filename,count) # 構成該分片唯一標識符
contents = file['body'] #異步讀取文件
async with aiofiles.open('./static/uploads/%s' % filename, "wb") as f:
await f.write(contents)
return {"filename": file.filename,"errcode":0}
async def put(self):
filename = self.get_argument("filename")
chunk = 0
async with aiofiles.open('./static/uploads/%s' % filename,'ab') as target_file:
while True:
try:
source_file = open('./static/uploads/%s_%s' % (filename,chunk), 'rb')
await target_file.write(source_file.read())
source_file.close()
except Exception as e:
print(str(e))
break
chunk = chunk + 1
self.finish({"msg":"ok","errcode":0})
這裏通過文件名進行尋址,隨後遍歷合併,注意句柄寫入模式為增量字節碼寫入,否則會逐層將分片文件覆蓋,同時也兼具了斷點續寫的功能。有些邏輯會將分片個數傳入後端,讓後端判斷分片合併個數,其實並不需要,因為如果尋址失敗,會自動拋出異常並且跳出循環,從而節約了一個參數的帶寬佔用。
輪詢服務
在真實的超大文件傳輸場景中,由於網絡或者其他因素,很可能導致分片任務中斷,此時就需要通過降級快速響應,返回託底數據,避免用户的長時間等待,這裏我們使用基於Tornado的Apscheduler庫來調度分片任務:
pip install apscheduler
隨後編寫job.py輪詢服務文件:
from datetime import datetime
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import RequestHandler, Application
from apscheduler.schedulers.tornado import TornadoScheduler
scheduler = None
job_ids = []
# 初始化
def init_scheduler():
global scheduler
scheduler = TornadoScheduler()
scheduler.start()
print('[Scheduler Init]APScheduler has been started')
# 要執行的定時任務在這裏
def task1(options):
print('{} [APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), options))
class MainHandler(RequestHandler):
def get(self):
self.write('<a href="/scheduler?job_id=1&action=add">add job</a><br><a href="/scheduler?job_id=1&action=remove">remove job</a>')
class SchedulerHandler(RequestHandler):
def get(self):
global job_ids
job_id = self.get_query_argument('job_id', None)
action = self.get_query_argument('action', None)
if job_id:
# add
if 'add' == action:
if job_id not in job_ids:
job_ids.append(job_id)
scheduler.add_job(task1, 'interval', seconds=3, id=job_id, args=(job_id,))
self.write('[TASK ADDED] - {}'.format(job_id))
else:
self.write('[TASK EXISTS] - {}'.format(job_id))
# remove
elif 'remove' == action:
if job_id in job_ids:
scheduler.remove_job(job_id)
job_ids.remove(job_id)
self.write('[TASK REMOVED] - {}'.format(job_id))
else:
self.write('[TASK NOT FOUND] - {}'.format(job_id))
else:
self.write('[INVALID PARAMS] INVALID job_id or action')
if __name__ == "__main__":
routes = [
(r"/", MainHandler),
(r"/scheduler/?", SchedulerHandler),
]
init_scheduler()
app = Application(routes, debug=True)
app.listen(8888)
IOLoop.current().start()
每一次分片接口被調用後,就建立定時任務對分片文件進行監測,如果分片成功就刪除分片文件,同時刪除任務,否則就啓用降級預案。
結語
分治法對超大文件進行分片切割,同時併發異步發送,可以提高傳輸效率,降低傳輸時間,和之前的一篇:聚是一團火散作滿天星,前端Vue.js+elementUI結合後端FastAPI實現大文件分片上傳,邏輯上有異曲同工之妙,但手法上卻略有不同,確是頗有相互借鏡之處,最後代碼開源於Github:https://github.com/zcxey2911/...\_Vuejs3\_Edu,與眾親同饗。
原文轉載自「劉悦的技術博客」 https://v3u.cn/a_id_218