Skip to content

Commit

Permalink
bpo-36819: Fix crashes in built-in encoders with weird error handlers (
Browse files Browse the repository at this point in the history
…GH-28593)

If the error handler returns position less or equal than the starting
position of non-encodable characters, most of built-in encoders didn't
properly re-size the output buffer. This led to out-of-bounds writes,
and segfaults.
  • Loading branch information
serhiy-storchaka authored May 2, 2022
1 parent 614420d commit 18b07d7
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 32 deletions.
177 changes: 168 additions & 9 deletions Lib/test/test_codeccallbacks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import codecs
import html.entities
import itertools
import sys
import unicodedata
import unittest
Expand All @@ -22,6 +23,18 @@ def handle(self, exc):
self.pos = len(exc.object)
return ("<?>", oldpos)

class RepeatedPosReturn:
def __init__(self, repl="<?>"):
self.repl = repl
self.pos = 0
self.count = 0

def handle(self, exc):
if self.count > 0:
self.count -= 1
return (self.repl, self.pos)
return (self.repl, exc.end)

# A UnicodeEncodeError object with a bad start attribute
class BadStartUnicodeEncodeError(UnicodeEncodeError):
def __init__(self):
Expand Down Expand Up @@ -783,20 +796,104 @@ def test_lookup(self):
codecs.lookup_error("namereplace")
)

def test_unencodablereplacement(self):
def test_encode_nonascii_replacement(self):
def handle(exc):
if isinstance(exc, UnicodeEncodeError):
return (repl, exc.end)
raise TypeError("don't know how to handle %r" % exc)
codecs.register_error("test.replacing", handle)

for enc, input, repl in (
("ascii", "[¤]", "abc"),
("iso-8859-1", "[€]", "½¾"),
("iso-8859-15", "[¤]", "œŸ"),
):
res = input.encode(enc, "test.replacing")
self.assertEqual(res, ("[" + repl + "]").encode(enc))

for enc, input, repl in (
("utf-8", "[\udc80]", "\U0001f40d"),
("utf-16", "[\udc80]", "\U0001f40d"),
("utf-32", "[\udc80]", "\U0001f40d"),
):
with self.subTest(encoding=enc):
with self.assertRaises(UnicodeEncodeError) as cm:
input.encode(enc, "test.replacing")
exc = cm.exception
self.assertEqual(exc.start, 1)
self.assertEqual(exc.end, 2)
self.assertEqual(exc.object, input)

def test_encode_unencodable_replacement(self):
def unencrepl(exc):
if isinstance(exc, UnicodeEncodeError):
return ("\u4242", exc.end)
return (repl, exc.end)
else:
raise TypeError("don't know how to handle %r" % exc)
codecs.register_error("test.unencreplhandler", unencrepl)
for enc in ("ascii", "iso-8859-1", "iso-8859-15"):
self.assertRaises(
UnicodeEncodeError,
"\u4242".encode,
enc,
"test.unencreplhandler"
)

for enc, input, repl in (
("ascii", "[¤]", "½"),
("iso-8859-1", "[€]", "œ"),
("iso-8859-15", "[¤]", "½"),
("utf-8", "[\udc80]", "\udcff"),
("utf-16", "[\udc80]", "\udcff"),
("utf-32", "[\udc80]", "\udcff"),
):
with self.subTest(encoding=enc):
with self.assertRaises(UnicodeEncodeError) as cm:
input.encode(enc, "test.unencreplhandler")
exc = cm.exception
self.assertEqual(exc.start, 1)
self.assertEqual(exc.end, 2)
self.assertEqual(exc.object, input)

def test_encode_bytes_replacement(self):
def handle(exc):
if isinstance(exc, UnicodeEncodeError):
return (repl, exc.end)
raise TypeError("don't know how to handle %r" % exc)
codecs.register_error("test.replacing", handle)

# It works even if the bytes sequence is not decodable.
for enc, input, repl in (
("ascii", "[¤]", b"\xbd\xbe"),
("iso-8859-1", "[€]", b"\xbd\xbe"),
("iso-8859-15", "[¤]", b"\xbd\xbe"),
("utf-8", "[\udc80]", b"\xbd\xbe"),
("utf-16le", "[\udc80]", b"\xbd\xbe"),
("utf-16be", "[\udc80]", b"\xbd\xbe"),
("utf-32le", "[\udc80]", b"\xbc\xbd\xbe\xbf"),
("utf-32be", "[\udc80]", b"\xbc\xbd\xbe\xbf"),
):
with self.subTest(encoding=enc):
res = input.encode(enc, "test.replacing")
self.assertEqual(res, "[".encode(enc) + repl + "]".encode(enc))

