redis提供了一種 HyperLog 類型,主要用於大數據(極限 2 ^ 64)但準確度要求不是很苛刻的計量統計或去重判斷(精度誤差 0.81%),處理速度超快(每秒數萬次),並且最多隻佔用12K + 8字節內存空間。
官方文檔:
https://redis.io/docs/latest/develop/data-types/probabilistic/hyperloglogs/
HyperLog 基本原理(編碼作為字符串,可以按字符串讀寫)。
源代碼位置:src/hyperloglog.c
分成兩部分(每字節8位)
- hash換算無符號整數8字節(64位,14位用於對應16K寄存器,50位用來計數)。
- 存儲12K字節的寄存器(寄存器是6位長度,即按6位算是16K個寄存器)。
注:14位即 2^14 = 16K = 16*1024 。
注:12K字節 = 12 * 1024 * 8 = 16K * 6 = 16 * 1024 * 6
概率數據結構
- 通過hash換算長度為 2^64 位以內的字符串為一個64位無符號整數,低14位是寄存器編號,高50位統計其二進制數尾部連續0個數。
- 通過寄存器編碼提取6位的寄存器,將0個數值條件寫入(當寄存器中的值 >= 則認為已經存在跳過,反之認為不存在寫入)。
- 把寄存器中的值(2 ^ 6 位)以索引方式填充到 int[64] 數組中,命中的索引值對應的數組值 +1
- 隨機概率估算得出基數
redis處理HyperLog分兩種模式
- 稀疏結構,用於計量小於寄存器佔用字節數據 < hll-sparse-max-bytes(redis.conf 中配置,默認: 3000字節,大概3K空間) 的基數估算。稀疏結構高效,佔用空間動態擴大,但建議 hll-sparse-max-bytes 限制在3000字節以內。
- 稠密結構,固定大小 12K 字節空間。
核心思路
數據具有隨機性,通過固定特徵(hash值尾部0的個數最大值)分佈到 16K 寄存器中(減少特徵稀釋),再通過隨機概率算法彙總估算即可得到相近值。
Nginx
nginx日誌格式需要調整,打開nginx.conf配置文件。
增加以下配置。
# 修改日誌格式,主要是增加 $bytes_sent 記錄流量字節
log_format main '$remote_addr - $remote_user [$time_local] "$request" "$host" $status $request_length $bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for"';
# 在server配置塊中增加以下配置
access_log logs/$host-access.log main;
PHP
先安裝redis客户端擴展
下載擴展源碼包 https://pecl.php.net/package/redis
# 示例安裝
# php安裝目錄
INSTALL_PATH=/usr/local/php
wget -qO - --no-check-certificate "https://pecl.php.net/get/redis-6.3.0.tgz"
yum install -y autoconf unzip
unzip redis-6.3.0.tgz
cd redis-6.3.0
# 生成 configure 編譯配置腳本
$INSTALL_PATH/bin/phpize
./configure --with-php-config=$INSTALL_PATH/bin/php-config
echo "extension=redis.so" >> $INSTALL_PATH/lib/php.ini
php統計代碼
保存位置: /www/www-resource/nginx-log.php
<?php
/**
* nginx-log統計腳本
*/
class NgnixLog {
/**
* 緩存前綴
*/
private const CACHE_PREFIX = 'nginx-log-';
/**
* @var \Redis
*/
protected \Redis $redis;
/**
* redis連接地址
* @var string
*/
protected string $redisHost;
/**
* redis連接端口
* @var int
*/
protected int $redisPort;
/**
* 頁面請求後綴
* @var string
*/
protected string $pageExt;
/**
* nginx log 格式
* @var string
*/
protected string $format = '$remote_addr - $remote_user [$time_local] "$request" "$host" $status $request_length $bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for"';
/**
* 日誌文件位置記錄
* @var array
*/
protected array $logpos = [];
/**
* nginx log格式正則表達式
* @var string
*/
protected string $formatReg;
/**
* nginx log格式對應變量名
* @var array
*/
protected array $formatKeys;
/**
* nginx運行文件路徑
* @var string
*/
protected string $nginxBin = 'nginx';
/**
* 統計時間維度
* @var array
*/
protected array $dateFormats = ['Y-m-d'];
/**
* 每次收集日誌行數
* @var int
*/
protected int $eachLogLine = 500;
/**
* 初始化處理
* @param string $format
* @param string $redisHost
* @param int $redisPort
* @param string $pageExt
*/
public function __construct(string $format = null, string $redisHost = '127.0.0.1', int $redisPort = 6379, string $pageExt = 'html|htm|php') {
if ($format) {
$this->format = $format;
}
if (!class_exists(\Redis::class)) {
die('請安裝redis擴展!');
}
$this->redisHost = $redisHost;
$this->redisPort = $redisPort;
$this->pageExt = $pageExt;
$arr = preg_split('#\$\w+#', $this->format);
if (count($arr) < 2) {
die('日誌格式沒有變量佔位符:' . $this->format);
}
$rule = '';
preg_match_all('#\$(\w+)#', $this->format, $matches);
$keys = $matches[1];
foreach ($keys as $key => $_) {
$rule .= preg_quote($arr[$key], '#') . '(.*?)';
}
$this->formatReg = '#^' . $rule . '$#s';
$this->formatKeys = $keys;
}
/**
* 設置nginx運行路徑
* @param string $bin
*/
public function setNginxBin(string $bin) {
if (file_exists($bin) && is_executable($bin)) {
$this->nginxBin = $bin;
} else {
die('請指定可執行的nginx運行路徑');
}
}
/**
* 設置統計的時間格式
* @param string $formats
*/
public function setDateFormats(string ...$formats) {
$this->dateFormats = $formats;
}
/**
* 設置每次收集日誌行數
* @param int $size
*/
public function setEachLogLine(int $size) {
$this->eachLogLine = max($size, 1);
}
/**
* 運行日誌處理
* @param string $logDir
* @param string $pattern
* @param int $maxSize
* @throws Exception
*/
public function run(string $logDir, string $pattern = './*-access.log', int $maxSize = 10 ** 7) {
$this->newRedis();
if (!is_dir($logDir)) {
die('日誌目錄不存在!');
}
chdir($logDir);
while (true) {
foreach ($this->eachFile($pattern) as $file) {
echo '提取日誌文件:' . $file . PHP_EOL;
foreach ($this->eachFlow($file) as $data) {
$this->sync($data);
}
if ($maxSize > 0 && $this->logpos[basename($file)] >= $maxSize) { // 超過指定記錄數壓縮處理
$this->compress($file);
}
}
sleep(20);
}
}
/**
* 壓縮日誌文件
* @param string $logfile
*/
protected function compress(string $logfile) {
echo '開始壓縮處理' . PHP_EOL;
$zipDir = './zip-log';
if (!is_dir($zipDir)) {
mkdir($zipDir);
}
$name = basename($logfile);
$tmpfile = './zip-log/' . $name;
if (file_exists($tmpfile)) {
unlink($tmpfile);
}
rename($logfile, $tmpfile);
$nginx = basename($this->nginxBin);
if (strpos($this->nginxBin, '.') !== false) {
$dir = dirname($this->nginxBin);
$command = "cd \"{$dir}\" && ./{$nginx} -t && ./{$nginx} -s reload";
} else {
$command = "{$nginx} -t && {$nginx} -s reload";
}
system($command, $result_code);
if ($result_code != 0) {
throw new Exception('無法正常重新加載nginx配置數據');
}
foreach ($this->eachFlow($tmpfile) as $data) {
$this->sync($data);
}
// 重置日誌文件位置
$this->cache($name, 0);
$this->logpos[$name] = 0;
$zipFile = './zip-log/' . date('Y-m-d_H-i-s') . '-' . $name . '.zip';
// 壓縮處理
system("zip {$zipFile} {$tmpfile}", $result_code);
if ($result_code != 0) {
throw new Exception('壓縮日誌文件失敗');
}
// 刪除已經壓縮成功的日誌文件
unlink($tmpfile);
}
/**
* 循環提取指定的文件
* @param string $pattern
*/
protected function eachFile(string $pattern = './*-access.log') {
yield from glob($pattern);
}
/**
* 循環讀取日誌
* @param string $logfile
* @param int $pos
* @throws Exception
*/
protected function eachRead(string $logfile, int &$pos = null) {
$f = fopen($logfile, 'r');
if ($pos) {
fseek($f, $pos, SEEK_SET);
}
while (!feof($f)) {
$line = fgets($f, 10240);
if (trim($line) == '') {
continue;
}
if (preg_match($this->formatReg, $line, $matches)) {
$data = [];
foreach ($this->formatKeys as $index => $key) {
$data[$key] = $matches[$index + 1];
}
yield $data;
}
}
$pos = ftell($f);
fclose($f);
}
/**
* 循環提取流量數據
* @param string $logfile
*/
protected function eachFlow(string $logfile) {
$name = basename($logfile);
foreach ($this->eachCount($logfile) as $pos => $data) {
foreach ($data as $date => $items) {
foreach ($items as $domain => $item) {
$item['ip'] = $this->getUniqueCount(["{$date}|{$domain}", ...array_unique($item['ip'])]);
$item['uv'] = $this->getUniqueCount(["{$date}|{$domain}", ...array_unique($item['uv'])]);
$data[$date][$domain] = $item;
}
}
yield $data;
$this->cache($name, $pos);
$this->logpos[$name] = $pos;
}
}
/**
* 循環統計數據
* @param string $logfile
*/
protected function eachCount(string $logfile) {
$name = basename($logfile);
if (!isset($this->logpos[$name])) {
$this->logpos[$name] = intval($this->cache($name) ?: 0);
}
$pos = $this->logpos[$name];
$data = [];
$num = 0;
foreach ($this->eachRead($logfile, $pos) as $item) {
if ($item['remote_addr'] == '127.0.0.1') {
continue;
}
$num++;
$req_size = $item['request_length']; // 請求大小
$res_size = $item['bytes_sent']; // 響應大小
$arr = explode(' ', $item['request']);
$pv = 0;
if (isset($arr[1])) {
$path = parse_url($arr[1], PHP_URL_PATH) ?: '/';
if (substr($path, -1) == '/' || !preg_match('#\.(\w+)#', $path, $matches) || preg_match('/^(' . $this->pageExt . ')$/', $matches[1])) {
$pv = 1;
}
}
$ip = $item['remote_addr'];
$uv = md5($item['remote_addr'] . $item['http_user_agent']);
if ($item['status'] >= 400 && $item['status'] < 500) {
$req_4xx = 1;
$req_5xx = 0;
} elseif ($item['status'] >= 500 && $item['status'] < 600) {
$req_4xx = 0;
$req_5xx = 1;
} else {
$req_4xx = 0;
$req_5xx = 0;
}
$spider = preg_match('#(Baiduspider|Bytespider|360Spider|Sogou web spider|Sosospider|Googlebot|bingbot|AdsBot-Google|Google-Adwords|YoudaoBot|Yandex|DNSPod-Monitor|YisouSpider|mpcrawler)#', $item['http_user_agent']) ? 1 : 0;
$domain = $item['host'];
foreach ($this->dateFormats as $dateFormat) {
$date = date($dateFormat, strtotime($item['time_local']));
if (empty($data[$date][$domain])) {
$data[$date][$domain] = [
'req' => 0,
'pv' => 0,
'ip' => [],
'uv' => [],
'req_4xx' => 0,
'req_5xx' => 0,
'spider' => 0, // 蜘蛛
'fake_spider' => 0, // 假蜘蛛
'req_size' => 0,
'res_size' => 0,
];
}
$result = &$data[$date][$domain];
$result['req']++;
$result['req_size'] += $req_size; // 請求大小
$result['res_size'] += $res_size; // 響應大小
$result['pv'] += $pv;
$result['ip'][] = $ip;
$result['uv'][] = $uv;
$result['req_4xx'] += $req_4xx;
$result['req_5xx'] += $req_5xx;
$result['spider'] += $spider;
unset($result);
}
if ($num > $this->eachLogLine) {
yield $pos => $data;
$data = [];
$num = 0;
}
}
if (count($data)) {
yield $pos => $data;
}
}
/**
* 統計唯一數
* @staticvar null $sha
* @param array $data
* @return int
*/
protected function getUniqueCount(array $data) {
static $sha = null;
if (empty($sha)) {
$code = "local prevCount = redis.call('pfCount', ARGV[1]);redis.pcall('pfAdd', unpack(ARGV));redis.call('expire', ARGV[1], 86400);return redis.call('pfCount', ARGV[1]) - prevCount;";
$sha = sha1($code);
if (!$this->redis->script('exists', $sha)[0]) {
$this->redis->script('load', $code);
}
}
return $this->redis->evalSha($sha, $data);
}
/**
* 讀寫緩存數據
* @param string $key
* @param mixed $value
* @return mixed
*/
protected function cache(string $key, $value = null) {
if (func_num_args() == 1) {
return $this->redis->get(self::CACHE_PREFIX . $key);
} else {
$this->redis->set(self::CACHE_PREFIX . $key, $value);
}
}
/**
* 生成redis連接處理器
*/
protected function newRedis() {
$this->redis = new \Redis([
'host' => $this->redisHost,
'port' => $this->redisPort,
'connectTimeout' => 5,
]);
}
/**
* 同步數據
* @param array $data
* @return bool
*/
protected function sync(array $data) {
$params = [];
foreach ($data as $date => $items) {
foreach ($items as $domain => $item) {
$item['date'] = $date;
$item['domain'] = $domain;
$item['flow'] = $item['req_size'] + $item['res_size'];
unset($item['req_size'], $item['res_size']);
/*
*
* 這裏入庫
*
*
*
*/
}
}
}
}
try {
$NgnixLog = new NgnixLog();
$nginxDir = glob('/usr/local/nginx/*', GLOB_ONLYDIR)[0];
$NgnixLog->setNginxBin($nginxDir . '/sbin/nginx');
$NgnixLog->run($nginxDir . '/logs/', $argv[1] ?? './*-access.log', intval($argv[2] ?? 10 ** 7));
} catch (Exception $err) {
die($err->getMessage());
}
啓動腳本
腳本可以添加到定時器中,保證腳本正常運行。
#!/bin/bash
# 安裝目錄
PHP_BIN='/usr/local/php/bin/php'
NGINX_LOG_PATH='/usr/local/nginx/logs/'
run_script(){
PS_LINE=$(ps aux|grep php)
if echo $PS_LINE|grep 'nginx-log.php'|grep -q "$1";then
echo '已經運行'
else
nohup $PHP_BIN /www/www-resource/nginx-log.php "$1"> /www/www-resource/nginx-log.log &
fi
}
# 遍歷出所有日誌文件,使用單獨的進程運行
while read -r FILE_PATH;do
run_script "$FILE_PATH"
done <<EOF
$(cd $NGINX_LOG_PATH;find ./ -maxdepth 1 -name '*-access.log')
EOF