Ian said:
I got a puzzler for y'all. I want to allow the editing of functions
in-place. I won't go into the reason (it's for HTConsole --
http://blog.ianbicking.org/introducing-htconsole.html), except that I
really want to edit it all in-process and in-memory. So I want the
identity of the function to remain the same, even as I edit the body
and hopefully the signature too.
Well, the reason is that I want to edit any function object, without
having to know who has a reference to the function. This way editing a
function or a method or a property.fget can all be done in the same
way.
The func_code attributes of functions is writable, but I don't know how
to create the proper code object. Just compiling a new body isn't good
enough.
The experimental module below updates many of the builtin types in-place,
including functions. Functions in particular are handled by
update_FunctionType. I use this in my own development environment to edit live
code i.e., as a better reload.
Michael
"""Update live code with new source.
Example:
>>> source1 = "def func1(a): return 2*a"
>>> namespace = {}
>>> exec source1 in namespace
>>> func1 = namespace["func1"]
>>> func1(2) 4
>>> source2 = "def func1(a, b): return 2*a+b"
>>> exec_update(namespace, source2, verbosity=2)
Updating: <type 'dict'>,
.func1 Updated
True"""
import types
import gc
class UpdateException(Exception): pass
def exec_update(namespace, source, name="", verbosity = 1):
module_proxy.verbosity = verbosity
if verbosity == 2:
print "Updating: %s, %s" % (type(namespace), name)
if isinstance(namespace, types.ModuleType):
proxy = module_proxy(namespace)
elif isinstance(namespace, (type, types.ClassType)):
proxy = cls_proxy(namespace)
elif isinstance(namespace, dict):
proxy = dict_proxy(namespace, name)
else:
raise UpdateException, "Unrecognized namespace type: %s" % type(namespace)
exec source in proxy.dict, proxy
return True
class module_proxy(object):
DO_NOT_COPY = ()
verbosity = 1
def __init__(self, module, name = ""):
self.dict = module.__dict__
self.namespace = module
self.name = getattr(module,"__name__", name)
def __contains__(self, key):
return key in self.dict
def _setitem(self,key,value):
self.dict[key] = value
def _delitem(self, key):
del self.dict[key]
def __getitem__(self, key):
return self.dict[key]
def __setitem__(self, key, value):
try:
obj = self.dict[key]
except KeyError:
if self.verbosity >= 1:
print "** %s.%s=%s (Binding)" % (self.name, key, repr(value))
self._setitem(key, value)
return True
try:
if update(obj, value):
if self.verbosity >= 1:
print "%s.%s Updated" % (self.name, key)
return True
else:
if self.verbosity >= 2:
print "%s.%s No change" % (self.name, key)
return False
except UpdateException:
if self.verbosity >= 1:
print "** %s.%s=>%s (Rebinding)" % (self.name, key, repr(value))
self._setitem(key, value)
return True
def __delitem__(self, key):
if self.verbosity >= 1:
print "** del %s.%s" % (self.name, key)
self._delitem(key)
def update_ns(self, other_dict, delete_missing=False, skip = ()):
dirty = False
if delete_missing:
for attr in self.dict.keys():
if not((attr in other_dict) or (attr in skip)):
dirty = True
del self[attr]
for to_attr, to_obj in other_dict.iteritems():
if to_attr in skip:
continue
dirty |= self.__setitem__(to_attr, to_obj)
return dirty
class dict_proxy(module_proxy):
def __init__(self, my_dict, name = ""):
self.dict = my_dict
self.namespace = None
self.name = name
class cls_proxy(module_proxy):
def _setitem(self,key,value):
setattr(self.namespace,key,value)
def _delitem(self, key):
delattr(self.namespace, key)
def update_cls(self, other):
DONOTCOPY = set(["__name__","__bases__","__base__","__dict__",
"__doc__","__weakref__","__module__"])
# This will often fail if they are not equal, so find out first!
obj = self.namespace
try:
obj.__bases__ = other.__bases__
except TypeError, err:
raise UpdateException, err
fromdict = obj.__dict__
todict = other.__dict__
obj_slots = set(getattr(obj, "__slots__", ()))
other_slots = set(getattr(other, "__slots__", ()))
if other_slots > obj_slots:
raise UpdateException, "Can't add slots %s" % \
list(other_slots - obj_slots)
return self.update_ns(todict, delete_missing=True, skip = DONOTCOPY |
obj_slots)
##TODO - Instances with __slots__ (see decimal.Decimal)
class instance_proxy(dict_proxy):
def __init__(self, my_obj, name = ""):
self.dict = my_obj.__dict__
self.namespace = my_obj
self.name = name
# Specialized updaters by type
# Each updater must return: True, False or raise UpdateException
# False: no change required - new object is equal to the old
# True: object was successfully updated
# UpdateException - object could not be updated in place. Caller must
# then decide whether to re-bind
def update_classmethod(obj, other):
# Massive hack. classmethod (and staticmethod) do not expose their
# underlying callable to Python. But they *seem* always to have
# one reference - which must be to the callable.
obj_refs = gc.get_referents(obj)
other_refs = gc.get_referents(other)
if len(obj_refs) != len(other_refs) != 1:
raise UpdateException, "Too many references from classmethod"
return update(obj_refs[0], other_refs[0])
update_staticmethod = update_classmethod
def update_classmethod_FunctionType(obj, other):
obj_refs = gc.get_referents(obj)
if len(obj_refs) != 1:
raise UpdateException, "Too many references from classmethod"
return update(obj_refs[0], other)
def update_property(obj, other):
# These are read-only attributes, so unless we can update them
# in-place, update_property will fail
dirty = False
dirty |= update(obj.fget, other.fget)
dirty |= update(obj.fset, other.fset)
dirty |= update(obj.fdel, other.fdel)
dirty |= update(obj.__doc__, other.__doc__)
return dirty
def update_FunctionType(obj, other):
# Note this applies only to Python-defined functions
dirty = False
for attr in ("func_code", "func_defaults", "func_dict", "func_doc",
"func_globals"):
try:
obj_attr, other_attr = getattr(obj, attr), getattr(other,attr)
except AttributeError, err:
# missing attribute means can't update the function
raise UpdateException, err
try:
# If possible, update the function attributes in-place
# this preserves, in particular, func_defaults
dirty |= update(obj_attr, other_attr)
except UpdateException:
# If we can't update the function attribute in place
# go ahead and overwrite it
dirty = True
setattr(obj, attr, other_attr)
return dirty
def update_MethodType(obj, other):
# No need to update the class/object bindings
return update_FunctionType(obj.im_func, other.im_func)
def update_MethodType_FunctionType(obj, other):
return update_FunctionType(obj.im_func, other)
def update_list(obj, other):
if obj == other:
return False
obj[:] = other
return True
def update_dict(obj, other):
if obj == other:
return False
obj.clear()
obj.update(other)
return True
def update_deque(obj, other):
if obj == other:
return False
obj.clear()
obj.extend(other)
return True
def update_array(obj, other):
if obj == other:
return False
if obj.typecode != other.typecode:
raise UpdateException, "Can't change typecode of array"
obj[:] = other
return True
def update_tuple(obj, other):
# We go to some additional lengths with tuples
if obj == other:
return False
if len(obj) == len(other):
dirty = False
for ob1, ob2 in zip(obj, other):
# This will raise UpdateException if any of the members
# are not updatable
dirty |= update(ob1, ob2)
return dirty
raise UpdateException, "Unequal tuples"
update_set = update_dict
def update_type(obj, other):
proxy_cls = cls_proxy(obj)
return proxy_cls.update_cls(other)
update_ClassType = update_type
def update(obj, other):
if obj is other:
return False
type_pair = type(obj), type(other)
#name = getattr(obj,"__name__", "")
try:
update_method = _method_cache[type_pair]
except KeyError:
pass
else:
#print "UpdateType %r" % obj
return update_method(obj, other)
if type_pair[0] == type_pair[1]:
if isinstance(obj, (object, types.InstanceType)):
try:
obj_dict, other_dict = obj.__dict__, other.__dict__
except AttributeError:
pass
else:
obj_dict_proxy = instance_proxy(obj)
return obj_dict_proxy.update_ns(other_dict, True)
if isinstance(obj, IMMUTABLE_TYPES):
if obj == other:
return False
raise UpdateException, "Can't update %s to %s" % type_pair
# Can't update these, but if they compare equal, there's no need to
import datetime
IMMUTABLE_TYPES = (bool, int, long, float, complex,
buffer, basestring, slice,
types.BuiltinFunctionType, types.BuiltinMethodType, types.CodeType,
types.DictProxyType,
datetime.date, datetime.time, datetime.timedelta, datetime.tzinfo)
del datetime
def _get_types():
from collections import deque
from array import array
d= locals()
d.update(types.__dict__)
return d
def _get_method_cache():
cache = {}
type_dict = _get_types()
for name, obj in globals().items():
if name.startswith("update_"):
fromtypename, totypename = name.rsplit("_", 2)[-2:]
if fromtypename == "update":
fromtypename = totypename
fromtype = eval(fromtypename, type_dict)
totype = eval(totypename, type_dict)
cache[(fromtype,totype)] = obj
return cache
_method_cache = _get_method_cache()