pyserial-как прочитать последнюю строку, отправленную с последовательного устройства


У меня есть Arduino, подключенный к моему компьютеру, выполняющему цикл, посылающий значение через последовательный порт обратно в компьютер каждые 100 мс.

Я хочу сделать скрипт Python, который будет считываться с последовательного порта только каждые несколько секунд, поэтому я хочу, чтобы он просто видел последнюю вещь, отправленную с Arduino.

Как вы делаете это в Pyserial?

Вот код, который я попробовал, но он не работает. Он читает строки последовательно.

import serial
import time

ser = serial.Serial('com4',9600,timeout=1)
while 1:
    time.sleep(10)
    print ser.readline() #How do I get the most recent line sent from the device?
10 17

10 ответов:

Возможно, я неправильно понял ваш вопрос, но поскольку это последовательная линия, вам придется читать все, что отправляется из Arduino последовательно - это будет буферизовано в Arduino, пока вы не прочтете его.

Если вы хотите иметь дисплей состояния, который показывает последнюю отправленную вещь-используйте поток, который включает код в ваш вопрос (минус спящий режим), и держите последнюю полную строку прочитанной как последняя строка из Arduino.

Обновление: mtasic's пример кода вполне хорошо, но если Arduino отправил частичную строку при вызове inWaiting(), вы получите усеченную строку. Вместо этого вы хотите поместить последнюю полную строку в last_received и сохранить частичную строку в buffer, чтобы ее можно было добавить к следующему циклу цикла. Что-то вроде этого:

def receiving(ser):
    global last_received

    buffer_string = ''
    while True:
        buffer_string = buffer_string + ser.read(ser.inWaiting())
        if '\n' in buffer_string:
            lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries
            last_received = lines[-2]
            #If the Arduino sends lots of empty lines, you'll lose the
            #last filled line, so you could make the above statement conditional
            #like so: if lines[-2]: last_received = lines[-2]
            buffer_string = lines[-1]

Относительно использования readline(): Вот что должна сказать документация Pyserial (слегка отредактированная для ясности и с упоминанием readlines ()):

Будьте осторожны, когда использование "readline". Делать укажите тайм-аут при открытии последовательный порт, в противном случае он может заблокировать навсегда, если нет символа новой строки полученный. Также обратите внимание, что "readlines()" работает только с таймаутом. Оно зависит от наличия тайм-аута и интерпретирует это как EOF (конец файла).

Что кажется мне вполне разумным!
from serial import *
from threading import Thread

last_received = ''

def receiving(ser):
    global last_received
    buffer = ''

    while True:
        # last_received = ser.readline()
        buffer += ser.read(ser.inWaiting())
        if '\n' in buffer:
            last_received, buffer = buffer.split('\n')[-2:]

if __name__ ==  '__main__':
    ser = Serial(
        port=None,
        baudrate=9600,
        bytesize=EIGHTBITS,
        parity=PARITY_NONE,
        stopbits=STOPBITS_ONE,
        timeout=0.1,
        xonxoff=0,
        rtscts=0,
        interCharTimeout=None
    )

    Thread(target=receiving, args=(ser,)).start()

Эти решения будут забивать процессор во время ожидания символов.

Вы должны сделать хотя бы один блокирующий вызов для чтения (1)

while True:
    if '\n' in buffer: 
        pass # skip if a line already in buffer
    else:
        buffer += ser.read(1)  # this will block until one more char or timeout
    buffer += ser.read(ser.inWaiting()) # get remaining buffered chars

...и делай то же самое, что и раньше.

Можно использовать ser.flushInput() для удаления всех последовательных данных, находящихся в данный момент в буфере.

Очистив старые данные, вы можете использовать ser.readline() для получения последних данных с последовательного устройства.

Я думаю, что это немного проще, чем другие предлагаемые здесь решения. Работал на меня, надеюсь, это подходит для вас.

Этот метод позволяет отдельно управлять таймаутом для сбора всех данных для каждой строки и другим таймаутом для ожидания на дополнительных строках.

# get the last line from serial port
lines = serial_com()
lines[-1]              

def serial_com():
    '''Serial communications: get a response'''

    # open serial port
    try:
        serial_port = serial.Serial(com_port, baudrate=115200, timeout=1)
    except serial.SerialException as e:
        print("could not open serial port '{}': {}".format(com_port, e))

    # read response from serial port
    lines = []
    while True:
        line = serial_port.readline()
        lines.append(line.decode('utf-8').rstrip())

        # wait for new data after each line
        timeout = time.time() + 0.1
        while not serial_port.inWaiting() and timeout > time.time():
            pass
        if not serial_port.inWaiting():
            break 

    #close the serial port
    serial_port.close()   
    return lines

Вам понадобится цикл, чтобы прочитать все отправленное, с блокировкой последнего вызова readline() до истечения таймаута. Итак:

def readLastLine(ser):
    last_data=''
    while True:
        data=ser.readline()
        if data!='':
            last_data=data
        else:
            return last_data

Небольшая модификация кода mtasic & Vinay Sajip:

Хотя я нашел этот код весьма полезным для меня для аналогичного приложения, мне нужны были все строки, возвращающиеся от последовательного устройства, которое будет периодически отправлять информацию.

Я решил вытащить первый элемент сверху, записать его, а затем присоединиться к остальным элементам в качестве нового буфера и продолжить оттуда.

Я понимаю, что это не то, о чем просил Грег, но я думал, что это было стоит поделиться в качестве примечания.

def receiving(ser):
    global last_received

    buffer = ''
    while True:
        buffer = buffer + ser.read(ser.inWaiting())
        if '\n' in buffer:
            lines = buffer.split('\n')
            last_received = lines.pop(0)

            buffer = '\n'.join(lines)

Использование .inWaiting() внутри бесконечного цикла может быть проблематичным. В зависимости от реализации он может захватить весь процессор . Вместо этого я бы рекомендовал использовать определенный размер данных для чтения. Поэтому в этом случае следует сделать следующее, например:

ser.read(1024)

Слишком много осложнений

Какова причина разбиения объекта bytes с помощью новой строки или других манипуляций с массивом? Я пишу самый простой метод, который решит вашу проблему:

import serial
s = serial.Serial(31)
s.write(bytes("ATI\r\n", "utf-8"));
while True:
    last = ''
    for byte in s.read(s.inWaiting()): last += chr(byte)
    if len(last) > 0:
        # Do whatever you want with last
        print (bytes(last, "utf-8"))
        last = ''

Вот пример использования оболочки, которая позволяет читать самую последнюю строку без 100% CPU

class ReadLine:
    """
    pyserial object wrapper for reading line
    source: https://github.com/pyserial/pyserial/issues/216
    """
    def __init__(self, s):
        self.buf = bytearray()
        self.s = s

    def readline(self):
        i = self.buf.find(b"\n")
        if i >= 0:
            r = self.buf[:i + 1]
            self.buf = self.buf[i + 1:]
            return r
        while True:
            i = max(1, min(2048, self.s.in_waiting))
            data = self.s.read(i)
            i = data.find(b"\n")
            if i >= 0:
                r = self.buf + data[:i + 1]
                self.buf[0:] = data[i + 1:]
                return r
            else:
                self.buf.extend(data)

s = serial.Serial('/dev/ttyS0')
device = ReadLine(s)
while True:
    print(device.readline())