Source code for ibeis.other.optimize_k

# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function
from six.moves import reduce
import fractions
import ibeis
import numpy as np
#import plottool as pt
import random
import scipy as sp
import utool as ut
from six.moves import builtins
import six
(print, rrr, profile) = ut.inject2(__name__, '[optimze_k]', DEBUG=False)


[docs]def collect_ibeis_training_annotations(ibs, nDaids_basis, verbose=True): # load a dataset #dbname = 'PZ_MTEST' #dbname = 'GZ_ALL' def get_set_groundfalse(ibs, qaids): # get groundfalse annots relative to the entire set valid_nids = ibs.get_valid_nids() qnids = ibs.get_annot_nids(qaids) nid_list = list(set(valid_nids) - set(qnids)) aids_list = ibs.get_name_aids(nid_list) return ut.flatten(aids_list) # determanism np.random.seed(0) random.seed(0) qaids_all = ibs.filter_junk_annotations(ibs.get_annot_rowid_sample(per_name=1, min_ngt=2, distinguish_unknowns=True)) qaids = qaids_all[::2] print('nQaids = %r' % len(qaids)) daids_gt_sample = ut.flatten(ibs.get_annot_groundtruth_sample(qaids, isexemplar=None)) daids_gf_all = get_set_groundfalse(ibs, qaids) ut.assert_eq(len(daids_gt_sample), len(qaids), 'missing gt') daids_list = [] for target_nDaids in ut.ProgressIter(nDaids_basis, lbl='testing dbsize'): print('---------------------------') # Sample one match from the groundtruth with padding daids_gf_sample = ut.random_sample(daids_gf_all, max(0, target_nDaids - len(daids_gt_sample))) daids = sorted(daids_gt_sample + daids_gf_sample) nDaids = len(daids) if target_nDaids != nDaids: continue daids_list.append(daids) return qaids, daids_list
[docs]def evaluate_training_data(ibs, qaids, daids_list, varydict, nDaids_basis, verbose=True): nError_list = [] nDaids_list = [] cfgdict_list2 = [] cfgdict_list = ut.all_dict_combinations(varydict) for daids in ut.ProgressIter(daids_list, lbl='testing dbsize'): nDaids = len(daids) print('\n---------------------------') with ut.Indenter('[nDaids=%r]' % (nDaids)): print('nDaids = %r' % nDaids) for cfgdict in ut.ProgressIter(cfgdict_list, lbl='testing cfgdict'): qreq_ = ibs.new_query_request(qaids, daids, cfgdict=cfgdict, verbose=verbose) qres_list = ibs.query_chips(qreq_=qreq_, verbose=verbose) gt_ranks_list = [qres.get_gt_ranks(ibs=ibs) for qres in qres_list] incorrect_list = [len(gt_ranks) == 0 or min(gt_ranks) != 0 for gt_ranks in gt_ranks_list] nErrors = sum(incorrect_list) nError_list.append(nErrors) nDaids_list.append(nDaids) cfgdict_list2.append(cfgdict.copy()) nError_list = np.array(nError_list) nDaids_list = np.array(nDaids_list) K_list = np.array([cfgdict['K'] for cfgdict in cfgdict_list2]) return nDaids_list, K_list, nError_list
[docs]def test_training_data(varydict, nDaids_basis): varydict['nDaids'] = nDaids_basis cfgdict_list = ut.all_dict_combinations(varydict) K_list = ut.get_list_column(cfgdict_list, 'K') nDaids_list = ut.get_list_column(cfgdict_list, 'nDaids') max_error = min(nDaids_basis) nError_perterb = np.random.rand(len(K_list)) #def distance_point_polynomial(point, poly_coeff): # """ # References: # http://kitchingroup.cheme.cmu.edu/blog/2013/02/14/Find-the-minimum-distance-from-a-point-to-a-curve/ # """ # def f(x): # return x ** 2 # def objective(X, *args): # point = args[0] # x, y = X # px, py = point # return np.sqrt((x - px) ** 2 + (y - py) ** 2) # def c1(X, *args): # x, y = X # return f(x) - y # X = sp.optimize.fmin_cobyla(objective, x0=[0.5, 0.5], args=(point,), cons=[c1], disp=False) # return X #point_list = np.array([point for point in zip(nDaids_list, K_list)]) #poly_coeff = [0.2, 0.5] # K model_params #closest_point_list = np.array([distance_point_polynomial(point, poly_coeff) for point in point_list]) #dist_list = np.sqrt(((point_list - closest_point_list) ** 2).sum(axis=1)) #nError_list = max_error * dist_list / dist_list.max() + nError_perterb nError_list = (np.array(nDaids_list) * .00001) nError_list /= nError_list.max() nError_list *= (max_error - 2) nError_list += 1 + nError_perterb #K_list = np.array([ 1, 1, 1, 4, 4, 4, 7, 7, 7, 10, 10, 10, 13, 13, 13]) #nDaids_list = np.array([100, 500, 1000, 100, 500, 1000, 100, 500, 1000, 100, 500, 1000, 100, 500, 1000]) #nError_list = np.array([ 5, 54, 130, 50, 50, 70, 14, 54, 40, 20, 9, 43, 90, 20, 130]) return nDaids_list, K_list, nError_list # Convert our non-uniform grid into a uniform grid using gcd
[docs]def compute_interpolation_grid(known_nd_data, pad_steps=0): """ use gcd to get the number of steps to take in each dimension """ ug_steps = [reduce(fractions.gcd, np.unique(x_).tolist()) for x_ in known_nd_data.T] ug_min = known_nd_data.min(axis=0) ug_max = known_nd_data.max(axis=0) ug_basis = [ np.arange(min_ - (step_ * pad_steps), max_ + (step_ * (pad_steps + 1)), step_) for min_, max_, step_ in zip(ug_min, ug_max, ug_steps) ] ug_shape = tuple([basis.size for basis in ug_basis][::-1]) # ig = interpolated grid unknown_nd_data = np.vstack([_pts.flatten() for _pts in np.meshgrid(*ug_basis)]).T return unknown_nd_data, ug_shape
[docs]def interpolate_error(known_nd_data, known_targets, unknown_nd_data): """ References: http://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.interpolate.griddata.html """ #method = 'cubic' # {'linear', 'nearest', 'cubic'} method = 'linear' # {'linear', 'nearest', 'cubic'} interpolated_targets = sp.interpolate.griddata(known_nd_data, known_targets, unknown_nd_data, method=method) interpolated_targets[np.isnan(interpolated_targets)] = known_targets.max() * 2 return interpolated_targets
[docs]def compute_K(nDaids, model_params, force_int=True): """ Args: nDaids (int): number of database annotations to compute K for model_params (list): coefficients of the n-degree polynomial CommandLine: python -m ibeis.other.optimize_k --test-compute_K --show Example: >>> # DISABLE_DOCTEST >>> from ibeis.other.optimize_k import * # NOQA >>> import plottool as pt >>> nDaids_list = np.arange(0, 1000) >>> model_params = [.2, .5] >>> K_list = compute_K(nDaids_list, model_params) >>> pt.plot2(nDaids_list, K_list, x_label='num_names', y_label='K', ... equal_aspect=False, marker='g-', pad=1, dark=True) >>> pt.show_if_requested() """ K = np.polyval(model_params, nDaids) if force_int: K = np.round(K) return K
[docs]def minimize_compute_K_params(known_nd_data, known_target_points, given_data_dims): """ References: http://docs.scipy.org/doc/scipy-0.14.0/reference/optimize.html """ poly_degree = 1 mode = 'brute' #mode = 'simplex' if poly_degree == 2: initial_model_params = [0, 0.2, 0.5] # a guess ranges = (slice(0, 1, .1), slice(0, 1, .1), slice(0, 1, .1)) #brute_force_basis = list(map(np.mgrid.__getitem__, ranges)) elif poly_degree == 1: #initial_model_params = [ 0.00814424, 0.1855764 ] initial_model_params = [ 6.73655087e-05, 9.25] initial_model_params = [ 0, 10] #initial_model_params = [0.02, 0.5] fidelity = 10 ranges = (slice(0, 1, .01 * fidelity), slice(0, 10, .1 * fidelity)) else: assert poly_degree > 2 initial_model_params = [0 for _ in range(poly_degree)] ranges = [slice(-2, 2, .1) for _ in range(poly_degree)] #raise AssertionError('Unknown poly_degree=%r' % (poly_degree,)) infiter = builtins.iter(int, 1) # TODO: progress iter for unknown size if mode == 'brute': brute_force_basis = list(map(np.mgrid.__getitem__, ranges)) nTotal = np.prod([_basis.size for _basis in brute_force_basis]) else: nTotal = 1 optprog = ut.ProgressIter(infiter, nTotal=nTotal, lbl='optimizing', freq=1) optprogiter = builtins.iter(optprog) def objective_func(model_params, *args): known_nd_data, known_target_points, unique_nDaids = args # Return the error over all of the daids K_list = np.array([compute_K(_nDaids, model_params, force_int=False) for _nDaids in unique_nDaids]) six.next(optprogiter) if np.any(K_list <= 0): return np.inf unknown_nd_data = np.vstack([unique_nDaids, K_list]).T error_list = interpolate_error(known_nd_data, known_target_points, unknown_nd_data) total_error = error_list.sum() #print('-----------------') #print('model_params = %s' % (np.array_str(np.array(model_params)),)) #print('K_list = %s' % (np.array_str(np.array(K_list)),)) #print('total_error = %r' % (total_error,)) return total_error unique_nDaids = np.unique(known_nd_data.take(given_data_dims, axis=1)) args = known_nd_data, known_target_points, unique_nDaids if mode == 'simplex': #method = 'Nelder-Mead' _out = sp.optimize.fmin(objective_func, initial_model_params, xtol=.01, args=args, disp=True, full_output=True) xopt, fopt, nIter, funcalls, warnflag = _out[:5] #, allvecs opt_model_params = xopt #opt_model_params = sp.optimize.basinhopping(objective_func, guess, args=args) #opt_model_params = sp.optimize.brute(objective_func, ranges, args=args, ) elif mode == 'brute': x0, fval, grid, Jout = sp.optimize.brute( objective_func, ranges, args=args, full_output=True) opt_model_params = x0 else: raise AssertionError('Unknown mode=%r' % (mode,)) opt_K_list = [compute_K(_nDaids, opt_model_params) for _nDaids in unique_nDaids] print('opt_model_params = %r' % (opt_model_params,)) print('opt_K_list = %r' % (opt_K_list,)) return opt_model_params
[docs]def plot_search_surface(known_nd_data, known_target_points, given_data_dims, opt_model_params=None): import plottool as pt pt.figure(2, doclf=True) # Interpolate uniform grid positions unknown_nd_data, ug_shape = compute_interpolation_grid(known_nd_data, 0 * 5) interpolated_error = interpolate_error(known_nd_data, known_target_points, unknown_nd_data) ax = pt.plot_surface3d( unknown_nd_data.T[0].reshape(ug_shape), unknown_nd_data.T[1].reshape(ug_shape), interpolated_error.reshape(ug_shape), xlabel='nDaids', ylabel='K', zlabel='error', rstride=1, cstride=1, cmap=pt.plt.get_cmap('jet'), wire=True, #norm=pt.mpl.colors.Normalize(0, 1), #shade=False, #dark=False, ) ax.scatter(known_nd_data.T[0], known_nd_data.T[1], known_target_points, s=100, c=pt.YELLOW) assert len(given_data_dims) == 1, 'can only plot 1 given data dim' xdim = given_data_dims[0] ydim = (xdim + 1) % (len(known_nd_data.T)) known_nd_min = known_nd_data.min(axis=0) known_nd_max = known_nd_data.max(axis=0) xmin, xmax = known_nd_min[xdim], known_nd_max[xdim] ymin, ymax = known_nd_min[ydim], known_nd_max[ydim] zmin, zmax = known_target_points.min(), known_target_points.max() if opt_model_params is not None: # plot learned data if availabel #given_known_nd_data = known_nd_data.take(given_data_dims, axis=1) xdata = np.linspace(xmin, xmax) ydata = compute_K(xdata, opt_model_params) xydata = np.array((xdata, ydata)).T zdata = interpolate_error(known_nd_data, known_target_points, xydata) ax.plot(xdata, ydata, zdata, c=pt.ORANGE) ymax = max(ymax, ydata.max()) ymin = min(ymin, ydata.min()) zmin = min(zmin, zdata.min()) zmax = max(zmax, zdata.max()) ax.scatter(xdata, ydata, zdata, s=100, c=pt.ORANGE) #[t.set_color('white') for t in ax.xaxis.get_ticklines()] #[t.set_color('white') for t in ax.xaxis.get_ticklabels()] ax.set_aspect('auto') ax.set_xlim(xmin, xmax) ax.set_ylim(ymin, ymax) ax.set_zlim(zmin, zmax) import matplotlib.ticker as mtick ax.zaxis.set_major_formatter(mtick.FormatStrFormatter('%.2f')) return ax
[docs]def learn_k(): r""" CommandLine: python -m ibeis.other.optimize_k --test-learn_k python -m ibeis.other.optimize_k --test-learn_k --show python -m ibeis.other.optimize_k --test-learn_k --show --dummy Example: >>> # DISABLE_DOCTEST >>> from ibeis.other.optimize_k import * # NOQA >>> import plottool as pt >>> # build test data >>> # execute function >>> known_nd_data, known_target_points, given_data_dims, opt_model_params = learn_k() >>> # verify results >>> ut.quit_if_noshow() >>> plot_search_surface(known_nd_data, known_target_points, given_data_dims, opt_model_params) >>> pt.all_figures_bring_to_front() >>> pt.show_if_requested() """ # Compute Training Data varydict = { #'K': [4, 7, 10, 13, 16, 19, 22, 25][:4], #'K': [1, 2, 3, 4, 8, 10, 13, 15], 'K': [1, 2, 4, 8, 16], #'nDaids': [20, 100, 250, 500, 750, 1000], } nDaids_basis = [20, 30, 50, 75, 100, 200, 250, 300, 325, 350, 400, 500, 600, 750, 800, 900, 1000, 1500] DUMMY = ut.get_argflag('--dummy') if DUMMY: nDaids_list, K_list, nError_list = test_training_data(varydict, nDaids_basis) nError_list = nError_list.astype(np.float32) / nError_list.max() else: dbname = ut.get_argval('--db', default='PZ_Master0') ibs = ibeis.opendb(dbname) verbose = False qaids, daids_list = collect_ibeis_training_annotations(ibs, nDaids_basis, verbose=verbose) nDaids_list, K_list, nError_list = evaluate_training_data(ibs, qaids, daids_list, varydict, nDaids_basis, verbose=verbose) nError_list = nError_list.astype(np.float32) / len(qaids) print('\nFinished Get Training Data') print('len(qaids) = %r' % (len(qaids))) print(ut.get_stats_str(nError_list)) #unique_nDaids = np.unique(nDaids_list) # Alias to general optimization problem known_nd_data = np.vstack([nDaids_list, K_list]).T known_target_points = nError_list # Mark the data we are given vs what we want to learn given_data_dims = [0] #learn_data_dims = [1] # Minimize K params opt_model_params = minimize_compute_K_params(known_nd_data, known_target_points, given_data_dims) return known_nd_data, known_target_points, given_data_dims, opt_model_params
if __name__ == '__main__': """ CommandLine: python -m ibeis.other.optimize_k python -m ibeis.other.optimize_k --allexamples python -m ibeis.other.optimize_k --allexamples --noface --nosrc """ import multiprocessing multiprocessing.freeze_support() # for win32 import utool as ut # NOQA ut.doctest_funcs()