English 中文(简体)
在C++中是否有与Java中Protocol Buffers的有界I/O函数相等的功能?
原标题:Are there C++ equivalents for the Protocol Buffers delimited I/O functions in Java?

我正在尝试在C++和Java中从文件读取/写入多个Protocol Buffers消息。Google建议在消息之前写入长度前缀,但默认情况下没有办法实现(我没有看到)。

然而,Java API 2.1.0版本中收到了一组"分隔符" I/O 函数,它们显然完成了该任务:

parseDelimitedFrom
mergeDelimitedFrom
writeDelimitedTo

是否有C++当量? 如果不是的话,那么,大面积的线式是什么预示着贾瓦·安普森的附则,因此,我可以将这些电文贴在C++中?


Update:

这些现在在v3.3.0中的 google/protobuf/util/delimited_message_util.h 中存在。

最佳回答

我来晚了,但下面的实现包括其他答案中缺少的一些优化,并且不会在 64MB 的输入后失败(虽然仍然对每个单独的消息强制执行64MB 的限制,但不对整个流执行限制)。

我是C++和Java Protobuf库的作者,但我不再在谷歌工作。很抱歉这段代码从未进入官方库。如果进入了,就应该是这个样子。

bool writeDelimitedTo(
    const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
  // We create a new coded stream for each message.  Don t worry, this is fast.
  google::protobuf::io::CodedOutputStream output(rawOutput);

  // Write the size.
  const int size = message.ByteSize();
  output.WriteVarint32(size);

  uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
  if (buffer != NULL) {
    // Optimization:  The message fits in one buffer, so use the faster
    // direct-to-array serialization path.
    message.SerializeWithCachedSizesToArray(buffer);
  } else {
    // Slightly-slower path when the message is multiple buffers.
    message.SerializeWithCachedSizes(&output);
    if (output.HadError()) return false;
  }

  return true;
}

bool readDelimitedFrom(
    google::protobuf::io::ZeroCopyInputStream* rawInput,
    google::protobuf::MessageLite* message) {
  // We create a new coded stream for each message.  Don t worry, this is fast,
  // and it makes sure the 64MB total size limit is imposed per-message rather
  // than on the whole stream.  (See the CodedInputStream interface for more
  // info on this limit.)
  google::protobuf::io::CodedInputStream input(rawInput);

  // Read the size.
  uint32_t size;
  if (!input.ReadVarint32(&size)) return false;

  // Tell the stream not to read beyond that size.
  google::protobuf::io::CodedInputStream::Limit limit =
      input.PushLimit(size);

  // Parse the message.
  if (!message->MergeFromCodedStream(&input)) return false;
  if (!input.ConsumedEntireMessage()) return false;

  // Release the limit.
  input.PopLimit(limit);

  return true;
}
问题回答

好的,所以我无法找到实现我所需功能的顶级C++函数,但是通过查找Java API文档,我找到了以下内容,位于MessageLite接口内:

void writeDelimitedTo(OutputStream output)
/*  Like writeTo(OutputStream), but writes the size of 
    the message as a varint before writing the data.   */

因此Java大小前缀是(协议缓冲区)varint!

凭借这些信息,我开始仔细查看C++ API,并找到了CodedStream标头,其中包含以下内容:

bool CodedInputStream::ReadVarint32(uint32 * value)
void CodedOutputStream::WriteVarint32(uint32 value)

使用这些,我应该能够编写自己的C++函数来完成这项工作。

他们应该确实将此功能加入到主要的信息API中;考虑到Java和Marc Gravell的出色的protobuf-net C#端口都有该功能,它现在缺少一些功能。

我使用CodedOutputStream/ArrayOutputStream编写消息(包括大小),并使用CodedInputStream/ArrayInputStream读取消息(包括大小),解决了相同的问题。

例如,以下伪代码会先写入消息大小,然后写入消息:

const unsigned bufLength = 256;
unsigned char buffer[bufLength];
Message protoMessage;

google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength);
google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput);

codedOutput.WriteLittleEndian32(protoMessage.ByteSize());
protoMessage.SerializeToCodedStream(&codedOutput);

在写入时,您还应该检查您的缓冲区是否足够大以适配消息(包括大小)。在读取时,您应该检查您的缓冲区是否包含整个消息(包括大小)。

如果C++ API也提供类似Java API中的便捷方法,那绝对会非常方便。

