Source code for ibeis.algo.hots.bayes

# -*- coding: utf-8 -*-
"""
1) Ambiguity / num names
2) independence of annotations
3) continuous
4) exponential case
5) speicifc examples of our prob
6) human in loop
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import six  # NOQA
import utool as ut
import numpy as np
from six.moves import zip
from ibeis.algo.hots import pgm_ext
from ibeis.algo.hots import pgm_viz

print, rrr, profile = ut.inject2(__name__, '[bayes]')

#SPECIAL_BASIS_POOL = ['fred', 'sue', 'tom']
SPECIAL_BASIS_POOL = []
#'fred', 'sue', 'tom']

# Quickly change names to be consistent with papers Sorry, person reading code.
# This will be confusing and inconsistent
NAME_TTYPE = 'name'
MATCH_TTYPE = 'same'
SCORE_TTYPE = 'evidence_match'


[docs]def test_model(num_annots, num_names, score_evidence=[], name_evidence=[], other_evidence={}, noquery=False, verbose=None, **kwargs): if verbose is None: verbose = ut.VERBOSE method = kwargs.pop('method', None) model = make_name_model(num_annots, num_names, verbose=verbose, **kwargs) if verbose: model.print_priors(ignore_ttypes=[MATCH_TTYPE, SCORE_TTYPE]) model, evidence, soft_evidence = update_model_evidence( model, name_evidence, score_evidence, other_evidence) if verbose and len(soft_evidence) != 0: model.print_priors(ignore_ttypes=[MATCH_TTYPE, SCORE_TTYPE], title='Soft Evidence', color='green') #if verbose: # ut.colorprint('\n --- Soft Evidence ---', 'white') # for ttype, cpds in model.ttype2_cpds.items(): # if ttype != MATCH_TTYPE: # for fs_ in ut.ichunks(cpds, 4): # ut.colorprint(ut.hz_str([f._cpdstr('psql') for f in fs_]), # 'green') if verbose: ut.colorprint('\n --- Inference ---', 'red') if (len(evidence) > 0 or len(soft_evidence) > 0) and not noquery: evidence = model._ensure_internal_evidence(evidence) query_vars = [] query_vars += ut.list_getattr(model.ttype2_cpds[NAME_TTYPE], 'variable') #query_vars += ut.list_getattr(model.ttype2_cpds[MATCH_TTYPE], 'variable') query_vars = ut.setdiff(query_vars, evidence.keys()) #query_vars = ut.setdiff(query_vars, soft_evidence.keys()) query_results = cluster_query(model, query_vars, evidence, soft_evidence, method) else: query_results = {} factor_list = query_results['factor_list'] if verbose: if verbose: print('+--------') semtypes = [model.var2_cpd[f.variables[0]].ttype for f in factor_list] for type_, factors in ut.group_items(factor_list, semtypes).items(): print('Result Factors (%r)' % (type_,)) factors = ut.sortedby(factors, [f.variables[0] for f in factors]) for fs_ in ut.ichunks(factors, 4): ut.colorprint(ut.hz_str([f._str('phi', 'psql') for f in fs_]), 'yellow') print('MAP assignments') top_assignments = query_results.get('top_assignments', []) tmp = [] for lbl, val in top_assignments: tmp.append('%s : %.4f' % (ut.repr2(lbl), val)) print(ut.align('\n'.join(tmp), ' :')) print('L_____\n') showkw = dict(evidence=evidence, soft_evidence=soft_evidence, **query_results) pgm_viz.show_model(model, **showkw) return (model, evidence, query_results) # pgm_ext.print_ascii_graph(model)
[docs]def make_name_model(num_annots, num_names=None, verbose=True, mode=1, num_scores=2, p_score_given_same=None, hack_score_only=False, score_basis=None, special_names=None): r""" CommandLine: python -m ibeis.algo.hots.bayes --exec-make_name_model --no-cnn python -m ibeis.algo.hots.bayes --exec-make_name_model --show --no-cnn python -m ibeis.algo.hots.bayes --exec-make_name_model --num-annots=3 Example: >>> # DISABLE_DOCTEST >>> from ibeis.algo.hots.bayes import * # NOQA >>> defaults = dict(num_annots=2, num_names=2, verbose=True) >>> modeltype = ut.get_argval('--modeltype', default='bayes') >>> kw = ut.argparse_funckw(make_name_model, defaults) >>> model = make_name_model(**kw) >>> ut.quit_if_noshow() >>> model.show_model(show_prior=False, show_title=False, modeltype=modeltype) >>> ut.show_if_requested() """ if special_names is None: special_names = SPECIAL_BASIS_POOL assert mode == 1, 'only can do mode 1' base = ut.get_argval('--base', type_=str, default='a') annots = ut.chr_range(num_annots, base=base) # The indexes of match CPDs will not change if another annotation is added upper_diag_idxs = ut.colwise_diag_idxs(num_annots, 2) if hack_score_only: upper_diag_idxs = upper_diag_idxs[-hack_score_only:] if num_names is None: num_names = num_annots # +--- Define CPD Templates and Instantiation --- cpd_list = [] # Name Factor name_cpd_t = pgm_ext.TemplateCPD( NAME_TTYPE, ('n', num_names), special_basis_pool=special_names) name_cpds = [name_cpd_t.new_cpd(parents=aid) for aid in annots] #name_cpds = [name_cpd_t.new_cpd(parents=aid, constrain_state=count) # for count, aid in enumerate(annots, start=1)] cpd_list.extend(name_cpds) # Match Factor def match_pmf(match_type, n1, n2): return { True: {'same': 1.0, 'diff': 0.0}, False: {'same': 0.0, 'diff': 1.0}, }[n1 == n2][match_type] match_states = ['diff', 'same'] match_cpd_t = pgm_ext.TemplateCPD( MATCH_TTYPE, match_states, evidence_ttypes=[name_cpd_t, name_cpd_t], pmf_func=match_pmf) #match_cpd_t.varpref = 'S' namepair_cpds = ut.list_unflat_take(name_cpds, upper_diag_idxs) match_cpds = [match_cpd_t.new_cpd(parents=cpds) for cpds in namepair_cpds] cpd_list.extend(match_cpds) # Score Factor score_states = list(range(num_scores)) if score_basis is not None: score_states = ['%.2f' % (s,) for s in score_basis] if p_score_given_same is None: tmp = np.arange(num_scores + 1)[1:] tmp = np.cumsum(tmp) tmp = (tmp / tmp.sum()) p_score_given_same = tmp def score_pmf(score_type, match_type): if isinstance(score_type, six.string_types): score_type = score_states.index(score_type) if match_type == 'same': return p_score_given_same[score_type] else: return p_score_given_same[-(score_type + 1)] score_cpd_t = pgm_ext.TemplateCPD( SCORE_TTYPE, score_states, evidence_ttypes=[match_cpd_t], pmf_func=score_pmf) #match_cpd_t.varpref = 'P' score_cpds = [score_cpd_t.new_cpd(parents=cpds) for cpds in zip(match_cpds)] cpd_list.extend(score_cpds) with_humans = False if with_humans: human_states = ['diff', 'same'] human_cpd_t = pgm_ext.TemplateCPD( 'human', human_states, evidence_ttypes=[match_cpd_t], pmf_func=[[.9, .1], [.1, .9]]) human_cpds = [human_cpd_t.new_cpd(parents=cpds) for cpds in zip(match_cpds)] cpd_list.extend(human_cpds) with_rank = False # Rank depends on dependant scores if with_rank: rank_states = ['0', '1', '2', '3'] rank_cpd_t = pgm_ext.TemplateCPD( 'rank', rank_states, evidence_ttypes=[match_cpd_t], pmf_func=None) rank_cpds = [rank_cpd_t.new_cpd(parents=cpds) for cpds in zip(match_cpds)] cpd_list.extend(rank_cpds) # L___ End CPD Definitions ___ print('score_cpds = %r' % (ut.list_getattr(score_cpds, 'variable'),)) # Make Model model = pgm_ext.define_model(cpd_list) model.num_names = num_names if verbose: model.print_templates(ignore_ttypes=[MATCH_TTYPE]) return model
[docs]def update_model_evidence(model, name_evidence, score_evidence, other_evidence): r""" CommandLine: python -m ibeis.algo.hots.bayes --exec-update_model_evidence Example: >>> # DISABLE_DOCTEST >>> from ibeis.algo.hots.bayes import * # NOQA >>> verbose = True >>> other_evidence = {} >>> name_evidence = [0, 0, 1, 1, None] >>> score_evidence = ['high', 'low', 'low', 'low', 'low', 'high'] >>> model = make_name_model(num_annots=5, num_names=3, verbose=True, >>> mode=1) >>> update_model_evidence(model, name_evidence, score_evidence, >>> other_evidence) """ name_cpds = model.ttype2_cpds[NAME_TTYPE] score_cpds = model.ttype2_cpds[SCORE_TTYPE] evidence = {} evidence.update(other_evidence) soft_evidence = {} def apply_hard_soft_evidence(cpd_list, evidence_list): for cpd, ev in zip(cpd_list, evidence_list): if isinstance(ev, int): # hard internal evidence evidence[cpd.variable] = ev if isinstance(ev, six.string_types): # hard external evidence evidence[cpd.variable] = cpd._internal_varindex( cpd.variable, ev) if isinstance(ev, dict): # soft external evidence # HACK THAT MODIFIES CPD IN PLACE def rectify_evidence_val(_v, card=cpd.variable_card): # rectify hacky string structures tmp = (1 / (2 * card ** 2)) return (1 + tmp) / (card + tmp) if _v == '+eps' else _v ev_ = ut.map_dict_vals(rectify_evidence_val, ev) fill = (1.0 - sum(ev_.values())) / (cpd.variable_card - len(ev_)) # HACK fix for float problems if len(ev_) == cpd.variable_card - 1: fill = 0 assert fill > -1E7, 'fill=%r' % (fill,) row_labels = list(ut.iprod(*cpd.statenames)) for i, lbl in enumerate(row_labels): if lbl in ev_: # external case1 cpd.values[i] = ev_[lbl] elif len(lbl) == 1 and lbl[0] in ev_: # external case2 cpd.values[i] = ev_[lbl[0]] elif i in ev_: # internal case cpd.values[i] = ev_[i] else: cpd.values[i] = fill cpd.normalize() soft_evidence[cpd.variable] = True apply_hard_soft_evidence(name_cpds, name_evidence) apply_hard_soft_evidence(score_cpds, score_evidence) return model, evidence, soft_evidence
[docs]def reduce_marginalize(phi, query_variables=None, evidence={}, inplace=False): """ Hack for reduction followed by marginalization Example: >>> reduced_joint = joint.observe( >>> query_variables, evidence, inplace=False) >>> new_rows = reduced_joint._row_labels() >>> new_vals = reduced_joint.values.ravel() >>> map_vals = new_rows[new_vals.argmax()] >>> map_assign = dict(zip(reduced_joint.variables, map_vals)) """ reduced_joint = phi if inplace else phi.copy() if query_variables is None: query_variables = reduced_joint.variables reduced_joint.reduce(evidence) reduced_joint.normalize() # Marginalize over non-query, non-evidence irrelevant_vars = ( set(reduced_joint.variables) - (set(evidence.keys()) | set(query_variables)) ) reduced_joint.marginalize(irrelevant_vars) reduced_joint.normalize() if not inplace: return reduced_joint
[docs]def make_temp_state(state): mapping = {} for state_idx in state: if state_idx not in mapping: mapping[state_idx] = -(len(mapping) + 1) temp_state = [mapping[state_idx] for state_idx in state] return temp_state
[docs]def collapse_labels(model, evidence, reduced_variables, reduced_row_idxs, reduced_values): import vtool as vt #assert np.all(reduced_joint.values.ravel() == reduced_joint.values.flatten()) reduced_ttypes = [model.var2_cpd[var].ttype for var in reduced_variables] evidence_vars = list(evidence.keys()) evidence_state_idxs = ut.dict_take(evidence, evidence_vars) evidence_ttypes = [model.var2_cpd[var].ttype for var in evidence_vars] ttype2_ev_indices = dict(zip(*ut.group_indices(evidence_ttypes))) ttype2_re_indices = dict(zip(*ut.group_indices(reduced_ttypes))) # ttype2_ev_indices = ut.group_items(range(len(evidence_vars)), evidence_ttypes) # ttype2_re_indices = ut.group_items(range(len(reduced_variables)), reduced_ttypes) # Allow specific types of labels to change # everything is the same, only the names have changed. # TODO: allow for multiple different label_ttypes # for label_ttype in label_ttypes if NAME_TTYPE not in model.ttype2_template: return reduced_row_idxs, reduced_values label_ttypes = [NAME_TTYPE] for label_ttype in label_ttypes: ev_colxs = ttype2_ev_indices[label_ttype] re_colxs = ttype2_re_indices[label_ttype] ev_state_idxs = ut.take(evidence_state_idxs, ev_colxs) ev_state_idxs_tile = np.tile(ev_state_idxs, (len(reduced_values), 1)).astype(np.int) num_ev_ = len(ev_colxs) aug_colxs = list(range(num_ev_)) + (np.array(re_colxs) + num_ev_).tolist() aug_state_idxs = np.hstack([ev_state_idxs_tile, reduced_row_idxs]) # Relabel rows based on the knowledge that # everything is the same, only the names have changed. num_cols = len(aug_state_idxs.T) mask = vt.index_to_boolmask(aug_colxs, num_cols) other_colxs, = np.where(~mask) relbl_states = aug_state_idxs.compress(mask, axis=1) other_states = aug_state_idxs.compress(~mask, axis=1) tmp_relbl_states = np.array(list(map(make_temp_state, relbl_states))) max_tmp_state = -1 min_tmp_state = tmp_relbl_states.min() # rebuild original state structure with temp state idxs tmp_state_cols = [None] * num_cols for count, colx in enumerate(aug_colxs): tmp_state_cols[colx] = tmp_relbl_states[:, count:count + 1] for count, colx in enumerate(other_colxs): tmp_state_cols[colx] = other_states[:, count:count + 1] tmp_state_idxs = np.hstack(tmp_state_cols) data_ids = np.array( vt.compute_unique_data_ids_(list(map(tuple, tmp_state_idxs)))) unique_ids, groupxs = vt.group_indices(data_ids) print('Collapsed %r states into %r states' % ( len(data_ids), len(unique_ids),)) # Sum the values in the cpd to marginalize the duplicate probs new_values = np.array([ g.sum() for g in vt.apply_grouping(reduced_values, groupxs) ]) # Take only the unique rows under this induced labeling unique_tmp_groupxs = np.array(ut.get_list_column(groupxs, 0)) new_aug_state_idxs = tmp_state_idxs.take(unique_tmp_groupxs, axis=0) tmp_idx_set = set((-np.arange(-max_tmp_state, (-min_tmp_state) + 1)).tolist()) true_idx_set = set(range(len(model.ttype2_template[label_ttype].basis))) # Relabel the rows one more time to agree with initial constraints for colx, true_idx in enumerate(ev_state_idxs): tmp_idx = np.unique(new_aug_state_idxs.T[colx]) assert len(tmp_idx) == 1 tmp_idx_set -= {tmp_idx[0]} true_idx_set -= {true_idx} new_aug_state_idxs[new_aug_state_idxs == tmp_idx] = true_idx # Relabel the remaining idxs remain_tmp_idxs = sorted(list(tmp_idx_set))[::-1] remain_true_idxs = sorted(list(true_idx_set)) for tmp_idx, true_idx in zip(remain_tmp_idxs, remain_true_idxs): new_aug_state_idxs[new_aug_state_idxs == tmp_idx] = true_idx # Remove evidence based augmented labels new_state_idxs = new_aug_state_idxs.T[num_ev_:].T return new_state_idxs, new_values
[docs]def collapse_factor_labels(model, reduced_joint, evidence): reduced_variables = reduced_joint.variables reduced_row_idxs = np.array(reduced_joint._row_labels(asindex=True)) reduced_values = reduced_joint.values.ravel() new_state_idxs, new_values = collapse_labels( model, evidence, reduced_variables, reduced_row_idxs, reduced_values) if isinstance(reduced_joint, pgm_ext.ApproximateFactor): new_reduced_joint = pgm_ext.ApproximateFactor( new_state_idxs, new_values, reduced_variables, statename_dict=reduced_joint.statename_dict) else: # hack into a new joint factor # (that is the same size as the reduced_joint) new_reduced_joint = reduced_joint.copy() assert new_reduced_joint.values is not reduced_joint.values, ( 'copy did not work') new_reduced_joint.values[:] = 0 flat_idxs = np.ravel_multi_index(new_state_idxs.T, new_reduced_joint.values.shape) old_values = new_reduced_joint.values.ravel() old_values[flat_idxs] = new_values new_reduced_joint.values = old_values.reshape( reduced_joint.cardinality) # print(new_reduced_joint._str(maxrows=4, sort=-1)) # return new_reduced_joint, new_state_idxs, new_values return new_reduced_joint
[docs]def report_partitioning_statistics(new_reduced_joint): # compute partitioning statistics import vtool as vt vals, idxs = vt.group_indices(new_reduced_joint.values.ravel()) #groupsize = list(map(len, idxs)) #groupassigns = ut.unflat_vecmap(new_reduced_joint.assignment, idxs) all_states = new_reduced_joint._row_labels(asindex=True) clusterstats = [tuple(sorted(list(ut.dict_hist(a).values()))) for a in all_states] grouped_vals = ut.group_items(new_reduced_joint.values.ravel(), clusterstats) #probs_assigned_to_clustertype = [( # sorted(np.unique(np.array(b).round(decimals=5)).tolist())[::-1], a) # for a, b in grouped_vals.items()] probs_assigned_to_clustertype = [( ut.dict_hist(np.array(b).round(decimals=5)), a) for a, b in grouped_vals.items()] sortx = ut.argsort([max(c[0].keys()) for c in probs_assigned_to_clustertype]) probs_assigned_to_clustertype = ut.take(probs_assigned_to_clustertype, sortx) # This list of 2-tuples with the first item being the unique # probabilies that are assigned to a cluster type along with the number # of times they were assigned. A cluster type is the second item. Every # number represents how many annotations were assigned to a specific # label. The length of that list is the number of total labels. For # all low scores you will see [[{somenum: 1}, {0: 800}], [1, 1, 1, ... 1]] # indicating that that the assignment of everyone to a different label happend once # where the probability was somenum and a 800 times where the probability was 0. #print(sorted([(b, a) for a, b in ut.map_dict_vals(sum, x)]).items()) #z = sorted([(b, a) for a, b in ut.map_dict_vals(sum, grouped_vals).items()]) print(ut.repr2(probs_assigned_to_clustertype, nl=2, precision=2, sorted_=True)) #group_numperlbl = [ # [sorted(list(ut.dict_hist(ut.get_list_column(a, 1)).values())) for a in assigns] # for assigns in groupassigns]
def _test_compute_reduced_joint(model, query_vars, evidence, method): import pgmpy operation = 'maximize' variables = query_vars infr_ve = pgmpy.inference.VariableElimination(model) joint_ve = infr_ve.compute_joint(variables, operation, evidence) joint_ve.normalize() joint_ve.reorder() infr_bp = pgmpy.inference.BeliefPropagation(model) joint_bp = infr_bp.compute_joint(variables, operation, evidence) joint_bp.normalize() joint_bp.reorder() assert np.allclose(joint_ve.values, joint_bp.values) print('VE and BP are the same') joint_bf = model.joint_distribution() reduce_marginalize(joint_bf, query_vars, evidence, inplace=True) assert np.allclose(joint_bf.values, joint_bp.values) print('BF and BP are the same')
[docs]def compute_reduced_joint(model, query_vars, evidence, method, operation='maximize'): import pgmpy if method == 'approx': # TODO: incorporate operation? query_states = model.get_number_of_states(query_vars) print('model.number_of_states = %r' % ( model.get_number_of_states(),)) print('query_states = %r' % (query_states,)) # Try to approximatly sample the map inference infr = pgmpy.inference.Sampling.BayesianModelSampling(model) # The markov blanket of a name node in our network # can be quite large. It includes all other names. # infr = pgmpy.inference.Sampling.GibbsSampling(model) # import utool # utool.embed() #infr = pgmpy.inference.Sampling.GibbsSampling() #infr._get_kernel_from_bayesian_model(model) evidence_ = [pgmpy.inference.Sampling.State(*item) for item in evidence.items()] # TODO: apply hoffding and chernoff bounds delta = .1 # desired probability of error eps = .2 # desired error bound u = 1 / (2 ** len(evidence)) # upper bound on cpd entries of evidence k = len(evidence) gamma = (4 * (1 + eps) / (eps ** 2)) * np.log(2 / delta) thresh = gamma * (u ** k) # We are observing the leaves of this network, which means # we are effectively sampling from the prior distribution # when using forward sampling. Py = 1 / query_states Py_hueristic = 1 / (4 ** len(query_vars)) M_hoffding = (np.log(2 / delta) / (2 * eps ** 2)) M_chernoff = 3 * (np.log(2 / delta) / (Py * eps ** 2)) M_chernoff_hueristic = 3 * (np.log(2 / delta) / (Py_hueristic * eps ** 2)) hueristic_size = 2 ** (len(query_vars) + 2) size = min(100000, max(hueristic_size, 128)) print('\n-----') print('u = %r' % (u,)) print('thresh = %r' % (thresh,)) print('k = %r' % (k,)) print('gamma = %r' % (gamma,)) print('M_chernoff_hueristic = %r' % (M_chernoff_hueristic,)) print('hueristic_size = %r' % (hueristic_size,)) print('M_hoffding = %r' % (M_hoffding,)) print('M_chernoff = %r' % (M_chernoff,)) print('size = %r' % (size,)) #np.log(2 / .1) / (2 * (.2 ** 2)) sampled = infr.likelihood_weighted_sample(evidence=evidence_, size=size) reduced_joint = pgm_ext.ApproximateFactor.from_sampled(sampled, query_vars, statename_dict=model.statename_dict) #self = reduced_joint # NOQA #arr = self.state_idxs # NOQA #import utool #utool.embed() num_raw_states = len(reduced_joint.state_idxs) reduced_joint.consolidate() num_unique_states = len(reduced_joint.state_idxs) print('[pgm] %r / %r initially sampled states are unique' % ( num_unique_states, num_raw_states,)) reduced_joint.normalize() reduced_joint.reorder() elif method == 'varelim': infr = pgmpy.inference.VariableElimination(model) reduced_joint = infr.compute_joint(query_vars, operation, evidence) reduced_joint.normalize() reduced_joint.reorder() elif method in ['bp', 'beliefprop']: # Dont brute force anymore infr = pgmpy.inference.BeliefPropagation(model) reduced_joint = infr.compute_joint(query_vars, operation, evidence) reduced_joint.normalize() reduced_joint.reorder() elif method in ['bf', 'brute', 'bruteforce']: # TODO: incorporate operation? full_joint = model.joint_distribution() reduced_joint = reduce_marginalize(full_joint, query_vars, evidence, inplace=False) del full_joint else: raise NotImplementedError('method=%r' % (method,)) return reduced_joint
[docs]def cluster_query(model, query_vars=None, evidence=None, soft_evidence=None, method=None, operation='maximize'): """ CommandLine: python -m ibeis.algo.hots.bayes --exec-cluster_query --show GridParams: >>> param_grid = dict( >>> #method=['approx', 'bf', 'bp'], >>> method=['approx', 'bp'], >>> ) >>> combos = ut.all_dict_combinations(param_grid) >>> index = 0 >>> keys = 'method'.split(', ') >>> method, = ut.dict_take(combos[index], keys) GridSetup: >>> from ibeis.algo.hots.bayes import * # NOQA >>> verbose = True >>> other_evidence = {} >>> name_evidence = [1, None, None, 0] >>> score_evidence = [2, 0, 2] >>> special_names = ['fred', 'sue', 'tom', 'paul'] >>> model = make_name_model( >>> num_annots=4, num_names=4, num_scores=3, verbose=True, mode=1, >>> special_names=special_names) >>> method = None >>> model, evidence, soft_evidence = update_model_evidence( >>> model, name_evidence, score_evidence, other_evidence) >>> evidence = model._ensure_internal_evidence(evidence) >>> query_vars = ut.list_getattr(model.ttype2_cpds[NAME_TTYPE], 'variable') GridExample: >>> # DISABLE_DOCTEST >>> query_results = cluster_query(model, query_vars, evidence, >>> method=method) >>> print(ut.repr2(query_results['top_assignments'], nl=1)) >>> ut.quit_if_noshow() >>> pgm_viz.show_model(model, evidence=evidence, **query_results) >>> ut.show_if_requested() """ evidence = model._ensure_internal_evidence(evidence) if query_vars is None: query_vars = model.nodes() orig_query_vars = query_vars # NOQA query_vars = ut.setdiff(query_vars, list(evidence.keys())) if method is None: method = ut.get_argval('--method', type_=str, default='bp') reduced_joint = compute_reduced_joint(model, query_vars, evidence, method, operation) new_reduced_joint = collapse_factor_labels(model, reduced_joint, evidence) if False: report_partitioning_statistics(new_reduced_joint) # FIXME: are these max marginals? max_marginals = {} for i, var in enumerate(query_vars): one_out = query_vars[:i] + query_vars[i + 1:] max_marginals[var] = new_reduced_joint.marginalize(one_out, inplace=False) # max_marginals[var] = joint2.maximize(one_out, inplace=False) factor_list = max_marginals.values() # Now find the most likely state reduced_variables = new_reduced_joint.variables new_state_idxs = np.array(new_reduced_joint._row_labels(asindex=True)) new_values = new_reduced_joint.values.ravel() sortx = new_values.argsort()[::-1] sort_new_state_idxs = new_state_idxs.take(sortx, axis=0) sort_new_values = new_values.take(sortx) sort_new_states = list(zip(*[ ut.dict_take(model.statename_dict[var], idx) for var, idx in zip(reduced_variables, sort_new_state_idxs.T)])) # Better map assignment based on knowledge of labels map_assign = dict(zip(reduced_variables, sort_new_states[0])) sort_reduced_rowstr_lbls = [ ut.repr2(dict(zip(reduced_variables, lbls)), explicit=True, nobraces=True, strvals=True) for lbls in sort_new_states ] top_assignments = list(zip(sort_reduced_rowstr_lbls[:4], sort_new_values)) if len(sort_new_values) > 3: top_assignments += [('other', 1 - sum(sort_new_values[:4]))] query_results = { 'factor_list': factor_list, 'top_assignments': top_assignments, 'map_assign': map_assign, 'method': method, } print('query_results = %s' % (ut.repr3(query_results, nl=2),)) return query_results
[docs]def draw_tree_model(model, **kwargs): import plottool as pt import networkx as netx if not ut.get_argval('--hackjunc'): fnum = pt.ensure_fnum(None) fig = pt.figure(fnum=fnum, doclf=True) # NOQA ax = pt.gca() #name_nodes = sorted(ut.list_getattr(model.ttype2_cpds[NAME_TTYPE], 'variable')) netx_graph = model.to_markov_model() #pos = netx.pygraphviz_layout(netx_graph) #pos = netx.graphviz_layout(netx_graph) #pos = get_hacked_pos(netx_graph, name_nodes, prog='neato') pos = netx.nx_pydot.pydot_layout(netx_graph) node_color = [pt.WHITE] * len(pos) drawkw = dict(pos=pos, ax=ax, with_labels=True, node_color=node_color, node_size=1100) netx.draw(netx_graph, **drawkw) if kwargs.get('show_title', True): pt.set_figtitle('Markov Model') if not ut.get_argval('--hackmarkov'): fnum = pt.ensure_fnum(None) fig = pt.figure(fnum=fnum, doclf=True) # NOQA ax = pt.gca() netx_graph = model.to_junction_tree() # prettify nodes def fixtupkeys(dict_): return { ', '.join(k) if isinstance(k, tuple) else k: fixtupkeys(v) for k, v in dict_.items() } n = fixtupkeys(netx_graph.node) e = fixtupkeys(netx_graph.edge) a = fixtupkeys(netx_graph.adj) netx_graph.node = n netx_graph.edge = e netx_graph.adj = a #netx_graph = model.to_markov_model() #pos = netx.pygraphviz_layout(netx_graph) #pos = netx.graphviz_layout(netx_graph) pos = netx.nx_pydot.pydot_layout(netx_graph) node_color = [pt.WHITE] * len(pos) drawkw = dict(pos=pos, ax=ax, with_labels=True, node_color=node_color, node_size=2000) netx.draw(netx_graph, **drawkw) if kwargs.get('show_title', True): pt.set_figtitle('Junction/Clique Tree / Cluster Graph')
[docs]def get_hacked_pos(netx_graph, name_nodes=None, prog='dot'): import pygraphviz import networkx as netx # Add "invisible" edges to induce an ordering # Hack for layout (ordering of top level nodes) netx_graph2 = netx_graph.copy() if getattr(netx_graph, 'ttype2_cpds', None) is not None: grouped_nodes = [] for ttype in netx_graph.ttype2_cpds.keys(): ttype_cpds = netx_graph.ttype2_cpds[ttype] # use defined ordering ttype_nodes = ut.list_getattr(ttype_cpds, 'variable') # ttype_nodes = sorted(ttype_nodes) invis_edges = list(ut.itertwo(ttype_nodes)) netx_graph2.add_edges_from(invis_edges) grouped_nodes.append(ttype_nodes) A = netx.to_agraph(netx_graph2) for nodes in grouped_nodes: A.add_subgraph(nodes, rank='same') else: A = netx.to_agraph(netx_graph2) #if name_nodes is not None: # #netx.set_node_attributes(netx_graph, 'label', {n: {'label': n} for n in all_nodes}) # invis_edges = list(ut.itertwo(name_nodes)) # netx_graph2.add_edges_from(invis_edges) # A.add_subgraph(name_nodes, rank='same') #else: # A = netx.to_agraph(netx_graph2) args = '' G = netx_graph A.layout(prog=prog, args=args) #A.draw('example.png', prog='dot') node_pos = {} for n in G: node_ = pygraphviz.Node(A, n) try: xx, yy = node_.attr["pos"].split(',') node_pos[n] = (float(xx), float(yy)) except: print("no position for node", n) node_pos[n] = (0.0, 0.0) return node_pos
[docs]def show_model(model, evidence={}, soft_evidence={}, **kwargs): """ References: http://stackoverflow.com/questions/22207802/pygraphviz-networkx-set-node-level-or-layer Ignore: pkg-config --libs-only-L libcgraph sudo apt-get install libgraphviz-dev -y sudo apt-get install libgraphviz4 -y # sudo apt-get install pkg-config sudo apt-get install libgraphviz-dev # pip install git+git://github.com/pygraphviz/pygraphviz.git pip install pygraphviz python -c "import pygraphviz; print(pygraphviz.__file__)" sudo pip3 install pygraphviz --install-option="--include-path=/usr/include/graphviz" --install-option="--library-path=/usr/lib/graphviz/" python3 -c "import pygraphviz; print(pygraphviz.__file__)" CommandLine: python -m ibeis.algo.hots.bayes --exec-show_model --show Example: >>> # DISABLE_DOCTEST >>> from ibeis.algo.hots.bayes import * # NOQA >>> model = '?' >>> evidence = {} >>> soft_evidence = {} >>> result = show_model(model, evidence, soft_evidence) >>> print(result) >>> ut.quit_if_noshow() >>> import plottool as pt >>> ut.show_if_requested() """ if ut.get_argval('--hackmarkov') or ut.get_argval('--hackjunc'): draw_tree_model(model, **kwargs) return import plottool as pt import networkx as netx fnum = pt.ensure_fnum(None) netx_graph = (model) #netx_graph.graph.setdefault('graph', {})['size'] = '"10,5"' #netx_graph.graph.setdefault('graph', {})['rankdir'] = 'LR' pos_dict = get_hacked_pos(netx_graph) #pos_dict = netx.nx_agraph.pygraphviz_layout(netx_graph) #pos = netx.nx_agraph.nx_pydot.pydot_layout(netx_graph, prog='dot') #pos_dict = netx.nx_agraph.graphviz_layout(netx_graph) textprops = { 'family': 'monospace', 'horizontalalignment': 'left', #'horizontalalignment': 'center', #'size': 12, 'size': 8, } netx_nodes = model.nodes(data=True) node_key_list = ut.get_list_column(netx_nodes, 0) pos_list = ut.dict_take(pos_dict, node_key_list) var2_post = {f.variables[0]: f for f in kwargs.get('factor_list', [])} prior_text = None post_text = None evidence_tas = [] post_tas = [] prior_tas = [] node_color = [] has_infered = evidence or var2_post if has_infered: ignore_prior_with_ttype = [SCORE_TTYPE, MATCH_TTYPE] show_prior = False else: ignore_prior_with_ttype = [] #show_prior = True show_prior = False dpy = 5 dbx, dby = (20, 20) takw1 = {'bbox_align': (.5, 0), 'pos_offset': [0, dpy], 'bbox_offset': [dbx, dby]} takw2 = {'bbox_align': (.5, 1), 'pos_offset': [0, -dpy], 'bbox_offset': [-dbx, -dby]} name_colors = pt.distinct_colors(max(model.num_names, 10)) name_colors = name_colors[:model.num_names] #cmap_ = 'hot' #mx = 0.65 #mn = 0.15 cmap_, mn, mx = 'plasma', 0.15, 1.0 _cmap = pt.plt.get_cmap(cmap_) def cmap(x): return _cmap((x * mx) + mn) for node, pos in zip(netx_nodes, pos_list): variable = node[0] cpd = model.var2_cpd[variable] prior_marg = (cpd if cpd.evidence is None else cpd.marginalize(cpd.evidence, inplace=False)) show_evidence = variable in evidence show_prior = cpd.ttype not in ignore_prior_with_ttype show_post = variable in var2_post show_prior |= cpd.ttype not in ignore_prior_with_ttype post_marg = None if show_post: post_marg = var2_post[variable] def get_name_color(phi): order = phi.values.argsort()[::-1] if len(order) < 2: dist_next = phi.values[order[0]] else: dist_next = phi.values[order[0]] - phi.values[order[1]] dist_total = (phi.values[order[0]]) confidence = (dist_total * dist_next) ** (2.5 / 4) #print('confidence = %r' % (confidence,)) color = name_colors[order[0]] color = pt.color_funcs.desaturate_rgb(color, 1 - confidence) color = np.array(color) return color if variable in evidence: if cpd.ttype == SCORE_TTYPE: cmap_index = evidence[variable] / (cpd.variable_card - 1) color = cmap(cmap_index) color = pt.lighten_rgb(color, .4) color = np.array(color) node_color.append(color) elif cpd.ttype == NAME_TTYPE: color = name_colors[evidence[variable]] color = np.array(color) node_color.append(color) else: color = pt.FALSE_RED node_color.append(color) #elif variable in soft_evidence: # color = pt.LIGHT_PINK # show_prior = True # color = get_name_color(prior_marg) # node_color.append(color) else: if cpd.ttype == NAME_TTYPE and post_marg is not None: color = get_name_color(post_marg) node_color.append(color) elif cpd.ttype == MATCH_TTYPE and post_marg is not None: color = cmap(post_marg.values[1]) color = pt.lighten_rgb(color, .4) color = np.array(color) node_color.append(color) else: #color = pt.WHITE color = pt.NEUTRAL node_color.append(color) if show_prior: if variable in soft_evidence: prior_color = pt.LIGHT_PINK else: prior_color = None prior_text = pgm_ext.make_factor_text(prior_marg, 'prior') prior_tas.append(dict(text=prior_text, pos=pos, color=prior_color, **takw2)) if show_evidence: _takw1 = takw1 if cpd.ttype == SCORE_TTYPE: _takw1 = takw2 evidence_text = cpd.variable_statenames[evidence[variable]] if isinstance(evidence_text, int): evidence_text = '%d/%d' % (evidence_text + 1, cpd.variable_card) evidence_tas.append(dict(text=evidence_text, pos=pos, color=color, **_takw1)) if show_post: _takw1 = takw1 if cpd.ttype == MATCH_TTYPE: _takw1 = takw2 post_text = pgm_ext.make_factor_text(post_marg, 'post') post_tas.append(dict(text=post_text, pos=pos, color=None, **_takw1)) def trnps_(dict_list): """ tranpose dict list """ list_dict = ut.ddict(list) for dict_ in dict_list: for key, val in dict_.items(): list_dict[key + '_list'].append(val) return list_dict takw1_ = trnps_(post_tas + evidence_tas) takw2_ = trnps_(prior_tas) # Draw graph if has_infered: pnum1 = (3, 1, (slice(0, 2), 0)) else: pnum1 = None fig = pt.figure(fnum=fnum, pnum=pnum1, doclf=True) # NOQA ax = pt.gca() #print('node_color = %s' % (ut.repr3(node_color),)) drawkw = dict(pos=pos_dict, ax=ax, with_labels=True, node_size=1500, node_color=node_color) netx.draw(netx_graph, **drawkw) hacks = [] if len(post_tas + evidence_tas): hacks.append(pt.draw_text_annotations(textprops=textprops, **takw1_)) if prior_tas: hacks.append(pt.draw_text_annotations(textprops=textprops, **takw2_)) xmin, ymin = np.array(pos_list).min(axis=0) xmax, ymax = np.array(pos_list).max(axis=0) num_annots = len(model.ttype2_cpds[NAME_TTYPE]) if num_annots > 4: ax.set_xlim((xmin - 40, xmax + 40)) ax.set_ylim((ymin - 50, ymax + 50)) fig.set_size_inches(30, 7) else: ax.set_xlim((xmin - 42, xmax + 42)) ax.set_ylim((ymin - 50, ymax + 50)) fig.set_size_inches(23, 7) fig = pt.gcf() title = 'num_names=%r, num_annots=%r' % (model.num_names, num_annots,) map_assign = kwargs.get('map_assign', None) top_assignments = kwargs.get('top_assignments', None) if top_assignments is not None: map_assign, map_prob = top_assignments[0] if map_assign is not None: def word_insert(text): return '' if len(text) == 0 else text + ' ' title += '\n%sMAP: ' % (word_insert(kwargs.get('method', ''))) title += map_assign + ' @' + '%.2f%%' % (100 * map_prob,) if kwargs.get('show_title', True): pt.set_figtitle(title, size=14) for hack in hacks: hack() # Hack in colorbars if has_infered: pt.colorbar(np.linspace(0, 1, len(name_colors)), name_colors, lbl=NAME_TTYPE, ticklabels=model.ttype2_template[NAME_TTYPE].basis, ticklocation='left') basis = model.ttype2_template[SCORE_TTYPE].basis scalars = np.linspace(0, 1, len(basis)) scalars = np.linspace(0, 1, 100) colors = pt.scores_to_color(scalars, cmap_=cmap_, reverse_cmap=False, cmap_range=(mn, mx)) colors = [pt.lighten_rgb(c, .4) for c in colors] if ut.list_type(basis) is int: pt.colorbar(scalars, colors, lbl=SCORE_TTYPE, ticklabels=np.array(basis) + 1) else: pt.colorbar(scalars, colors, lbl=SCORE_TTYPE, ticklabels=basis) #print('basis = %r' % (basis,)) # Draw probability hist if has_infered and top_assignments is not None: bin_labels = ut.get_list_column(top_assignments, 0) bin_vals = ut.get_list_column(top_assignments, 1) # bin_labels = ['\n'.join(ut.textwrap.wrap(_lbl, width=30)) for _lbl in bin_labels] pt.draw_histogram(bin_labels, bin_vals, fnum=fnum, pnum=(3, 8, (2, slice(4, None))), transpose=True, use_darkbackground=False, #xtick_rotation=-10, ylabel='Prob', xlabel='assignment') pt.set_title('Assignment probabilities') #fpath = ('name_model_' + suff + '.png') #pt.plt.savefig(fpath) #return fpath
if __name__ == '__main__': r""" CommandLine: python -m ibeis.algo.hots.bayes python -m ibeis.algo.hots.bayes --allexamples """ if ut.VERBOSE: print('[hs] bayes') import multiprocessing multiprocessing.freeze_support() # for win32 import utool as ut # NOQA ut.doctest_funcs()