Commit 2c20c055 authored by Kirill Smelkov's avatar Kirill Smelkov

golang_str: tests: Deep replacer

deepReplace returns object's clone with replacing all internal objects
selected by predicate via provided replacement function. We will use
this functionality in the following patches to organize testing of
bstr/ustr methods: a method would be first invoked on regular str, and
then on bstr/ustr and the result will be compared against each other.
The results are usually different, because e.g. u'a b c'.split() returns
[u'a', u'b', u'c'] while b('a b c').split() should return
[b('a'), b('b'), b('c')]. We want to make sure that the second result is
exactly the first result with all instances of unicode replaced by bstr.
That's where deep replacer will be used.

The deep replacement itself is implemented via pickle reduce/rebuild
protocol: we unassemble and reconstruct objects. And while an object is
unassembled, we try to apply the replacement recursively. Since this is
not so trivial functionality, it itself also comes with a test.
parent 510cf8d1
......@@ -21,7 +21,7 @@
from __future__ import print_function, absolute_import
import golang
from golang import b, u, bstr, ustr, bbyte, uchr
from golang import b, u, bstr, ustr, bbyte, uchr, func, defer, panic
from golang._golang import _udata, _bdata
from golang.gcompat import qq
from golang.strconv_test import byterange
......@@ -31,7 +31,7 @@ import sys
import six
from six import text_type as unicode, unichr
from six.moves import range as xrange
import pickle
import pickle, copy, types
import array
......@@ -717,6 +717,529 @@ def test_qq():
# it is tested e.g. in test_strings_ops2 and test_strings_mod_and_format
# ---- deep replace ----
# deepReplace returns object's clone with replacing all internal objects selected by predicate.
#
# Specifically: for every object x - obj or its internal object - if
# fpred(x) is true, it is replaced by what freplace(x) returns.
def deepReplace(obj, fpred, freplace):
r = _DeepReplacer(fpred, freplace)
return r.replace(obj)
_pickleproto = min(4, pickle.HIGHEST_PROTOCOL) # py2 does not define pickle.DEFAULT_PROTOCOL
_OldClassInstance = None
if six.PY2:
_OldClassInstance = types.InstanceType
# _DeepReplacer serves deepReplace.
#
# It works by recursively going through objects, unassembling them, doing
# replacements in the unassembled parts, and then rebuilding objects back.
#
# The unassemble/rebuild is implemented via using pickle-related machinery
# (__reduce__ and friends).
class _DeepReplacer:
def __init__(r, fpred, freplace):
r.fpred = fpred
r.freplace = freplace
r.memo = {}
r.keepalive = []
r.rlevel = 0 # recursion level
def _debug(self, fmt='', *argv):
if 0:
print(' '*(self.rlevel-1) + (fmt % argv))
@func
def replace(r, obj):
r.rlevel += 1
def _(): r.rlevel -= 1
defer(_)
r._debug()
r._debug('_replace %r @%s', obj, id(obj))
r._debug(' memo:\t%r', r.memo)
if id(obj) in r.memo:
r._debug(' (in memo)')
return r.memo[id(obj)]
obj_ = r._replace(obj)
r._debug('-> %r @%s', obj_, id(obj_))
if id(obj) in r.memo:
assert r.memo[id(obj)] is obj_
else:
r.memo[id(obj)] = obj_
# keep obj alive while we keep its amended version in memo referenced by id(obj).
#
# some objects, that we are processing, might be temporary (i.e. created by ilist/idict)
# and if we don't keep them alive other temporary objects could be
# further created with the same id which will break our memo accounting.
if obj_ is not obj:
r.keepalive.append(obj)
return obj_
def _replace(r, obj):
if r.fpred(obj):
return r.freplace(obj)
cls = type(obj)
if issubclass(cls, type): # a class, e.g. 'tuple'
return obj
# fast path for atomic objects (int, float, bool, bytes, unicode, ... but not e.g. tuple)
if copy._deepcopy_dispatch.get(cls) is copy._deepcopy_atomic:
return obj
# obj is non-atomic - it contains references to other objects
return r._replace_nonatomic(obj)
def _replace_nonatomic(r, obj): # -> obj*
# unassemble and rebuild obj after doing the replacement in its state
cls = type(obj)
# handle tuples specially
# if we don't - we won't get into replacing tuple items, because
# `tup.__getnewargs__() is (tup,)` and that same tup object will be present in newargv.
if cls is tuple: # NOTE plain tuple only, no subclasses here
v = []
for x in obj:
x_ = r.replace(x)
v.append(x_)
# for self-referencing cases, going replace through the state
# might already replace the tuple itself
if id(obj) in r.memo:
return r.memo[id(obj)]
return tuple(v)
if cls is _OldClassInstance: # obj is instance of old-style class
return r._replace_oldstyle(obj)
else:
return r._replace_newstyle(obj)
def _replace_oldstyle(r, obj): # -> obj*
# old-style classes are pickled not via __reduce__ - see copy._copy_inst / _deepcopy_inst
initargv = None
if hasattr(obj, '__getinitargs__'):
initargv = obj.__getinitargs__()
if hasattr(obj, '__getstate__'):
state = obj.__getstate__()
else:
state = obj.__dict__
# initargv is empty - instantiate the class via _EmptyClass and .__class__ patch
# https://github.com/python/cpython/blob/2.7-0-g8d21aa21f2c/Lib/pickle.py#L1057-L1059
if not initargv:
obj_ = copy._EmptyClass()
obj_.__class__ = obj.__class__
assert id(obj) not in r.memo
r.memo[id(obj)] = obj_
# initargv
if initargv is not None:
initargv_ = []
for x in initargv:
x_, n = r.replace(x)
initargv_.append(x_)
initargv = tuple(initargv_)
# state
state = r.replace(state)
if initargv is not None:
obj_ = obj.__class__(*initargv)
else:
obj_ = r.memo[id(obj)]
if hasattr(obj_, '__setstate__'):
obj_.__setstate__(state)
else:
obj_.__dict__.update(state)
return obj_
def _replace_newstyle(r, obj): # -> obj*
# new-style classes are pickled via __reduce__
# see copy and pickle documentation for details
# https://docs.python.org/3/library/pickle.html#pickling-class-instances
state = None
ilist = None
idict = None
setstate = None
# TODO copy_reg.reduce should have priority ?
_ = obj.__reduce_ex__(_pickleproto)
new = _[0]
newargv = _[1]
if len(_) >= 3:
state = _[2]
if len(_) >= 4:
ilist = _[3]
if len(_) >= 5:
idict = _[4]
if len(_) >= 6:
setstate = _[5]
r._debug()
r._debug(' obj:\t%r @%s', obj, id(obj))
r._debug(' new:\t%r', new)
r._debug(' newargv: %r', newargv)
r._debug(' state:\t%r', state)
r._debug(' ilist:\t%r', ilist)
r._debug(' idict:\t%r', idict)
r._debug(' setstate:\t%r', setstate)
# __newobj__ function is treated specially meaning __newobj__(cls) should call cls.__new__()
# https://github.com/python/cpython/blob/v3.11.0a7-248-g4153f2cbcb4/Lib/pickle.py#L652-L689
new_name = getattr(new, "__name__", "")
if new_name == "__newobj__" and len(newargv) == 1:
cls = newargv[0]
if hasattr(cls, "__new__"):
assert id(obj) not in r.memo
r.memo[id(obj)] = cls.__new__(cls)
# newargv
newargv_ = []
for x in newargv:
x_ = r.replace(x)
newargv_.append(x_)
newargv = tuple(newargv_)
# state
if state is not None:
state = r.replace(state)
# ilist
if ilist is not None:
ilist_ = []
for x in ilist:
x_ = r.replace(x)
ilist_.append(x_)
ilist = ilist_ # NOTE unconditionally (we consumed the iterator)
# idict
if idict is not None:
idict_ = []
for x in idict:
x_ = r.replace(x)
idict_.append(x_)
idict = idict_ # NOTE unconditionally (----//----)
# for self-referencing cases, going replace through arguments/state
# might already replace the object itself
if id(obj) in r.memo:
obj_ = r.memo[id(obj)]
else:
obj_ = new(*newargv)
if state is not None:
if setstate is not None:
setstate(obj_, state)
elif hasattr(obj_, '__setstate__'):
obj_.__setstate__(state)
else:
obj_.__dict__.update(state)
if ilist is not None:
for _ in ilist:
obj_.append(_)
if idict is not None:
for k,v in idict:
obj_[k] = v
return obj_
# deepReplaceBytes returns obj's clone with bytes instances replaced with
# unicode via UTF-8 decoding.
def _isxbytes(x):
if not isinstance(x, bytes):
return False
return True
def _bdecode(x):
return _udata(u(x))
def deepReplaceBytes(obj):
return deepReplace(obj, _isxbytes, _bdecode)
def test_deepreplace_bytes():
def f(): pass
g = lambda: None # non-picklable func
with raises((pickle.PicklingError, AttributeError), match="Can't pickle "):
pickle.dumps(g, pickle.HIGHEST_PROTOCOL)
class L(list): pass
class T(tuple): pass
class S(set): pass
class F(frozenset): pass
class D(dict): pass
class Cold: pass
class Cnew(object): pass
# TODO class without reduce, but that can be reduced via copy_reg
# TODO class completely without reduce support
cold = Cold(); cold.x = u"α"
cnew = Cnew(); cnew.x = u"β"
nochangev = [
1001, 123.4, complex(1,2), None, True, False,
u"α",
f, g,
type,
unicode, bytes,
tuple, list, int, dict,
Cold, Cnew,
NotImplementedError,
[], (), {}, set(), frozenset(),
[1, u"α", f], L([1, u"α", f]),
(1, u"α", g), T([1, u"α", g]),
{1, u"α", f}, S({1, u"α", f}),
frozenset({1, u"α", f}), F({1, u"α", f}),
{1:2, u"α":u"β", f:g}, D({1:2, u"α":u"β", f:g}),
#cold, cnew,
[(u"α", {u"β":2, 3:[4, {u"γ"}]})],
]
for x in nochangev:
assert deepReplaceBytes(x) == x
bs = xbytes("мир") ; assert type(bs) is bytes
us = xunicode("мир") ; assert type(us) is unicode
_ = deepReplaceBytes(bs)
assert type(_) is unicode
assert _ == us
x = 123
def R(obj):
obj_ = deepReplaceBytes(obj)
assert type(obj_) is type(obj)
return obj_
# list
for typ in (list, L):
assert R(typ([bs])) == typ([us])
_ = R(typ([x, bs, f]))
assert _ == typ([x, us, f])
assert _[0] is x
assert _[2] is f
_ = R(typ([bs, [bs]])) # verify that last bs is not missed to be converted due to memoization
assert _ == typ([us, [us]])
l = typ(); l += [l] # self-reference, nochange
_ = R(l)
assert len(_) == 1
assert _[0] is _
l = typ([bs]); l += [l, bs] # self-reference
_ = R(l)
assert len(_) == 3
assert _[0] == us
assert _[1] is _
assert _[2] == us
# tuple
for typ in (tuple, T):
assert R(typ((bs,))) == typ((us,))
_ = R(typ((x, bs, f)))
assert _ == typ((x, us, f))
assert _[0] is x
assert _[2] is f
t = typ(([],)); t[0].append(t) # self-reference, nochange
_ = R(t)
assert len(_) == 1
assert len(_[0]) == 1
assert _[0][0] is _
t = typ(([bs], bs)); t[0].append(t) # self-reference
_ = R(t)
assert len(_) == 2
assert len(_[0]) == 2
assert _[0][0] == us
assert _[0][1] is _
assert _[1] == us
# set
for typ in (set, frozenset, S, F):
assert R(typ({bs})) == typ({us})
_ = R(typ({x, bs, f}))
assert _ == typ({x, us, f})
_ = set(_) # e.g. frozenset -> set
_.remove(us)
while _:
obj = _.pop()
if obj == x: assert obj is x
elif obj == f: assert obj is f
else: panic(obj)
l = hlist(); s = typ({l}); l.append(s) # self-reference, nochange
s_ = R(s)
assert len(s_) == 1
l_ = list(s_)[0]
assert type(l_) is hlist
assert len(l_) == 1
assert l_[0] is s_
l = hlist(); s = typ({bs, l}); l.append(s) # self-reference
s_ = R(s)
assert len(s_) == 2
_ = list(s_)
assert us in _
obj = _.pop(_.index(us))
assert type(obj) is unicode
assert obj == us
assert len(_) == 1
assert type(_[0]) is hlist
assert len(_[0]) == 1
assert _[0][0] is s_
# dict
for typ in (dict, D):
_ = R(typ({x:bs, bs:12, f:g}))
assert _ == typ({x:us, us:12, f:g})
l = hlist([x]); d = typ({l:12}); l.append(d) # self-reference(value), nochange
d_ = R(d)
_ = list(d_.items())
assert len(_) == 1
l_, v = _[0]
assert v == 12
assert type(l_) is hlist
assert len(l_) == 2
assert l_[0] == x
assert l_[1] is d_
l = hlist([x]); d = typ({12:l}); l.append(d) # self-reference(value), nochange
d_ = R(d)
_ = list(d_.items())
assert len(_) == 1
k, l_ = _[0]
assert k == 12
assert type(l_) is hlist
assert len(l_) == 2
assert l_[0] == x
assert l_[1] is d_
lk = hlist([x]); lv = hlist([12]); d = typ({lk:lv}) # self-ref(key,value), nochange
lk.append(d); lv.append(d)
d_ = R(d)
_ = list(d_.items())
assert len(_) == 1
lk_, lv_ = _[0]
assert type(lk_) is hlist
assert type(lv_) is hlist
assert len(lk_) == 2
assert len(lv_) == 2
assert lk_[0] == x
assert lv_[0] == 12
assert lk_[1] is d_
assert lv_[1] is d_
lk = hlist([xbytes('key')]); lv = hlist([xbytes('value')]); d = typ({lk:lv}) # self-ref(k,v)
lk.append(d); lv.append(d)
d_ = R(d)
_ = list(d_.items())
assert len(_) == 1
lk_, lv_ = _[0]
assert type(lk_) is hlist
assert type(lv_) is hlist
assert len(lk_) == 2
assert len(lv_) == 2
assert type(lk_[0]) is unicode
assert type(lv_[0]) is unicode
assert lk_[0] == xunicode('key')
assert lv_[0] == xunicode('value')
assert lk_[1] is d_
assert lv_[1] is d_
# class instances
cold = Cold(); cold.x = x; cold.y = bs; cold.me = cold
cnew = Cnew(); cnew.f = f; cnew.y = bs; cnew.me = cnew
_ = R(cold)
assert _ is not cold
assert _.x is x
assert _.y == us
assert _.me is _
_ = R(cnew)
assert _ is not cnew
assert _.f is f
assert _.y == us
assert _.me is _
# combining example
cnew = Cnew()
cnew.a = [cnew, {bs}]
cnew.b = {(bs,f): g}
_ = R(cnew)
assert _ is not cnew
assert type(_.a) is list
assert len(_.a) == 2
assert _.a[0] is _
assert type(_.a[1]) is set
assert _.a[1] == {us}
assert type(_.b) is dict
assert len(_.b) == 1
k, v = list(_.b.items())[0]
assert type(k) is tuple
assert len(k) == 2
assert type(k[0]) is unicode
assert k[0] == us
assert k[1] is f
assert v is g
# deepReplaceStr returns x with all instances of str replaced with bstrmk(·)
#
# except as ad-hoc rule we we don't change ASCII strings to avoid changing
# e.g. __dict__ keys in classes from str to bytes. However a string can be
# forced to be processed as string and changed - even if it is all ASCII - by
# starting it with "*str " prefix.
def _isstr(x):
return (type(x) is str) and (x.startswith("*str ") or not isascii(x))
def deepReplaceStr(x, bstrmk):
return deepReplace(x, _isstr, bstrmk)
def test_deepreplace_str():
# verify deepReplaceStr only lightly because underlying deepReplace
# functionality is verified thoroughly via test_deepreplace_bytes
_ = deepReplaceStr('α', b)
assert type(_) is bstr
assert _ == 'α'
_ = deepReplaceStr('β', u)
assert type(_) is ustr
assert _ == 'β'
def R(x):
x_ = deepReplaceStr(x, b)
assert type(x_) is type(x)
return x_
x = 123
assert R(x) is x
_ = R([1, 'α', 2])
assert _ == [1, 'α', 2]
assert type(_[1]) is bstr
# ----------------------------------------
# verify that what we patched stay unaffected when
......@@ -818,6 +1341,20 @@ def xbytes(x): return x.encode('utf-8') if type(x) is unicode else x
def xunicode(x): return x.decode('utf-8') if type(x) is bytes else x
def xbytearray(x): return bytearray(xbytes(x))
# deepReplaceStr2Bytearray replaces str to bytearray, or hashable-version of
# bytearray, if str objects are detected to be present inside set or dict keys.
class hbytearray(bytearray):
def __hash__(self):
return hash(bytes(self))
def xhbytearray(x): return hbytearray(xbytes(x))
def deepReplaceStr2Bytearray(x):
try:
return deepReplaceStr(x, xbytearray)
except TypeError as e:
if e.args != ("unhashable type: 'bytearray'",):
raise
return deepReplaceStr(x, xhbytearray)
# xstr returns string corresponding to specified type and data.
def xstr(text, typ):
def _():
......@@ -860,3 +1397,22 @@ def tbu(typ):
if typ in (unicode, ustr):
return ustr
raise AssertionError("invalid type %r" % typ)
# isascii returns whether bytes/unicode x consists of only ASCII characters.
def isascii(x):
if isinstance(x, unicode):
x = x.encode('utf-8')
assert isinstance(x, bytes)
# hand-made isascii (there is no bytes.isascii on py2)
try:
bytes.decode(x, 'ascii', 'strict')
except UnicodeDecodeError:
return False # non-ascii
else:
return True # ascii
# hlist is hashable list.
class hlist(list):
def __hash__(self):
return 0 # always hashable
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment