Source code for RRtoolbox.lib.inspector

# -*- coding: utf-8 -*-
"""
This module is an all purpose intended for debugging, tracking, auto-documenting and self-introspecting the
package

Made by Davtoh. Powered partially by pycallgraph.
Dependent project: https://github.com/gak/pycallgraph/#python-call-graph
"""

from __future__ import print_function
from __future__ import absolute_import
from builtins import object
import sys
import os
import tempfile
import traceback
import functools
import time
import re
import inspect
from pycallgraph.output import GraphvizOutput
import pycallgraph  # PyCallGraph
from pycallgraph.tracer import AsyncronousTracer, SyncronousTracer
from importlib import import_module

__license__ = (pycallgraph.__license__ +
    " see https://github.com/gak/pycallgraph/blob/develop/LICENSE")

from . import directory as dr


[docs]def funcData(func): argspec = inspect.getargspec(func) # get function data args = argspec.args defaults = argspec.defaults if defaults: # get args with their values defaults = {args[i - len(defaults)]: val for i, val in enumerate(defaults)} name = func.__name__ doc = func.__doc__ # get path of source, unlike getfile that even gets compiled sourcefile = inspect.getsourcefile(func) lines, line = inspect.getsourcelines(func) # get source imp_from = inspect.getmodule(func).__name__ toreturn = dict(name=name, args=args, defaults=defaults, doc=doc, keywords=argspec.keywords, varargs=argspec.varargs, lines=lines, line=line, sourcefile=sourcefile, imp_from=imp_from) return toreturn
[docs]def reloadFunc(func): data = funcData(func) return load(data["imp_from"], data["name"])
[docs]def load(mod_name, obj_name): """ Convert a string version of a class name to the object. For example, get_class('sympy.core.Basic') will return class Basic located in module sympy.core """ #obj = getattr(__import__(mod_name, {}, {}, ['*']), obj_name) return getattr(import_module(mod_name), obj_name)
[docs]def tracer(instance, broadcast=True, report=True): """ Tracer for decorated functions. :param instance: Logger instance :param broadcast: :param report: :return: """ def decorator(func): #argnames = func.func_code.co_varnames[:func.func_code.co_argcount] funcname = func.__name__ @functools.wraps(func) def wrapper(*args, **kwargs): instance.func = func instance.funcname = funcname instance.stack = inspect.stack() # traceback.extract_stack() instance.inputs = inspect.getcallargs(func, *args, **kwargs) instance.trace = inspect.trace() timeit = time.time() instance.time = time.localtime(timeit) e = None try: result = func(*args, **kwargs) if broadcast: instance.broadcast() except: result = None # http://stackoverflow.com/a/1278740/5288758 exc_type, exc_obj, exc_tb = e = sys.exc_info() fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] print(exc_type, fname, exc_tb.tb_lineno) instance.exectime = time.time() - timeit instance.outputs = result instance.error = e if report: instance.report() instance.throwError() instance.renew() return result return wrapper return decorator
[docs]class Logger(object): """ Logger for decorated functions. Holds important information of an instanced object and can be used with @trace decorator for traceback purposes. :param func: object reference. :param funcname: object name. :param inputs: inputs pass to the object. :param outputs: outputs given by the object execution. :param time: initial time of execution. :param exectime: time of execution in seconds. :param writer: writer function where messages are passed. :param eventHandle: event function where object is passed when Logger.broadcast() is called. :param msg_report: message format to use in reports. :param msg_no_executed: massage format to pass to writer when object has not been executed and Logger.report() is called. :param msg_executed: massage format to use when object is executed and Logger.broadcast() is called. """ eventHandle = None file = sys.stdout _msg_report = ("\nName: {self.funcname}\n" "Type: {self.Type_}\n" "Time of execution: {self.Time_}\n" "Execution time: {self.exectime} secs\n" "Inputs: {self.inputs}\n" "Outputs: {self.outputs}\n") _msg_no_executed = "No instance {self.funcname} has been executed" _msg_executed = "{self.funcname} has been executed..." def __init__(self, **kwargs): """ """ self.renew() # initialize info variables. self.__dict__.update(kwargs)
[docs] def broadcast(self): """ pass a notification message on object execution to the writer """ self.writer(self.broadcast, self._msg_executed.format(self=self)) if self.eventHandle: self.eventHandle(self)
[docs] def report(self): """ pass a report of the last executed object to the writer """ if self.func is not None: self.writer(self.report, self._msg_report.format(self=self)) else: self.writer(self.report, self._msg_no_executed.format(self=self))
[docs] def throwError(self): """ throw caught error :return: """ if self.error: self.writer(self.throwError, self.error)
[docs] def writer(self, sender, *arg): FILE = self.file HEADER = (sender.__self__, sender.__func__, self.func) if sender.__func__.__name__ == "throwError": if self.throwAtError: raise arg[0] else: print(HEADER, ">>> ", file=FILE) exc_type, exc_value, exc_traceback = arg[0] traceback.print_exception( exc_type, exc_value, exc_traceback, file=FILE) else: print(HEADER, ">>> ", *arg, file=FILE)
@property def tracer(self): return tracer(self) @property def Type_(self): """ returns type name (str) """ return type(self.func).__name__ @property def Time_(self): """ returns formated time (str) """ return time.asctime(self.time)
[docs] def renew(self): """ renew Instance """ self.func = None self.funcname = "" self.inputs = {} self.outputs = () self.stack = [] self.trace = [] self.error = None self.time = 0 self.exectime = 0
# PYCALLGRAPH modification # TODO: enhance decorator API to facilitate saving and more ergonomic feel # TODO: let the user add the path to graphviz binaries from API
[docs]class Syncronous(SyncronousTracer):
[docs] def start(self): self.old_trace = sys.gettrace() sys.settrace(self.tracer)
[docs] def stop(self): sys.settrace(self.old_trace)
[docs]class Asyncronous(Syncronous):
[docs] def start(self): self.processor.start() Syncronous.start(self)
[docs] def tracer(self, frame, event, arg): self.processor.queue(frame, event, arg, self.memory()) return self.tracer
[docs] def done(self): self.processor.done() self.processor.join()
[docs]class GraphTraceOutput(GraphvizOutput): def __init__(self, source=None, saveflag=True, label="", **kwargs): self.source = source self.saveflag = saveflag self.label = label GraphvizOutput.__init__(self, **kwargs) self.graph_attributes['graph']['label'] = self.label
[docs] def done(self): source = self.generate() self.debug(source) self.source = source self.save()
[docs] def saveSource(self, file, source=None): if not source: source = self.source cmd = '{} -T{} -o{} {}'.format( self.tool, self.output_type, self.output_file, file ) with open(file, 'w') as f: f.write("// run as: {}\n\n".format(cmd) + source) self.cmd = cmd return cmd
[docs] def save(self, file=None, source=None): if file: # save custom file istemp = False cmd = self.saveSource(file, source) else: # if no file a temporal file is made istemp = True if not source: source = self.source fd, file = tempfile.mkstemp() cmd = '{} -T{} -o{} {}'.format( self.tool, self.output_type, self.output_file, file ) with os.fdopen(fd, 'w') as f: f.write("// run as: {}\n\n".format(cmd) + source) self.cmd = cmd self.verbose('Executing: {}'.format(cmd)) try: ret = os.system(cmd) if ret: raise Exception( 'The command "%(cmd)s" failed with error ' 'code %(ret)i.' % locals()) finally: if istemp: os.remove(file) # remove file if it is temporal self.verbose('Generated {} with {} nodes.'.format( self.output_file, len(self.processor.func_count), ))
[docs]class GraphTrace(pycallgraph.PyCallGraph): def __enter__(self): # modified function # review for trace problems at # \Python27\Lib\site-packages\pycallgraph\tracer.py self.start() return self @property def source(self): # modified function return [output.source for output in self.output]
[docs] def saveSource(self, file): # added function name = dr.getData(file) outputs = self.output if len(outputs) > 1: # save enumerated outputs for i, output in enumerate(self.output): tempname = name[:-1] + ["_", str(i + 1)] + name[-1:] output.saveSource("".join(tempname)) else: # save single output outputs[0].saveSource("".join(name))
[docs] def get_tracer_class(self): # modified function if self.config.threaded: return Asyncronous else: return Syncronous