Source code for ibeis.model.hots.score_normalization

# -*- coding: utf-8 -*-
"""
GOALS:
    1) vsmany
       * works resaonable for very few and very many
       * stars with small k and then k becomes a percent or log percent
       * distinctiveness from different location

    2) 1-vs-1
       * uses distinctiveness and foreground when available
       * start with ratio test and ransac

    3) First N decision are interactive until we learn a good threshold

    4) Always show numbers between 0 and 1 spatial verification is based on
    single best exemplar

       x - build normalizer
       x - test normalizer
       x - monotonicity (both nondecreasing and strictly increasing)
       x - cache normalizer
       x - cache maitainance (deleters and listers)
       o - Incemental learning
       o - Spceies sensitivity


    * Add ability for user to relearn normalizer from labeled database.


TODO:
    move scorenorm functionality to vtool

"""
from __future__ import absolute_import, division, print_function, unicode_literals
import utool
from os.path import join
import numpy as np
import utool as ut
import vtool as vt
import six  # NOQA
print, rrr, profile = utool.inject2(__name__, '[scorenorm]', DEBUG=False)


# NORMALIZER STORAGE AND CACHINE CLASS
USE_NORMALIZER_CACHE = not ut.get_argflag(('--no-normalizer-cache', '--no-normcache'))
# IBEIS FUNCTIONS
MAX_NORMALIZER_CACHE_SIZE = 8
NORMALIZER_CACHE = ut.get_lru_cache(MAX_NORMALIZER_CACHE_SIZE)
#NORMALIZER_CACHE = {}


