效果:

Python批量下載並顯示統計進度_#Python批量下載

Python批量下載並顯示統計進度_sed_02

實現腳本:

downapk_thread_100.py

import requests
import threading
import time
import sys
from queue import Queue
from dataclasses import dataclass
from typing import Dict, List
import json


@dataclass
class ThreadStatus:
    thread_id: int
    current_task: int = 0
    status: str = "空閒"
    progress: float = 0
    speed: float = 0
    downloaded: int = 0
    start_time: float = 0


class SimpleMultiDownloader:
    def __init__(self, url, total_tasks=100, max_threads=10):
        self.url = url
        self.total_tasks = total_tasks
        self.max_threads = max_threads
        self.task_queue = Queue()

        # 初始化線程狀態
        self.threads_status: Dict[int, ThreadStatus] = {}
        for i in range(1, max_threads + 1):
            self.threads_status[i] = ThreadStatus(thread_id=i)

        # 全局統計
        self.global_stats = {
            'completed': 0,
            'failed': 0,
            'total_size': 0,
            'start_time': time.time(),
            'speeds': []
        }
        self.lock = threading.Lock()
        self.display_active = True

    def clear_screen(self):
        """清屏"""
        os.system('cls' if os.name == 'nt' else 'clear')

    def display_status(self):
        """顯示狀態表格"""
        while self.display_active:
            self.clear_screen()

            print("┌" + "─" * 78 + "┐")
            print("│" + "多線程下載監控".center(78) + "│")
            print("├" + "─" * 78 + "┤")

            # 全局統計
            with self.lock:
                completed = self.global_stats['completed']
                failed = self.global_stats['failed']
                total_done = completed + failed
                elapsed = time.time() - self.global_stats['start_time']

                if completed > 0:
                    avg_speed = sum(self.global_stats['speeds']) / len(self.global_stats['speeds'])
                else:
                    avg_speed = 0

            progress = total_done / self.total_tasks * 100
            bar_length = 40
            filled = int(bar_length * progress / 100)
            progress_bar = "█" * filled + "░" * (bar_length - filled)

            print(f"│ 總進度: [{progress_bar}] {progress:6.1f}%")
            print(f"│ 成功: {completed:4d}  失敗: {failed:4d}  總計: {total_done:4d}/{self.total_tasks:4d}")
            print(f"│ 用時: {elapsed:7.1f}s   平均速度: {avg_speed:7.1f} KB/s")
            print("├" + "─" * 78 + "┤")
            print("│ 線程ID │   任務   │       狀態       │   進度   │   速度   │   下載量   │")
            print("├" + "─" * 78 + "┤")

            # 顯示每個線程狀態
            with self.lock:
                for thread_id in sorted(self.threads_status.keys()):
                    status = self.threads_status[thread_id]

                    # 狀態顏色/圖標
                    if status.status == "完成":
                        status_icon = "✅"
                    elif "失敗" in status.status:
                        status_icon = "❌"
                    elif status.status == "下載中":
                        status_icon = "⏬"
                    else:
                        status_icon = "⏳"

                    # 格式化顯示
                    task_str = f"{status.current_task:4d}" if status.current_task > 0 else "等待"
                    status_str = f"{status_icon} {status.status[:12]:12s}"
                    progress_str = f"{status.progress:6.1f}%" if status.progress > 0 else "等待"
                    speed_str = f"{status.speed:7.1f}" if status.speed > 0 else "等待"
                    size_str = f"{status.downloaded / 1024:7.1f}KB" if status.downloaded > 0 else "等待"

                    print(
                        f"│ {thread_id:6d} │ {task_str:8s} │ {status_str:16s} │ {progress_str:8s} │ {speed_str:8s} │ {size_str:10s} │")

            print("└" + "─" * 78 + "┘")
            print("按 Ctrl+C 退出監控 (下載繼續)")

            # 檢查是否完成
            if total_done >= self.total_tasks:
                print("\n✅ 所有任務完成!")
                self.display_active = False
                break

            time.sleep(0.3)

    def download_worker(self, thread_id):
        """工作線程"""
        while True:
            try:
                task_id = self.task_queue.get(timeout=1)

                # 更新線程狀態
                with self.lock:
                    self.threads_status[thread_id].current_task = task_id
                    self.threads_status[thread_id].status = "準備中"
                    self.threads_status[thread_id].start_time = time.time()

                # 執行下載
                success = self.perform_download(thread_id, task_id)

                # 更新全局統計
                with self.lock:
                    if success:
                        self.global_stats['completed'] += 1
                    else:
                        self.global_stats['failed'] += 1

                self.task_queue.task_done()

            except:
                break

    def perform_download(self, thread_id, task_id):
        """執行下載任務"""
        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
                'Accept': '*/*',
                'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
                'Referer': 'https://fly.walkera.cn/'
            }

            with self.lock:
                self.threads_status[thread_id].status = "連接中"

            # 獲取文件大小
            response = requests.get(self.url, headers=headers, stream=True, timeout=30)
            response.raise_for_status()

            # 獲取文件大小
            file_size = int(response.headers.get('content-length', 0))

            downloaded = 0
            start_time = time.time()
            last_update = start_time

            with self.lock:
                self.threads_status[thread_id].status = "下載中"

            # 流式下載
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    downloaded += len(chunk)

                    # 更新顯示(限制更新頻率)
                    current_time = time.time()
                    if current_time - last_update > 0.1:  # 每0.1秒更新一次
                        elapsed = current_time - start_time
                        speed = downloaded / elapsed / 1024 if elapsed > 0 else 0
                        progress = (downloaded / file_size * 100) if file_size > 0 else 0

                        with self.lock:
                            self.threads_status[thread_id].progress = progress
                            self.threads_status[thread_id].speed = speed
                            self.threads_status[thread_id].downloaded = downloaded

                        last_update = current_time

            # 下載完成
            end_time = time.time()
            download_time = end_time - start_time
            speed = downloaded / download_time / 1024 if download_time > 0 else 0

            with self.lock:
                self.threads_status[thread_id].status = "完成"
                self.threads_status[thread_id].progress = 100
                self.threads_status[thread_id].speed = speed
                self.global_stats['total_size'] += downloaded
                self.global_stats['speeds'].append(speed)

            return True

        except Exception as e:
            with self.lock:
                self.threads_status[thread_id].status = f"失敗: {str(e)[:15]}"
                self.threads_status[thread_id].progress = -1

            return False

    def run(self):
        """運行下載器"""
        print("開始多線程下載...")
        print(f"URL: {self.url}")
        print(f"任務數: {self.total_tasks}")
        print(f"線程數: {self.max_threads}")

        # 添加任務到隊列
        for i in range(self.total_tasks):
            self.task_queue.put(i + 1)

        # 啓動顯示線程
        display_thread = threading.Thread(target=self.display_status, daemon=True)
        display_thread.start()

        # 啓動工作線程
        threads = []
        for i in range(self.max_threads):
            thread = threading.Thread(target=self.download_worker, args=(i + 1,))
            thread.daemon = True
            thread.start()
            threads.append(thread)

        try:
            # 等待所有任務完成
            self.task_queue.join()

            # 等待顯示線程結束
            display_thread.join(timeout=2)

        except KeyboardInterrupt:
            print("\n用户中斷")
        finally:
            self.display_active = False

        # 顯示最終統計
        self.show_summary()

    def show_summary(self):
        """顯示最終統計"""
        total_time = time.time() - self.global_stats['start_time']

        print("\n" + "=" * 60)
        print("下載完成! 最終統計:")
        print("=" * 60)

        with self.lock:
            completed = self.global_stats['completed']
            failed = self.global_stats['failed']

            print(f"總任務數: {self.total_tasks}")
            print(f"成功: {completed} ({completed / self.total_tasks * 100:.1f}%)")
            print(f"失敗: {failed} ({failed / self.total_tasks * 100:.1f}%)")
            print(f"總用時: {total_time:.2f} 秒")
            print(f"總下載量: {self.global_stats['total_size'] / 1024 / 1024:.2f} MB")

            if completed > 0:
                avg_speed = sum(self.global_stats['speeds']) / len(self.global_stats['speeds'])
                print(f"平均速度: {avg_speed:.1f} KB/s")
                print(f"吞吐量: {self.global_stats['total_size'] / total_time / 1024:.1f} KB/s")


# 使用示例
if __name__ == "__main__":
    import os

    url = "https://fly.walkera.cn/fd/download?name=WKFLY-1.3.72-1018-release.apk"

    downloader = SimpleMultiDownloader(
        url=url,
        total_tasks=50,  # 減少任務數便於觀察
        max_threads=8  # 線程數
    )

    downloader.run()