pyserial-как прочитать последнюю строку, отправленную с последовательного устройства
У меня есть Arduino, подключенный к моему компьютеру, выполняющему цикл, посылающий значение через последовательный порт обратно в компьютер каждые 100 мс.
Я хочу сделать скрипт Python, который будет считываться с последовательного порта только каждые несколько секунд, поэтому я хочу, чтобы он просто видел последнюю вещь, отправленную с Arduino.
Как вы делаете это в Pyserial?
Вот код, который я попробовал, но он не работает. Он читает строки последовательно.
import serial
import time
ser = serial.Serial('com4',9600,timeout=1)
while 1:
time.sleep(10)
print ser.readline() #How do I get the most recent line sent from the device?
10 ответов:
Возможно, я неправильно понял ваш вопрос, но поскольку это последовательная линия, вам придется читать все, что отправляется из Arduino последовательно - это будет буферизовано в Arduino, пока вы не прочтете его.
Если вы хотите иметь дисплей состояния, который показывает последнюю отправленную вещь-используйте поток, который включает код в ваш вопрос (минус спящий режим), и держите последнюю полную строку прочитанной как последняя строка из Arduino.
Обновление:
mtasic
's пример кода вполне хорошо, но если Arduino отправил частичную строку при вызовеinWaiting()
, вы получите усеченную строку. Вместо этого вы хотите поместить последнюю полную строку вlast_received
и сохранить частичную строку вbuffer
, чтобы ее можно было добавить к следующему циклу цикла. Что-то вроде этого:def receiving(ser): global last_received buffer_string = '' while True: buffer_string = buffer_string + ser.read(ser.inWaiting()) if '\n' in buffer_string: lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries last_received = lines[-2] #If the Arduino sends lots of empty lines, you'll lose the #last filled line, so you could make the above statement conditional #like so: if lines[-2]: last_received = lines[-2] buffer_string = lines[-1]
Относительно использования
readline()
: Вот что должна сказать документация Pyserial (слегка отредактированная для ясности и с упоминанием readlines ()):Что кажется мне вполне разумным!Будьте осторожны, когда использование "readline". Делать укажите тайм-аут при открытии последовательный порт, в противном случае он может заблокировать навсегда, если нет символа новой строки полученный. Также обратите внимание, что "readlines()" работает только с таймаутом. Оно зависит от наличия тайм-аута и интерпретирует это как EOF (конец файла).
from serial import * from threading import Thread last_received = '' def receiving(ser): global last_received buffer = '' while True: # last_received = ser.readline() buffer += ser.read(ser.inWaiting()) if '\n' in buffer: last_received, buffer = buffer.split('\n')[-2:] if __name__ == '__main__': ser = Serial( port=None, baudrate=9600, bytesize=EIGHTBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=0.1, xonxoff=0, rtscts=0, interCharTimeout=None ) Thread(target=receiving, args=(ser,)).start()
Эти решения будут забивать процессор во время ожидания символов.
Вы должны сделать хотя бы один блокирующий вызов для чтения (1)
while True: if '\n' in buffer: pass # skip if a line already in buffer else: buffer += ser.read(1) # this will block until one more char or timeout buffer += ser.read(ser.inWaiting()) # get remaining buffered chars
...и делай то же самое, что и раньше.
Можно использовать
ser.flushInput()
для удаления всех последовательных данных, находящихся в данный момент в буфере.Очистив старые данные, вы можете использовать ser.readline() для получения последних данных с последовательного устройства.
Я думаю, что это немного проще, чем другие предлагаемые здесь решения. Работал на меня, надеюсь, это подходит для вас.
Этот метод позволяет отдельно управлять таймаутом для сбора всех данных для каждой строки и другим таймаутом для ожидания на дополнительных строках.
# get the last line from serial port lines = serial_com() lines[-1] def serial_com(): '''Serial communications: get a response''' # open serial port try: serial_port = serial.Serial(com_port, baudrate=115200, timeout=1) except serial.SerialException as e: print("could not open serial port '{}': {}".format(com_port, e)) # read response from serial port lines = [] while True: line = serial_port.readline() lines.append(line.decode('utf-8').rstrip()) # wait for new data after each line timeout = time.time() + 0.1 while not serial_port.inWaiting() and timeout > time.time(): pass if not serial_port.inWaiting(): break #close the serial port serial_port.close() return lines
Вам понадобится цикл, чтобы прочитать все отправленное, с блокировкой последнего вызова readline() до истечения таймаута. Итак:
def readLastLine(ser): last_data='' while True: data=ser.readline() if data!='': last_data=data else: return last_data
Небольшая модификация кода mtasic & Vinay Sajip:
Хотя я нашел этот код весьма полезным для меня для аналогичного приложения, мне нужны были все строки, возвращающиеся от последовательного устройства, которое будет периодически отправлять информацию.
Я решил вытащить первый элемент сверху, записать его, а затем присоединиться к остальным элементам в качестве нового буфера и продолжить оттуда.
Я понимаю, что это не то, о чем просил Грег, но я думал, что это было стоит поделиться в качестве примечания.
def receiving(ser): global last_received buffer = '' while True: buffer = buffer + ser.read(ser.inWaiting()) if '\n' in buffer: lines = buffer.split('\n') last_received = lines.pop(0) buffer = '\n'.join(lines)
Использование
.inWaiting()
внутри бесконечного цикла может быть проблематичным. В зависимости от реализации он может захватить весь процессор . Вместо этого я бы рекомендовал использовать определенный размер данных для чтения. Поэтому в этом случае следует сделать следующее, например:ser.read(1024)
Слишком много осложнений
Какова причина разбиения объекта bytes с помощью новой строки или других манипуляций с массивом? Я пишу самый простой метод, который решит вашу проблему:
import serial s = serial.Serial(31) s.write(bytes("ATI\r\n", "utf-8")); while True: last = '' for byte in s.read(s.inWaiting()): last += chr(byte) if len(last) > 0: # Do whatever you want with last print (bytes(last, "utf-8")) last = ''
Вот пример использования оболочки, которая позволяет читать самую последнюю строку без 100% CPU
class ReadLine: """ pyserial object wrapper for reading line source: https://github.com/pyserial/pyserial/issues/216 """ def __init__(self, s): self.buf = bytearray() self.s = s def readline(self): i = self.buf.find(b"\n") if i >= 0: r = self.buf[:i + 1] self.buf = self.buf[i + 1:] return r while True: i = max(1, min(2048, self.s.in_waiting)) data = self.s.read(i) i = data.find(b"\n") if i >= 0: r = self.buf + data[:i + 1] self.buf[0:] = data[i + 1:] return r else: self.buf.extend(data) s = serial.Serial('/dev/ttyS0') device = ReadLine(s) while True: print(device.readline())