IsteamInputStream 在与 std::istream 一起使用时非常容易受到 eof 和其他错误的影响,这会导致 protobuf 流永久性受损,任何已经使用的缓冲区数据都将被销毁。 Protobuf 有适当的支持从传统的流中进行读取。

实现google::protobuf::io::CopyingInputStream,并与CopyingInputStreamAdapter一起使用。对于输出变体进行相同的操作。

在实际应用中,解析调用最终会进入google::protobuf::io::CopyingInputStream:: Read(void * buffer,int size),其中提供了一个缓冲区。唯一剩下要做的就是以某种方式读取它。

这是与Asio同步流一起使用的示例(SyncReadStream/SyncWriteStream):

#include <google/protobuf/io/zero_copy_stream_impl_lite.h>

using namespace google::protobuf::io;


template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
    public:
        AsioInputStream(SyncReadStream& sock);
        int Read(void* buffer, int size);
    private:
        SyncReadStream& m_Socket;
};


template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
    m_Socket(sock) {}


template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
    std::size_t bytes_read;
    boost::system::error_code ec;
    bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);

    if(!ec) {
        return bytes_read;
    } else if (ec == boost::asio::error::eof) {
        return 0;
    } else {
        return -1;
    }
}


template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
    public:
        AsioOutputStream(SyncWriteStream& sock);
        bool Write(const void* buffer, int size);
    private:
        SyncWriteStream& m_Socket;
};


template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
    m_Socket(sock) {}


template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{   
    boost::system::error_code ec;
    m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
    return !ec;
}

用途:

AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
CopyingInputStreamAdaptor cis_adp(&ais);
CodedInputStream cis(&cis_adp);

Message protoMessage;
uint32_t msg_size;

/* Read message size */
if(!cis.ReadVarint32(&msg_size)) {
    // Handle error
 }

/* Make sure not to read beyond limit of message */
CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size);
if(!msg.ParseFromCodedStream(&cis)) {
    // Handle error
}

/* Remove limit */
cis.PopLimit(msg_limit);

这是给你的:

#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>

using namespace google::protobuf::io;

class FASWriter 
{
    std::ofstream mFs;
    OstreamOutputStream *_OstreamOutputStream;
    CodedOutputStream *_CodedOutputStream;
public:
    FASWriter(const std::string &file) : mFs(file,std::ios::out | std::ios::binary)
    {
        assert(mFs.good());

        _OstreamOutputStream = new OstreamOutputStream(&mFs);
        _CodedOutputStream = new CodedOutputStream(_OstreamOutputStream);
    }

    inline void operator()(const ::google::protobuf::Message &msg)
    {
        _CodedOutputStream->WriteVarint32(msg.ByteSize());

        if ( !msg.SerializeToCodedStream(_CodedOutputStream) )
            std::cout << "SerializeToCodedStream error " << std::endl;
    }

    ~FASWriter()
    {
        delete _CodedOutputStream;
        delete _OstreamOutputStream;
        mFs.close();
    }
};

class FASReader
{
    std::ifstream mFs;

    IstreamInputStream *_IstreamInputStream;
    CodedInputStream *_CodedInputStream;
public:
    FASReader(const std::string &file), mFs(file,std::ios::in | std::ios::binary)
    {
        assert(mFs.good());

        _IstreamInputStream = new IstreamInputStream(&mFs);
        _CodedInputStream = new CodedInputStream(_IstreamInputStream);      
    }

    template<class T>
    bool ReadNext()
    {
        T msg;
        unsigned __int32 size;

        bool ret;
        if ( ret = _CodedInputStream->ReadVarint32(&size) )
        {   
            CodedInputStream::Limit msgLimit = _CodedInputStream->PushLimit(size);
            if ( ret = msg.ParseFromCodedStream(_CodedInputStream) )
            {
                _CodedInputStream->PopLimit(msgLimit);      
                std::cout << mFeed << " FASReader ReadNext: " << msg.DebugString() << std::endl;
            }
        }

        return ret;
    }

    ~FASReader()
    {
        delete _CodedInputStream;
        delete _IstreamInputStream;
        mFs.close();
    }
};

我在C ++和Python的编程中都遇到了同样的问题。

对于C++版本,我使用了Kenton Varda在此线程中发布的代码和他发送给protobuf团队的拉取请求中的代码的混合(因为此处发布的版本无法处理EOF而他发送到Github上的版本可以)。

