前言

做爬蟲的朋友都遇到過這種情況:程序跑得好好的,突然就開始返回403錯誤,或者直接連接超時。十有八九是IP被網站封了。

現在的網站反爬蟲越來越嚴格,稍微頻繁一點就會被拉黑。今天分享幾個實用的解決方案,都是我在實際項目中用過的。

方案一:代理IP池

這是最直接的辦法,換個馬甲繼續幹活。

基本實現

import requests
import random
import time

class ProxyPool:
    def __init__(self):
        # 這裏放你的代理列表
        self.proxies = [
            'http://user:pass@proxy1.com:8080',
            'http://user:pass@proxy2.com:8080',
            'http://user:pass@proxy3.com:8080',
        ]
        self.failed_proxies = set()
    
    def get_proxy(self):
        available = [p for p in self.proxies if p not in self.failed_proxies]
        if not available:
            return None
        return {'http': random.choice(available), 'https': random.choice(available)}
    
    def mark_failed(self, proxy_url):
        self.failed_proxies.add(proxy_url)

def crawl_with_proxy(url):
    pool = ProxyPool()
    
    for attempt in range(3):  # 最多重試3次
        proxy = pool.get_proxy()
        if not proxy:
            break
            
        try:
            response = requests.get(url, proxies=proxy, timeout=10)
            if response.status_code == 200:
                return response
        except:
            pool.mark_failed(proxy.get('http'))
            time.sleep(1)
    
    return None

優缺點:

  • 優點:效果立竿見影,能快速解決封IP問題
  • 缺點:好的代理要花錢,免費的不穩定

方案二:控制請求頻率

別太猴急,慢慢來。網站封你主要是因為你請求太頻繁了。

智能延時

import time
import random
from datetime import datetime

class SmartDelay:
    def __init__(self):
        self.fail_count = 0
        self.success_count = 0
        self.last_request_time = None
    
    def wait(self):
        # 基礎延時 1-3秒
        base_delay = random.uniform(1, 3)
        
        # 如果失敗率高,增加延時
        if self.fail_count > 0:
            fail_rate = self.fail_count / (self.fail_count + self.success_count)
            if fail_rate > 0.3:  # 失敗率超過30%
                base_delay *= 2
        
        print(f"等待 {base_delay:.1f} 秒...")
        time.sleep(base_delay)
        self.last_request_time = datetime.now()
    
    def record_result(self, success):
        if success:
            self.success_count += 1
            # 成功了可以稍微激進一點
            if self.fail_count > 0:
                self.fail_count -= 1
        else:
            self.fail_count += 1

# 使用示例
def crawl_slowly(urls):
    delay = SmartDelay()
    
    for i, url in enumerate(urls):
        if i > 0:  # 第一個請求不用等
            delay.wait()
        
        try:
            response = requests.get(url, timeout=10)
            if response.status_code == 200:
                delay.record_result(True)
                print(f"✓ {url}")
            else:
                delay.record_result(False)
                print(f"✗ {url} - {response.status_code}")
        except Exception as e:
            delay.record_result(False)
            print(f"✗ {url} - {e}")

方案三:輪換User-Agent

固定的User-Agent就像在臉上寫着"我是爬蟲"。

import random

class UARotator:
    def __init__(self):
        self.user_agents = [
            # Windows Chrome
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            # Mac Chrome  
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            # Windows Firefox
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
            # Mac Safari
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15',
            # 手機瀏覽器
            'Mozilla/5.0 (iPhone; CPU iPhone OS 17_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1',
        ]
    
    def get_headers(self):
        return {
            'User-Agent': random.choice(self.user_agents),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
        }

# 使用
ua = UARotator()
for url in urls:
    headers = ua.get_headers()
    response = requests.get(url, headers=headers)

方案四:Session複用

別每次都重新建連接,用Session保持狀態。

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class SmartSession:
    def __init__(self):
        self.session = requests.Session()
        self.setup_session()
    
    def setup_session(self):
        # 設置重試策略
        retry_strategy = Retry(
            total=3,
            status_forcelist=[429, 500, 502, 503, 504],
            backoff_factor=1
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
        
        # 默認headers
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        })
    
    def get(self, url, **kwargs):
        return self.session.get(url, timeout=15, **kwargs)
    
    def close(self):
        self.session.close()

# 使用
session = SmartSession()
try:
    for url in urls:
        response = session.get(url)
        print(f"狀態碼: {response.status_code}")
        time.sleep(random.uniform(1, 3))
finally:
    session.close()

方案五:分佈式爬蟲

單機搞不定就上集羣,多台機器分攤壓力。

Redis任務隊列

import redis
import json
import socket
from datetime import datetime

