Source code for ibeis.algo.hots.pgm_viz

# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
import networkx as nx
import six  # NOQA
import utool as ut
import numpy as np
from six.moves import zip
print, rrr, profile = ut.inject2(__name__, '[pgmext]')



def _debug_repr_model(model):
    cpd_code_list = [_debug_repr_cpd(cpd) for cpd in model.cpds]
    code_fmt = ut.codeblock(
        '''
        import numpy as np
        import pgmpy
        import pgmpy.inference
        import pgmpy.factors
        import pgmpy.models

        {cpds}

        cpd_list = {nodes}
        input_graph = {edges}
        model = pgmpy.models.BayesianModel(input_graph)
        model.add_cpds(*cpd_list)
        infr = pgmpy.inference.BeliefPropagation(model)
        ''')

    code = code_fmt.format(
        cpds='\n'.join(cpd_code_list),
        nodes=ut.repr2(sorted(model.nodes()), strvals=True),
        edges=ut.repr2(sorted(model.edges()), nl=1),
    )
    ut.print_code(code)
    ut.copy_text_to_clipboard(code)


def _debug_repr_cpd(cpd):
    import re
    import utool as ut
    code_fmt = ut.codeblock(
        '''
        {variable} = pgmpy.factors.TabularCPD(
            variable={variable_repr},
            variable_card={variable_card_repr},
            values={get_cpd_repr},
            evidence={evidence_repr},
            evidence_card={evidence_card_repr},
        )
        ''')
    keys = ['variable', 'variable_card', 'values', 'evidence', 'evidence_card']
    dict_ = ut.odict(zip(keys, [getattr(cpd, key) for key in keys]))
    # HACK
    dict_['values'] = cpd.get_cpd()
    r = ut.repr2(dict_, explicit=True, nobraces=True, nl=True)
    print(r)

    # Parse props that are needed for this fmtstr
    fmt_keys = [match.groups()[0] for match in re.finditer('{(.*?)}', code_fmt)]
    need_reprs = [key[:-5] for key in fmt_keys if key.endswith('_repr')]
    need_keys = [key for key in fmt_keys if not key.endswith('_repr')]
    # Get corresponding props
    # Call methods if needbe
    tmp = [(prop, getattr(cpd, prop)) for prop in need_reprs]
    tmp = [(x, y()) if ut.is_funclike(y) else (x, y) for (x, y) in tmp]
    fmtdict = dict(tmp)
    fmtdict = ut.map_dict_vals(ut.repr2, fmtdict)
    fmtdict = ut.map_dict_keys(lambda x: x + '_repr', fmtdict)
    tmp2 = [(prop, getattr(cpd, prop)) for prop in need_keys]
    fmtdict.update(dict(tmp2))
    code = code_fmt.format(**fmtdict)
    return code


