帶環形緩衝區

import serial
import time
from datetime import datetime  # 用於獲取精確到秒的系統時間

class RingBuffer:
    """自定義15字符長度的環形緩衝區"""
    def __init__(self, capacity=8):
        self.capacity = capacity  # 緩衝區容量固定為15
        self.buffer = []  # 存儲字符的列表
        self.last_valid_time = None  # 記錄上一次有效接收(U-Boot)的時間戳

    def add_char(self, char):
        """添加字符到緩衝區,超出容量則移除最舊字符"""
        if len(self.buffer) >= self.capacity:
            self.buffer.pop(0)  # 移除頭部最舊字符
        self.buffer.append(char)  # 新增字符到尾部

    def get_buffer_str(self):
        """返回緩衝區拼接後的字符串"""
        return ''.join(self.buffer)

    def reset(self):
        """清空緩衝區"""
        self.buffer = []

def serial_monitor_demo(
    port="COM3", 
    bps=38400, 
    timeout=5,  # 串口單次讀取超時(與wait_time保持一致)
    wait_time=100000,  # 每次接收的最大等待時間
    target_str="U-Boot",  # 指定接收的目標ASCII字符串
):
    ser = None
    i = 0  # 成功接收目標字符串計數
    j = 0  # 超時計數
    k = 0  # 接收非目標字符串計數
    total_tests = 0  # 總測試數(i+j+k)
    
    # 初始化15字符長度的環形緩衝區
    ring_buffer = RingBuffer(capacity=15)
    
    try:
        # 1. 初始化並打開串口(僅打開一次,持續保持連接)
        ser = serial.Serial(
            port=port, 
            baudrate=bps, 
            timeout=wait_time,
            parity=serial.PARITY_NONE,
            stopbits=serial.STOPBITS_ONE,
            bytesize=serial.EIGHTBITS,
            xonxoff=False,  # 關閉軟件流控
            rtscts=False    # 關閉硬件流控
        )
        
        if ser.isOpen():
            open_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            print(f"RMC_AC壓測_測試復位引腳台階_程序\n[{open_time}] 串口 {port} 打開成功(波特率:{bps}),持續等待接收目標ASCII字符串:{target_str}(單次超時時間設定為:{wait_time}s)\n")
            
            # 2. 持續等待接收數據(串口保持打開,循環接收)
            while True:
                received_data = b""  # 存儲單次接收的字節數據
                start_time = time.time()  # 記錄本次接收開始時間
                has_received_data = False  # 標記是否接收到任何數據
                
                # 單次接收等待邏輯(超時時間=wait_time)
                while (time.time() - start_time) < wait_time:
                    if ser.in_waiting > 0:
                        received_data += ser.read(ser.in_waiting)
                        has_received_data = True  # 標記已接收到數據
                        
                        # ASCII解碼(忽略非ASCII無效字符)
                        received_str = received_data.decode("ascii", errors="ignore")
                        
                        # 將接收字符逐個加入環形緩衝區
                        for char in received_str:
                            ring_buffer.add_char(char)
                        
                        # 檢查緩衝區中是否包含目標字符串
                        buffer_str = ring_buffer.get_buffer_str()
                        if target_str in buffer_str:
                            i += 1
                            total_tests = i + j + k
                            current_time = datetime.now()
                            success_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
                            
                            # 計算與上一次有效接收的時間間隔
                            time_interval = 0
                            if ring_buffer.last_valid_time is not None:
                                time_interval = (current_time - ring_buffer.last_valid_time).total_seconds()
                            # 更新上一次有效接收時間
                            ring_buffer.last_valid_time = current_time
                            
                            # 打印有效接收信息(含時間間隔)
                            print(f"[{success_time}] RMC成功拉起,拉起數:{i} 總測試數:{total_tests} | 與上一次有效接收間隔:{time_interval:.1f}秒")
                            
                            ring_buffer.reset()  # 清空環形緩衝區
                            ser.flushInput()  # 清空串口緩衝區
                            #time.sleep(0.3)  # 隔離尾段字符
                            break  # 退出本次等待,進入下一次接收循環
                
                # 3. 判斷是否接收到非目標字符串
                if has_received_data and target_str not in received_data.decode("ascii", errors="ignore"):
                    k += 1
                    total_tests = i + j + k
                    non_target_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                    print(f"[{non_target_time}] ---非目標字符串---,非目標數:{k} 超時數:{j} 總測試數:{total_tests}")

                # 4. 判斷本次接收是否超時(僅無任何數據時計數超時)
                elapsed_time = time.time() - start_time
                if elapsed_time >= wait_time - 0.01 and not has_received_data:
                    j += 1
                    total_tests = i + j + k
                    timeout_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                    print(f"[{timeout_time}] ---超時警告---:本次等待{wait_time}s未接收到任何數據,超時數:{j} 非目標字符串數:{k} 總測試數:{total_tests}")
                
                #time.sleep(0.1)  # 降低CPU佔用
        
    except KeyboardInterrupt:
        # 手動終止程序(Ctrl+C)
        stop_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"\n[{stop_time}] ---程序手動終止---")
        print(f"最終統計:成功拉起數:{i},非目標字符串數:{k},超時數:{j},總測試數:{total_tests}")
        # 新增:計算各場景佔比(便於分析)
        if total_tests > 0:
            print(f"成功率:{i/total_tests:.2%},非目標占比:{k/total_tests:.2%},超時佔比:{j/total_tests:.2%}")
    except Exception as e:
        # 串口操作異常
        exception_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"\n[{exception_time}] ---串口操作異常---:{str(e)}")
        print(f"異常時統計:成功拉起數:{i},非目標字符串數:{k},超時數:{j} 總測試數:{total_tests}")
    finally:
        # 程序終止時必關串口(釋放資源)
        if ser and ser.isOpen():
            ser.close()
            final_close_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            print(f"[{final_close_time}] 串口 {port} 已關閉")

