帶環形緩衝區
import serial
import time
from datetime import datetime # 用於獲取精確到秒的系統時間
class RingBuffer:
"""自定義15字符長度的環形緩衝區"""
def __init__(self, capacity=8):
self.capacity = capacity # 緩衝區容量固定為15
self.buffer = [] # 存儲字符的列表
self.last_valid_time = None # 記錄上一次有效接收(U-Boot)的時間戳
def add_char(self, char):
"""添加字符到緩衝區,超出容量則移除最舊字符"""
if len(self.buffer) >= self.capacity:
self.buffer.pop(0) # 移除頭部最舊字符
self.buffer.append(char) # 新增字符到尾部
def get_buffer_str(self):
"""返回緩衝區拼接後的字符串"""
return ''.join(self.buffer)
def reset(self):
"""清空緩衝區"""
self.buffer = []
def serial_monitor_demo(
port="COM3",
bps=38400,
timeout=5, # 串口單次讀取超時(與wait_time保持一致)
wait_time=100000, # 每次接收的最大等待時間
target_str="U-Boot", # 指定接收的目標ASCII字符串
):
ser = None
i = 0 # 成功接收目標字符串計數
j = 0 # 超時計數
k = 0 # 接收非目標字符串計數
total_tests = 0 # 總測試數(i+j+k)
# 初始化15字符長度的環形緩衝區
ring_buffer = RingBuffer(capacity=15)
try:
# 1. 初始化並打開串口(僅打開一次,持續保持連接)
ser = serial.Serial(
port=port,
baudrate=bps,
timeout=wait_time,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
xonxoff=False, # 關閉軟件流控
rtscts=False # 關閉硬件流控
)
if ser.isOpen():
open_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"RMC_AC壓測_測試復位引腳台階_程序\n[{open_time}] 串口 {port} 打開成功(波特率:{bps}),持續等待接收目標ASCII字符串:{target_str}(單次超時時間設定為:{wait_time}s)\n")
# 2. 持續等待接收數據(串口保持打開,循環接收)
while True:
received_data = b"" # 存儲單次接收的字節數據
start_time = time.time() # 記錄本次接收開始時間
has_received_data = False # 標記是否接收到任何數據
# 單次接收等待邏輯(超時時間=wait_time)
while (time.time() - start_time) < wait_time:
if ser.in_waiting > 0:
received_data += ser.read(ser.in_waiting)
has_received_data = True # 標記已接收到數據
# ASCII解碼(忽略非ASCII無效字符)
received_str = received_data.decode("ascii", errors="ignore")
# 將接收字符逐個加入環形緩衝區
for char in received_str:
ring_buffer.add_char(char)
# 檢查緩衝區中是否包含目標字符串
buffer_str = ring_buffer.get_buffer_str()
if target_str in buffer_str:
i += 1
total_tests = i + j + k
current_time = datetime.now()
success_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
# 計算與上一次有效接收的時間間隔
time_interval = 0
if ring_buffer.last_valid_time is not None:
time_interval = (current_time - ring_buffer.last_valid_time).total_seconds()
# 更新上一次有效接收時間
ring_buffer.last_valid_time = current_time
# 打印有效接收信息(含時間間隔)
print(f"[{success_time}] RMC成功拉起,拉起數:{i} 總測試數:{total_tests} | 與上一次有效接收間隔:{time_interval:.1f}秒")
ring_buffer.reset() # 清空環形緩衝區
ser.flushInput() # 清空串口緩衝區
#time.sleep(0.3) # 隔離尾段字符
break # 退出本次等待,進入下一次接收循環
# 3. 判斷是否接收到非目標字符串
if has_received_data and target_str not in received_data.decode("ascii", errors="ignore"):
k += 1
total_tests = i + j + k
non_target_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{non_target_time}] ---非目標字符串---,非目標數:{k} 超時數:{j} 總測試數:{total_tests}")
# 4. 判斷本次接收是否超時(僅無任何數據時計數超時)
elapsed_time = time.time() - start_time
if elapsed_time >= wait_time - 0.01 and not has_received_data:
j += 1
total_tests = i + j + k
timeout_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timeout_time}] ---超時警告---:本次等待{wait_time}s未接收到任何數據,超時數:{j} 非目標字符串數:{k} 總測試數:{total_tests}")
#time.sleep(0.1) # 降低CPU佔用
except KeyboardInterrupt:
# 手動終止程序(Ctrl+C)
stop_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n[{stop_time}] ---程序手動終止---")
print(f"最終統計:成功拉起數:{i},非目標字符串數:{k},超時數:{j},總測試數:{total_tests}")
# 新增:計算各場景佔比(便於分析)
if total_tests > 0:
print(f"成功率:{i/total_tests:.2%},非目標占比:{k/total_tests:.2%},超時佔比:{j/total_tests:.2%}")
except Exception as e:
# 串口操作異常
exception_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n[{exception_time}] ---串口操作異常---:{str(e)}")
print(f"異常時統計:成功拉起數:{i},非目標字符串數:{k},超時數:{j} 總測試數:{total_tests}")
finally:
# 程序終止時必關串口(釋放資源)
if ser and ser.isOpen():
ser.close()
final_close_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{final_close_time}] 串口 {port} 已關閉")
# 執行函數
if __name__ == "__main__":
serial_monitor_demo()
延時等待:
import serial
import time
from datetime import datetime # 用於獲取精確到秒的系統時間
def serial_monitor_demo(
port="COM3",
bps=38400,
timeout=5, # 串口單次讀取超時(與wait_time保持一致)
wait_time=0.3, # 每次接收的最大等待時間(1秒)
target_str="U-Boot", # 指定接收的目標ASCII字符串(可修改)
#write_data="" # 初始寫入數據(不需要可註釋)
):
ser = None
i = 0 # 成功接收目標字符串計數
j = 0 # 超時計數
k = 0 # 接收非目標字符串計數(新增)
total_tests = 0 # 總測試數(i+j+k)
try:
# 1. 初始化並打開串口(僅打開一次,持續保持連接)
ser = serial.Serial(
port=port,
baudrate=bps,
timeout=wait_time,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
xonxoff=False, # 關閉軟件流控
rtscts=False # 關閉硬件流控
)
if ser.isOpen():
open_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"RMC_AC壓測_測試復位引腳台階_程序\n[{open_time}] 串口 {port} 打開成功(波特率:{bps}),持續等待接收目標ASCII字符串:{target_str}(單次超時時間設定為:{wait_time}s)\n")
# 可選:僅發送一次初始數據(不需要可註釋)
# write_bytes = write_data.encode("gbk") # 中文用gbk,ASCII用encode("ascii")
# ser.write(write_bytes)
# send_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# print(f"[{send_time}] 已發送初始數據:{write_data}\n")
# 2. 持續等待接收數據(串口保持打開,循環接收)
while True:
received_data = b"" # 存儲單次接收的字節數據
start_time = time.time() # 記錄本次接收開始時間
has_received_data = False # 標記是否接收到任何數據(新增)
# 單次接收等待邏輯(超時時間=wait_time)
while (time.time() - start_time) < wait_time:
if ser.in_waiting > 0:
received_data += ser.read(ser.in_waiting)
has_received_data = True # 標記已接收到數據
# ASCII解碼(忽略非ASCII無效字符)
received_str = received_data.decode("ascii", errors="ignore")
# 檢查目標字符串
if target_str in received_str:
i += 1
total_tests = i + j + k
success_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{success_time}] RMC成功拉起,拉起數:{i} 非目標字符串數:{k} 超時數:{j} 總測試數:{total_tests}")
ser.flushInput() # 清空緩衝區,準備下一次接收
time.sleep(0.3) # 隔離尾段字符
break # 退出本次等待,進入下一次接收循環
# 3. 新增:判斷是否接收到非目標字符串
if has_received_data and target_str not in received_data.decode("ascii", errors="ignore"):
k += 1
total_tests = i + j + k
non_target_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 打印非目標字符串詳情(含實際接收內容,便於調試)
received_str = received_data.decode("ascii", errors="ignore")
#print(f"[{non_target_time}] ---非目標字符串---:接收內容:{received_str.strip()},非目標數:{k} 超時數:{j} 總測試數:{total_tests}")
print(f"[{non_target_time}] ---非目標字符串---,非目標數:{k} 超時數:{j} 總測試數:{total_tests}")
# 4. 判斷本次接收是否超時(僅無任何數據時計數超時)
elapsed_time = time.time() - start_time
if elapsed_time >= wait_time - 0.01 and not has_received_data: # 新增:僅無數據時算超時
j += 1
total_tests = i + j + k
timeout_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timeout_time}] ---超時警告---:本次等待{wait_time}s未接收到任何數據,超時數:{j} 非目標字符串數:{k} 總測試數:{total_tests}")
time.sleep(0.1) # 降低CPU佔用,不影響接收響應
except KeyboardInterrupt:
# 手動終止程序(Ctrl+C)
stop_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n[{stop_time}] ---程序手動終止---")
print(f"最終統計:成功拉起數:{i},非目標字符串數:{k},超時數:{j},總測試數:{total_tests}")
# 新增:計算各場景佔比(便於分析)
if total_tests > 0:
print(f"成功率:{i/total_tests:.2%},非目標占比:{k/total_tests:.2%},超時佔比:{j/total_tests:.2%}")
except Exception as e:
# 串口操作異常
exception_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n[{exception_time}] ---串口操作異常---:{str(e)}")
print(f"異常時統計:成功拉起數:{i},非目標字符串數:{k},超時數:{j},總測試數:{total_tests}")
finally:
# 程序終止時必關串口(釋放資源)
if ser and ser.isOpen():
ser.close()
final_close_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{final_close_time}] 串口 {port} 已關閉")
# 執行函數
if __name__ == "__main__":
serial_monitor_demo()
本文章為轉載內容,我們尊重原作者對文章享有的著作權。如有內容錯誤或侵權問題,歡迎原作者聯繫我們進行內容更正或刪除文章。