前言
做爬蟲的朋友都遇到過這種情況:程序跑得好好的,突然就開始返回403錯誤,或者直接連接超時。十有八九是IP被網站封了。
現在的網站反爬蟲越來越嚴格,稍微頻繁一點就會被拉黑。今天分享幾個實用的解決方案,都是我在實際項目中用過的。
方案一:代理IP池
這是最直接的辦法,換個馬甲繼續幹活。
基本實現
import requests
import random
import time
class ProxyPool:
def __init__(self):
# 這裏放你的代理列表
self.proxies = [
'http://user:pass@proxy1.com:8080',
'http://user:pass@proxy2.com:8080',
'http://user:pass@proxy3.com:8080',
]
self.failed_proxies = set()
def get_proxy(self):
available = [p for p in self.proxies if p not in self.failed_proxies]
if not available:
return None
return {'http': random.choice(available), 'https': random.choice(available)}
def mark_failed(self, proxy_url):
self.failed_proxies.add(proxy_url)
def crawl_with_proxy(url):
pool = ProxyPool()
for attempt in range(3): # 最多重試3次
proxy = pool.get_proxy()
if not proxy:
break
try:
response = requests.get(url, proxies=proxy, timeout=10)
if response.status_code == 200:
return response
except:
pool.mark_failed(proxy.get('http'))
time.sleep(1)
return None
優缺點:
- 優點:效果立竿見影,能快速解決封IP問題
- 缺點:好的代理要花錢,免費的不穩定
方案二:控制請求頻率
別太猴急,慢慢來。網站封你主要是因為你請求太頻繁了。
智能延時
import time
import random
from datetime import datetime
class SmartDelay:
def __init__(self):
self.fail_count = 0
self.success_count = 0
self.last_request_time = None
def wait(self):
# 基礎延時 1-3秒
base_delay = random.uniform(1, 3)
# 如果失敗率高,增加延時
if self.fail_count > 0:
fail_rate = self.fail_count / (self.fail_count + self.success_count)
if fail_rate > 0.3: # 失敗率超過30%
base_delay *= 2
print(f"等待 {base_delay:.1f} 秒...")
time.sleep(base_delay)
self.last_request_time = datetime.now()
def record_result(self, success):
if success:
self.success_count += 1
# 成功了可以稍微激進一點
if self.fail_count > 0:
self.fail_count -= 1
else:
self.fail_count += 1
# 使用示例
def crawl_slowly(urls):
delay = SmartDelay()
for i, url in enumerate(urls):
if i > 0: # 第一個請求不用等
delay.wait()
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
delay.record_result(True)
print(f"✓ {url}")
else:
delay.record_result(False)
print(f"✗ {url} - {response.status_code}")
except Exception as e:
delay.record_result(False)
print(f"✗ {url} - {e}")
方案三:輪換User-Agent
固定的User-Agent就像在臉上寫着"我是爬蟲"。
import random
class UARotator:
def __init__(self):
self.user_agents = [
# Windows Chrome
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
# Mac Chrome
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
# Windows Firefox
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
# Mac Safari
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15',
# 手機瀏覽器
'Mozilla/5.0 (iPhone; CPU iPhone OS 17_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1',
]
def get_headers(self):
return {
'User-Agent': random.choice(self.user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
# 使用
ua = UARotator()
for url in urls:
headers = ua.get_headers()
response = requests.get(url, headers=headers)
方案四:Session複用
別每次都重新建連接,用Session保持狀態。
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class SmartSession:
def __init__(self):
self.session = requests.Session()
self.setup_session()
def setup_session(self):
# 設置重試策略
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=1
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# 默認headers
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
})
def get(self, url, **kwargs):
return self.session.get(url, timeout=15, **kwargs)
def close(self):
self.session.close()
# 使用
session = SmartSession()
try:
for url in urls:
response = session.get(url)
print(f"狀態碼: {response.status_code}")
time.sleep(random.uniform(1, 3))
finally:
session.close()
方案五:分佈式爬蟲
單機搞不定就上集羣,多台機器分攤壓力。
Redis任務隊列
import redis
import json
import socket
from datetime import datetime
class DistributedCrawler:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.worker_id = socket.gethostname() + str(datetime.now().timestamp())[:10]
self.task_queue = 'crawler:tasks'
self.result_queue = 'crawler:results'
def add_tasks(self, urls):
"""主節點添加任務"""
for url in urls:
task = {'url': url, 'retries': 0}
self.redis_client.lpush(self.task_queue, json.dumps(task))
print(f"添加了 {len(urls)} 個任務")
def get_task(self):
"""工作節點獲取任務"""
task_data = self.redis_client.brpop(self.task_queue, timeout=10)
return json.loads(task_data[1]) if task_data else None
def save_result(self, result):
"""保存結果"""
result['worker'] = self.worker_id
result['time'] = datetime.now().isoformat()
self.redis_client.lpush(self.result_queue, json.dumps(result))
def start_worker(self):
"""啓動工作進程"""
print(f"Worker {self.worker_id} 開始工作...")
while True:
task = self.get_task()
if not task:
continue
url = task['url']
print(f"處理: {url}")
try:
response = requests.get(url, timeout=10)
result = {
'url': url,
'status': 'success',
'status_code': response.status_code,
'size': len(response.content)
}
self.save_result(result)
print(f"✓ 完成: {url}")
except Exception as e:
# 重試邏輯
if task['retries'] < 3:
task['retries'] += 1
self.redis_client.lpush(self.task_queue, json.dumps(task))
print(f"↻ 重試: {url}")
else:
result = {'url': url, 'status': 'failed', 'error': str(e)}
self.save_result(result)
print(f"✗ 失敗: {url}")
time.sleep(random.uniform(1, 3))
# 使用方法:
# 1. 在主節點運行:
# crawler = DistributedCrawler()
# crawler.add_tasks(['http://example.com', ...])
# 2. 在各個工作節點運行:
# crawler = DistributedCrawler()
# crawler.start_worker()
進階技巧
處理JavaScript渲染
有些網站內容是JS動態加載的,requests拿不到。
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
def get_js_content(url):
options = Options()
options.add_argument('--headless') # 無界面模式
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
# 等待頁面加載
driver.implicitly_wait(10)
return driver.page_source
finally:
driver.quit()
# 或者用更輕量的pyppeteer
import asyncio
from pyppeteer import launch
async def get_js_content_async(url):
browser = await launch(headless=True)
page = await browser.newPage()
await page.goto(url)
content = await page.content()
await browser.close()
return content
# 使用
# content = asyncio.run(get_js_content_async('https://example.com'))
簡單的重試裝飾器
def retry(times=3, delay=1):
def decorator(func):
def wrapper(*args, **kwargs):
for i in range(times):
try:
return func(*args, **kwargs)
except Exception as e:
if i == times - 1: # 最後一次重試
raise e
print(f"第{i+1}次重試失敗: {e}")
time.sleep(delay * (i + 1)) # 遞增延時
return None
return wrapper
return decorator
# 使用
@retry(times=3, delay=2)
def fetch_url(url):
response = requests.get(url, timeout=10)
response.raise_for_status()
return response
實用建議
1. 檢查robots.txt
做個有素質的爬蟲:
import urllib.robotparser
def can_crawl(url, user_agent='*'):
try:
from urllib.parse import urljoin, urlparse
base_url = f"{urlparse(url).scheme}://{urlparse(url).netloc}"
robots_url = urljoin(base_url, '/robots.txt')
rp = urllib.robotparser.RobotFileParser()
rp.set_url(robots_url)
rp.read()
return rp.can_fetch(user_agent, url)
except:
return True # 讀取失敗默認允許
2. 監控爬蟲狀態
簡單的統計功能:
class CrawlerStats:
def __init__(self):
self.total = 0
self.success = 0
self.failed = 0
self.start_time = time.time()
def record(self, success):
self.total += 1
if success:
self.success += 1
else:
self.failed += 1
def print_stats(self):
runtime = time.time() - self.start_time
success_rate = (self.success / self.total * 100) if self.total > 0 else 0
speed = self.total / runtime * 60 if runtime > 0 else 0
print(f"\n統計信息:")
print(f"運行時間: {runtime:.1f}秒")
print(f"總請求: {self.total}")
print(f"成功: {self.success} ({success_rate:.1f}%)")
print(f"失敗: {self.failed}")
print(f"速度: {speed:.1f} 請求/分鐘")
# 使用
stats = CrawlerStats()
for url in urls:
try:
response = requests.get(url)
stats.record(response.status_code == 200)
except:
stats.record(False)
if stats.total % 10 == 0: # 每10個請求打印一次
stats.print_stats()
總結
這5種方案各有適用場景:
- 代理IP:見效最快,適合緊急情況
- 控制頻率:最基礎的方法,任何時候都要用
- 輪換UA:成本最低,效果不錯
- Session複用:提高效率,減少資源消耗
- 分佈式:大規模爬取的終極方案
實際項目中建議組合使用。比如:基礎的頻率控制+UA輪換+Session,遇到封IP再加代理池。
最後提醒一句:技術是用來解決問題的,不是用來搞破壞的。爬蟲要有度,別給人家服務器造成太大壓力。畢竟大家都要恰飯,相互理解一下。