#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/io/coded_stream.h>


bool writeDelimitedTo(const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput)
{
    // We create a new coded stream for each message.  Don t worry, this is fast.
    google::protobuf::io::CodedOutputStream output(rawOutput);

    // Write the size.
    const int size = message.ByteSize();
    output.WriteVarint32(size);

    uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
    if (buffer != NULL)
    {
        // Optimization:  The message fits in one buffer, so use the faster
        // direct-to-array serialization path.
        message.SerializeWithCachedSizesToArray(buffer);
    }

    else
    {
        // Slightly-slower path when the message is multiple buffers.
        message.SerializeWithCachedSizes(&output);
        if (output.HadError())
            return false;
    }

    return true;
}

bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof)
{
    // We create a new coded stream for each message.  Don t worry, this is fast,
    // and it makes sure the 64MB total size limit is imposed per-message rather
    // than on the whole stream.  (See the CodedInputStream interface for more
    // info on this limit.)
    google::protobuf::io::CodedInputStream input(rawInput);
    const int start = input.CurrentPosition();
    if (clean_eof)
        *clean_eof = false;


    // Read the size.
    uint32_t size;
    if (!input.ReadVarint32(&size))
    {
        if (clean_eof)
            *clean_eof = input.CurrentPosition() == start;
        return false;
    }
    // Tell the stream not to read beyond that size.
    google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size);

    // Parse the message.
    if (!message->MergeFromCodedStream(&input)) return false;
    if (!input.ConsumedEntireMessage()) return false;

    // Release the limit.
    input.PopLimit(limit);

    return true;
}

这是我的Python2实现:

from google.protobuf.internal import encoder
from google.protobuf.internal import decoder

#I had to implement this because the tools in google.protobuf.internal.decoder
#read from a buffer, not from a file-like objcet
def readRawVarint32(stream):
    mask = 0x80 # (1 << 7)
    raw_varint32 = []
    while 1:
        b = stream.read(1)
        #eof
        if b == "":
            break
        raw_varint32.append(b)
        if not (ord(b) & mask):
            #we found a byte starting with a 0, which means it s the last byte of this varint
            break
    return raw_varint32

def writeDelimitedTo(message, stream):
    message_str = message.SerializeToString()
    delimiter = encoder._VarintBytes(len(message_str))
    stream.write(delimiter + message_str)

def readDelimitedFrom(MessageType, stream):
    raw_varint32 = readRawVarint32(stream)
    message = None

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message = MessageType()
        message.ParseFromString(data)

    return message

#In place version that takes an already built protobuf object
#In my tests, this is around 20% faster than the other version 
#of readDelimitedFrom()
def readDelimitedFrom_inplace(message, stream):
    raw_varint32 = readRawVarint32(stream)

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message.ParseFromString(data)

        return message
    else:
        return None

这也许不是最佳的守则,我相信,它能够重新成为一个公正的轨道,但至少应该向你们表明这样做的方法。

现在的大问题是:速度

即使使用C++实现的Python-protobuf,与纯C++相比,速度慢了一个数量级。我有一个基准测试,在该测试中,我从文件中读取了大约30字节的10M个protobuf消息。在C++中大约需要0.9秒,在Python中需要35秒。

一种使速度更快的方法是重新实现varint解码器,使其一次性从文件中读取并解码,而不是像当前代码那样从文件中读取,再进行解码(分析表明,varint编码器/解码器消耗了相当多的时间)。但不用说,仅凭此举还不足以缩小Python版本和C++版本之间的差距。

任何能让它变得更快的想法都非常欢迎 :)

仅为完整性起见,在此发布一个最新版本,可与protobuf的主版本和Python3一起使用。

对于 C++ 版本,仅需使用 delimited_message_utils.h 中的实用程序即可,这里是一个最小工作示例。

#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/util/delimited_message_util.h>

#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>

template <typename T>
bool writeManyToFile(std::deque<T> messages, std::string filename) {
    int outfd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC);
    google::protobuf::io::FileOutputStream fout(outfd);

    bool success;
    for (auto msg: messages) {
        success = google::protobuf::util::SerializeDelimitedToZeroCopyStream(
            msg, &fout);
        if (! success) {
            std::cout << "Writing Failed" << std::endl;
            break;
        }
    }
    fout.Close();
    close(outfd);
    return success;
}

