blob: 98e4647222fb48c900c2934c6081a4276cf184b2 [file] [log] [blame]
#! /usr/bin/python
#
# Protocol Buffers - Google's data interchange format
# Copyright 2008 Google Inc. All rights reserved.
# http://code.google.com/p/protobuf/
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Test for google.protobuf.internal.decoder."""
__author__ = 'robinson@google.com (Will Robinson)'
import struct
import unittest
from google.protobuf.internal import decoder
from google.protobuf.internal import encoder
from google.protobuf.internal import input_stream
from google.protobuf.internal import wire_format
from google.protobuf import message
import logging
import mox
class DecoderTest(unittest.TestCase):
def setUp(self):
self.mox = mox.Mox()
self.mock_stream = self.mox.CreateMock(input_stream.InputStream)
self.mock_message = self.mox.CreateMock(message.Message)
def testReadFieldNumberAndWireType(self):
# Test field numbers that will require various varint sizes.
for expected_field_number in (1, 15, 16, 2047, 2048):
for expected_wire_type in range(6): # Highest-numbered wiretype is 5.
e = encoder.Encoder()
e.AppendTag(expected_field_number, expected_wire_type)
s = e.ToString()
d = decoder.Decoder(s)
field_number, wire_type = d.ReadFieldNumberAndWireType()
self.assertEqual(expected_field_number, field_number)
self.assertEqual(expected_wire_type, wire_type)
def ReadScalarTestHelper(self, test_name, decoder_method, expected_result,
expected_stream_method_name,
stream_method_return, *args):
"""Helper for testReadScalars below.
Calls one of the Decoder.Read*() methods and ensures that the results are
as expected.
Args:
test_name: Name of this test, used for logging only.
decoder_method: Unbound decoder.Decoder method to call.
expected_result: Value we expect returned from decoder_method().
expected_stream_method_name: (string) Name of the InputStream
method we expect Decoder to call to actually read the value
on the wire.
stream_method_return: Value our mocked-out stream method should
return to the decoder.
args: Additional arguments that we expect to be passed to the
stream method.
"""
logging.info('Testing %s scalar input.\n'
'Calling %r(), and expecting that to call the '
'stream method %s(%r), which will return %r. Finally, '
'expecting the Decoder method to return %r'% (
test_name, decoder_method,
expected_stream_method_name, args, stream_method_return,
expected_result))
d = decoder.Decoder('')
d._stream = self.mock_stream
if decoder_method in (decoder.Decoder.ReadString,
decoder.Decoder.ReadBytes):
self.mock_stream.ReadVarUInt32().AndReturn(len(stream_method_return))
# We have to use names instead of methods to work around some
# mox weirdness. (ResetAll() is overzealous).
expected_stream_method = getattr(self.mock_stream,
expected_stream_method_name)
expected_stream_method(*args).AndReturn(stream_method_return)
self.mox.ReplayAll()
result = decoder_method(d)
self.assertEqual(expected_result, result)
self.assert_(isinstance(result, type(expected_result)))
self.mox.VerifyAll()
self.mox.ResetAll()
VAL = 1.125 # Perfectly representable as a float (no rounding error).
LITTLE_FLOAT_VAL = '\x00\x00\x90?'
LITTLE_DOUBLE_VAL = '\x00\x00\x00\x00\x00\x00\xf2?'
def testReadScalars(self):
test_string = 'I can feel myself getting sutpider.'
scalar_tests = [
['int32', decoder.Decoder.ReadInt32, 0, 'ReadVarint32', 0],
['int64', decoder.Decoder.ReadInt64, 0, 'ReadVarint64', 0],
['uint32', decoder.Decoder.ReadUInt32, 0, 'ReadVarUInt32', 0],
['uint64', decoder.Decoder.ReadUInt64, 0, 'ReadVarUInt64', 0],
['fixed32', decoder.Decoder.ReadFixed32, 0xffffffff,
'ReadLittleEndian32', 0xffffffff],
['fixed64', decoder.Decoder.ReadFixed64, 0xffffffffffffffff,
'ReadLittleEndian64', 0xffffffffffffffff],
['sfixed32', decoder.Decoder.ReadSFixed32, long(-1),
'ReadLittleEndian32', long(0xffffffff)],
['sfixed64', decoder.Decoder.ReadSFixed64, long(-1),
'ReadLittleEndian64', 0xffffffffffffffff],
['float', decoder.Decoder.ReadFloat, self.VAL,
'ReadBytes', self.LITTLE_FLOAT_VAL, 4],
['double', decoder.Decoder.ReadDouble, self.VAL,
'ReadBytes', self.LITTLE_DOUBLE_VAL, 8],
['bool', decoder.Decoder.ReadBool, True, 'ReadVarUInt32', 1],
['enum', decoder.Decoder.ReadEnum, 23, 'ReadVarUInt32', 23],
['string', decoder.Decoder.ReadString,
unicode(test_string, 'utf-8'), 'ReadBytes', test_string,
len(test_string)],
['utf8-string', decoder.Decoder.ReadString,
unicode(test_string, 'utf-8'), 'ReadBytes', test_string,
len(test_string)],
['bytes', decoder.Decoder.ReadBytes,
test_string, 'ReadBytes', test_string, len(test_string)],
# We test zigzag decoding routines more extensively below.
['sint32', decoder.Decoder.ReadSInt32, -1, 'ReadVarUInt32', 1],
['sint64', decoder.Decoder.ReadSInt64, -1, 'ReadVarUInt64', 1],
]
# Ensure that we're testing different Decoder methods and using
# different test names in all test cases above.
self.assertEqual(len(scalar_tests), len(set(t[0] for t in scalar_tests)))
self.assert_(len(scalar_tests) >= len(set(t[1] for t in scalar_tests)))
for args in scalar_tests:
self.ReadScalarTestHelper(*args)
def testReadMessageInto(self):
length = 23
def Test(simulate_error):
d = decoder.Decoder('')
d._stream = self.mock_stream
self.mock_stream.ReadVarUInt32().AndReturn(length)
sub_buffer = object()
self.mock_stream.GetSubBuffer(length).AndReturn(sub_buffer)
if simulate_error:
self.mock_message.MergeFromString(sub_buffer).AndReturn(length - 1)
self.mox.ReplayAll()
self.assertRaises(
message.DecodeError, d.ReadMessageInto, self.mock_message)
else:
self.mock_message.MergeFromString(sub_buffer).AndReturn(length)
self.mock_stream.SkipBytes(length)
self.mox.ReplayAll()
d.ReadMessageInto(self.mock_message)
self.mox.VerifyAll()
self.mox.ResetAll()
Test(simulate_error=False)
Test(simulate_error=True)
def testReadGroupInto_Success(self):
# Test both the empty and nonempty cases.
for num_bytes in (5, 0):
field_number = expected_field_number = 10
d = decoder.Decoder('')
d._stream = self.mock_stream
sub_buffer = object()
self.mock_stream.GetSubBuffer().AndReturn(sub_buffer)
self.mock_message.MergeFromString(sub_buffer).AndReturn(num_bytes)
self.mock_stream.SkipBytes(num_bytes)
self.mock_stream.ReadVarUInt32().AndReturn(wire_format.PackTag(
field_number, wire_format.WIRETYPE_END_GROUP))
self.mox.ReplayAll()
d.ReadGroupInto(expected_field_number, self.mock_message)
self.mox.VerifyAll()
self.mox.ResetAll()
def ReadGroupInto_FailureTestHelper(self, bytes_read):
d = decoder.Decoder('')
d._stream = self.mock_stream
sub_buffer = object()
self.mock_stream.GetSubBuffer().AndReturn(sub_buffer)
self.mock_message.MergeFromString(sub_buffer).AndReturn(bytes_read)
return d
def testReadGroupInto_NegativeBytesReported(self):
expected_field_number = 10
d = self.ReadGroupInto_FailureTestHelper(bytes_read=-1)
self.mox.ReplayAll()
self.assertRaises(message.DecodeError,
d.ReadGroupInto, expected_field_number,
self.mock_message)
self.mox.VerifyAll()
def testReadGroupInto_NoEndGroupTag(self):
field_number = expected_field_number = 10
num_bytes = 5
d = self.ReadGroupInto_FailureTestHelper(bytes_read=num_bytes)
self.mock_stream.SkipBytes(num_bytes)
# Right field number, wrong wire type.
self.mock_stream.ReadVarUInt32().AndReturn(wire_format.PackTag(
field_number, wire_format.WIRETYPE_LENGTH_DELIMITED))
self.mox.ReplayAll()
self.assertRaises(message.DecodeError,
d.ReadGroupInto, expected_field_number,
self.mock_message)
self.mox.VerifyAll()
def testReadGroupInto_WrongFieldNumberInEndGroupTag(self):
expected_field_number = 10
field_number = expected_field_number + 1
num_bytes = 5
d = self.ReadGroupInto_FailureTestHelper(bytes_read=num_bytes)
self.mock_stream.SkipBytes(num_bytes)
# Wrong field number, right wire type.
self.mock_stream.ReadVarUInt32().AndReturn(wire_format.PackTag(
field_number, wire_format.WIRETYPE_END_GROUP))
self.mox.ReplayAll()
self.assertRaises(message.DecodeError,
d.ReadGroupInto, expected_field_number,
self.mock_message)
self.mox.VerifyAll()
def testSkipBytes(self):
d = decoder.Decoder('')
num_bytes = 1024
self.mock_stream.SkipBytes(num_bytes)
d._stream = self.mock_stream
self.mox.ReplayAll()
d.SkipBytes(num_bytes)
self.mox.VerifyAll()
if __name__ == '__main__':
unittest.main()