[docs]def make_factor_text(factor, name): collapse_uniform = True if collapse_uniform and ut.almost_allsame(factor.values): # Reduce uniform text ftext = name + ':\nuniform(%.3f)' % (factor.values[0],) else: values = factor.values try: rowstrs = ['p(%s)=%.3f' % (','.join(n), v,) for n, v in zip(zip(*factor.statenames), values)] except Exception: rowstrs = ['p(%s)=%.3f' % (','.join(n), v,) for n, v in zip(factor._row_labels(False), values)] idxs = ut.list_argmaxima(values) for idx in idxs: rowstrs[idx] += '*' thresh = 4 always_sort = True if len(rowstrs) > thresh: sortx = factor.values.argsort()[::-1] rowstrs = ut.take(rowstrs, sortx[0:(thresh - 1)]) rowstrs += ['... %d more' % ((len(values) - len(rowstrs)),)] elif always_sort: sortx = factor.values.argsort()[::-1] rowstrs = ut.take(rowstrs, sortx) ftext = name + ': \n' + '\n'.join(rowstrs) return ftext
[docs]def get_bayesnet_layout(model, name_nodes=None, prog='dot'): """ Ensures ordering of layers is in order of addition via templates """ import pygraphviz import networkx as nx # Add "invisible" edges to induce an ordering # Hack for layout (ordering of top level nodes) netx_graph2 = model.copy() if getattr(model, 'ttype2_cpds', None) is not None: grouped_nodes = [] for ttype in model.ttype2_cpds.keys(): ttype_cpds = model.ttype2_cpds[ttype] # use defined ordering ttype_nodes = ut.list_getattr(ttype_cpds, 'variable') # ttype_nodes = sorted(ttype_nodes) invis_edges = list(ut.itertwo(ttype_nodes)) netx_graph2.add_edges_from(invis_edges) grouped_nodes.append(ttype_nodes) agraph = nx.nx_agraph.to_agraph(netx_graph2) for nodes in grouped_nodes: agraph.add_subgraph(nodes, rank='same') else: agraph = nx.nx_agraph.to_agraph(netx_graph2) print(agraph) args = '' agraph.layout(prog=prog, args=args) #agraph.draw('example.png', prog='dot') node_pos = {} for n in model: node_ = pygraphviz.Node(agraph, n) try: xx, yy = node_.attr['pos'].split(',') node_pos[n] = (float(xx), float(yy)) except: print('no position for node', n) node_pos[n] = (0.0, 0.0) return node_pos
[docs]def draw_map_histogram(top_assignments, fnum=None, pnum=(1, 1, 1)): import plottool as pt bin_labels = ut.get_list_column(top_assignments, 0) bin_vals = ut.get_list_column(top_assignments, 1) fnum = pt.ensure_fnum(fnum) # bin_labels = ['\n'.join(ut.textwrap.wrap(_lbl, width=30)) for _lbl in bin_labels] pt.draw_histogram(bin_labels, bin_vals, fnum=fnum, pnum=pnum, transpose=True, use_darkbackground=False, #xtick_rotation=-10, ylabel='Prob', xlabel='assignment') pt.set_title('Assignment probabilities')
[docs]def get_node_viz_attrs(model, evidence, soft_evidence, factor_list, ttype_colors, **kwargs): import plottool as pt var2_post = {f.variables[0]: f for f in factor_list} pos_dict = get_bayesnet_layout(model) #pos_dict = nx.pygraphviz_layout(netx_graph) #pos_dict = nx.pydot_layout(netx_graph, prog='dot') #pos_dict = nx.graphviz_layout(netx_graph) netx_nodes = model.nodes(data=True) node_key_list = ut.get_list_column(netx_nodes, 0) pos_list = ut.dict_take(pos_dict, node_key_list) prior_text = None post_text = None evidence_tas = [] post_tas = [] prior_tas = [] node_color = [] has_infered = evidence or var2_post if has_infered: ignore_prior_with_ttype = ['score', 'match'] show_prior = False else: ignore_prior_with_ttype = [] #show_prior = True show_prior = False dpy = 5 dbx, dby = (20, 20) takw1 = {'bbox_align': (.5, 0), 'pos_offset': [0, dpy], 'bbox_offset': [dbx, dby]} takw2 = {'bbox_align': (.5, 1), 'pos_offset': [0, -dpy], 'bbox_offset': [-dbx, -dby]} def get_name_color(phi): order = phi.values.argsort()[::-1] if len(order) < 2: dist_next = phi.values[order[0]] else: dist_next = phi.values[order[0]] - phi.values[order[1]] dist_total = (phi.values[order[0]]) confidence = (dist_total * dist_next) ** (2.5 / 4) #print('confidence = %r' % (confidence,)) color = ttype_colors['name'][order[0]] color = pt.color_funcs.desaturate_rgb(color, 1 - confidence) color = np.array(color) return color for node, pos in zip(netx_nodes, pos_list): variable = node[0] cpd = model.var2_cpd[variable] prior_marg = (cpd if cpd.evidence is None else cpd.marginalize(cpd.evidence, inplace=False)) show_evidence = variable in evidence show_prior = cpd.ttype not in ignore_prior_with_ttype show_post = variable in var2_post show_prior |= cpd.ttype not in ignore_prior_with_ttype show_prior &= kwargs.get('show_prior', True) post_marg = None if show_post: post_marg = var2_post[variable] if variable in evidence: if cpd.ttype == 'score': color = ttype_colors['score'][evidence[variable] ] color = np.array(color) node_color.append(color) elif cpd.ttype == 'name': color = ttype_colors['name'][evidence[variable]] color = np.array(color) node_color.append(color) else: color = pt.FALSE_RED node_color.append(color) #elif variable in soft_evidence: # color = pt.LIGHT_PINK # show_prior = True # color = get_name_color(prior_marg) # node_color.append(color) else: if cpd.ttype == 'name' and post_marg is not None: color = get_name_color(post_marg) node_color.append(color) elif cpd.ttype == 'match' and post_marg is not None: # color = cmap(post_marg.values[1]) # color = pt.lighten_rgb(color, .4) # color = np.array(color) color = pt.NEUTRAL color = pt.LIGHT_PINK node_color.append(color) else: #color = pt.WHITE color = pt.NEUTRAL node_color.append(color) if show_prior: if variable in soft_evidence: prior_color = pt.LIGHT_PINK else: prior_color = None prior_text = make_factor_text(prior_marg, 'prior') prior_tas.append(dict(text=prior_text, pos=pos, color=prior_color, **takw2)) if show_evidence: _takw1 = takw1 if cpd.ttype == 'score': _takw1 = takw2 evidence_text = cpd.variable_statenames[evidence[variable]] if isinstance(evidence_text, int): evidence_text = '%d/%d' % (evidence_text + 1, cpd.variable_card) evidence_tas.append(dict(text=evidence_text, pos=pos, color=color, **_takw1)) if show_post: _takw1 = takw1 if cpd.ttype == 'match': _takw1 = takw2 post_text = make_factor_text(post_marg, 'post') post_tas.append(dict(text=post_text, pos=pos, color=None, **_takw1)) def trnps_(dict_list): """ tranpose dict list """ list_dict = ut.ddict(list) for dict_ in dict_list: for key, val in dict_.items(): list_dict[key + '_list'].append(val) return list_dict takw1_ = trnps_(post_tas + evidence_tas) takw2_ = trnps_(prior_tas) takws = [takw1_, takw2_] return node_color, pos_list, pos_dict, takws
[docs]def make_colorcodes(model): """ python -m ibeis.algo.hots.bayes --exec-make_name_model --show python -m ibeis.algo.hots.bayes --exec-cluster_query --show python -m ibeis --tf demo_bayesnet --ev :nA=4,nS=2,Na=n0,rand_scores=True --show --verbose python -m ibeis --tf demo_bayesnet --ev :nA=4,nS=3,Na=n0,rand_scores=True --show --verbose """ import plottool as pt ttype_colors = {} ttype_scalars = {} # Hacked in custom colors if 'name' in model.ttype2_template: basis = model.ttype2_template['name'].basis card = len(basis) colors = pt.distinct_colors(max(card, 10))[:card] ttype_colors['name'] = colors ttype_scalars['name'] = np.linspace(0, 1, len(basis)) if 'score' in model.ttype2_template: cmap_, mn, mx = 'plasma', 0.15, 1.0 _cmap = pt.plt.get_cmap(cmap_) def cmap(x): return _cmap((x * mx) + mn) basis = model.ttype2_template['score'].basis scalars = np.linspace(0, 1, len(basis)) # scalars = np.linspace(0, 1, 100) colors = pt.scores_to_color(scalars, cmap_=cmap_, reverse_cmap=False, cmap_range=(mn, mx)) colors = [pt.lighten_rgb(c, .4) for c in colors] ttype_scalars['score'] = scalars ttype_colors['score'] = colors return ttype_colors, ttype_scalars
[docs]def draw_bayesian_model(model, evidence={}, soft_evidence={}, fnum=None, pnum=None, **kwargs): from pgmpy.models import BayesianModel if not isinstance(model, BayesianModel): model = model.to_bayesian_model() import plottool as pt import networkx as nx kwargs = kwargs.copy() factor_list = kwargs.pop('factor_list', []) ttype_colors, ttype_scalars = make_colorcodes(model) textprops = { 'horizontalalignment': 'left', 'family': 'monospace', 'size': 8, } # build graph attrs tup = get_node_viz_attrs( model, evidence, soft_evidence, factor_list, ttype_colors, **kwargs) node_color, pos_list, pos_dict, takws = tup # draw graph has_infered = evidence or 'factor_list' in kwargs if False: fig = pt.figure(fnum=fnum, pnum=pnum, doclf=True) # NOQA ax = pt.gca() drawkw = dict(pos=pos_dict, ax=ax, with_labels=True, node_size=1100, node_color=node_color) nx.draw(model, **drawkw) else: # BE VERY CAREFUL if 1: graph = model.copy() graph.__class__ = nx.DiGraph graph.graph['groupattrs'] = ut.ddict(dict) #graph = model. if getattr(graph, 'ttype2_cpds', None) is not None: # Add invis edges and ttype groups for ttype in model.ttype2_cpds.keys(): ttype_cpds = model.ttype2_cpds[ttype] # use defined ordering ttype_nodes = ut.list_getattr(ttype_cpds, 'variable') # ttype_nodes = sorted(ttype_nodes) invis_edges = list(ut.itertwo(ttype_nodes)) graph.add_edges_from(invis_edges) nx.set_edge_attributes(graph, 'style', {edge: 'invis' for edge in invis_edges}) nx.set_node_attributes(graph, 'groupid', {node: ttype for node in ttype_nodes}) graph.graph['groupattrs'][ttype]['rank'] = 'same' graph.graph['groupattrs'][ttype]['cluster'] = False else: graph = model pt.show_nx(graph, layout_kw={'prog': 'dot'}, fnum=fnum, pnum=pnum, verbose=0) pt.zoom_factory() fig = pt.gcf() ax = pt.gca() pass hacks = [pt.draw_text_annotations(textprops=textprops, **takw) for takw in takws if takw] xmin, ymin = np.array(pos_list).min(axis=0) xmax, ymax = np.array(pos_list).max(axis=0) if 'name' in model.ttype2_template: num_names = len(model.ttype2_template['name'].basis) num_annots = len(model.ttype2_cpds['name']) if num_annots > 4: ax.set_xlim((xmin - 40, xmax + 40)) ax.set_ylim((ymin - 50, ymax + 50)) fig.set_size_inches(30, 7) else: ax.set_xlim((xmin - 42, xmax + 42)) ax.set_ylim((ymin - 50, ymax + 50)) fig.set_size_inches(23, 7) title = 'num_names=%r, num_annots=%r' % (num_names, num_annots,) else: title = '' map_assign = kwargs.get('map_assign', None) def word_insert(text): return '' if len(text) == 0 else text + ' ' top_assignments = kwargs.get('top_assignments', None) if top_assignments is not None: map_assign, map_prob = top_assignments[0] if map_assign is not None: title += '\n%sMAP: ' % (word_insert(kwargs.get('method', ''))) title += map_assign + ' @' + '%.2f%%' % (100 * map_prob,) if kwargs.get('show_title', True): pt.set_figtitle(title, size=14) for hack in hacks: hack() if has_infered: # Hack in colorbars # if ut.list_type(basis) is int: # pt.colorbar(scalars, colors, lbl='score', ticklabels=np.array(basis) + 1) # else: # pt.colorbar(scalars, colors, lbl='score', ticklabels=basis) keys = ['name', 'score'] locs = ['left', 'right'] for key, loc in zip(keys, locs): if key in ttype_colors: basis = model.ttype2_template[key].basis # scalars = colors = ttype_colors[key] scalars = ttype_scalars[key] pt.colorbar(scalars, colors, lbl=key, ticklabels=basis, ticklocation=loc)
[docs]def draw_markov_model(model, fnum=None, **kwargs): import plottool as pt fnum = pt.ensure_fnum(fnum) pt.figure(fnum=fnum, doclf=True) ax = pt.gca() from pgmpy.models import MarkovModel if isinstance(model, MarkovModel): markovmodel = model else: markovmodel = model.to_markov_model() # pos = nx.nx_agraph.pydot_layout(markovmodel) pos = nx.nx_agraph.pygraphviz_layout(markovmodel) # Referenecs: # https://groups.google.com/forum/#!topic/networkx-discuss/FwYk0ixLDuY # pos = nx.spring_layout(markovmodel) # pos = nx.circular_layout(markovmodel) # curved-arrow # markovmodel.edge_attr['curved-arrow'] = True # markovmodel.graph.setdefault('edge', {})['splines'] = 'curved' # markovmodel.graph.setdefault('graph', {})['splines'] = 'curved' # markovmodel.graph.setdefault('edge', {})['splines'] = 'curved' node_color = [pt.NEUTRAL] * len(pos) drawkw = dict(pos=pos, ax=ax, with_labels=True, node_color=node_color, # NOQA node_size=1100) from matplotlib.patches import FancyArrowPatch, Circle import numpy as np def draw_network(G, pos, ax, sg=None): for n in G: c = Circle(pos[n], radius=10, alpha=0.5, color=pt.NEUTRAL_BLUE) ax.add_patch(c) G.node[n]['patch'] = c x, y = pos[n] pt.ax_absolute_text(x, y, n, ha='center', va='center') seen = {} for (u, v, d) in G.edges(data=True): n1 = G.node[u]['patch'] n2 = G.node[v]['patch'] rad = 0.1 if (u, v) in seen: rad = seen.get((u, v)) rad = (rad + np.sign(rad) * 0.1) * -1 alpha = 0.5 color = 'k' e = FancyArrowPatch(n1.center, n2.center, patchA=n1, patchB=n2, # arrowstyle='-|>', arrowstyle='-', connectionstyle='arc3,rad=%s' % rad, mutation_scale=10.0, lw=2, alpha=alpha, color=color) seen[(u, v)] = rad ax.add_patch(e) return e # nx.draw(markovmodel, **drawkw) draw_network(markovmodel, pos, ax) ax.autoscale() pt.plt.axis('equal') pt.plt.axis('off') if kwargs.get('show_title', True): pt.set_figtitle('Markov Model')
[docs]def draw_junction_tree(model, fnum=None, **kwargs): import plottool as pt fnum = pt.ensure_fnum(fnum) pt.figure(fnum=fnum) ax = pt.gca() from pgmpy.models import JunctionTree if not isinstance(model, JunctionTree): netx_graph = model.to_junction_tree() else: netx_graph = model # prettify nodes def fixtupkeys(dict_): return { ', '.join(k) if isinstance(k, tuple) else k: fixtupkeys(v) for k, v in dict_.items() } n = fixtupkeys(netx_graph.node) e = fixtupkeys(netx_graph.edge) a = fixtupkeys(netx_graph.adj) netx_graph.node = n netx_graph.edge = e netx_graph.adj = a #netx_graph = model.to_markov_model() #pos = nx.nx_agraph.pygraphviz_layout(netx_graph) #pos = nx.nx_agraph.graphviz_layout(netx_graph) pos = nx.pydot_layout(netx_graph) node_color = [pt.NEUTRAL] * len(pos) drawkw = dict(pos=pos, ax=ax, with_labels=True, node_color=node_color, node_size=2000) nx.draw(netx_graph, **drawkw) if kwargs.get('show_title', True): pt.set_figtitle('Junction / Clique Tree / Cluster Graph')
[docs]def show_junction_tree(*args, **kwargs): return draw_junction_tree(*args, **kwargs)
[docs]def show_markov_model(*args, **kwargs): return draw_markov_model(*args, **kwargs)
[docs]def show_model(model, *args, **kwargs): modeltype = kwargs.pop('modeltype', 'bayes') if modeltype in ['bayesian', 'bayes']: return show_bayesian_model(model, *args, **kwargs) elif modeltype in ['markov']: return show_markov_model(model, *args, **kwargs) elif modeltype in ['junc', 'junction']: return show_junction_tree(model, *args, **kwargs) else: raise NotImplementedError('Unknown modeltype=%r' % (modeltype,))
[docs]def show_bayesian_model(model, evidence={}, soft_evidence={}, fnum=None, **kwargs): r""" References: http://stackoverflow.com/questions/22207802/networkx-node-level-or-layer Ignore: import nx2tikz print(nx2tikz.dumps_tikz(model, layout='layered', use_label=True)) Ignore: sudo apt-get install libgraphviz4 libgraphviz-dev -y sudo apt-get install libgraphviz-dev pip install pygraphviz sudo pip3 install pygraphviz \ --install-option="--include-path=/usr/include/graphviz" \ --install-option="--library-path=/usr/lib/graphviz/" python -c "import pygraphviz; print(pygraphviz.__file__)" python3 -c "import pygraphviz; print(pygraphviz.__file__)" CommandLine: python -m ibeis.algo.hots.pgm_viz --exec-show_model --show Example: >>> # DISABLE_DOCTEST >>> from ibeis.algo.hots.pgm_viz import * # NOQA >>> model = '?' >>> evidence = {} >>> soft_evidence = {} >>> result = show_model(model, evidence, soft_evidence) >>> print(result) >>> ut.quit_if_noshow() >>> import plottool as pt >>> ut.show_if_requested() """ import plottool as pt fnum = pt.ensure_fnum(fnum) top_assignments = kwargs.get('top_assignments', None) if evidence and top_assignments and 'factor_list' in kwargs: pnum1 = (3, 1, (slice(0, 2), 0)) pnum2 = (3, 8, (2, slice(4, None))) else: pnum1 = (1, 1, 1) draw_bayesian_model(model, evidence, soft_evidence, fnum=fnum, pnum=pnum1, **kwargs) # Draw probability hist if top_assignments is not None: draw_map_histogram(top_assignments, fnum=fnum, pnum=pnum2)
if __name__ == '__main__': r""" CommandLine: python -m ibeis.algo.hots.pgm_viz python -m ibeis.algo.hots.pgm_viz --allexamples """ import multiprocessing multiprocessing.freeze_support() # for win32 import utool as ut # NOQA ut.doctest_funcs()