# -*- coding: utf-8 -*-
"""
This module is an all purpose intended for debugging, tracking, auto-documenting and self-introspecting the
package
Made by Davtoh. Powered partially by pycallgraph.
Dependent project: https://github.com/gak/pycallgraph/#python-call-graph
"""
from __future__ import print_function
from __future__ import absolute_import
from builtins import object
import sys
import os
import tempfile
import traceback
import functools
import time
import re
import inspect
from pycallgraph.output import GraphvizOutput
import pycallgraph # PyCallGraph
from pycallgraph.tracer import AsyncronousTracer, SyncronousTracer
from importlib import import_module
__license__ = (pycallgraph.__license__ +
" see https://github.com/gak/pycallgraph/blob/develop/LICENSE")
from . import directory as dr
[docs]def funcData(func):
argspec = inspect.getargspec(func) # get function data
args = argspec.args
defaults = argspec.defaults
if defaults: # get args with their values
defaults = {args[i - len(defaults)]: val for i,
val in enumerate(defaults)}
name = func.__name__
doc = func.__doc__
# get path of source, unlike getfile that even gets compiled
sourcefile = inspect.getsourcefile(func)
lines, line = inspect.getsourcelines(func) # get source
imp_from = inspect.getmodule(func).__name__
toreturn = dict(name=name, args=args, defaults=defaults, doc=doc,
keywords=argspec.keywords, varargs=argspec.varargs,
lines=lines, line=line, sourcefile=sourcefile, imp_from=imp_from)
return toreturn
[docs]def reloadFunc(func):
data = funcData(func)
return load(data["imp_from"], data["name"])
[docs]def load(mod_name, obj_name):
"""
Convert a string version of a class name to the object.
For example, get_class('sympy.core.Basic') will return
class Basic located in module sympy.core
"""
#obj = getattr(__import__(mod_name, {}, {}, ['*']), obj_name)
return getattr(import_module(mod_name), obj_name)
[docs]def tracer(instance, broadcast=True, report=True):
"""
Tracer for decorated functions.
:param instance: Logger instance
:param broadcast:
:param report:
:return:
"""
def decorator(func):
#argnames = func.func_code.co_varnames[:func.func_code.co_argcount]
funcname = func.__name__
@functools.wraps(func)
def wrapper(*args, **kwargs):
instance.func = func
instance.funcname = funcname
instance.stack = inspect.stack() # traceback.extract_stack()
instance.inputs = inspect.getcallargs(func, *args, **kwargs)
instance.trace = inspect.trace()
timeit = time.time()
instance.time = time.localtime(timeit)
e = None
try:
result = func(*args, **kwargs)
if broadcast:
instance.broadcast()
except:
result = None
# http://stackoverflow.com/a/1278740/5288758
exc_type, exc_obj, exc_tb = e = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
instance.exectime = time.time() - timeit
instance.outputs = result
instance.error = e
if report:
instance.report()
instance.throwError()
instance.renew()
return result
return wrapper
return decorator
[docs]class Logger(object):
"""
Logger for decorated functions. Holds important information of an instanced object and
can be used with @trace decorator for traceback purposes.
:param func: object reference.
:param funcname: object name.
:param inputs: inputs pass to the object.
:param outputs: outputs given by the object execution.
:param time: initial time of execution.
:param exectime: time of execution in seconds.
:param writer: writer function where messages are passed.
:param eventHandle: event function where object is
passed when Logger.broadcast() is called.
:param msg_report: message format to use in reports.
:param msg_no_executed: massage format to pass to writer when object
has not been executed and Logger.report() is called.
:param msg_executed: massage format to use when object is
executed and Logger.broadcast() is called.
"""
eventHandle = None
file = sys.stdout
_msg_report = ("\nName: {self.funcname}\n"
"Type: {self.Type_}\n"
"Time of execution: {self.Time_}\n"
"Execution time: {self.exectime} secs\n"
"Inputs: {self.inputs}\n"
"Outputs: {self.outputs}\n")
_msg_no_executed = "No instance {self.funcname} has been executed"
_msg_executed = "{self.funcname} has been executed..."
def __init__(self, **kwargs):
"""
"""
self.renew() # initialize info variables.
self.__dict__.update(kwargs)
[docs] def broadcast(self):
"""
pass a notification message on object execution to the writer
"""
self.writer(self.broadcast, self._msg_executed.format(self=self))
if self.eventHandle:
self.eventHandle(self)
[docs] def report(self):
"""
pass a report of the last executed object to the writer
"""
if self.func is not None:
self.writer(self.report, self._msg_report.format(self=self))
else:
self.writer(self.report, self._msg_no_executed.format(self=self))
[docs] def throwError(self):
"""
throw caught error
:return:
"""
if self.error:
self.writer(self.throwError, self.error)
[docs] def writer(self, sender, *arg):
FILE = self.file
HEADER = (sender.__self__, sender.__func__, self.func)
if sender.__func__.__name__ == "throwError":
if self.throwAtError:
raise arg[0]
else:
print(HEADER, ">>> ", file=FILE)
exc_type, exc_value, exc_traceback = arg[0]
traceback.print_exception(
exc_type, exc_value, exc_traceback, file=FILE)
else:
print(HEADER, ">>> ", *arg, file=FILE)
@property
def tracer(self):
return tracer(self)
@property
def Type_(self):
"""
returns type name (str)
"""
return type(self.func).__name__
@property
def Time_(self):
"""
returns formated time (str)
"""
return time.asctime(self.time)
[docs] def renew(self):
"""
renew Instance
"""
self.func = None
self.funcname = ""
self.inputs = {}
self.outputs = ()
self.stack = []
self.trace = []
self.error = None
self.time = 0
self.exectime = 0
# PYCALLGRAPH modification
# TODO: enhance decorator API to facilitate saving and more ergonomic feel
# TODO: let the user add the path to graphviz binaries from API
[docs]class Syncronous(SyncronousTracer):
[docs] def start(self):
self.old_trace = sys.gettrace()
sys.settrace(self.tracer)
[docs] def stop(self):
sys.settrace(self.old_trace)
[docs]class Asyncronous(Syncronous):
[docs] def start(self):
self.processor.start()
Syncronous.start(self)
[docs] def tracer(self, frame, event, arg):
self.processor.queue(frame, event, arg, self.memory())
return self.tracer
[docs] def done(self):
self.processor.done()
self.processor.join()
[docs]class GraphTraceOutput(GraphvizOutput):
def __init__(self, source=None, saveflag=True, label="", **kwargs):
self.source = source
self.saveflag = saveflag
self.label = label
GraphvizOutput.__init__(self, **kwargs)
self.graph_attributes['graph']['label'] = self.label
[docs] def done(self):
source = self.generate()
self.debug(source)
self.source = source
self.save()
[docs] def saveSource(self, file, source=None):
if not source:
source = self.source
cmd = '{} -T{} -o{} {}'.format(
self.tool, self.output_type, self.output_file, file
)
with open(file, 'w') as f:
f.write("// run as: {}\n\n".format(cmd) + source)
self.cmd = cmd
return cmd
[docs] def save(self, file=None, source=None):
if file: # save custom file
istemp = False
cmd = self.saveSource(file, source)
else: # if no file a temporal file is made
istemp = True
if not source:
source = self.source
fd, file = tempfile.mkstemp()
cmd = '{} -T{} -o{} {}'.format(
self.tool, self.output_type, self.output_file, file
)
with os.fdopen(fd, 'w') as f:
f.write("// run as: {}\n\n".format(cmd) + source)
self.cmd = cmd
self.verbose('Executing: {}'.format(cmd))
try:
ret = os.system(cmd)
if ret:
raise Exception(
'The command "%(cmd)s" failed with error '
'code %(ret)i.' % locals())
finally:
if istemp:
os.remove(file) # remove file if it is temporal
self.verbose('Generated {} with {} nodes.'.format(
self.output_file, len(self.processor.func_count),
))
[docs]class GraphTrace(pycallgraph.PyCallGraph):
def __enter__(self): # modified function
# review for trace problems at
# \Python27\Lib\site-packages\pycallgraph\tracer.py
self.start()
return self
@property
def source(self): # modified function
return [output.source for output in self.output]
[docs] def saveSource(self, file): # added function
name = dr.getData(file)
outputs = self.output
if len(outputs) > 1: # save enumerated outputs
for i, output in enumerate(self.output):
tempname = name[:-1] + ["_", str(i + 1)] + name[-1:]
output.saveSource("".join(tempname))
else: # save single output
outputs[0].saveSource("".join(name))
[docs] def get_tracer_class(self): # modified function
if self.config.threaded:
return Asyncronous
else:
return Syncronous