from __future__ import absolute_import, division, print_function, unicode_literals
# from six.moves import zip
import dtool
import utool as ut
import vtool as vt
import six
import pyflann
import numpy as np
# import cv2
from ibeis.control.controller_inject import register_preprocs, register_subprops
(print, rrr, profile) = ut.inject2(__name__, '[new_annots]')
derived_attribute = register_preprocs['annot']
register_subprop = register_subprops['annot']
# dtool.Config.register_func = derived_attribute
@ut.memoize
[docs]def testdata_vocab():
from ibeis.new_annots import * # NOQA
import ibeis
ibs, aid_list = ibeis.testdata_aids('testdb1')
depc = ibs.depc_annot
fid_list = depc.get_rowids('feat', aid_list)
config = VocabConfig()
vocab = compute_vocab(depc, fid_list, config)
return ibs, aid_list, vocab
@six.add_metaclass(ut.ReloadingMetaclass)
[docs]class VisualVocab(ut.NiceRepr):
def __init__(self, words=None):
self.wx2_word = words
self.wordflann = pyflann.FLANN()
self.flann_params = vt.get_flann_params(random_seed=42)
[docs] def reindex(self, verbose=True):
num_vecs = len(self.wx2_word)
if verbose:
print('[nnindex] ...building kdtree over %d points (this may take a sec).' % num_vecs)
tt = ut.tic(msg='Building vocab index')
if num_vecs == 0:
print('WARNING: CANNOT BUILD FLANN INDEX OVER 0 POINTS. THIS MAY BE A SIGN OF A DEEPER ISSUE')
else:
self.wordflann.build_index(self.wx2_word, **self.flann_params)
if verbose:
ut.toc(tt)
[docs] def nn_index(self, idx2_vec, nAssign):
"""
>>> idx2_vec = depc.d.get_feat_vecs(aid_list)[0]
>>> self = vocab
>>> nAssign = 1
"""
# Assign each vector to the nearest visual words
assert nAssign > 0, 'cannot assign to 0 neighbors'
try:
_idx2_wx, _idx2_wdist = self.wordflann.nn_index(idx2_vec, nAssign)
except pyflann.FLANNException as ex:
ut.printex(ex, 'probably misread the cached flann_fpath=%r' % (self.wordflann.flann_fpath,))
raise
else:
_idx2_wx = vt.atleast_nd(_idx2_wx, 2)
_idx2_wdist = vt.atleast_nd(_idx2_wdist, 2)
return _idx2_wx, _idx2_wdist
[docs] def assign_to_words(self, idx2_vec, nAssign, massign_alpha=1.2,
massign_sigma=80.0, massign_equal_weights=False):
"""
Assigns descriptor-vectors to nearest word.
Args:
wordflann (FLANN): nearest neighbor index over words
words (ndarray): vocabulary words
idx2_vec (ndarray): descriptors to assign
nAssign (int): number of words to assign each descriptor to
massign_alpha (float): multiple-assignment ratio threshold
massign_sigma (float): multiple-assignment gaussian variance
massign_equal_weights (bool): assign equal weight to all multiassigned words
Returns:
tuple: inverted index, multi-assigned weights, and forward index
formated as::
* wx2_idxs - word index -> vector indexes
* wx2_maws - word index -> multi-assignment weights
* idf2_wxs - vector index -> assigned word indexes
Example:
>>> # SLOW_DOCTEST
>>> idx2_vec = depc.d.get_feat_vecs(aid_list)[0][0::300]
>>> idx2_vec = np.vstack((idx2_vec, self.wx2_word[0]))
>>> nAssign = 2
>>> massign_equal_weights = False
>>> massign_alpha = 1.2
>>> massign_sigma = 80.0
>>> nAssign = 2
>>> idx2_wxs, idx2_maws = self.assign_to_words(idx2_vec, nAssign)
>>> print('idx2_maws = %s' % (ut.repr2(idx2_wxs, precision=2),))
>>> print('idx2_wxs = %s' % (ut.repr2(idx2_maws, precision=2),))
"""
if ut.VERBOSE:
print('[smk_index.assign] +--- Start Assign vecs to words.')
print('[smk_index.assign] * nAssign=%r' % nAssign)
if not ut.QUIET:
print('[smk_index.assign] assign_to_words_. len(idx2_vec) = %r' % len(idx2_vec))
_idx2_wx, _idx2_wdist = self.nn_index(idx2_vec, nAssign)
if nAssign > 1:
idx2_wxs, idx2_maws = self._compute_multiassign_weights(
_idx2_wx, _idx2_wdist, massign_alpha, massign_sigma, massign_equal_weights)
else:
idx2_wxs = _idx2_wx.tolist()
idx2_maws = [[1.0]] * len(idx2_wxs)
return idx2_wxs, idx2_maws
[docs] def invert_assignment(self, idx2_wxs, idx2_maws, *other_idx2_prop):
"""
Inverts assignment of vectors to words into words to vectors.
Example:
>>> idx2_idx = np.arange(len(idx2_wxs))
>>> other_idx2_prop = (idx2_idx,)
>>> wx2_idxs, wx2_maws, wx2_idxs_ = self.invert_assignment(idx2_wxs, idx2_maws, idx2_idx)
>>> assert ut.dict_str(wx2_idxs) == ut.dict_str(wx2_idxs_)
"""
# Invert mapping -- Group by word indexes
idx2_nAssign = [len(wxs) for wxs in idx2_wxs]
jagged_idxs = [[idx] * num for idx, num in enumerate(idx2_nAssign)]
wx_keys, groupxs = vt.jagged_group(idx2_wxs)
idxs_list = vt.apply_jagged_grouping(jagged_idxs, groupxs)
wx2_idxs = dict(zip(wx_keys, idxs_list))
maws_list = vt.apply_jagged_grouping(idx2_maws, groupxs)
wx2_maws = dict(zip(wx_keys, maws_list))
other_wx2_prop = []
for idx2_prop in other_idx2_prop:
# Props are assumed to be non-jagged, so make them jagged
jagged_prop = [[prop] * num for prop, num in zip(idx2_prop, idx2_nAssign)]
prop_list = vt.apply_jagged_grouping(jagged_prop, groupxs)
wx2_prop = dict(zip(wx_keys, prop_list))
other_wx2_prop.append(wx2_prop)
if ut.VERBOSE:
print('[smk_index.assign] L___ End Assign vecs to words.')
assignment = (wx2_idxs, wx2_maws) + tuple(other_wx2_prop)
return assignment
@staticmethod
def _compute_multiassign_weights(_idx2_wx, _idx2_wdist, massign_alpha=1.2,
massign_sigma=80.0,
massign_equal_weights=False):
"""
Multi Assignment Weight Filtering from Improving Bag of Features
Args:
massign_equal_weights (): Turns off soft weighting. Gives all assigned
vectors weight 1
Returns:
tuple : (idx2_wxs, idx2_maws)
References:
(Improving Bag of Features)
http://lear.inrialpes.fr/pubs/2010/JDS10a/jegou_improvingbof_preprint.pdf
(Lost in Quantization)
http://www.robots.ox.ac.uk/~vgg/publications/papers/philbin08.ps.gz
(A Context Dissimilarity Measure for Accurate and Efficient Image Search)
https://lear.inrialpes.fr/pubs/2007/JHS07/jegou_cdm.pdf
Example:
>>> massign_alpha = 1.2
>>> massign_sigma = 80.0
>>> massign_equal_weights = False
Notes:
sigma values from \cite{philbin_lost08}
(70 ** 2) ~= 5000, (80 ** 2) ~= 6250, (86 ** 2) ~= 7500,
"""
if not ut.QUIET:
print('[smk_index.assign] compute_multiassign_weights_')
if _idx2_wx.shape[1] <= 1:
idx2_wxs = _idx2_wx.tolist()
idx2_maws = [[1.0]] * len(idx2_wxs)
else:
# Valid word assignments are beyond fraction of distance to the nearest word
massign_thresh = _idx2_wdist.T[0:1].T.copy()
# HACK: If the nearest word has distance 0 then this threshold is too hard
# so we should use the distance to the second nearest word.
EXACT_MATCH_HACK = True
if EXACT_MATCH_HACK:
flag_too_close = (massign_thresh == 0)
massign_thresh[flag_too_close] = _idx2_wdist.T[1:2].T[flag_too_close]
# Compute the threshold fraction
epsilon = .001
np.add(epsilon, massign_thresh, out=massign_thresh)
np.multiply(massign_alpha, massign_thresh, out=massign_thresh)
# Mark assignments as invalid if they are too far away from the nearest assignment
invalid = np.greater_equal(_idx2_wdist, massign_thresh)
if ut.VERBOSE:
nInvalid = (invalid.size - invalid.sum(), invalid.size)
print('[maw] + massign_alpha = %r' % (massign_alpha,))
print('[maw] + massign_sigma = %r' % (massign_sigma,))
print('[maw] + massign_equal_weights = %r' % (massign_equal_weights,))
print('[maw] * Marked %d/%d assignments as invalid' % nInvalid)
if massign_equal_weights:
# Performance hack from jegou paper: just give everyone equal weight
masked_wxs = np.ma.masked_array(_idx2_wx, mask=invalid)
idx2_wxs = list(map(ut.filter_Nones, masked_wxs.tolist()))
#ut.embed()
if ut.DEBUG2:
assert all([isinstance(wxs, list) for wxs in idx2_wxs])
idx2_maws = [np.ones(len(wxs), dtype=np.float32) for wxs in idx2_wxs]
else:
# More natural weighting scheme
# Weighting as in Lost in Quantization
gauss_numer = np.negative(_idx2_wdist.astype(np.float64))
gauss_denom = 2 * (massign_sigma ** 2)
gauss_exp = np.divide(gauss_numer, gauss_denom)
unnorm_maw = np.exp(gauss_exp)
# Mask invalid multiassignment weights
masked_unorm_maw = np.ma.masked_array(unnorm_maw, mask=invalid)
# Normalize multiassignment weights from 0 to 1
masked_norm = masked_unorm_maw.sum(axis=1)[:, np.newaxis]
masked_maw = np.divide(masked_unorm_maw, masked_norm)
masked_wxs = np.ma.masked_array(_idx2_wx, mask=invalid)
# Remove masked weights and word indexes
idx2_wxs = list(map(ut.filter_Nones, masked_wxs.tolist()))
idx2_maws = list(map(ut.filter_Nones, masked_maw.tolist()))
#with ut.EmbedOnException():
if ut.DEBUG2:
checksum = [sum(maws) for maws in idx2_maws]
for x in np.where([not ut.almost_eq(val, 1) for val in checksum])[0]:
print(checksum[x])
print(_idx2_wx[x])
print(masked_wxs[x])
print(masked_maw[x])
print(massign_thresh[x])
print(_idx2_wdist[x])
#all([ut.almost_eq(x, 1) for x in checksum])
assert all([ut.almost_eq(val, 1) for val in checksum]), 'weights did not break evenly'
return idx2_wxs, idx2_maws
def __nice__(self):
return ' nW=%r' % (ut.safelen(self.wx2_word))
[docs] def on_load(nnindexer, depc):
pass
[docs] def on_save(nnindexer, depc, fpath):
pass
def __getstate__(self):
# TODO: Figure out how to make these play nice with the depcache
state = self.__dict__
del state['wordflann']
return state
def __setstate__(self, state_dict):
self.__dict__.update(state_dict)
@six.add_metaclass(ut.ReloadingMetaclass)
[docs]class InvertedIndex(ut.NiceRepr):
def __init__(self):
pass
[docs]class VocabConfig(dtool.Config):
"""
Example:
>>> from ibeis.core_annots import * # NOQA
>>> cfg = VocabConfig()
>>> result = str(cfg)
>>> print(result)
"""
_param_info_list = [
ut.ParamInfo('algorithm', 'kdtree', 'alg'),
ut.ParamInfo('random_seed', 42, 'seed'),
ut.ParamInfo('num_words', 1000, 'seed'),
ut.ParamInfo('version', 1),
# max iters
# flann params
# random seed
]
_sub_config_list = [
]
@derived_attribute(
tablename='vocab', parents=['feat*'],
colnames=['words'], coltypes=[VisualVocab],
configclass=VocabConfig,
chunksize=1, fname='visual_vocab',
# func_is_single=True,
# _internal_parent_ids=False, # Give the function nicer ids to work with
# _internal_parent_ids=True, # Give the function nicer ids to work with
)
[docs]def compute_vocab(depc, fid_list, config):
r"""
Args:
depc (dtool.DependencyCache):
fids_list (list):
config (dtool.Config):
CommandLine:
python -m ibeis.core_annots --exec-compute_neighbor_index --show
python -m ibeis.control.IBEISControl --test-show_depc_annot_table_input --show --tablename=neighbor_index
Example:
>>> # DISABLE_DOCTEST
>>> from ibeis.new_annots import * # NOQA
>>> import ibeis
>>> ibs, aid_list = ibeis.testdata_aids('testdb1')
>>> depc = ibs.depc_annot
>>> fid_list = depc.get_rowids('feat', aid_list)
>>> config = VocabConfig()
>>> vocab, train_vecs = ut.exec_func_src(compute_vocab, key_list=['vocab', 'train_vecs'])
>>> idx2_vec = depc.d.get_feat_vecs(aid_list)[0]
>>> self = vocab
>>> ut.quit_if_noshow()
>>> data = train_vecs
>>> centroids = vocab.wx2_word
>>> import plottool as pt
>>> vt.plot_centroids(data, centroids, num_pca_dims=2)
>>> ut.show_if_requested()
>>> #config = ibs.depc_annot['vocab'].configclass()
>>>
"""
print('[IBEIS] COMPUTE_VOCAB:')
vecs_list = depc.get_native('feat', fid_list, 'vecs')
train_vecs = np.vstack(vecs_list)
num_words = config['num_words']
max_iters = 100
print('[smk_index] Train Vocab(nWords=%d) using %d annots and %d descriptors' %
(num_words, len(fid_list), len(train_vecs)))
flann_params = vt.get_flann_params(random_seed=42)
kwds = dict(
max_iters=max_iters,
flann_params=flann_params
)
words = vt.akmeans(train_vecs, num_words, **kwds)
vocab = VisualVocab(words)
vocab.reindex()
return vocab
@six.add_metaclass(ut.ReloadingMetaclass)
[docs]class StackedFeatures(ut.NiceRepr):
def __init__(fstack, ibs, aid_list, config=None):
ax2_vecs = ibs.depc_annot.d.get_feat_vecs(aid_list, config=config)
fstack.config = config
fstack.ibs = ibs
fstack.ax2_aid = aid_list
fstack.ax2_nFeat = [len(vecs) for vecs in ax2_vecs]
fstack.idx2_fxs = ut.flatten([list(range(num)) for num in fstack.ax2_nFeat])
fstack.idx2_axs = ut.flatten([[ax] * num for ax, num in enumerate(fstack.ax2_nFeat)])
fstack.idx2_vec = np.vstack(ax2_vecs)
#fstack.idx2_fxs = vt.atleast_nd(fstack.idx2_fxs, 2)
#fstack.idx2_axs = vt.atleast_nd(fstack.idx2_axs, 2)
fstack.num_feat = sum(fstack.ax2_nFeat)
def __nice__(fstack):
return ' nA=%r nF=%r' % (ut.safelen(fstack.ax2_aid), fstack.num_feat)
[docs] def inverted_assignment(fstack, vocab, nAssign=1):
# do work
idx2_wxs, idx2_maws = vocab.assign_to_words(fstack.idx2_vec, nAssign)
# Pack into structure
forward_assign = (idx2_wxs, idx2_maws, fstack.idx2_fxs, fstack.idx2_axs)
invert_assign = vocab.invert_assignment(*forward_assign)
(wx2_idxs, wx2_maws, wx2_fxs, wx2_axs) = invert_assign
invassign = InvertedStackAssignment(fstack, vocab, wx2_idxs, wx2_maws, wx2_fxs, wx2_axs)
return invassign
@six.add_metaclass(ut.ReloadingMetaclass)
[docs]class InvertedStackAssignment(ut.NiceRepr):
def __init__(invassign, fstack, vocab, wx2_idxs, wx2_maws, wx2_fxs, wx2_axs):
invassign.fstack = fstack
invassign.vocab = vocab
invassign.wx2_idxs = wx2_idxs
invassign.wx2_maws = wx2_maws
invassign.wx2_fxs = wx2_fxs
invassign.wx2_axs = wx2_axs
invassign.wx2_num = ut.map_dict_vals(len, invassign.wx2_axs)
invassign.wx_list = sorted(invassign.wx2_num.keys())
invassign.num_list = ut.take(invassign.wx2_num, invassign.wx_list)
invassign.perword_stats = ut.get_stats(invassign.num_list)
def __nice__(invassign):
return ' nW=%r mean=%.2f' % (ut.safelen(invassign.wx2_idxs), invassign.perword_stats['mean'])
[docs] def get_vecs(invassign, wx):
vecs = invassign.fstack.idx2_vec.take(invassign.wx2_idxs[wx], axis=0)
return vecs
[docs] def get_patches(invassign, wx):
ax_list = invassign.wx2_axs[wx]
fx_list = invassign.wx2_fxs[wx]
config = invassign.fstack.config
ibs = invassign.fstack.ibs
unique_axs, groupxs = vt.group_indices(ax_list)
fxs_groups = vt.apply_grouping(fx_list, groupxs)
unique_aids = ut.take(invassign.fstack.ax2_aid, unique_axs)
all_kpts_list = ibs.depc.d.get_feat_kpts(unique_aids, config=config)
sub_kpts_list = vt.ziptake(all_kpts_list, fxs_groups, axis=0)
chip_list = ibs.depc_annot.d.get_chips_img(unique_aids)
# convert to approprate colorspace
#if colorspace is not None:
# chip_list = vt.convert_image_list_colorspace(chip_list, colorspace)
# ut.print_object_size(chip_list, 'chip_list')
patch_size = 64
grouped_patches_list = [
vt.get_warped_patches(chip, kpts, patch_size=patch_size)[0]
for chip, kpts in ut.ProgIter(zip(chip_list, sub_kpts_list),
nTotal=len(unique_aids),
lbl='warping patches')
]
# Make it correspond with original fx_list and ax_list
word_patches = vt.invert_apply_grouping(grouped_patches_list, groupxs)
return word_patches
[docs] def compute_nonagg_rvecs(invassign, wx, compress=False):
"""
Driver function for nonagg residual computation
Args:
words (ndarray): array of words
idx2_vec (dict): stacked vectors
wx_sublist (list): words of interest
idxs_list (list): list of idxs grouped by wx_sublist
Returns:
tuple : (rvecs_list, flags_list)
"""
# Pick out corresonding lists of residuals and words
vecs = invassign.get_vecs(wx)
word = invassign.vocab.wx2_word[wx]
# Compute nonaggregated normalized residuals
arr_float = np.subtract(word.astype(np.float), vecs.astype(np.float))
vt.normalize_rows(arr_float, out=arr_float)
if compress:
rvecs_list = np.clip(np.round(arr_float * 255.0), -127, 127).astype(np.int8)
else:
rvecs_list = arr_float
# Extract flags (rvecs_list which are all zeros) and rvecs_list
error_flags = ~np.any(rvecs_list, axis=1)
return rvecs_list, error_flags
[docs] def compute_agg_rvecs(invassign, wx):
"""
Sums and normalizes all rvecs that belong to the same word and the same
annotation id
"""
rvecs_list, error_flags = invassign.compute_nonagg_rvecs(wx)
ax_list = invassign.wx2_axs[wx]
maw_list = invassign.wx2_maws[wx]
# group members of each word by aid, we will collapse these groups
unique_ax, groupxs = vt.group_indices(ax_list)
# (weighted aggregation with multi-assign-weights)
grouped_maws = vt.apply_grouping(maw_list, groupxs)
grouped_rvecs = vt.apply_grouping(rvecs_list, groupxs)
grouped_flags = vt.apply_grouping(~error_flags, groupxs)
grouped_rvecs2_ = vt.zipcompress(grouped_rvecs, grouped_flags, axis=0)
grouped_maws2_ = vt.zipcompress(grouped_maws, grouped_flags)
is_good = [len(rvecs) > 0 for rvecs in grouped_rvecs2_]
aggvecs = [aggregate_rvecs(rvecs, maws)[0] for rvecs, maws in zip(grouped_rvecs2_, grouped_maws2_)]
unique_ax2_ = unique_ax.compress(is_good)
ax2_aggvec = dict(zip(unique_ax2_, aggvecs))
# Need to recompute flags for consistency
# flag is true when aggvec is all zeros
return ax2_aggvec
[docs]def aggregate_rvecs(rvecs, maws, compress=False):
r"""
helper for compute_agg_rvecs
"""
if rvecs.shape[0] == 0:
rvecs_agg = np.empty((0, rvecs.shape[1]), dtype=np.float)
if rvecs.shape[0] == 1:
rvecs_agg = rvecs
else:
# Prealloc sum output (do not assign the result of sum)
arr_float = np.empty((1, rvecs.shape[1]), dtype=np.float)
out = arr_float[0]
# Take weighted average of multi-assigned vectors
total_weight = maws.sum()
weighted_sum = (maws[:, np.newaxis] * rvecs.astype(np.float)).sum(axis=0, out=out)
np.divide(weighted_sum, total_weight, out=out)
vt.normalize_rows(arr_float, out=arr_float)
if compress:
rvecs_agg = np.clip(np.round(arr_float * 255.0), -127, 127).astype(np.int8)
else:
rvecs_agg = arr_float
return rvecs_agg
[docs]def visualize_vocab_word(ibs, invassign, wx, fnum=None):
"""
Example:
>>> from ibeis.new_annots import * # NOQA
>>> import plottool as pt
>>> pt.qt4ensure()
>>> ibs, aid_list, vocab = testdata_vocab()
>>> #aid_list = aid_list[0:1]
>>> fstack = StackedFeatures(ibs, aid_list)
>>> nAssign = 2
>>> invassign = fstack.inverted_assignment(vocab, nAssign)
>>> sortx = ut.argsort(invassign.num_list)[::-1]
>>> wx_list = ut.take(invassign.wx_list, sortx)
>>> wx = wx_list[0]
"""
import plottool as pt
pt.qt4ensure()
vecs = invassign.get_vecs(wx)
word = invassign.vocab.wx2_word[wx]
word_patches = invassign.get_patches(wx)
average_patch = np.mean(word_patches, axis=0)
average_vec = vecs.mean(axis=0)
average_vec = word
word
with_sift = True
fnum = 2
fnum = pt.ensure_fnum(fnum)
if with_sift:
patch_img = pt.render_sift_on_patch(average_patch, average_vec)
#sift_word_patches = [pt.render_sift_on_patch(patch, vec) for patch, vec in ut.ProgIter(list(zip(word_patches, vecs)))]
#stacked_patches = vt.stack_square_images(word_patches)
#stacked_patches = vt.stack_square_images(sift_word_patches)
else:
patch_img = average_patch
stacked_patches = vt.stack_square_images(word_patches)
solidbar = np.zeros((patch_img.shape[0], int(patch_img.shape[1] * .1), 3), dtype=patch_img.dtype)
border_color = (100, 10, 10) # bgr, darkblue
if ut.is_float(solidbar):
solidbar[:, :, :] = (np.array(border_color) / 255)[None, None]
else:
solidbar[:, :, :] = np.array(border_color)[None, None]
word_img = vt.stack_image_list([patch_img, solidbar, stacked_patches], vert=False, modifysize=True)
pt.imshow(word_img, fnum=fnum)
#pt.imshow(patch_img, pnum=(1, 2, 1), fnum=fnum)
#patch_size = 64
#half_size = patch_size / 2
#pt.imshow(stacked_patches, pnum=(1, 2, 2), fnum=fnum)
pt.iup()
[docs]def test_visualize_vocab_interact():
"""
python -m ibeis.new_annots --exec-test_visualize_vocab_interact --show
Example:
>>> from ibeis.new_annots import * # NOQA
>>> test_visualize_vocab_interact()
>>> ut.show_if_requested()
"""
import plottool as pt
pt.qt4ensure()
ibs, aid_list, vocab = testdata_vocab()
#aid_list = aid_list[0:1]
fstack = StackedFeatures(ibs, aid_list)
nAssign = 2
invassign = fstack.inverted_assignment(vocab, nAssign)
sortx = ut.argsort(invassign.num_list)[::-1]
wx_list = ut.take(invassign.wx_list, sortx)
wx = wx_list[0]
fnum = 1
for wx in ut.InteractiveIter(wx_list):
visualize_vocab_word(ibs, invassign, wx, fnum)
if __name__ == '__main__':
r"""
CommandLine:
python -m ibeis.new_annots
python -m ibeis.new_annots --allexamples
"""
import multiprocessing
multiprocessing.freeze_support() # for win32
import utool as ut # NOQA
ut.doctest_funcs()