最近有傻逼一直攻击我网站,之前我都是自己看日志然后分析哪些ip然后简单封一下就行了,但是最近有点多,我就需要写个脚本来进行过滤通配了,直接附上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import re
import gzip
import os
from collections import defaultdict
from datetime import datetime

# 配置参数
LOG_FILE = 'easygif.cn_2025_07_07_000000_010000'
THRESHOLD = 0
CHECK_4XX_5XX = True
OUTPUT_FILE = 'abnormal_ips.txt'


log_pattern = re.compile(
r'^\[(.*?)\]\s+' # [时间戳]
r'(\S+)\s+' # 客户端IP
r'-\s+' # -
r'\d+\s+' # 日志编号 "-" (忽略)
r'"(.*?)"\s+' # xxx.com
r'"(.*?)"\s+' # 请求行,如 GET xxxxx
r'(\d{3})\s+\d+\s+\S+\s+' # 状态码、响应体大小等(忽略部分)
r'\S+\s+' # 是否命中
r'"(.*?)"\s+' # User-Agent
r'"(.*?)"\s+' # Content-Type
r'(\S+)' # Backend IP
)

def extract_gz_files(directory):
"""解压目录下的所有 .gz 文件"""
extracted_files = []
for filename in os.listdir(directory):
if filename.endswith('.gz'):
gz_path = os.path.join(directory, filename)
extracted_path = gz_path[:-3] # 去掉 .gz 后缀
with gzip.open(gz_path, 'rt', encoding='utf-8') as gz_file:
with open(extracted_path, 'w', encoding='utf-8') as extracted_file:
extracted_file.write(gz_file.read())
extracted_files.append(extracted_path)
return extracted_files

def parse_log(file_paths):
"""解析日志文件"""
ip_requests = defaultdict(list)
error_ips = set()

for file_path in file_paths:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
match = log_pattern.match(line.strip())
if not match:
continue

timestamp_str, client_ip, website, request_line, status, user_agent, content_type, backend_ip = match.groups()
timestamp = datetime.strptime(timestamp_str, "%d/%b/%Y:%H:%M:%S %z")

# 忽略无效IP
if client_ip == '-' or client_ip.startswith(('10.', '192.168.')):
continue

# 记录每个IP的请求时间
ip_requests[client_ip].append(int(timestamp.timestamp()))

# 判断是否是异常状态码
if CHECK_4XX_5XX and status.startswith(('4', '5')):
error_ips.add(client_ip)

return ip_requests, error_ips


def detect_high_frequency(ip_requests):
"""检测高频访问IP"""
top_ip_cnt = defaultdict(int)
for ip, timestamps in ip_requests.items():
if len(timestamps) >= THRESHOLD:
top_ip_cnt[ip] = len(timestamps)
# 按访问次数排序,返回访问次数超过阈值的IP
return {ip:count for ip, count in sorted(top_ip_cnt.items(), key=lambda item: item[1], reverse=True) if count >= THRESHOLD}


def main():
# 解压 .gz 文件
extracted_files = extract_gz_files(os.getcwd())

# 解析解压后的日志文件
ip_requests, error_ips = parse_log(extracted_files)
high_freq_ips = detect_high_frequency(ip_requests)

abnormal_ips = dict()
abnormal_ips.update(high_freq_ips)

with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
for ip, count in abnormal_ips.items():
f.write(f"{ip} - {count}\n")
print(f"Abnormal IP detected: {ip}")

print(f"\n✅ Total abnormal IPs found: {len(abnormal_ips)}")
print(f"Saved to: {OUTPUT_FILE}")


if __name__ == '__main__':
main()

如果日志格式不改的话,后面能一直用吧,就在这mark一下