class DistributedCrawler:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        self.worker_id = socket.gethostname() + str(datetime.now().timestamp())[:10]
        self.task_queue = 'crawler:tasks'
        self.result_queue = 'crawler:results'
    
    def add_tasks(self, urls):
        """主節點添加任務"""
        for url in urls:
            task = {'url': url, 'retries': 0}
            self.redis_client.lpush(self.task_queue, json.dumps(task))
        print(f"添加了 {len(urls)} 個任務")
    
    def get_task(self):
        """工作節點獲取任務"""
        task_data = self.redis_client.brpop(self.task_queue, timeout=10)
        return json.loads(task_data[1]) if task_data else None
    
    def save_result(self, result):
        """保存結果"""
        result['worker'] = self.worker_id
        result['time'] = datetime.now().isoformat()
        self.redis_client.lpush(self.result_queue, json.dumps(result))
    
    def start_worker(self):
        """啓動工作進程"""
        print(f"Worker {self.worker_id} 開始工作...")
        
        while True:
            task = self.get_task()
            if not task:
                continue
            
            url = task['url']
            print(f"處理: {url}")
            
            try:
                response = requests.get(url, timeout=10)
                result = {
                    'url': url,
                    'status': 'success',
                    'status_code': response.status_code,
                    'size': len(response.content)
                }
                self.save_result(result)
                print(f"✓ 完成: {url}")
                
            except Exception as e:
                # 重試邏輯
                if task['retries'] < 3:
                    task['retries'] += 1
                    self.redis_client.lpush(self.task_queue, json.dumps(task))
                    print(f"↻ 重試: {url}")
                else:
                    result = {'url': url, 'status': 'failed', 'error': str(e)}
                    self.save_result(result)
                    print(f"✗ 失敗: {url}")
            
            time.sleep(random.uniform(1, 3))

# 使用方法:
# 1. 在主節點運行:
# crawler = DistributedCrawler()
# crawler.add_tasks(['http://example.com', ...])

# 2. 在各個工作節點運行:
# crawler = DistributedCrawler()  
# crawler.start_worker()

進階技巧

處理JavaScript渲染

有些網站內容是JS動態加載的,requests拿不到。

from selenium import webdriver
from selenium.webdriver.chrome.options import Options

def get_js_content(url):
    options = Options()
    options.add_argument('--headless')  # 無界面模式
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    
    driver = webdriver.Chrome(options=options)
    try:
        driver.get(url)
        # 等待頁面加載
        driver.implicitly_wait(10)
        return driver.page_source
    finally:
        driver.quit()

# 或者用更輕量的pyppeteer
import asyncio
from pyppeteer import launch

async def get_js_content_async(url):
    browser = await launch(headless=True)
    page = await browser.newPage()
    await page.goto(url)
    content = await page.content()
    await browser.close()
    return content

# 使用
# content = asyncio.run(get_js_content_async('https://example.com'))

簡單的重試裝飾器

def retry(times=3, delay=1):
    def decorator(func):
        def wrapper(*args, **kwargs):
            for i in range(times):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if i == times - 1:  # 最後一次重試
                        raise e
                    print(f"第{i+1}次重試失敗: {e}")
                    time.sleep(delay * (i + 1))  # 遞增延時
            return None
        return wrapper
    return decorator

# 使用
@retry(times=3, delay=2)
def fetch_url(url):
    response = requests.get(url, timeout=10)
    response.raise_for_status()
    return response

實用建議

1. 檢查robots.txt

做個有素質的爬蟲:

import urllib.robotparser

def can_crawl(url, user_agent='*'):
    try:
        from urllib.parse import urljoin, urlparse
        base_url = f"{urlparse(url).scheme}://{urlparse(url).netloc}"
        robots_url = urljoin(base_url, '/robots.txt')
        
        rp = urllib.robotparser.RobotFileParser()
        rp.set_url(robots_url)
        rp.read()
        
        return rp.can_fetch(user_agent, url)
    except:
        return True  # 讀取失敗默認允許

2. 監控爬蟲狀態

簡單的統計功能:

class CrawlerStats:
    def __init__(self):
        self.total = 0
        self.success = 0
        self.failed = 0
        self.start_time = time.time()
    
    def record(self, success):
        self.total += 1
        if success:
            self.success += 1
        else:
            self.failed += 1
    
    def print_stats(self):
        runtime = time.time() - self.start_time
        success_rate = (self.success / self.total * 100) if self.total > 0 else 0
        speed = self.total / runtime * 60 if runtime > 0 else 0
        
        print(f"\n統計信息:")
        print(f"運行時間: {runtime:.1f}秒")
        print(f"總請求: {self.total}")
        print(f"成功: {self.success} ({success_rate:.1f}%)")
        print(f"失敗: {self.failed}")
        print(f"速度: {speed:.1f} 請求/分鐘")

# 使用
stats = CrawlerStats()
for url in urls:
    try:
        response = requests.get(url)
        stats.record(response.status_code == 200)
    except:
        stats.record(False)
    
    if stats.total % 10 == 0:  # 每10個請求打印一次
        stats.print_stats()

總結

這5種方案各有適用場景:

  • 代理IP:見效最快,適合緊急情況
  • 控制頻率:最基礎的方法,任何時候都要用
  • 輪換UA:成本最低,效果不錯
  • Session複用:提高效率,減少資源消耗
  • 分佈式:大規模爬取的終極方案

實際項目中建議組合使用。比如:基礎的頻率控制+UA輪換+Session,遇到封IP再加代理池。

最後提醒一句:技術是用來解決問題的,不是用來搞破壞的。爬蟲要有度,別給人家服務器造成太大壓力。畢竟大家都要恰飯,相互理解一下。