Существуют ли эквиваленты C++ для буферов протокола, разделенных функциями ввода-вывода в Java?
Я пытаюсь читать / писать сообщения нескольких буферов протокола из файлов, как на C++, так и на Java. Google предлагает писать префиксы длины перед сообщениями, но нет никакого способа сделать это по умолчанию (что я мог видеть).
однако Java API в версии 2.1.0 получил набор "разделенных" функций ввода-вывода, которые, по-видимому, выполняют эту работу:
parseDelimitedFrom
mergeDelimitedFrom
writeDelimitedTo
существуют ли эквиваленты C++? А если нет, то каков формат провода для префиксов размера, которые прикрепляет Java API, поэтому Я могу разобрать эти сообщения на C++?
10 ответов:
Я немного опоздал на вечеринку здесь, но нижеприведенные реализации включают некоторые оптимизации, отсутствующие в других ответах, и не потерпят неудачу после ввода 64 МБ (хотя он все еще применяет предел 64 МБ на каждое отдельное сообщение, просто не на весь поток).
(Я являюсь автором библиотек C++ и Java protobuf, но я больше не работаю в Google. Извините, что этот код никогда не попадал в официальную lib. Как это будет выглядеть, если он имел.)
bool writeDelimitedTo( const google::protobuf::MessageLite& message, google::protobuf::io::ZeroCopyOutputStream* rawOutput) { // We create a new coded stream for each message. Don't worry, this is fast. google::protobuf::io::CodedOutputStream output(rawOutput); // Write the size. const int size = message.ByteSize(); output.WriteVarint32(size); uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size); if (buffer != NULL) { // Optimization: The message fits in one buffer, so use the faster // direct-to-array serialization path. message.SerializeWithCachedSizesToArray(buffer); } else { // Slightly-slower path when the message is multiple buffers. message.SerializeWithCachedSizes(&output); if (output.HadError()) return false; } return true; } bool readDelimitedFrom( google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message) { // We create a new coded stream for each message. Don't worry, this is fast, // and it makes sure the 64MB total size limit is imposed per-message rather // than on the whole stream. (See the CodedInputStream interface for more // info on this limit.) google::protobuf::io::CodedInputStream input(rawInput); // Read the size. uint32_t size; if (!input.ReadVarint32(&size)) return false; // Tell the stream not to read beyond that size. google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size); // Parse the message. if (!message->MergeFromCodedStream(&input)) return false; if (!input.ConsumedEntireMessage()) return false; // Release the limit. input.PopLimit(limit); return true; }
хорошо, поэтому я не смог найти функции C++ верхнего уровня, реализующие то, что мне нужно, но некоторые spelunking через ссылку API Java оказались следующими, внутри MessageLite интерфейс:
void writeDelimitedTo(OutputStream output) /* Like writeTo(OutputStream), but writes the size of the message as a varint before writing the data. */
таким образом, префикс размера Java-это (буферы протокола) varint!
вооружившись этой информацией, я пошел копаться в API C++ и нашел CodedStream заголовок, который имеет эти:
bool CodedInputStream::ReadVarint32(uint32 * value) void CodedOutputStream::WriteVarint32(uint32 value)
используя их, я должен быть в состоянии свернуть мои собственные функции C++, которые выполняют эту работу.
они действительно должны добавить это в API основного сообщения; это отсутствует функциональность, учитывая, что Java имеет его, а также отличный порт protobuf-net C# Марка Гравелла (через SerializeWithLengthPrefix и DeserializeWithLengthPrefix).
Я решил ту же проблему, используя CodedOutputStream/ArrayOutputStream для записи сообщения (с размером) и CodedInputStream/ArrayInputStream для чтения сообщения (с размером).
например, следующий псевдокод записывает размер сообщения, следующий за сообщением:
const unsigned bufLength = 256; unsigned char buffer[bufLength]; Message protoMessage; google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength); google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput); codedOutput.WriteLittleEndian32(protoMessage.ByteSize()); protoMessage.SerializeToCodedStream(&codedOutput);
при записи вы также должны проверить, что ваш буфер достаточно велик, чтобы соответствовать сообщению (включая размер). И при чтении, вы должны проверить, что ваш буфер содержит все сообщение (включая размер).
Это определенно было бы удобно, если бы они добавили методы удобства в C++ API, аналогичные тем, которые предоставляются Java API.
вот так:
#include <google/protobuf/io/zero_copy_stream_impl.h> #include <google/protobuf/io/coded_stream.h> using namespace google::protobuf::io; class FASWriter { std::ofstream mFs; OstreamOutputStream *_OstreamOutputStream; CodedOutputStream *_CodedOutputStream; public: FASWriter(const std::string &file) : mFs(file,std::ios::out | std::ios::binary) { assert(mFs.good()); _OstreamOutputStream = new OstreamOutputStream(&mFs); _CodedOutputStream = new CodedOutputStream(_OstreamOutputStream); } inline void operator()(const ::google::protobuf::Message &msg) { _CodedOutputStream->WriteVarint32(msg.ByteSize()); if ( !msg.SerializeToCodedStream(_CodedOutputStream) ) std::cout << "SerializeToCodedStream error " << std::endl; } ~FASWriter() { delete _CodedOutputStream; delete _OstreamOutputStream; mFs.close(); } }; class FASReader { std::ifstream mFs; IstreamInputStream *_IstreamInputStream; CodedInputStream *_CodedInputStream; public: FASReader(const std::string &file), mFs(file,std::ios::in | std::ios::binary) { assert(mFs.good()); _IstreamInputStream = new IstreamInputStream(&mFs); _CodedInputStream = new CodedInputStream(_IstreamInputStream); } template<class T> bool ReadNext() { T msg; unsigned __int32 size; bool ret; if ( ret = _CodedInputStream->ReadVarint32(&size) ) { CodedInputStream::Limit msgLimit = _CodedInputStream->PushLimit(size); if ( ret = msg.ParseFromCodedStream(_CodedInputStream) ) { _CodedInputStream->PopLimit(msgLimit); std::cout << mFeed << " FASReader ReadNext: " << msg.DebugString() << std::endl; } } return ret; } ~FASReader() { delete _CodedInputStream; delete _IstreamInputStream; mFs.close(); } };
я столкнулся с той же проблемой как в C++, так и в Python.
для версии C++ я использовал смесь кода Kenton Varda, опубликованного в этом потоке, и кода из запроса pull, который он отправил команде protobuf (потому что версия, опубликованная здесь, не обрабатывает EOF, а тот, который он отправил в github).
#include <google/protobuf/message_lite.h> #include <google/protobuf/io/zero_copy_stream.h> #include <google/protobuf/io/coded_stream.h> bool writeDelimitedTo(const google::protobuf::MessageLite& message, google::protobuf::io::ZeroCopyOutputStream* rawOutput) { // We create a new coded stream for each message. Don't worry, this is fast. google::protobuf::io::CodedOutputStream output(rawOutput); // Write the size. const int size = message.ByteSize(); output.WriteVarint32(size); uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size); if (buffer != NULL) { // Optimization: The message fits in one buffer, so use the faster // direct-to-array serialization path. message.SerializeWithCachedSizesToArray(buffer); } else { // Slightly-slower path when the message is multiple buffers. message.SerializeWithCachedSizes(&output); if (output.HadError()) return false; } return true; } bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof) { // We create a new coded stream for each message. Don't worry, this is fast, // and it makes sure the 64MB total size limit is imposed per-message rather // than on the whole stream. (See the CodedInputStream interface for more // info on this limit.) google::protobuf::io::CodedInputStream input(rawInput); const int start = input.CurrentPosition(); if (clean_eof) *clean_eof = false; // Read the size. uint32_t size; if (!input.ReadVarint32(&size)) { if (clean_eof) *clean_eof = input.CurrentPosition() == start; return false; } // Tell the stream not to read beyond that size. google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size); // Parse the message. if (!message->MergeFromCodedStream(&input)) return false; if (!input.ConsumedEntireMessage()) return false; // Release the limit. input.PopLimit(limit); return true; }
и вот моя реализация python2:
from google.protobuf.internal import encoder from google.protobuf.internal import decoder #I had to implement this because the tools in google.protobuf.internal.decoder #read from a buffer, not from a file-like objcet def readRawVarint32(stream): mask = 0x80 # (1 << 7) raw_varint32 = [] while 1: b = stream.read(1) #eof if b == "": break raw_varint32.append(b) if not (ord(b) & mask): #we found a byte starting with a 0, which means it's the last byte of this varint break return raw_varint32 def writeDelimitedTo(message, stream): message_str = message.SerializeToString() delimiter = encoder._VarintBytes(len(message_str)) stream.write(delimiter + message_str) def readDelimitedFrom(MessageType, stream): raw_varint32 = readRawVarint32(stream) message = None if raw_varint32: size, _ = decoder._DecodeVarint32(raw_varint32, 0) data = stream.read(size) if len(data) < size: raise Exception("Unexpected end of file") message = MessageType() message.ParseFromString(data) return message #In place version that takes an already built protobuf object #In my tests, this is around 20% faster than the other version #of readDelimitedFrom() def readDelimitedFrom_inplace(message, stream): raw_varint32 = readRawVarint32(stream) if raw_varint32: size, _ = decoder._DecodeVarint32(raw_varint32, 0) data = stream.read(size) if len(data) < size: raise Exception("Unexpected end of file") message.ParseFromString(data) return message else: return None
Это может быть не самый красивый код, и я уверен, что он может быть переработан немного, но в крайней мере, это должно показать вам один способ делать это.
теперь большая проблема: это медленно.
даже при использовании C++ реализации python-protobuf, это на порядок медленнее, чем в чистом C++. У меня есть тест, где я читаю 10M протобуф сообщения ~30 байт каждый из файла. Это занимает ~0.9 s в C++ и 35s в python.
один из способов сделать это немного быстрее - это повторно реализовать декодер varint, чтобы он читал из файла и декодируйте за один раз, вместо того, чтобы читать из файла, а затем декодировать, как это делает этот код в настоящее время. (профилирование показывает, что значительное количество времени тратится в кодере/декодере varint). Но излишне говорить, что одного этого недостаточно, чтобы закрыть разрыв между версией python и версией C++.
любая идея, чтобы сделать это быстрее очень приветствуется :)
IsteamInputStream очень хрупок для доказательств и других ошибок, которые легко возникают при использовании вместе с std::istream. После этого потоки protobuf постоянно повреждаются и все уже используемые буферные данные уничтожаются. В protobuf есть правильная поддержка чтения из традиционных потоков.
реализовать
google::protobuf::io::CopyingInputStream
и используйте это вместе с CopyingInputStreamAdapter. Сделайте то же самое для выходных вариантов.на практике разбор вызова заканчивается в
google::protobuf::io::CopyingInputStream::Read(void* buffer, int size)
где задан буфер. Единственное, что осталось сделать, это как-то прочитать его.вот пример для использования с синхронизированными потоками Asio (SyncReadStream/SyncWriteStream):
#include <google/protobuf/io/zero_copy_stream_impl_lite.h> using namespace google::protobuf::io; template <typename SyncReadStream> class AsioInputStream : public CopyingInputStream { public: AsioInputStream(SyncReadStream& sock); int Read(void* buffer, int size); private: SyncReadStream& m_Socket; }; template <typename SyncReadStream> AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) : m_Socket(sock) {} template <typename SyncReadStream> int AsioInputStream<SyncReadStream>::Read(void* buffer, int size) { std::size_t bytes_read; boost::system::error_code ec; bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec); if(!ec) { return bytes_read; } else if (ec == boost::asio::error::eof) { return 0; } else { return -1; } } template <typename SyncWriteStream> class AsioOutputStream : public CopyingOutputStream { public: AsioOutputStream(SyncWriteStream& sock); bool Write(const void* buffer, int size); private: SyncWriteStream& m_Socket; }; template <typename SyncWriteStream> AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) : m_Socket(sock) {} template <typename SyncWriteStream> bool AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size) { boost::system::error_code ec; m_Socket.write_some(boost::asio::buffer(buffer, size), ec); return !ec; }
использование:
AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket CopyingInputStreamAdaptor cis_adp(&ais); CodedInputStream cis(&cis_adp); Message protoMessage; uint32_t msg_size; /* Read message size */ if(!cis.ReadVarint32(&msg_size)) { // Handle error } /* Make sure not to read beyond limit of message */ CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size); if(!msg.ParseFromCodedStream(&cis)) { // Handle error } /* Remove limit */ cis.PopLimit(msg_limit);
также искал решение для этого. Вот ядро нашего решения, предполагая, что некоторый код java написал много сообщений MyRecord с
writeDelimitedTo
в файл. Откройте файл и цикл, делая:if(someCodedInputStream->ReadVarint32(&bytes)) { CodedInputStream::Limit msgLimit = someCodedInputStream->PushLimit(bytes); if(myRecord->ParseFromCodedStream(someCodedInputStream)) { //do your stuff with the parsed MyRecord instance } else { //handle parse error } someCodedInputStream->PopLimit(msgLimit); } else { //maybe end of file }надеюсь, что это помогает.
работая с objective-c версией protocol-buffers, я столкнулся с этой точной проблемой. При отправке с клиента iOS на сервер на основе Java, который использует parseDelimitedFrom, который ожидает длину в качестве первого байта, мне нужно было сначала вызвать writeRawByte в CodedOutputStream. Публикация здесь, чтобы надеяться помочь другим, которые сталкиваются с этой проблемой. При работе над этой проблемой можно было бы подумать, что Google proto-bufs будет поставляться с простым флагом, который делает это для вы...
Request* request = [rBuild build]; [self sendMessage:request]; } - (void) sendMessage:(Request *) request { //** get length NSData* n = [request data]; uint8_t len = [n length]; PBCodedOutputStream* os = [PBCodedOutputStream streamWithOutputStream:outputStream]; //** prepend it to message, such that Request.parseDelimitedFrom(in) can parse it properly [os writeRawByte:len]; [request writeToCodedOutputStream:os]; [os flush]; }
поскольку мне не разрешено писать это как комментарий к ответу Кентона Варды выше; я считаю, что в коде, который он опубликовал (а также в других ответах, которые были предоставлены), есть ошибка. Следующий код:
... google::protobuf::io::CodedInputStream input(rawInput); // Read the size. uint32_t size; if (!input.ReadVarint32(&size)) return false; // Tell the stream not to read beyond that size. google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size); ...
устанавливает неверный предел, поскольку он не учитывает размер varint32, который уже был считан из входных данных. Это может привести к потере/повреждению данных, поскольку дополнительные байты считываются из потока, который может быть частью следующего сообщения. Этот обычный способ правильной обработки - удалить CodedInputStream, используемый для чтения размера, и создать новый для чтения полезной нагрузки:
... uint32_t size; { google::protobuf::io::CodedInputStream input(rawInput); // Read the size. if (!input.ReadVarint32(&size)) return false; } google::protobuf::io::CodedInputStream input(rawInput); // Tell the stream not to read beyond that size. google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size); ...