template <typename T>
std::deque<T> readManyFromFile(std::string filename) {
    int infd = open(filename.c_str(), O_RDONLY);

    google::protobuf::io::FileInputStream fin(infd);
    bool keep = true;
    bool clean_eof = true;
    std::deque<T> out;

    while (keep) {
        T msg;
        keep = google::protobuf::util::ParseDelimitedFromZeroCopyStream(
            &msg, &fin, nullptr);
        if (keep)
            out.push_back(msg);
    }
    fin.Close();
    close(infd);
    return out;
}

对于Python3版本,在@fireboot的答案基础上,唯一需要修改的是对raw_varint32的解码。

def getSize(raw_varint32):
    result = 0
    shift = 0
    b = six.indexbytes(raw_varint32, 0)
    result |= ((ord(b) & 0x7f) << shift)
    return result

def readDelimitedFrom(MessageType, stream):
    raw_varint32 = readRawVarint32(stream)
    message = None

    if raw_varint32:
        size = getSize(raw_varint32)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message = MessageType()
        message.ParseFromString(data)

    return message

也在寻找解决方案。以下是我们的解决方案核心,假设一些Java代码将许多MyRecord消息使用writeDelimitedTo写入文件。打开文件并循环执行以下操作:

if(someCodedInputStream->ReadVarint32(&bytes)) {
  CodedInputStream::Limit msgLimit = someCodedInputStream->PushLimit(bytes);
  if(myRecord->ParseFromCodedStream(someCodedInputStream)) {
    //do your stuff with the parsed MyRecord instance
  } else {
    //handle parse error
  }
  someCodedInputStream->PopLimit(msgLimit);
} else {
  //maybe end of file
}

希望它能帮助。

使用 objective-c 版本的协议缓冲区遇到了这个确切的问题。在从 iOS 客户端发送到使用 parseDelimitedFrom 的基于 Java 的服务器时,服务器希望将长度作为第一个字节接收,因此我需要首先调用 writeRawByte 到 CodedOutputStream。在此发布希望能帮助其他遇到这个问题的人。在解决此问题时,一个人会认为 Google proto-bufs 会附带一个简单的标志来帮助你完成这个...

    Request* request = [rBuild build];

    [self sendMessage:request];
} 


- (void) sendMessage:(Request *) request {

    //** get length
    NSData* n = [request data];
    uint8_t len = [n length];

    PBCodedOutputStream* os = [PBCodedOutputStream streamWithOutputStream:outputStream];
    //** prepend it to message, such that Request.parseDelimitedFrom(in) can parse it properly
    [os writeRawByte:len];
    [request writeToCodedOutputStream:os];
    [os flush];
}

由于我不被允许在上面的Kenton Varda的回答中以评论形式写此内容;我相信他发布的代码(以及其他提供的答案)存在错误。以下代码:

...
google::protobuf::io::CodedInputStream input(rawInput);

// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;

// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
    input.PushLimit(size);
...

由于未考虑已从输入中读取的varint32的大小,因此设置了错误的限制。这可能会导致数据丢失/损坏,因为从流中读取其他字节可能是下一个消息的一部分。正确处理的通常方法是删除用于读取大小的CodedInputStream并创建一个新的用于读取有效载荷:

...
uint32_t size;
{
  google::protobuf::io::CodedInputStream input(rawInput);

  // Read the size.
  if (!input.ReadVarint32(&size)) return false;
}

google::protobuf::io::CodedInputStream input(rawInput);

// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
    input.PushLimit(size);
...




相关问题
Spring Properties File

Hi have this j2ee web application developed using spring framework. I have a problem with rendering mnessages in nihongo characters from the properties file. I tried converting the file to ascii using ...

Logging a global ID in multiple components

I have a system which contains multiple applications connected together using JMS and Spring Integration. Messages get sent along a chain of applications. [App A] -> [App B] -> [App C] We set a ...

Java Library Size

If I m given two Java Libraries in Jar format, 1 having no bells and whistles, and the other having lots of them that will mostly go unused.... my question is: How will the larger, mostly unused ...

How to get the Array Class for a given Class in Java?

I have a Class variable that holds a certain type and I need to get a variable that holds the corresponding array class. The best I could come up with is this: Class arrayOfFooClass = java.lang....

SQLite , Derby vs file system

I m working on a Java desktop application that reads and writes from/to different files. I think a better solution would be to replace the file system by a SQLite database. How hard is it to migrate ...

热门标签