# 執行函數
if __name__ == "__main__":
    serial_monitor_demo()

 

延時等待:

import serial
import time
from datetime import datetime  # 用於獲取精確到秒的系統時間

def serial_monitor_demo(
    port="COM3", 
    bps=38400, 
    timeout=5,  # 串口單次讀取超時(與wait_time保持一致)
    wait_time=0.3,  # 每次接收的最大等待時間(1秒)
    target_str="U-Boot",  # 指定接收的目標ASCII字符串(可修改)
    #write_data=""  # 初始寫入數據(不需要可註釋)
):
    ser = None
    i = 0  # 成功接收目標字符串計數
    j = 0  # 超時計數
    k = 0  # 接收非目標字符串計數(新增)
    total_tests = 0  # 總測試數(i+j+k)
    
    try:
        # 1. 初始化並打開串口(僅打開一次,持續保持連接)
        ser = serial.Serial(
            port=port, 
            baudrate=bps, 
            timeout=wait_time,
            parity=serial.PARITY_NONE,
            stopbits=serial.STOPBITS_ONE,
            bytesize=serial.EIGHTBITS,
            xonxoff=False,  # 關閉軟件流控
            rtscts=False    # 關閉硬件流控
        )
        
        if ser.isOpen():
            open_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            print(f"RMC_AC壓測_測試復位引腳台階_程序\n[{open_time}] 串口 {port} 打開成功(波特率:{bps}),持續等待接收目標ASCII字符串:{target_str}(單次超時時間設定為:{wait_time}s)\n")
            
            # 可選:僅發送一次初始數據(不需要可註釋)
            # write_bytes = write_data.encode("gbk")  # 中文用gbk,ASCII用encode("ascii")
            # ser.write(write_bytes)
            # send_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            # print(f"[{send_time}] 已發送初始數據:{write_data}\n")
            
            # 2. 持續等待接收數據(串口保持打開,循環接收)
            while True:
                received_data = b""  # 存儲單次接收的字節數據
                start_time = time.time()  # 記錄本次接收開始時間
                has_received_data = False  # 標記是否接收到任何數據(新增)
                
                # 單次接收等待邏輯(超時時間=wait_time)
                while (time.time() - start_time) < wait_time:
                    if ser.in_waiting > 0:
                        received_data += ser.read(ser.in_waiting)
                        has_received_data = True  # 標記已接收到數據
                        
                        # ASCII解碼(忽略非ASCII無效字符)
                        received_str = received_data.decode("ascii", errors="ignore")
                        
                        # 檢查目標字符串
                        if target_str in received_str:
                            i += 1
                            total_tests = i + j + k
                            success_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                            print(f"[{success_time}] RMC成功拉起,拉起數:{i} 非目標字符串數:{k} 超時數:{j} 總測試數:{total_tests}")
                            ser.flushInput()  # 清空緩衝區,準備下一次接收
                            time.sleep(0.3)  # 隔離尾段字符
                            break  # 退出本次等待,進入下一次接收循環
                
                # 3. 新增:判斷是否接收到非目標字符串
                if has_received_data and target_str not in received_data.decode("ascii", errors="ignore"):
                    k += 1
                    total_tests = i + j + k
                    non_target_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                    # 打印非目標字符串詳情(含實際接收內容,便於調試)
                    received_str = received_data.decode("ascii", errors="ignore")
                    #print(f"[{non_target_time}] ---非目標字符串---:接收內容:{received_str.strip()},非目標數:{k} 超時數:{j} 總測試數:{total_tests}")
                    print(f"[{non_target_time}] ---非目標字符串---,非目標數:{k} 超時數:{j} 總測試數:{total_tests}")

                # 4. 判斷本次接收是否超時(僅無任何數據時計數超時)
                elapsed_time = time.time() - start_time
                if elapsed_time >= wait_time - 0.01 and not has_received_data:  # 新增:僅無數據時算超時
                    j += 1
                    total_tests = i + j + k
                    timeout_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                    print(f"[{timeout_time}] ---超時警告---:本次等待{wait_time}s未接收到任何數據,超時數:{j} 非目標字符串數:{k} 總測試數:{total_tests}")
                
                time.sleep(0.1)  # 降低CPU佔用,不影響接收響應
        
    except KeyboardInterrupt:
        # 手動終止程序(Ctrl+C)
        stop_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"\n[{stop_time}] ---程序手動終止---")
        print(f"最終統計:成功拉起數:{i},非目標字符串數:{k},超時數:{j},總測試數:{total_tests}")
        # 新增:計算各場景佔比(便於分析)
        if total_tests > 0:
            print(f"成功率:{i/total_tests:.2%},非目標占比:{k/total_tests:.2%},超時佔比:{j/total_tests:.2%}")
    except Exception as e:
        # 串口操作異常
        exception_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"\n[{exception_time}] ---串口操作異常---:{str(e)}")
        print(f"異常時統計:成功拉起數:{i},非目標字符串數:{k},超時數:{j},總測試數:{total_tests}")
    finally:
        # 程序終止時必關串口(釋放資源)
        if ser and ser.isOpen():
            ser.close()
            final_close_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            print(f"[{final_close_time}] 串口 {port} 已關閉")

# 執行函數
if __name__ == "__main__":
    serial_monitor_demo()