@six.add_metaclass(ut.ReloadingMetaclass)
[docs]class ScoreNormalizer(ut.Cachable): r""" Args: normalizer (?): cfgstr (None): score_domain (None): p_tp_given_score (None): tp_support (None): tn_support (None): tp_labels (None): tn_labels (None): clip_score (None): CommandLine: python -m ibeis.model.hots.score_normalization --test-ScoreNormalizer Example: >>> # DISABLE_DOCTEST >>> from ibeis.model.hots.score_normalization import * # NOQA >>> cfgstr = None >>> score_domain = None >>> p_tp_given_score = None >>> tp_support = None >>> tn_support = None >>> tp_labels = None >>> tn_labels = None >>> normalizer = ScoreNormalizer(cfgstr, score_domain, p_tp_given_score, ... tp_support, tn_support, tp_labels, ... tn_labels) """ prefix2 = '_normalizer_' def __init__(normalizer, cfgstr=None, score_domain=None, p_tp_given_score=None, tp_support=None, tn_support=None, tp_labels=None, tn_labels=None, clip_score=None, timestamp=None, prefix=''): super(ScoreNormalizer, normalizer).__init__() normalizer.cfgstr = cfgstr normalizer.prefix1 = prefix normalizer.score_domain = score_domain normalizer.p_tp_given_score = p_tp_given_score normalizer.tp_support = tp_support normalizer.tn_support = tn_support normalizer.tp_labels = tp_labels normalizer.tn_labels = tn_labels normalizer.timestamp = timestamp normalizer.clip_score = clip_score #normalizer.set_values(score_domain, p_tp_given_score, tp_support, # tn_support, tp_labels, tn_labels)
[docs] def get_prefix(normalizer): return normalizer.prefix1 + ScoreNormalizer.prefix2
[docs] def get_cfgstr(normalizer): assert normalizer.cfgstr is not None return normalizer.cfgstr #def load(normalizer, *args, **kwargs): # # Inherited method # super(ScoreNormalizer, normalizer).load(*args, **kwargs) #def save(normalizer, *args, **kwargs): # # Inherited method # super(ScoreNormalizer, normalizer).save(*args, **kwargs)
[docs] def normalize_score_(normalizer, score): """ for internal use only """ if normalizer.score_domain is None: raise AssertionError('user normalize score list') return .5 if score < normalizer.score_domain[0]: # clip scores at 0 prob = 0.0 elif score > normalizer.score_domain[-1]: # interpolate between max probability and one prob = (normalizer.p_tp_given_score[-1] + 1.0) / 2.0 else: # use normalizer to get scores indexes = np.where(normalizer.score_domain <= score)[0] index = indexes[-1] prob = normalizer.p_tp_given_score[index] #if prob >= 1: # ut.embed() return prob
def __call__(normalizer, score_list): return normalizer.normalize_score_list(score_list)
[docs] def normalize_score_list(normalizer, score_list): if normalizer.get_num_training_pairs() < 2: #prob_list = normalizer.empty_normalize_score_list_46(score_list) prob_list = normalizer.empty_normalize_score_list_None(score_list) else: prob_list = [normalizer.normalize_score_(score) for score in score_list] return prob_list
[docs] def empty_normalize_score_list_None(normalizer, score_list): return [None] * len(score_list)
[docs] def empty_normalize_score_list_46(normalizer, score_list): """ # HACK # return scores from .4 to .6 if we have no idea """ score_arr = np.array(score_list) if len(score_arr) < 2 or score_arr.max() == score_arr.min(): return np.full(score_arr.shape, .5) else: prob_list = (ut.norm_zero_one(score_arr) * .2) + .4 return prob_list
[docs] def normalizer_score_list2(normalizer, score_list): """ linear combination of probability and original score based on num support cases """ num_train_pairs = normalizer.get_num_training_pairs() score_list = np.array(score_list) prob_list = normalizer.normalize_score_list(score_list) NUM_SUPPORT_THRESH = 200 alpha = min(1.0, num_train_pairs / float(NUM_SUPPORT_THRESH)) prob_list2 = (alpha * score_list) + ((1 - alpha) * prob_list) return prob_list2
[docs] def get_num_training_pairs(normalizer): if normalizer.score_domain is None: num_train_pairs = 0 else: num_train_pairs = len(normalizer.tp_support) return num_train_pairs
[docs] def get_infostr(normalizer): if normalizer.score_domain is None: return 'empty normalizer' infostr_list = [ ut.get_stats_str(normalizer.tp_support, lbl='tp_support', exclude_keys=['nMin', 'nMax']), ut.get_stats_str(normalizer.tn_support, lbl='tn_support', exclude_keys=['nMin', 'nMax']), ut.get_stats_str(normalizer.p_tp_given_score, lbl='p_tp_given_score', exclude_keys=['nMin', 'nMax']), ut.get_stats_str(normalizer.score_domain, keys=['max', 'min', 'shape'], lbl='score_domain'), 'clip_score = %.2f' % normalizer.clip_score, 'cfgstr = %r' % normalizer.cfgstr, 'timestamp = %r' % normalizer.timestamp, ] infostr = '\n'.join(infostr_list) return infostr
[docs] def add_support(normalizer, tp_scores, tn_scores, tp_labels, tn_labels): """ CommandLine: python -m ibeis.model.hots.score_normalization --test-add_support --show Example: >>> # DISABLE_DOCTEST >>> from ibeis.model.hots.score_normalization import * # NOQA >>> # build test data >>> normalizer = ScoreNormalizer('testnorm') >>> tp_scores = [100, 100, 70, 60, 60, 60, 100] >>> tn_scores = [10, 10, 20, 30, 30, 30, 10] >>> tp_labels = list(map(ut.deterministic_uuid, [110, 110, 111, 112, 112, 112, 110])) >>> tn_labels = list(map(ut.deterministic_uuid, [10, 10, 11, 12, 12, 12, 10])) >>> # call test function >>> normalizer.add_support(tp_scores, tn_scores, tp_labels, tn_labels) >>> # verify results >>> normalizer.retrain() >>> if ut.show_was_requested(): >>> normalizer.visualize() >>> # build test data >>> tp_scores = np.random.randint(100, size=100) >>> tn_scores = np.random.randint(50, size=100) >>> tp_labels = list(map(ut.deterministic_uuid, np.arange(1000, 1100))) >>> tn_labels = list(map(ut.deterministic_uuid, np.arange(2000, 2100))) >>> normalizer.add_support(tp_scores, tn_scores, tp_labels, tn_labels) >>> normalizer.retrain() >>> if ut.show_was_requested(): >>> import plottool as pt >>> normalizer.visualize() >>> pt.show_if_requested() """ # Initialize support if empty if normalizer.tp_support is None: normalizer.tp_support = np.array([]) normalizer.tn_support = np.array([]) normalizer.tp_labels = np.array([]) normalizer.tn_label = np.array([]) # Ensure that incoming data is unique w.r.t. data that already exists def filter_seen_data(seen_labels, input_labels, input_data): """ seen_labels, input_labels, input_data = normalizer.tp_labels, tp_labels, tp_scores """ unique_labels, unique_indiceis = np.unique(input_labels, return_index=True) unique_data = np.array(input_data).take(unique_indiceis, axis=0) isold_flags = np.in1d(unique_labels, seen_labels) isnew_flags = np.logical_not(isold_flags, out=isold_flags) filtered_labels = unique_labels.compress(isnew_flags) filtered_data = unique_data.compress(isnew_flags) return filtered_labels, filtered_data filtered_tp_labels, filtered_tp_scores = filter_seen_data(normalizer.tp_labels, tp_labels, tp_scores) filtered_tn_labels, filtered_tn_scores = filter_seen_data(normalizer.tn_labels, tn_labels, tn_scores) # Ensure input in list format assert ut.list_allsame(map( len, (tp_scores, tn_scores, tp_labels, tn_labels))), ('unequal lengths') if len(filtered_tp_scores) == 0: return normalizer.tp_support = np.append(normalizer.tp_support, filtered_tp_scores) normalizer.tn_support = np.append(normalizer.tn_support, filtered_tn_scores) normalizer.tp_labels = np.append(normalizer.tp_labels, filtered_tp_labels) normalizer.tn_label = np.append(normalizer.tn_labels, filtered_tn_labels)
[docs] def retrain(normalizer): tp_support = np.array(normalizer.tp_support) tn_support = np.array(normalizer.tn_support) learnkw = dict() learntup = learn_score_normalization(tp_support, tn_support, return_all=False, **learnkw) (score_domain, p_tp_given_score, clip_score) = learntup # DONT Make a new custom cfg #cfgstr = ut.hashstr((tp_support, tn_support)) #normalizer.cfgstr = cfgstr normalizer.score_domain = score_domain normalizer.p_tp_given_score = p_tp_given_score normalizer.clip_score = clip_score
[docs] def visualize(normalizer, update=True, verbose=True, fnum=None): """ CommandLine: python -m ibeis.model.hots.score_normalization --test-visualize --index 0 --cmd Example: >>> # DISABLE_DOCTEST >>> import plottool as pt >>> from ibeis.model.hots.score_normalization import * # NOQA >>> #import ibeis >>> index = ut.get_argval('--index', type_=int, default=0) >>> normalizer = load_precomputed_normalizer(index, with_global=False) >>> normalizer.visualize() >>> six.exec_(pt.present(), globals(), locals()) """ import plottool as pt if verbose: print(normalizer.get_infostr()) if normalizer.score_domain is None: return if fnum is None: fnum = pt.next_fnum() pt.figure(fnum=fnum, pnum=(2, 1, 1), doclf=True, docla=True) normalizer.visualize_probs(fnum=fnum, pnum=(2, 1, 1), update=False) normalizer.visualize_support(fnum=fnum, pnum=(2, 1, 2), update=False) if update: pt.update()
[docs] def visualize_support(normalizer, update=True, fnum=None, pnum=(1, 1, 1)): plot_support(normalizer.tn_support, normalizer.tp_support, fnum=fnum, pnum=pnum) if update: import plottool as pt pt.update()
[docs] def visualize_probs(normalizer, update=True, fnum=None, pnum=(1, 1, 1)): plot_postbayes_pdf(normalizer.score_domain, 1 - normalizer.p_tp_given_score, normalizer.p_tp_given_score, cfgstr=normalizer.get_cfgstr(), fnum=fnum, pnum=pnum) if update: import plottool as pt pt.update() # DEVELOPER FUNCTIONS
[docs]def parse_available_normalizers(*args, **kwargs): import parse normalizers_fpaths = list_available_score_normalizers(*args, **kwargs) parsestr = '{cachedir}/{prefix1}' + ScoreNormalizer.prefix2 + '{cfgstr}' + ScoreNormalizer.ext result_list = [parse.parse(parsestr, path) for path in normalizers_fpaths] cfgstr_list = [result['cfgstr'] for result in result_list] prefix1_list = [result['prefix1'] for result in result_list] cachedir_list = [result['cachedir'] for result in result_list] return cfgstr_list, cachedir_list, prefix1_list
[docs]def load_precomputed_normalizer(index, *args, **kwargs): """ python -m ibeis.model.hots.score_normalization --test-load_precomputed_normalizer Example: >>> from ibeis.model.hots.score_normalization import * # NOQA >>> normalizer = load_precomputed_normalizer(None) >>> normalizer.visualize() >>> import plottool as pt >>> six.exec_(pt.present(), globals(), locals()) """ cfgstr_list, cachedir_list, prefix1_list = parse_available_normalizers(*args, **kwargs) if index is None or index == 'None': print('Avaliable indexes:') print(ut.indentjoin(map(str, enumerate(cfgstr_list)))) index = int(input('what index?')) cfgstr = cfgstr_list[index] cachedir = cachedir_list[index] #prefix1 = prefix1_list[index] normalizer = ScoreNormalizer(cfgstr=cfgstr) normalizer.load(cachedir) return normalizer
[docs]def testload_myscorenorm(): r""" CommandLine: python -m ibeis.model.hots.score_normalization --test-testload_myscorenorm Example: >>> # DISABLE_DOCTEST >>> from ibeis.model.hots.score_normalization import * # NOQA >>> testload_myscorenorm() >>> import plottool as pt >>> six.exec_(pt.present(), globals(), locals()) """ normalizer = ScoreNormalizer(cfgstr='gzbase') normalizer.load(utool.truepath('~/Dropbox/IBEIS')) normalizer.visualize()
[docs]def list_available_score_normalizers(with_global=True, with_local=True): r""" CommandLine: python -m ibeis.model.hots.score_normalization --test-list_available_score_normalizers Ignore:: cp /media/raid/work/_INCTEST_arr((666)7xcu21@fcschv2@m)_GZ_ALL/_ibsdb/_ibeis_cache/scorenorm/zebra_grevys/zebra_grevys_normalizer_bi+i4y&3dl8!xb!+.cPkl mkdir ~/Dropbox/IBEIS cp '/media/raid/work/_INCTEST_arr((666)7xcu21@fcschv2@m)_GZ_ALL/_ibsdb/_ibeis_cache/scorenorm/zebra_grevys/zebra_grevys_normalizer_bi+i4y&3dl8!xb!+.cPkl' ~/Dropbox/IBEIS/normalizer.cPkl mv ~/Dropbox/IBEIS/normalizer.cPkl ~/Dropbox/IBEIS/_normalizer_gzbase.cPkl Example: >>> # DISABLE_DOCTEST >>> from ibeis.model.hots.score_normalization import * # NOQA >>> local_normalizers_fpaths = list_available_score_normalizers(with_global=False) >>> global_normalizers_fpaths = list_available_score_normalizers(with_local=False) >>> # quote them >>> # local_normalizers_fpaths = ['"%s"' % fpath for fpath in local_normalizers_fpaths] >>> # global_normalizers_fpaths = ['"%s"' % fpath for fpath in global_normalizers_fpaths] >>> print('Available LOCAL normalizers: ' + ut.indentjoin(local_normalizers_fpaths, '\n ')) >>> print('Available GLOBAL normalizers: ' + ut.indentjoin(global_normalizers_fpaths, '\n ')) >>> print(list(map(ut.get_file_nBytes_str, local_normalizers_fpaths))) >>> print(list(map(ut.get_file_nBytes_str, global_normalizers_fpaths))) """ from ibeis.init import sysres from ibeis import constants #from os.path import join pattern = '*' + ScoreNormalizer.prefix2 + '*' + ScoreNormalizer.ext ibeis_resdir = sysres.get_ibeis_resource_dir() workdir = sysres.get_workdir() normalizer_fpaths = [] if with_global: global_normalizers = ut.glob(ibeis_resdir, pattern, recursive=True) normalizer_fpaths += global_normalizers if with_local: ibsdbdir_list = sysres.get_ibsdb_list(workdir) searchdirs = [join(ibsdbdir, constants.REL_PATHS.cache) for ibsdbdir in ibsdbdir_list] local_normalizers_list = [ut.glob(path, pattern, recursive=True) for path in searchdirs] local_normalizers = ut.flatten(local_normalizers_list) normalizer_fpaths.extend(local_normalizers) # Just search localdb cachedirs (otherwise it will take forever) return normalizer_fpaths
[docs]def delete_all_learned_normalizers(): r""" DELETES ALL CACHED NORMALIZERS IN ALL DATABASES CommandLine: python -m ibeis.model.hots.score_normalization --test-delete_all_learned_normalizers #-y Example: >>> # DOCTEST_DISABLE >>> from ibeis.model.hots import score_normalization >>> score_normalization.delete_all_learned_normalizers() """ from ibeis.model.hots import score_normalization import utool as ut print('DELETE_ALL_LEARNED_NORMALIZERS') normalizer_fpath_list = score_normalization.list_available_score_normalizers() print('The following normalizers will be deleted: ' + ut.indentjoin(normalizer_fpath_list, '\n ')) if ut.are_you_sure('Deleting all learned normalizers'): ut.remove_fpaths(normalizer_fpath_list, verbose=True) # TRAINING FUNCTIONS
[docs]def train_baseline_for_all_dbs(): r""" Runs unnormalized queries to compute normalized queries CommandLine: python -m ibeis.model.hots.score_normalization --test-train_baseline_for_all_dbs Example: >>> from ibeis.model.hots.score_normalization import * # NOQA >>> train_baseline_for_all_dbs() """ import ibeis #from ibeis.model.hots import score_normalization dbname = 'GZ_ALL' dbname = 'PZ_MTEST' dbname_list = [ 'PZ_MTEST', #'GZ_ALL', ] learnkw = dict() for dbname in dbname_list: ibs = ibeis.opendb(dbname) train_baseline_ibeis_normalizer(ibs, use_cache=False, **learnkw)
[docs]def train_baseline_ibeis_normalizer(ibs, use_cache=True, **learnkw): r""" Runs unnormalized queries to compute normalized queries Args: ibs (IBEISController): Returns: ScoreNormalizer: normalizer CommandLine: python -m ibeis.model.hots.score_normalization --test-train_baseline_ibeis_normalizer --cmd python -m ibeis.model.hots.score_normalization --test-train_baseline_ibeis_normalizer --noshow Example: >>> # DISABLE_DOCTEST >>> from ibeis.model.hots.score_normalization import * # NOQA >>> from ibeis.all_imports import * # NOQA >>> import plottool as pt >>> import ibeis >>> from ibeis.model.hots import score_normalization >>> #score_normalization.rrr() >>> dbname = 'GZ_ALL' >>> dbname = 'PZ_MTEST' >>> ibs = ibeis.opendb(dbname) >>> learnkw = dict() >>> normalizer = score_normalization.train_baseline_ibeis_normalizer(ibs, use_cache=False, **learnkw) >>> normalizer.visualize() >>> result = str(normalizer) >>> print(result) >>> exec(pt.present()) """ from ibeis.model.hots import query_request # TRAIN BASELINE tag = '<TRAINING> ' print(utool.msgblock(tag, 'Begning Training')) with utool.Timer(tag): #with utool.Indenter('TRAIN >>> '): qaid_list = ibs.get_valid_aids() daid_list = ibs.get_valid_aids() #cfgdict = dict(codename='nsum_unnorm') codename = 'vsone_unnorm' cfgdict = dict(codename=codename) qreq_ = query_request.new_ibeis_query_request(ibs, qaid_list, daid_list, cfgdict) use_qcache = True cm_list = ibs.query_chips(qaid_list, daid_list, qreq_=qreq_, use_cache=use_qcache, return_cm=True) normalizer = cached_ibeis_score_normalizer(ibs, cm_list, qreq_, use_cache=use_cache, **learnkw) # Save as baseline for this species species_text = '_'.join(qreq_.get_unique_species()) # HACK baseline_cfgstr = 'baseline_' + species_text cachedir = ibs.get_global_species_scorenorm_cachedir(species_text) normalizer.save(cachedir, cfgstr=baseline_cfgstr) print('\n' + utool.msgblock(tag, 'Finished Training')) return normalizer
[docs]def try_download_baseline_ibeis_normalizer(ibs, qreq_): """ tries to download a baseline normalizer for some species. creates an empty normalizer if it cannot """ baseline_url_dict = { # TODO: Populate } species_text = '_'.join(qreq_.get_unique_species()) # HACK query_cfgstr = qreq_.qparams.query_cfgstr cachedir = qreq_.ibs.get_global_species_scorenorm_cachedir(species_text) key = species_text + query_cfgstr baseline_url = baseline_url_dict.get(key, None) if baseline_url is not None: try: cachedir = qreq_.ibs.get_global_species_scorenorm_cachedir(species_text) baseline_cachedir = join(cachedir, 'baseline') ut.ensuredir(baseline_cachedir) normalizer = ScoreNormalizer(cfgstr=query_cfgstr, prefix=species_text) normalizer.load(baseline_cachedir) except Exception: normalizer = None else: normalizer = None if normalizer is None: if False and ut.is_developer(['hyrule']): # train new normalizer. only do this on hyrule print('Baseline does not exist and cannot be downlaoded. Training baseline') normalizer = train_baseline_ibeis_normalizer(qreq_.ibs) else: # return empty score normalizer normalizer = ScoreNormalizer(cfgstr=query_cfgstr, prefix=species_text) print('returning empty normalizer') #raise NotImplementedError('return the nodata noramlizer with 1/2 default') return normalizer
@profile
[docs]def request_ibeis_normalizer(qreq_, verbose=True): r""" FIXME: do what is in the docstr Any loaded normalizer must be configured on the query_cfg of the query request. This ensures that all of the support data fed to the normalizer is consistent. First try to lod the normalizer from the in-memory cache. If that fails try to load a custom normalizer from the local directory If that fails try to load a custom normalizer from the global directory If that fails try to (download and) load the baseline normalizer from the global directory If that fails return empty score normalizer. As queries are run the normalizer should be udpated and saved under the custom normalizer in the local directory. Tries to load the best possible normalizer for this query request. If none are found then a it tries to load a downloaded baseline. If none exists then it starts to compute a custom baseline. The basline probability for an empty normalizer should be 1/2. The probability of a baseline normalizer should be regularized to stay close to 1/2 when there is little support. Returns: ScoreNormalizer: cached or prebuilt score normalizer Example: >>> from ibeis.model.hots.score_normalization import * # NOQA >>> from ibeis.model.hots import query_request >>> import ibeis >>> ibs = ibeis.opendb(db='PZ_MTEST') >>> qaid_list = [1] >>> daid_list = [1, 2, 3, 4, 5] >>> cfgdict = dict(codename='vsone_unnorm') >>> #cfgdict = dict(codename='vsone_unnorm') >>> qreq_ = query_request.new_ibeis_query_request(ibs, qaid_list, daid_list, cfgdict=cfgdict) >>> normalizer = request_ibeis_normalizer(qreq_) >>> normalizer.add_support([100], [10], [1], [2]) """ global NORMALIZER_CACHE if not USE_NORMALIZER_CACHE: normalizer = try_download_baseline_ibeis_normalizer(qreq_.ibs, qreq_) return normalizer species_text = '_'.join(qreq_.get_unique_species()) # HACK query_cfgstr = qreq_.get_pipe_cfgstr() cfgstr = species_text + query_cfgstr if NORMALIZER_CACHE.has_key(cfgstr): # NOQA # use memory cache normalizer = NORMALIZER_CACHE[cfgstr] if verbose: print('[scorenorm] returning memorycache normalizer') return normalizer def try_custom_local(): try: cachedir = qreq_.ibs.get_local_species_scorenorm_cachedir(species_text) normalizer = ScoreNormalizer(cfgstr=query_cfgstr, prefix=species_text) normalizer.load(cachedir) if verbose: print('[scorenorm] returning local custom normalizer') return normalizer except Exception: return None def try_custom_global(): try: cachedir = qreq_.ibs.get_global_species_scorenorm_cachedir(species_text) normalizer = ScoreNormalizer(cfgstr=query_cfgstr, prefix=species_text) normalizer.load(cachedir) if verbose: print('[scorenorm] returning global custom normalizer') return normalizer except Exception: return None normalizer = try_custom_local() if normalizer is None: normalizer = try_custom_global() if normalizer is None: normalizer = try_download_baseline_ibeis_normalizer(qreq_.ibs, qreq_) if verbose: print('[scorenorm] returning baseline normalizer') assert normalizer is not None, 'something failed' # Save to memory cache NORMALIZER_CACHE[cfgstr] = normalizer return normalizer
[docs]def cached_ibeis_score_normalizer(ibs, cm_list, qreq_, use_cache=True, **learnkw): r""" Builds a normalizer trained on query results for a database Args: qaid2_qres (int): query annotation id Returns: ScoreNormalizer: cached or freshly trained score normalizer CommandLine: python -m ibeis.model.hots.score_normalization --test-cached_ibeis_score_normalizer Example: >>> # ENABLE_DOCTEST >>> from ibeis.model.hots.score_normalization import * # NOQA >>> import ibeis >>> ibeis._init_numpy() >>> dbname = 'PZ_MTEST' >>> ibs = ibeis.opendb(dbname) >>> qaid_list = daid_list = ibs.get_valid_aids()[1:14:2] >>> cfgdict = dict(codename='vsone_unnorm', prescore_method='nsum') >>> use_cache = True >>> cm_list, qreq_ = ibs.query_chips(qaid_list, daid_list, cfgdict, use_cache=True, save_qcache=True, return_request=True, return_cm=True) >>> assert cm_list[0].qaid == qaid_list[0], 'inconsistent' >>> score_normalizer = cached_ibeis_score_normalizer(ibs, cm_list, qreq_) >>> result = score_normalizer.get_fname() >>> result += '\n' + score_normalizer.get_cfgstr() >>> print(result) zebra_plains_normalizer_4rauirubzfudap8h.cPkl _vsone_NN(single,K1+1,last,cks704)_NNWeight(ratio_thresh=0.625,fg)_SV(0.01;2.0;1.57minIn=4,nNRR=50,nARR=6,nsum,cdl,)_AGG(nsum)_FLANN(8_kdtrees)_RRVsOne(False)_FEATWEIGHT(ON,uselabel,rf)_FEAT(hesaff+sift_)_CHIP(sz450) zebra_plains_normalizer_x@!cxcgfncxz97mo.cPkl _vsone_NN(single,K1+1,last,cks704)_FILT(ratio<0.625;1.0,fg;1.0)_SV(0.01;2;1.57minIn=4,nRR=50,nsum,)_AGG(nsum)_FLANN(8_kdtrees)_FEATWEIGHT(ON,uselabel,rf)_FEAT(hesaff+sift_)_CHIP(sz450) zebra_plains_normalizer_n%w@df%th@i@seel.cPkl _vsone_NN(single,K1+1,last,cks1024)_FILT(ratio<0.625;1.0,fg;1.0)_SV(0.01;2;1.57minIn=4,nRR=50,nsum,)_AGG(nsum)_FLANN(4_kdtrees)_FEATWEIGHT(ON,uselabel,rf)_FEAT(hesaff+sift_)_CHIP(sz450) normalizer_5cv1%3s&.cPkl PZ_MTEST_DSUUIDS((9)67j%dr%&bl%4oh4+)_QSUUIDS((9)67j%dr%&bl%4oh4+)zebra_plains_vsone_NN(single,K1+1,last,cks1024)_FILT(ratio<0.625;1.0,fg;1.0)_SV(0.01;2;1.57minIn=4,nRR=50,nsum,)_AGG(nsum)_FLANN(4_kdtrees)_FEATWEIGHT(ON,uselabel,rf)_FEAT(hesaff+sift_)_CHIP(sz450) normalizer_PZ_MTEST_SUUIDS((9)67j%dr%&bl%4oh4+).cPkl """ # Collect training data #cfgstr = ibs.get_dbname() + ibs.get_annot_hashid_semantic_uuid(qaid_list) species_text = '_'.join(qreq_.get_unique_species()) # HACK #data_hashid = qreq_.get_data_hashid() #query_hashid = qreq_.get_query_hashid() query_cfgstr = qreq_.get_pipe_cfgstr() prefix = species_text cfgstr = query_cfgstr #ibs.get_dbname() + data_hashid + query_hashid + species_text + query_cfgstr cachedir = ibs.get_local_species_scorenorm_cachedir(species_text) try: if use_cache is False: raise Exception('forced normalizer cache miss') normalizer = ScoreNormalizer(cfgstr) normalizer.load(cachedir) print('returning cached normalizer') except Exception as ex: print('cannot load noramlizer so computing on instead') ut.printex(ex, iswarning=True) #qaid_list = qreq_.get_external_qaids() normalizer = learn_ibeis_score_normalizer(ibs, cm_list, cfgstr, prefix, **learnkw) normalizer.save(cachedir) return normalizer # LEARNING FUNCTIONS
[docs]def learn_ibeis_score_normalizer(ibs, cm_list, cfgstr, prefix, **learnkw): """ Takes the result of queries and trains a score normalizer Args: ibs (IBEISController): cm_list (list): object of feature correspondences and scores cfgstr (str): Returns: ScoreNormalizer: freshly trained score normalizer """ print('learning normalizer') # Get support datatup = get_ibeis_score_training_data(ibs, cm_list) (tp_support, tn_support, tp_support_labels, tn_support_labels) = datatup if len(tp_support) < 2 or len(tn_support) < 2: print('len(tp_support) = %r' % (len(tp_support),)) print('len(tn_support) = %r' % (len(tn_support),)) print('Warning: [score_normalization] not enough data') import warnings warnings.warn('Warning: [score_normalization] not enough data') # Train normalizer learntup = learn_score_normalization(tp_support, tn_support, return_all=False, **learnkw) (score_domain, p_tp_given_score, clip_score) = learntup # Return normalizer structure # NOTE: this is the only place that the normalizer is construct with # noncache args keep it that way. timestamp = ut.get_printable_timestamp() normalizer = ScoreNormalizer(cfgstr, score_domain, p_tp_given_score, tp_support, tn_support, tp_support_labels, tn_support_labels, clip_score, timestamp, prefix) return normalizer
[docs]def get_ibeis_score_training_data(ibs, cm_list): """ Returns "good" taining examples """ good_tp_nscores = [] good_tn_nscores = [] good_tp_aidnid_pairs = [] good_tn_aidnid_pairs = [] for qx, cm in enumerate(cm_list): qaid = cm.qaid #if not cm.is_nsum(): # raise AssertionError('must be nsum') if not ibs.get_annot_has_groundtruth(qaid): continue qnid = ibs.get_annot_name_rowids(cm.qaid) nscoretup = cm.get_nscoretup() (sorted_nids, sorted_nscores, sorted_aids, sorted_scores) = nscoretup sorted_ndiff = -np.diff(sorted_nscores.tolist()) sorted_nids = np.array(sorted_nids) is_positive = sorted_nids == qnid is_negative = np.logical_and(~is_positive, sorted_nids > 0) # Only take data from results with positive and negative examples if not np.any(is_positive) or not np.any(is_negative): continue gt_rank = np.nonzero(is_positive)[0][0] gf_rank = np.nonzero(is_negative)[0][0] # Only take correct groundtruth scores if gt_rank == 0 and len(sorted_nscores) > gf_rank: if len(sorted_ndiff) > gf_rank: good_tp_nscores.append(sorted_nscores[gt_rank]) good_tn_nscores.append(sorted_nscores[gf_rank]) good_tp_aidnid_pairs.append((qaid, sorted_nids[gt_rank])) good_tn_aidnid_pairs.append((qaid, sorted_nids[gf_rank])) tp_support = np.array(good_tp_nscores) tn_support = np.array(good_tn_nscores) tp_support_labels = good_tp_aidnid_pairs tn_support_labels = good_tp_aidnid_pairs return (tp_support, tn_support, tp_support_labels, tn_support_labels)
[docs]def learn_score_normalization(tp_support, tn_support, gridsize=1024, adjust=8, return_all=False, monotonize=True, clip_factor=(ut.PHI + 1)): r""" Takes collected data and applys parzen window density estimation and bayes rule. Args: tp_support (ndarray): tn_support (ndarray): gridsize (int): default 512 adjust (int): default 8 return_all (bool): default False monotonize (bool): default True clip_factor (float): default phi ** 2 Returns: tuple: (score_domain, p_tp_given_score, p_tn_given_score, p_score_given_tp, p_score_given_tn, p_score, clip_score) Example: >>> # ENABLE_DOCTEST >>> from ibeis.model.hots.score_normalization import * # NOQA >>> tp_support = np.linspace(100, 10000, 512) >>> tn_support = np.linspace(0, 120, 512) >>> (score_domain, p_tp_given_score, clip_score) = learn_score_normalization(tp_support, tn_support) >>> result = int(p_tp_given_score.sum()) >>> print(result) 92 """ # Estimate true positive density score_tp_pdf = vt.estimate_pdf(tp_support, gridsize=gridsize, adjust=adjust) score_tn_pdf = vt.estimate_pdf(tn_support, gridsize=gridsize, adjust=adjust) # Find good maximum score (for domain not learning) #clip_score = 2000 clip_score = find_score_maxclip(tp_support, tn_support, clip_factor) score_domain = np.linspace(0, clip_score, 1024) # Evaluate true negative density p_score_given_tp = score_tp_pdf.evaluate(score_domain) p_score_given_tn = score_tn_pdf.evaluate(score_domain) # Average to get probablity of any score p_score = (np.array(p_score_given_tp) + np.array(p_score_given_tn)) / 2.0 # Apply bayes p_tp = .5 p_tp_given_score = ut.bayes_rule(p_score_given_tp, p_tp, p_score) if monotonize: p_tp_given_score = vt.ensure_monotone_strictly_increasing( p_tp_given_score, zerohack=True, onehack=True) if return_all: p_tn_given_score = 1 - p_tp_given_score return (score_domain, p_tp_given_score, p_tn_given_score, p_score_given_tp, p_score_given_tn, p_score, clip_score) else: return (score_domain, p_tp_given_score, clip_score)
[docs]def find_score_maxclip(tp_support, tn_support, clip_factor=ut.PHI + 1): """ returns score to clip true positives past. Args: tp_support (ndarray): tn_support (ndarray): Returns: float: clip_score Example: >>> # ENABLE_DOCTEST >>> from ibeis.model.hots.score_normalization import * # NOQA >>> tp_support = np.array([100, 200, 50000]) >>> tn_support = np.array([10, 30, 110]) >>> clip_score = find_score_maxclip(tp_support, tn_support) >>> result = str(clip_score) >>> print(result) 287.983738762 """ max_true_positive_score = tp_support.max() max_true_negative_score = tn_support.max() if clip_factor is None: clip_score = max_true_positive_score else: overshoot_factor = max_true_positive_score / max_true_negative_score if overshoot_factor > clip_factor: clip_score = max_true_negative_score * clip_factor else: clip_score = max_true_positive_score return clip_score # DEBUGGING FUNCTIONS
[docs]def test_score_normalization(): """ CommandLine: python ibeis/model/hots/score_normalization.py --test-test_score_normalization python dev.py -t custom --cfg codename:vsone_unnorm --db PZ_MTEST --allgt --vf --va python dev.py -t custom --cfg codename:vsone_unnorm --db PZ_MTEST --allgt --vf --va --index 0:8:3 --dindex 0:10 --verbose Example: >>> # DISABLE_DOCTEST >>> #from ibeis.model.hots import score_normalization >>> #score_normalization.rrr() >>> from ibeis.model.hots.score_normalization import * # NOQA >>> locals_ = test_score_normalization() >>> execstr = ut.execstr_dict(locals_) >>> #print(execstr) >>> exec(execstr) >>> import plottool as pt >>> exec(pt.present()) """ import ibeis import plottool as pt # NOQA # Load IBEIS database dbname = 'PZ_MTEST' #dbname = 'GZ_ALL' ibs = ibeis.opendb(dbname) qaid_list = daid_list = ibs.get_valid_aids() # Get unnormalized query results #cfgdict = dict(codename='nsum_unnorm') cfgdict = dict(codename='vsone_unnorm') cm_list = ibs.query_chips(qaid_list, daid_list, cfgdict, return_cm=True) # Get a training sample datatup = get_ibeis_score_training_data(ibs, cm_list) (tp_support, tn_support, tp_support_labels, tn_support_labels) = datatup # Print raw score statistics ut.print_stats(tp_support, lbl='tp_support') ut.print_stats(tn_support, lbl='tn_support') normkw_list = ut.util_dict.all_dict_combinations( { 'monotonize': [True], # [True, False], #'adjust': [1, 4, 8], 'adjust': [4, 8], #'adjust': [8], } ) if len(normkw_list) > 32: raise AssertionError('Too many plots to test!') fnum = pt.next_fnum() true_color = pt.TRUE_BLUE # pt.TRUE_GREEN false_color = pt.FALSE_RED unknown_color = pt.UNKNOWN_PURP pt.plots.plot_sorted_scores( (tn_support, tp_support), ('true negative scores', 'true positive scores'), score_colors=(false_color, true_color), #logscale=True, logscale=False, figtitle='sorted nscores', fnum=fnum) for normkw in normkw_list: # Learn the appropriate normalization #normkw = {} # dict(gridsize=1024, adjust=8, clip_factor=ut.PHI + 1, return_all=True) (score_domain, p_tp_given_score, p_tn_given_score, p_score_given_tp, p_score_given_tn, p_score, clip_score) = learn_score_normalization(tp_support, tn_support, return_all=True, **normkw) assert clip_score > tn_support.max() inspect_pdfs(tn_support, tp_support, score_domain, p_tp_given_score, p_tn_given_score, p_score_given_tp, p_score_given_tn, p_score) pt.set_figtitle('ScoreNorm ' + ibs.get_dbname() + ' ' + ut.dict_str(normkw)) locals_ = locals() return locals_
[docs]def inspect_pdfs(tn_support, tp_support, score_domain, p_tp_given_score, p_tn_given_score, p_score_given_tp, p_score_given_tn, p_score, with_scores=False): import plottool as pt # NOQA fnum = pt.next_fnum() nRows = 2 + with_scores pnum_ = pt.get_pnum_func(nRows=nRows, nCols=1) #pnum_ = pt.get_pnum_func(nRows=3, nCols=1) #def next_pnum(): # return pnum_( def generate_pnum(): for px in range(nRows): yield pnum_(px) _pnumiter = generate_pnum().next pt.figure(fnum=fnum, pnum=pnum_(0)) if with_scores: plot_support(tn_support, tp_support, fnum=fnum, pnum=_pnumiter()) plot_prebayes_pdf(score_domain, p_score_given_tn, p_score_given_tp, p_score, cfgstr='', fnum=fnum, pnum=_pnumiter()) plot_postbayes_pdf(score_domain, p_tn_given_score, p_tp_given_score, cfgstr='', fnum=fnum, pnum=_pnumiter())
[docs]def plot_support(tn_support, tp_support, fnum=None, pnum=(1, 1, 1)): r""" Args: tn_support (ndarray): tp_support (ndarray): fnum (int): figure number pnum (tuple): plot number CommandLine: python -m ibeis.model.hots.score_normalization --test-plot_support Example: >>> # DISABLE_DOCTEST >>> from ibeis.model.hots.score_normalization import * # NOQA >>> tn_support = '?' >>> tp_support = '?' >>> fnum = None >>> pnum = (1, 1, 1) >>> result = plot_support(tn_support, tp_support, fnum, pnum) >>> print(result) """ import plottool as pt # NOQA if fnum is None: fnum = pt.next_fnum() true_color = pt.TRUE_BLUE # pt.TRUE_GREEN false_color = pt.FALSE_RED pt.plots.plot_sorted_scores( (tn_support, tp_support), ('trueneg scores', 'truepos scores'), score_colors=(false_color, true_color), #logscale=True, logscale=False, figtitle='sorted nscores', fnum=fnum, pnum=pnum)
[docs]def plot_prebayes_pdf(score_domain, p_score_given_tn, p_score_given_tp, p_score, cfgstr='', fnum=None, pnum=(1, 1, 1)): import plottool as pt # NOQA if fnum is None: fnum = pt.next_fnum() true_color = pt.TRUE_BLUE # pt.TRUE_GREEN false_color = pt.FALSE_RED unknown_color = pt.UNKNOWN_PURP pt.plots.plot_probabilities( (p_score_given_tn, p_score_given_tp, p_score), ('p(score | tn)', 'p(score | tp)', 'p(score)'), prob_colors=(false_color, true_color, unknown_color), figtitle='pre_bayes pdf score', xdata=score_domain, fnum=fnum, pnum=pnum)
[docs]def plot_postbayes_pdf(score_domain, p_tn_given_score, p_tp_given_score, cfgstr='', fnum=None, pnum=(1, 1, 1)): import plottool as pt # NOQA if fnum is None: fnum = pt.next_fnum() true_color = pt.TRUE_BLUE # pt.TRUE_GREEN false_color = pt.FALSE_RED pt.plots.plot_probabilities( (p_tn_given_score, p_tp_given_score), ('p(tn | score)', 'p(tp | score)'), prob_colors=(false_color, true_color,), figtitle='post_bayes pdf score ' + cfgstr, xdata=score_domain, fnum=fnum, pnum=pnum)
[docs]def test(): r""" >>> from ibeis.model.hots.score_normalization import * # NOQA """ #from ibeis.model.hots import query_request import ibeis ibs = ibeis.opendb(db='PZ_MTEST') qaid_list = [1, 2, 3, 4, 5] daid_list = [1, 2, 3, 4, 5] cfgdict = {'codename': 'nsum'} cm_list, qreq_ = ibs.query_chips(qaid_list, daid_list, use_cache=False, return_cm=True, cfgdict=cfgdict, return_request=True) qreq_.load_score_normalizer(qreq_.ibs) normalizer = qreq_.normalizer for cm in cm_list: aid_list = list(six.iterkeys(cm.aid2_score)) score_list = list(six.itervalues(cm.aid2_score)) #normalizer = normalizer prob_list = normalizer.normalize_score_list(score_list) cm.qaid2_score = dict(zip(aid_list, prob_list)) for cm in cm_list: print(list(six.itervalues(cm.qaid2_score))) #aid2_score = {aid: normalizer.no(score) for aid, score in } pass # DOCTEST MAIN
if __name__ == '__main__': """ CommandLine: python -m ibeis.model.hots.score_normalization python -m ibeis.model.hots.score_normalization --allexamples python -m ibeis.model.hots.score_normalization --allexamples --noface --nosrc """ import multiprocessing multiprocessing.freeze_support() # for win32 import utool as ut # NOQA ut.doctest_funcs()