def test_encode_odd_bytes_replacement(self):
def handle(exc):
if isinstance(exc, UnicodeEncodeError):
return (repl, exc.end)
raise TypeError("don't know how to handle %r" % exc)
codecs.register_error("test.replacing", handle)

input = "[\udc80]"
# Tests in which the replacement bytestring contains not whole number
# of code units.
for enc, repl in (
*itertools.product(("utf-16le", "utf-16be"),
[b"a", b"abc"]),
*itertools.product(("utf-32le", "utf-32be"),
[b"a", b"ab", b"abc", b"abcde"]),
):
with self.subTest(encoding=enc, repl=repl):
with self.assertRaises(UnicodeEncodeError) as cm:
input.encode(enc, "test.replacing")
exc = cm.exception
self.assertEqual(exc.start, 1)
self.assertEqual(exc.end, 2)
self.assertEqual(exc.object, input)
self.assertEqual(exc.reason, "surrogates not allowed")

def test_badregistercall(self):
# enhance coverage of:
Expand Down Expand Up @@ -940,6 +1037,68 @@ def __getitem__(self, key):
self.assertRaises(ValueError, codecs.charmap_encode, "\xff", err, D())
self.assertRaises(TypeError, codecs.charmap_encode, "\xff", err, {0xff: 300})

def test_decodehelper_bug36819(self):
handler = RepeatedPosReturn("x")
codecs.register_error("test.bug36819", handler.handle)

testcases = [
("ascii", b"\xff"),
("utf-8", b"\xff"),
("utf-16be", b'\xdc\x80'),
("utf-32be", b'\x00\x00\xdc\x80'),
("iso-8859-6", b"\xff"),
]
for enc, bad in testcases:
input = "abcd".encode(enc) + bad
with self.subTest(encoding=enc):
handler.count = 50
decoded = input.decode(enc, "test.bug36819")
self.assertEqual(decoded, 'abcdx' * 51)

def test_encodehelper_bug36819(self):
handler = RepeatedPosReturn()
codecs.register_error("test.bug36819", handler.handle)

input = "abcd\udc80"
encodings = ["ascii", "latin1", "utf-8", "utf-16", "utf-32"] # built-in
encodings += ["iso-8859-15"] # charmap codec
if sys.platform == 'win32':
encodings = ["mbcs", "oem"] # code page codecs

handler.repl = "\udcff"
for enc in encodings:
with self.subTest(encoding=enc):
handler.count = 50
with self.assertRaises(UnicodeEncodeError) as cm:
input.encode(enc, "test.bug36819")
exc = cm.exception
self.assertEqual(exc.start, 4)
self.assertEqual(exc.end, 5)
self.assertEqual(exc.object, input)
if sys.platform == "win32":
handler.count = 50
with self.assertRaises(UnicodeEncodeError) as cm:
codecs.code_page_encode(437, input, "test.bug36819")
exc = cm.exception
self.assertEqual(exc.start, 4)
self.assertEqual(exc.end, 5)
self.assertEqual(exc.object, input)

handler.repl = "x"
for enc in encodings:
with self.subTest(encoding=enc):
# The interpreter should segfault after a handful of attempts.
# 50 was chosen to try to ensure a segfault without a fix,
# but not OOM a machine with one.
handler.count = 50
encoded = input.encode(enc, "test.bug36819")
self.assertEqual(encoded.decode(enc), "abcdx" * 51)
if sys.platform == "win32":
handler.count = 50
encoded = codecs.code_page_encode(437, input, "test.bug36819")
self.assertEqual(encoded[0].decode(), "abcdx" * 51)
self.assertEqual(encoded[1], len(input))

def test_translatehelper(self):
# enhance coverage of:
# Objects/unicodeobject.c::unicode_encode_call_errorhandler()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix crashes in built-in encoders with error handlers that return position
less or equal than the starting position of non-encodable characters.
15 changes: 13 additions & 2 deletions Objects/stringlib/codecs.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,19 @@ STRINGLIB(utf8_encoder)(_PyBytesWriter *writer,
if (!rep)
goto error;

/* subtract preallocated bytes */
writer->min_size -= max_char_size * (newpos - startpos);
if (newpos < startpos) {
writer->overallocate = 1;
p = _PyBytesWriter_Prepare(writer, p,
max_char_size * (startpos - newpos));
if (p == NULL)
goto error;
}
else {
/* subtract preallocated bytes */
writer->min_size -= max_char_size * (newpos - startpos);
/* Only overallocate the buffer if it's not the last write */
writer->overallocate = (newpos < size);
}

if (PyBytes_Check(rep)) {
p = _PyBytesWriter_WriteBytes(writer, p,
Expand Down
Loading

0 comments on commit 18b07d7

Please sign in to comment.