# -*- coding: utf-8 -*-
"""
Interface into SQL for the IBEIS Controller
"""
#from __future__ import absolute_import, division, print_function
from __future__ import absolute_import, division, print_function, unicode_literals
import six
import parse
import utool as ut
import collections
from six.moves import map, zip, cStringIO
from os.path import join, exists
from ibeis import constants as const
from ibeis.control._sql_helpers import (
_unpacker, sanitize_sql, SQLExecutionContext, VERBOSE_SQL, NOT_QUIET)
from ibeis.control import __SQLITE3__ as lite
print, rrr, profile = ut.inject2(__name__, '[sql]')
VERBOSE = ut.VERBOSE
VERYVERBOSE = ut.VERYVERBOSE
COPY_TO_MEMORY = ut.get_argflag(('--copy-db-to-memory'))
#AUTODUMP = ut.get_argflag('--auto-dump')
SQLColumnRichInfo = collections.namedtuple(
'SQLColumnRichInfo', ('column_id', 'name', 'type_', 'notnull', 'dflt_value', 'pk'))
__STR__ = const.__STR__
[docs]def default_decor(func):
#return profile(func)
return func
[docs]class SQLAtomicContext(object):
def __init__(context, db, verbose=VERBOSE_SQL):
context.db = db # Reference to sqldb
context.cur = context.db.connection.cursor() # Get new cursor
context.verbose = verbose
[docs] def __enter__(context):
""" Sets the database into a state of an atomic transaction """
context.cur.execute('BEGIN EXCLUSIVE TRANSACTION')
return context
[docs] def __exit__(context, type_, value, trace):
""" Finalization of an SQLAtomicContext """
if trace is not None:
# An SQLError is a serious offence.
print('[sql] FATAL ERROR IN ATOMIC CONTEXT')
context.db.dump() # Dump on error
print('[sql] Error in atomic context manager!: ' + str(value))
return False # return a falsey value on error
else:
context.db.connection.commit()
context.cur.close()
[docs]def dev_test_new_schema_version(dbname, sqldb_dpath, sqldb_fname,
version_current, version_next=None):
"""
hacky function to ensure that only developer sees the development schema
and only on test databases
"""
TESTING_NEW_SQL_VERSION = version_current != version_next
if TESTING_NEW_SQL_VERSION:
print('[sql] ATTEMPTING TO TEST NEW SQLDB VERSION')
devdb_list = ['PZ_MTEST', 'testdb1', 'testdb0', 'testdb2',
'testdb_dst2', 'emptydatabase']
testing_newschmea = ut.is_developer() and dbname in devdb_list
#testing_newschmea = False
#ut.is_developer() and ibs.get_dbname() in ['PZ_MTEST', 'testdb1']
if testing_newschmea:
# Set to true until the schema module is good then continue tests
# with this set to false
testing_force_fresh = True or ut.get_argflag('--force-fresh')
# Work on a fresh schema copy when developing
dev_sqldb_fname = ut.augpath(sqldb_fname, '_develop_schema')
sqldb_fpath = join(sqldb_dpath, sqldb_fname)
dev_sqldb_fpath = join(sqldb_dpath, dev_sqldb_fname)
ut.copy(sqldb_fpath, dev_sqldb_fpath, overwrite=testing_force_fresh)
# Set testing schema version
#ibs.db_version_expected = '1.3.6'
print('[sql] TESTING NEW SQLDB VERSION: %r' % (version_next,))
#print('[sql] ... pass --force-fresh to reload any changes')
return version_next, dev_sqldb_fname
else:
print('[ibs] NOT TESTING')
return version_current, sqldb_fname
[docs]class SQLDatabaseController(object):
"""
SQLDatabaseController an efficientish interface into SQL
"""
def __init__(db, sqldb_dpath='.', sqldb_fname='database.sqlite3',
text_factory=__STR__, inmemory=None):
""" Creates db and opens connection """
# standard metadata table keys for each docstr
# TODO: generalize the places that use this so to add a new cannonical
# metadata field it is only necessary to append to this list.
db.table_metadata_keys = [
#'constraint',
'dependson',
'docstr',
'relates',
'shortname',
'superkeys',
'extern_tables',
'dependsmap',
'primary_superkey',
]
# Get SQL file path
db.dir_ = sqldb_dpath
db.fname = sqldb_fname
assert exists(db.dir_), '[sql] db.dir_=%r does not exist!' % db.dir_
db.fpath = join(db.dir_, db.fname)
db.text_factory = text_factory
if not exists(db.fpath):
print('[sql] Initializing new database')
# Open the SQL database connection with support for custom types
#lite.enable_callback_tracebacks(True)
#db.fpath = ':memory:'
db.connection = lite.connect2(db.fpath)
db.connection.text_factory = db.text_factory
#db.connection.isolation_level = None # turns sqlite3 autocommit off
#db.connection.isolation_level = lite.IMMEDIATE # turns sqlite3 autocommit off
if inmemory is True or (inmemory is None and COPY_TO_MEMORY):
db.squeeze()
db._copy_to_memory()
db.connection.text_factory = text_factory
# Get a cursor which will preform sql commands / queries / executions
db.cur = db.connection.cursor()
# Optimize the database (if anything is set)
db.optimize()
db._ensure_metadata_table()
[docs] def get_fpath(db):
return db.fpath
[docs] def close(db):
db.cur = None
db.connection.close()
def _ensure_metadata_table(db):
"""
Creates the metadata table if it does not exist
We need this to be done every time so that the update code works
correctly.
"""
db.add_table(const.METADATA_TABLE, (
('metadata_rowid', 'INTEGER PRIMARY KEY'),
('metadata_key', 'TEXT'),
('metadata_value', 'TEXT'),
),
superkeys=[('metadata_key',)],
# IMPORTANT: Yes, we want this line to be tabbed over for the
# schema auto-generation
docstr='''
The table that stores permanently all of the metadata about the
database (tables, etc)''')
# Ensure that a version number exists
db.get_db_version(ensure=True)
[docs] def get_db_version(db, ensure=True):
version = db.get_metadata_val('database_version', default=None)
if version is None and ensure:
version = const.BASE_DATABASE_VERSION
colnames = ['metadata_key', 'metadata_value']
params_iter = zip(['database_version'], [version])
# We don't care to find any, because we know there is no version
def get_rowid_from_superkey(x):
return [None] * len(x)
db.add_cleanly(const.METADATA_TABLE, colnames, params_iter, get_rowid_from_superkey)
return version
def _copy_to_memory(db):
"""
References:
http://stackoverflow.com/questions/3850022/python-sqlite3-load-existing-db-file-to-memory
"""
if NOT_QUIET:
print('[sql] Copying database into RAM')
tempfile = cStringIO()
for line in db.connection.iterdump():
tempfile.write('%s\n' % line)
db.connection.close()
tempfile.seek(0)
# Create a database in memory and import from tempfile
db.connection = lite.connect2(":memory:")
db.connection.cursor().executescript(tempfile.read())
db.connection.commit()
db.connection.row_factory = lite.Row
#@ut.memprof
[docs] def reboot(db):
print('[sql] reboot')
db.cur.close()
del db.cur
db.connection.close()
del db.connection
db.connection = lite.connect2(db.fpath)
db.connection.text_factory = db.text_factory
db.cur = db.connection.cursor()
[docs] def optimize(db):
# http://web.utk.edu/~jplyon/sqlite/SQLite_optimization_FAQ.html#pragma-cache_size
# http://web.utk.edu/~jplyon/sqlite/SQLite_optimization_FAQ.html
if VERBOSE_SQL:
print('[sql] running sql pragma optimizions')
#db.cur.execute('PRAGMA cache_size = 0;')
#db.cur.execute('PRAGMA cache_size = 1024;')
#db.cur.execute('PRAGMA page_size = 1024;')
#print('[sql] running sql pragma optimizions')
db.cur.execute('PRAGMA cache_size = 10000;') # Default: 2000
db.cur.execute('PRAGMA temp_store = MEMORY;')
db.cur.execute('PRAGMA synchronous = OFF;')
#db.cur.execute('PRAGMA synchronous = NORMAL;')
#db.cur.execute('PRAGMA synchronous = FULL;') # Default
#db.cur.execute('PRAGMA parser_trace = OFF;')
#db.cur.execute('PRAGMA busy_timeout = 1;')
#db.cur.execute('PRAGMA default_cache_size = 0;')
[docs] def shrink_memory(db):
print('[sql] shrink_memory')
db.connection.commit()
db.cur.execute('PRAGMA shrink_memory;')
db.connection.commit()
[docs] def vacuum(db):
print('[sql] vaccum')
db.connection.commit()
db.cur.execute('VACUUM;')
db.connection.commit()
[docs] def squeeze(db):
print('[sql] squeeze')
db.shrink_memory()
db.vacuum()
#==============
# API INTERFACE
#==============
@default_decor
[docs] def get_all_rowids(db, tblname, **kwargs):
""" returns a list of all rowids from a table in ascending order """
fmtdict = {'tblname': tblname, }
operation_fmt = '''
SELECT rowid
FROM {tblname}
ORDER BY rowid ASC
'''
return db._executeone_operation_fmt(operation_fmt, fmtdict, **kwargs)
@default_decor
[docs] def get_all_col_rows(db, tblname, colname):
""" returns a list of all rowids from a table in ascending order """
fmtdict = {'colname': colname, 'tblname': tblname, }
operation_fmt = '''
SELECT {colname}
FROM {tblname}
ORDER BY rowid ASC
'''
return db._executeone_operation_fmt(operation_fmt, fmtdict)
@default_decor
[docs] def get_all_rowids_where(db, tblname, where_clause, params, **kwargs):
"""
returns a list of rowids from a table in ascending order satisfying a
condition
"""
fmtdict = {'tblname': tblname, 'where_clause': where_clause, }
operation_fmt = '''
SELECT rowid
FROM {tblname}
WHERE {where_clause}
ORDER BY rowid ASC
'''
return db._executeone_operation_fmt(operation_fmt, fmtdict, params, **kwargs)
@default_decor
[docs] def check_rowid_exists(db, tablename, rowid_iter, eager=True, **kwargs):
rowid_list1 = db.get(tablename, ('rowid',), rowid_iter)
exists_list = [rowid is not None for rowid in rowid_list1]
return exists_list
@default_decor
def _add(db, tblname, colnames, params_iter, **kwargs):
""" ADDER NOTE: use add_cleanly """
fmtdict = {'tblname' : tblname,
'erotemes' : ', '.join(['?'] * len(colnames)),
'params' : ',\n'.join(colnames), }
operation_fmt = '''
INSERT INTO {tblname}(
rowid,
{params}
) VALUES (NULL, {erotemes})
'''
rowid_list = db._executemany_operation_fmt(operation_fmt, fmtdict,
params_iter=params_iter, **kwargs)
return rowid_list
@default_decor
[docs] def add_cleanly(db, tblname, colnames, params_iter,
get_rowid_from_superkey, superkey_paramx=(0,)):
"""
ADDER Extra input:
the first item of params_iter must be a superkey (like a uuid),
Does not add None values. Does not add duplicate values.
For each None input returns None ouptut.
For each duplicate input returns existing rowid
Args:
tblname (str): table name to add into
colnames (tuple of strs): columns whos values are specified in params_iter
params_iter (iterable): an iterable of tuples where each tuple corresonds to a row
get_rowid_from_superkey (func): function that tests if a row needs
to be added. It should return None for any new rows to be inserted.
It should return the existing rowid if one exists
superkey_paramx (tuple of ints): indices of tuples in params_iter which
correspond to superkeys. defaults to (0,)
Returns:
iterable: rowid_list_ -- list of newly added or previously added rowids
Example:
>>> from ibeis.control.SQLDatabaseControl import * # NOQA
>>> db = '?'
>>> tblname = tblname_temp
>>> colnames = dst_list
>>> params_iter = data_list
>>> #get_rowid_from_superkey = '?'
>>> superkey_paramx = (0,)
>>> rowid_list_ = add_cleanly(db, tblname, colnames, params_iter, get_rowid_from_superkey, superkey_paramx)
>>> print(rowid_list_)
"""
# ADD_CLEANLY_1: PREPROCESS INPUT
params_list = list(params_iter) # eagerly evaluate for superkeys
# Extract superkeys from the params list (requires eager eval)
superkey_lists = [[None if params is None else params[x]
for params in params_list]
for x in superkey_paramx]
# ADD_CLEANLY_2: PREFORM INPUT CHECKS
# check which parameters are valid
#and not any(ut.flag_None_items(params))
isvalid_list = [params is not None for params in params_list]
# Check for duplicate inputs
isunique_list = ut.flag_unique_items(list(zip(*superkey_lists)))
# Check to see if this already exists in the database
#superkey_params_iter = list(zip(*superkey_lists))
# get_rowid_from_superkey functions take each list separately here
rowid_list_ = get_rowid_from_superkey(*superkey_lists)
isnew_list = [rowid is None for rowid in rowid_list_]
if VERBOSE_SQL and not all(isunique_list):
print('[WARNING]: duplicate inputs to db.add_cleanly')
# Flag each item that needs to added to the database
needsadd_list = list(map(all, zip(isvalid_list, isunique_list, isnew_list)))
# ADD_CLEANLY_3.1: EXIT IF CLEAN
if not any(needsadd_list):
return rowid_list_ # There is nothing to add. Return the rowids
# ADD_CLEANLY_3.2: PERFORM DIRTY ADDITIONS
dirty_params = ut.filter_items(params_list, needsadd_list)
if ut.VERBOSE:
print('[sql] adding %r/%r new %s' % (len(dirty_params), len(params_list), tblname))
# Add any unadded parameters to the database
try:
db._add(tblname, colnames, dirty_params)
except Exception as ex:
nInput = len(params_list) # NOQA
ut.printex(ex, key_list=[
'dirty_params',
'needsadd_list',
'superkey_lists',
'nInput',
'rowid_list_'])
raise
# TODO: We should only have to preform a subset of adds here
# (at the positions where rowid_list was None in the getter check)
rowid_list = get_rowid_from_superkey(*superkey_lists)
# ADD_CLEANLY_4: SANITY CHECK AND RETURN
assert len(rowid_list) == len(params_list), 'failed sanity check'
return rowid_list
@default_decor
[docs] def get_where2(db, tblname, colnames, params_iter, andwhere_colnames,
orwhere_colnames=[],
unpack_scalars=True, eager=True, **kwargs):
""" hacked in function for nicer templates """
andwhere_clauses = [colname + '=?' for colname in andwhere_colnames]
where_clause = ' AND '.join(andwhere_clauses)
return db.get_where(tblname, colnames, params_iter, where_clause,
unpack_scalars=unpack_scalars, eager=eager,
**kwargs)
@default_decor
[docs] def get_where3(db, tblname, colnames, params_iter, where_colnames,
unpack_scalars=True, eager=True, logicop='AND', **kwargs):
""" hacked in function for nicer templates
unpack_scalars = True
kwargs = {}
Kwargs:
verbose:
"""
andwhere_clauses = [colname + '=?' for colname in where_colnames]
logicop_ = ' %s ' % (logicop,)
where_clause = logicop_.join(andwhere_clauses)
return db.get_where(tblname, colnames, params_iter, where_clause,
unpack_scalars=unpack_scalars, eager=eager, **kwargs)
@default_decor
[docs] def get_where(db, tblname, colnames, params_iter, where_clause,
unpack_scalars=True, eager=True,
**kwargs):
"""
"""
assert isinstance(colnames, tuple), 'colnames must be a tuple'
#if isinstance(colnames, six.string_types):
# colnames = (colnames,)
if where_clause is None:
fmtdict = {'tblname' : tblname,
'colnames' : ', '.join(colnames), }
operation_fmt = '''
SELECT {colnames}
FROM {tblname}
'''
val_list = db._executeone_operation_fmt(operation_fmt, fmtdict, **kwargs)
else:
fmtdict = { 'tblname' : tblname,
'colnames' : ', '.join(colnames),
'where_clauses' : where_clause, }
operation_fmt = '''
SELECT {colnames}
FROM {tblname}
WHERE {where_clauses}
'''
val_list = db._executemany_operation_fmt(
operation_fmt, fmtdict, params_iter=params_iter,
unpack_scalars=unpack_scalars, eager=eager, **kwargs)
return val_list
@default_decor
[docs] def get_rowid_from_superkey(db, tblname, params_iter=None,
superkey_colnames=None, **kwargs):
""" getter which uses the constrained superkeys instead of rowids """
where_clause = ' AND '.join([colname + '=?' for colname in superkey_colnames])
return db.get_where(tblname, ('rowid',), params_iter, where_clause, **kwargs)
@default_decor
[docs] def get(db, tblname, colnames, id_iter=None, id_colname='rowid', eager=True, **kwargs):
""" getter
Args:
tblname (str): table name to get from
colnames (tuple of str): column names to grab from
id_iter (iterable): iterable of search keys
id_colname (str): column to be used as the search key (default: rowid)
eager (bool): use eager evaluation
unpack_scalars (bool): default True
"""
if VERBOSE_SQL:
print('[sql]' + ut.get_caller_name(list(range(1, 4))) + ' db.get(%r, %r, ...)' %
(tblname, colnames,))
assert isinstance(colnames, tuple), 'must specify column names to get from'
#if isinstance(colnames, six.string_types):
# colnames = (colnames,)
if id_iter is None:
where_clause = None
params_iter = []
else:
where_clause = (id_colname + '=?')
params_iter = ((_rowid,) for _rowid in id_iter)
return db.get_where(tblname, colnames, params_iter, where_clause, eager=eager, **kwargs)
@default_decor
[docs] def set(db, tblname, colnames, val_iter, id_iter, id_colname='rowid',
duplicate_behavior='error', **kwargs):
""" setter
TODO: Test
"""
assert isinstance(colnames, tuple)
#if isinstance(colnames, six.string_types):
# colnames = (colnames,)
val_list = list(val_iter) # eager evaluation
id_list = list(id_iter) # eager evaluation
if VERBOSE_SQL or (NOT_QUIET and VERYVERBOSE):
print('[sql] SETTER: ' + ut.get_caller_name())
print('[sql] * tblname=%r' % (tblname,))
print('[sql] * val_list=%r' % (val_list,))
print('[sql] * id_list=%r' % (id_list,))
print('[sql] * id_colname=%r' % (id_colname,))
if duplicate_behavior == 'error':
try:
assert not ut.duplicates_exist(id_list), "Passing a not-unique list of ids"
except Exception as ex:
ut.debug_duplicate_items(id_list)
ut.printex(ex, 'len(id_list) = %r, len(set(id_list)) = %r' %
(len(id_list), len(set(id_list))))
raise
elif duplicate_behavior == 'filter':
# Keep only the first setting of every row
isunique_list = ut.flag_unique_items(id_list)
id_list = ut.filter_items(id_list, isunique_list)
val_list = ut.filter_items(val_list, isunique_list)
else:
raise AssertionError(
('unknown duplicate_behavior=%r. '
'known behaviors are: error and filter')
% (duplicate_behavior,))
try:
num_val = len(val_list)
num_id = len(id_list)
assert num_val == num_id, 'list inputs have different lengths'
except AssertionError as ex:
ut.printex(ex, key_list=['num_val', 'num_id'])
raise
fmtdict = {
'tblname_str' : tblname,
'assign_str' : ',\n'.join(['%s=?' % name for name in colnames]),
'where_clause' : (id_colname + '=?'),
}
operation_fmt = '''
UPDATE {tblname_str}
SET {assign_str}
WHERE {where_clause}
'''
# TODO: The flattenize can be removed if we pass in val_lists instead
params_iter = ut.flattenize(list(zip(val_list, id_list)))
#params_iter = list(zip(val_list, id_list))
return db._executemany_operation_fmt(operation_fmt, fmtdict,
params_iter=params_iter, **kwargs)
@default_decor
[docs] def delete(db, tblname, id_list, id_colname='rowid', **kwargs):
""" deleter. USE delete_rowids instead """
fmtdict = {
'tblname' : tblname,
'rowid_str' : (id_colname + '=?'),
}
operation_fmt = '''
DELETE
FROM {tblname}
WHERE {rowid_str}
'''
params_iter = ((_rowid,) for _rowid in id_list)
return db._executemany_operation_fmt(operation_fmt, fmtdict,
params_iter=params_iter,
**kwargs)
@default_decor
[docs] def delete_rowids(db, tblname, rowid_list, **kwargs):
""" deletes the the rows in rowid_list """
fmtdict = {
'tblname' : tblname,
'rowid_str' : ('rowid=?'),
}
operation_fmt = '''
DELETE
FROM {tblname}
WHERE {rowid_str}
'''
params_iter = ((_rowid,) for _rowid in rowid_list)
return db._executemany_operation_fmt(operation_fmt, fmtdict,
params_iter=params_iter,
**kwargs)
#==============
# CORE WRAPPERS
#==============
@default_decor
def _executeone_operation_fmt(db, operation_fmt, fmtdict, params=None, eager=True, **kwargs):
if params is None:
params = []
operation = operation_fmt.format(**fmtdict)
return db.executeone(operation, params, auto_commit=True, eager=eager, **kwargs)
@default_decor
def _executemany_operation_fmt(db, operation_fmt, fmtdict, params_iter,
unpack_scalars=True, eager=True, **kwargs):
operation = operation_fmt.format(**fmtdict)
return db.executemany(operation, params_iter, unpack_scalars=unpack_scalars,
auto_commit=True, eager=eager, **kwargs)
#=========
# SQLDB CORE
#=========
@default_decor
[docs] def executeone(db, operation, params=(), auto_commit=True, eager=True,
verbose=VERBOSE_SQL):
contextkw = dict(
nInput=1, verbose=verbose
)
with SQLExecutionContext(db, operation, **contextkw) as context:
try:
result_iter = context.execute_and_generate_results(params)
result_list = list(result_iter)
except Exception as ex:
ut.printex(ex, key_list=[(str, 'operation'), 'params'])
# ut.sys.exit(1)
raise
return result_list
#@ut.memprof
@default_decor
[docs] def executemany(db, operation, params_iter, auto_commit=True,
verbose=VERBOSE_SQL, unpack_scalars=True, nInput=None,
eager=True):
# --- ARGS PREPROC ---
# Aggresively compute iterator if the nInput is not given
if nInput is None:
if isinstance(params_iter, (list, tuple)):
nInput = len(params_iter)
else:
if VERBOSE_SQL:
print('[sql!] WARNING: aggressive eval of params_iter because nInput=None')
params_iter = list(params_iter)
nInput = len(params_iter)
else:
if VERBOSE_SQL:
print('[sql] Taking params_iter as iterator')
# Do not compute executemany without params
if nInput == 0:
if VERBOSE_SQL:
print('[sql!] WARNING: dont use executemany'
'with no params use executeone instead.')
return []
# --- SQL EXECUTION ---
contextkw = {
'nInput': nInput,
'start_transaction': True,
'verbose': verbose,
}
with SQLExecutionContext(db, operation, **contextkw) as context:
#try:
if eager:
#if ut.DEBUG2:
# print('--------------')
# print('+++ eager eval')
# print(operation)
# print('+++ eager eval')
#results_iter = list(
#map(
# list,
# (context.execute_and_generate_results(params) for params in params_iter)
#)
#) # list of iterators
results_iter = [list(context.execute_and_generate_results(params))
for params in params_iter]
if unpack_scalars:
results_iter = list(map(_unpacker, results_iter)) # list of iterators
#try:
# results_iter = [_unpacker(results_) for resultx,
# results_ in enumerate(results_iter)]
#except AssertionError:
# print('resultx = %r' % (resultx,))
# if isinstance(params_iter, list):
# print('params = %r' % (params_iter[resultx]))
# raise
results_list = list(results_iter) # Eager evaluation
else:
#if ut.DEBUG2:
# print('--------------')
# print(' +++ lazy eval')
# print(operation)
# print(' +++ lazy eval')
def tmpgen(context):
# Temporary hack to turn off eager_evaluation
for params in params_iter:
# Eval results per query yeild per iter
results = list(context.execute_and_generate_results(params))
if unpack_scalars:
yield _unpacker(results)
else:
yield results
results_list = tmpgen(context)
#except Exception as ex:
# ut.printex(ex)
# raise
return results_list
#def commit(db):
# db.connection.commit()
@default_decor
[docs] def dump(db, file_=None, **kwargs):
if file_ is None or isinstance(file_, six.string_types):
dump_fpath = file_
if dump_fpath is None:
# Default filepath
version_str = 'v' + db.get_db_version()
if kwargs.get('schema_only', False):
version_str += '.schema_only'
dump_fname = db.fname + '.' + version_str + '.dump.txt'
dump_fpath = join(db.dir_, dump_fname)
with open(dump_fpath, 'w') as file_:
db.dump_to_file(file_, **kwargs)
else:
db.dump_to_file(file_, **kwargs)
@default_decor
[docs] def dump_to_string(db, **kwargs):
string_file = cStringIO.StringIO()
db.dump_to_file(string_file, **kwargs)
retstr = string_file.getvalue()
return retstr
@default_decor
[docs] def dump_to_file(db, file_, auto_commit=True, schema_only=False, include_metadata=True):
VERBOSE_SQL = True
if VERBOSE_SQL:
print('[sql.dump_to_file] file_=%r' % (file_,))
if auto_commit:
db.connection.commit()
#db.commit(verbose=False)
for line in db.connection.iterdump():
if schema_only and line.startswith('INSERT'):
if not include_metadata or 'metadata' not in line:
continue
file_.write('%s\n' % line)
[docs] def dump_to_stdout(db, **kwargs):
import sys
file_ = sys.stdout
kwargs['schema_only'] = kwargs.get('schema_only', True)
db.dump(file_, **kwargs)
[docs] def print_dbg_schema(db):
print('\n\nCREATE'.join(db.dump_to_string(schema_only=True).split('CREATE')))
#=========
# SQLDB METADATA
#=========
@default_decor
@default_decor
#def get_metadata_val(db, key, eval_=False, default=ut.NoParam):
@default_decor
[docs] def add_column(db, tablename, colname, coltype):
if VERBOSE_SQL:
print('[sql] add column=%r of type=%r to tablename=%r' % (colname, coltype, tablename))
fmtkw = {
'tablename': tablename,
'colname': colname,
'coltype': coltype,
}
op_fmtstr = 'ALTER TABLE {tablename} ADD COLUMN {colname} {coltype}'
operation = op_fmtstr.format(**fmtkw)
db.executeone(operation, [], verbose=False)
@default_decor
[docs] def add_table(db, tablename=None, coldef_list=None, **metadata_keyval):
"""
add_table
Args:
tablename (str):
coldef_list (list):
constraint (list or None):
docstr (str):
superkeys (list or None): list of tuples of column names which
uniquely identifies a rowid
"""
bad_kwargs = set(metadata_keyval.keys()) - set(db.table_metadata_keys)
assert len(bad_kwargs) == 0, 'keyword args specified that are not metadata keys=%r' % (
bad_kwargs,)
assert tablename is not None, 'tablename must be given'
assert coldef_list is not None, 'tablename must be given'
if ut.DEBUG2:
print('[sql] schema ensuring tablename=%r' % tablename)
if ut.VERBOSE:
print('')
_args = [tablename, coldef_list]
print(ut.func_str(db.add_table, _args, metadata_keyval))
print('')
# Technically insecure call, but all entries are statically inputted by
# the database's owner, who could delete or alter the entire database
# anyway.
constraint_list = []
superkeys = metadata_keyval.get('superkeys', None)
try:
has_superkeys = (superkeys is not None and
len(superkeys) > 0)
if has_superkeys:
# Add in superkeys to constraints
#SELECT image_rowid
#FROM encounter_image_relationship
#WHERE image_rowid=? AND encounter_rowid=?
constraint_fmtstr = 'CONSTRAINT superkey UNIQUE ({colnames_str})'
assert isinstance(superkeys, list), (
'must be list got %r, superkeys=%r' % (type(superkeys), superkeys))
for superkey_colnames in superkeys:
assert isinstance(superkey_colnames, tuple), (
'must be list of tuples got list of %r' % (type(superkey_colnames,)))
colnames_str = ','.join(superkey_colnames)
unique_constraint = constraint_fmtstr.format(colnames_str=colnames_str)
constraint_list.append(unique_constraint)
constraint_list = ut.unique_keep_order(constraint_list)
except Exception as ex:
ut.printex(ex, keys=locals().keys())
raise
# ASSERT VALID TYPES
for name, type_ in coldef_list:
assert isinstance(name, six.string_types) and len(name) > 0, (
'cannot have empty name. name=%r, type_=%r' % (name, type_))
assert isinstance(type_, six.string_types) and len(type_) > 0, (
'cannot have empty type. name=%r, type_=%r' % (name, type_))
body_list = ['%s %s' % (name, type_) for (name, type_) in coldef_list]
table_body = ', '.join(body_list + constraint_list)
fmtkw = {
'table_body': table_body,
'tablename': tablename,
}
op_fmtstr = 'CREATE TABLE IF NOT EXISTS {tablename} ({table_body})'
operation = op_fmtstr.format(**fmtkw)
db.executeone(operation, [], verbose=False)
# Handle table metdata
for suffix in db.table_metadata_keys:
if suffix in metadata_keyval and metadata_keyval[suffix] is not None:
val = metadata_keyval[suffix]
if suffix in ['docstr']:
db.set_metadata_val(tablename + '_' + suffix, val)
else:
db.set_metadata_val(tablename + '_' + suffix, repr(val))
@default_decor
[docs] def modify_table(db, tablename=None, colmap_list=None, tablename_new=None,
#constraint=None, docstr=None, superkeys=None,
**metadata_keyval):
"""
function to modify the schema - only columns that are being added,
removed or changed need to be enumerated
Args:
tablename (str): tablename
colmap_list (list): of tuples (orig_colname, new_colname, new_coltype, convert_func)
orig_colname - the original name of the column, None to append, int for index
new_colname - the new column name ('' for same, None to delete)
new_coltype - New Column Type. None to use data unmodified
convert_func - Function to convert data from old to new
constraint (str):
superkeys (list)
docstr (str)
tablename_new (?)
Example:
>>> def loc_zip_map(x):
... return x
>>> db.modify_table(const.CONTRIBUTOR_TABLE, (
>>> # orig_colname, new_colname, new_coltype, convert_func
>>> # a non-needed, but correct mapping (identity function)
>>> ('contrib_rowid', '', '', None),
>>> # for new columns, function is ignored (TYPE CANNOT BE EMPTY IF ADDING)
>>> (None, 'contrib_loc_address', 'TEXT', None),
>>> # adding a new column at index 4 (if index is invalid, None is used)
>>> (4, 'contrib_loc_address', 'TEXT', None),
>>> # for deleted columns, type and function are ignored
>>> ('contrib_loc_city', None, '', None),
>>> # for renamed columns, type and function are ignored
>>> ('contrib_loc_city', 'contrib_loc_town', '', None),
>>> ('contrib_loc_zip', 'contrib_loc_zip', 'TEXT', loc_zip_map),
>>> # type not changing, only NOT NULL provision
>>> ('contrib_loc_country', '', 'TEXT NOT NULL', None),
>>> ),
>>> superkeys=[('contributor_rowid',)],
>>> constraint=[],
>>> docstr='Used to store the contributors to the project'
>>> )
"""
#assert colmap_list is not None, 'must specify colmaplist'
assert tablename is not None, 'tablename must be given'
if VERBOSE_SQL or ut.VERBOSE:
print('[sql] schema modifying tablename=%r' % tablename)
print('[sql] * colmap_list = ' + 'None' if colmap_list is None else
ut.list_str(colmap_list))
if colmap_list is None:
colmap_list = []
colname_list = db.get_column_names(tablename)
colname_original_list = colname_list[:]
coltype_list = db.get_column_types(tablename)
colname_dict = {colname: colname for colname in colname_list}
colmap_dict = {}
insert = False
for (src, dst, type_, map_) in colmap_list:
#is_newcol = dst not in colname_list
#if dst == 'contributor_rowid':
# ut.embed()
if (src is None or isinstance(src, int)):
# Add column
assert dst is not None and len(dst) > 0, (
"New column's name must be valid")
assert type_ is not None and len(type_) > 0, (
"New column's type must be specified")
if isinstance(src, int) and (src < 0 or len(colname_list) <= src):
src = None
if src is None:
colname_list.append(dst)
coltype_list.append(type_)
else:
if insert:
print('[sql] WARNING: multiple index inserted add columns'
', may cause allignment issues')
colname_list.insert(src, dst)
coltype_list.insert(src, type_)
insert = True
else:
try:
assert src in colname_list, (
'Unkown source colname=%s in tablename=%s' % (
src, tablename))
except AssertionError as ex:
ut.printex(ex, keys=['colname_list'])
index = colname_list.index(src)
if dst is None:
# Drop column
assert src is not None and len(src) > 0, (
"Deleted column's name must be valid")
del colname_list[index]
del coltype_list[index]
del colname_dict[src]
elif (len(src) > 0 and len(dst) > 0 and src != dst):
# Rename column
colname_list[index] = dst
colname_dict[src] = dst
# Check if type should change as well
if ((type_ is None or len(type_) == 0) and type_ != coltype_list[index]):
coltype_list[index] = type_
elif len(type_) > 0 and type_ != coltype_list[index]:
# Change column type
if len(dst) == 0:
dst = src
coltype_list[index] = type_
elif map_ is not None:
# Simply map function across table's data
if len(dst) == 0:
dst = src
if len(type_) == 0:
type_ = coltype_list[index]
else:
# Identity, this can be ommited as it is automatically done
if len(dst) == 0:
dst = src
if len(type_) == 0:
type_ = coltype_list[index]
if map_ is not None:
colmap_dict[src] = map_
coldef_list = list(zip(colname_list, coltype_list))
tablename_orig = tablename
tablename_temp = tablename_orig + '_temp' + ut.random_nonce(length=8)
metadata_keyval2 = metadata_keyval.copy()
for suffix in db.table_metadata_keys:
if suffix not in metadata_keyval2 or metadata_keyval2[suffix] is None:
val = db.get_metadata_val(tablename_orig + '_' + suffix, eval_=True)
metadata_keyval2[suffix] = val
db.add_table(tablename_temp, coldef_list, **metadata_keyval2)
# Copy data
src_list = []
dst_list = []
for name in colname_original_list:
if name in colname_dict.keys():
src_list.append(name)
dst_list.append(colname_dict[name])
if len(src_list) > 0:
data_list_ = db.get(tablename, tuple(src_list))
else:
data_list_ = []
# Run functions across all data for specified callums
data_list = [
tuple([
colmap_dict[src_](d) if src_ in colmap_dict.keys() else d
for d, src_ in zip(data, src_list)
])
for data in data_list_
]
# Add the data to the database
def get_rowid_from_superkey(x):
return [None] * len(x)
db.add_cleanly(tablename_temp, dst_list, data_list, get_rowid_from_superkey)
if tablename_new is None:
# Drop original table
db.drop_table(tablename)
# Rename temp table to original table name
db.rename_table(tablename_temp, tablename)
else:
# Rename new table to new name
db.rename_table(tablename_temp, tablename_new)
@default_decor
[docs] def reorder_columns(db, tablename, order_list):
raise NotImplementedError('needs update')
if ut.VERBOSE:
print('[sql] schema column reordering for tablename=%r' % tablename)
# Get current tables
colname_list = db.get_column_names(tablename)
coltype_list = db.get_column_types(tablename)
assert len(colname_list) == len(coltype_list) and len(colname_list) == len(order_list)
assert all([ i in order_list for i in range(len(colname_list)) ]), (
'Order index list invalid')
# Reorder column definitions
combined = sorted(list(zip(order_list, colname_list, coltype_list)))
coldef_list = [ (name, type_) for i, name, type_ in combined ]
tablename_temp = tablename + '_temp' + ut.random_nonce(length=8)
docstr = db.get_table_docstr(tablename)
#constraint = db.get_table_constraints(tablename)
db.add_table(tablename_temp, coldef_list,
#constraint=constraint,
docstr=docstr,)
# Copy data
data_list = db.get(tablename, tuple(colname_list))
# Add the data to the database
get_rowid_from_superkey = (lambda x: [None] * len(x))
db.add_cleanly(tablename_temp, colname_list, data_list, get_rowid_from_superkey)
# Drop original table
db.drop_table(tablename)
# Rename temp table to original table name
db.rename_table(tablename_temp, tablename)
@default_decor
[docs] def duplicate_table(db, tablename, tablename_duplicate):
if VERBOSE_SQL:
print('[sql] schema duplicating tablename=%r into tablename=%r' %
(tablename, tablename_duplicate))
db.modify_table(tablename, [], tablename_new=tablename_duplicate)
@default_decor
[docs] def duplicate_column(db, tablename, colname, colname_duplicate):
if ut.VERBOSE:
print('[sql] schema duplicating tablename.colname=%r.%r into tablename.colname=%r.%r' %
(tablename, colname, tablename, colname_duplicate))
# Modify table to add a blank column with the appropriate tablename and NO data
column_names = db.get_column_names(tablename)
column_types = db.get_column_types(tablename)
assert len(column_names) == len(column_types)
try:
index = column_names.index(colname)
except Exception:
if VERBOSE_SQL:
print('[!sql] could not find colname=%r to duplicate' % colname)
return
# Add column to the table
# DATABASE TABLE CACHES ARE UPDATED WITH add_column
db.add_column(tablename, colname_duplicate, column_types[index])
# Copy the data from the original column to the new duplcate column
fmtkw = {
'tablename': tablename,
'colname_duplicate': colname_duplicate,
'colname': colname,
}
op_fmtstr = 'UPDATE {tablename} SET {colname_duplicate} = {colname}'
operation = op_fmtstr.format(**fmtkw)
db.executeone(operation, [], verbose=False)
@default_decor
[docs] def rename_table(db, tablename_old, tablename_new):
if ut.VERBOSE:
print('[sql] schema renaming tablename=%r -> %r' % (tablename_old, tablename_new))
# Technically insecure call, but all entries are statically inputted by
# the database's owner, who could delete or alter the entire database
# anyway.
fmtkw = {
'tablename_old': tablename_old,
'tablename_new': tablename_new,
}
op_fmtstr = 'ALTER TABLE {tablename_old} RENAME TO {tablename_new}'
operation = op_fmtstr.format(**fmtkw)
db.executeone(operation, [], verbose=False)
# Rename table's metadata
key_old_list = [tablename_old + '_' + suffix for suffix in db.table_metadata_keys]
key_new_list = [tablename_new + '_' + suffix for suffix in db.table_metadata_keys]
id_iter = [(key,) for key in key_old_list]
val_iter = [(key,) for key in key_new_list]
colnames = ('metadata_key',)
#print('Setting metadata_key from %s to %s' % (ut.list_str(id_iter), ut.list_str(val_iter)))
#ut.embed()
db.set(const.METADATA_TABLE, colnames, val_iter, id_iter, id_colname='metadata_key')
@default_decor
[docs] def rename_column(db, tablename, colname_old, colname_new):
# DATABASE TABLE CACHES ARE UPDATED WITH modify_table
db.modify_table(tablename, (
(colname_old, colname_new, '', None),
))
@default_decor
[docs] def drop_table(db, tablename):
if VERBOSE_SQL:
print('[sql] schema dropping tablename=%r' % tablename)
# Technically insecure call, but all entries are statically inputted by
# the database's owner, who could delete or alter the entire database
# anyway.
fmtkw = {
'tablename': tablename,
}
op_fmtstr = 'DROP TABLE IF EXISTS {tablename}'
operation = op_fmtstr.format(**fmtkw)
db.executeone(operation, [], verbose=False)
# Delete table's metadata
key_list = [tablename + '_' + suffix for suffix in db.table_metadata_keys]
db.delete(const.METADATA_TABLE, key_list, id_colname='metadata_key')
@default_decor
[docs] def drop_column(db, tablename, colname):
""" # DATABASE TABLE CACHES ARE UPDATED WITH modify_table """
db.modify_table(tablename, (
(colname, None, '', None),
))
#==============
# CONVINENCE
#==============
@default_decor
[docs] def dump_tables_to_csv(db):
""" Convenience: Dumps all csv database files to disk """
dump_dir = join(db.dir_, 'CSV_DUMP')
ut.ensuredir(dump_dir)
for tablename in db.get_table_names():
table_fname = tablename + '.csv'
table_csv = db.get_table_csv(tablename)
with open(join(dump_dir, table_fname), 'w') as file_:
file_.write(table_csv)
@default_decor
[docs] def get_schema_current_autogeneration_str(db, autogen_cmd):
""" Convenience: Autogenerates the most up-to-date database schema
CommandLine:
python -m ibeis.control.SQLDatabaseControl --exec-get_schema_current_autogeneration_str
Example:
>>> # ENABLE_DOCTEST
>>> import ibeis
>>> ibs = ibeis.opendb('testdb1')
>>> result = ibs.db.get_schema_current_autogeneration_str('')
>>> print(result)
"""
db_version_current = db.get_db_version()
# Define what tab space we want to save
tab1 = ' ' * 4
tab2 = ' ' * 8
line_list = []
#autogen_cmd = 'python -m ibeis.control.DB_SCHEMA --test-test_dbschema
#--force-incremental-db-update --dump-autogen-schema'
# File Header
line_list.append(ut.TRIPLE_DOUBLE_QUOTE)
line_list.append('AUTOGENERATED ON ' + ut.timestamp('printable'))
line_list.append('AutogenCommandLine:')
# TODO: Fix autogen command
line_list.append(ut.indent(autogen_cmd, tab1))
line_list.append(ut.TRIPLE_DOUBLE_QUOTE)
line_list.append('# -*- coding: utf-8 -*-')
#line_list.append('from __future__ import absolute_import, division, print_function')
line_list.append('from __future__ import absolute_import, division, print_function, unicode_literals')
line_list.append('from ibeis import constants as const')
line_list.append('\n')
line_list.append('# =======================')
line_list.append('# Schema Version Current')
line_list.append('# =======================')
line_list.append('\n')
line_list.append('VERSION_CURRENT = %s' % ut.repr2(db_version_current))
line_list.append('\n')
line_list.append('def update_current(db, ibs=None):')
# Function content
first = True
for tablename in sorted(db.get_table_names()):
if first:
first = False
else:
line_list.append('%s' % '')
# Hack to find the name of the constant variable
constant_name = None
for variable, value in const.__dict__.iteritems():
if value == tablename:
constant_name = variable
break
# assert constant_name is not None, "Table name does not exists in constants"
if constant_name is not None:
line_list.append(tab1 + 'db.add_table(const.%s, [' % (constant_name, ))
else:
line_list.append(tab1 + 'db.add_table(%s, [' % (ut.repr2(tablename),))
column_list = db.get_columns(tablename)
for column in column_list:
col_name = ('%s,' % ut.repr2(__STR__(column[1]))).ljust(32)
col_type = str(column[2])
if column[5] == 1: # Check if PRIMARY KEY
col_type += ' PRIMARY KEY'
elif column[3] == 1: # Check if NOT NULL
col_type += ' NOT NULL'
elif column[4] is not None:
col_type += ' DEFAULT ' + __STR__(column[4]) # Specify default value
line_list.append(tab2 + '(%s%s),' % (col_name, ut.repr2(col_type), ))
line_list.append(tab1 + '],')
superkeys = db.get_table_superkey_colnames(tablename)
docstr = db.get_table_docstr(tablename)
# Append metadata values
specially_handled_table_metakeys = ['docstr', 'superkeys',
#'constraint',
'dependsmap']
def quote_docstr(docstr):
import textwrap
wraped_docstr = '\n'.join(textwrap.wrap(ut.textblock(docstr)))
indented_docstr = ut.indent(wraped_docstr.strip(), tab2)
_TSQ = ut.TRIPLE_SINGLE_QUOTE
quoted_docstr = _TSQ + '\n' + indented_docstr + '\n' + tab2 + _TSQ
return quoted_docstr
line_list.append(tab2 + 'docstr=' + quote_docstr(docstr) + ',')
line_list.append(tab2 + 'superkeys=%s,' % (ut.repr2(superkeys), ))
#line_list.append(tab2 + 'constraint=%r,' %
#(db.get_metadata_val(tablename + '_constraint'),))
# Hack out docstr and superkeys for now
for suffix in db.table_metadata_keys:
if suffix in specially_handled_table_metakeys:
continue
key = tablename + '_' + suffix
val = db.get_metadata_val(key, eval_=True, default=None)
if val is not None:
#ut.embed()
line_list.append(tab2 + '%s=%s,' % (suffix, ut.repr2(val)))
dependsmap = db.get_metadata_val(tablename + '_dependsmap', eval_=True, default=None)
if dependsmap is not None:
_dictstr = ut.indent(ut.repr2(dependsmap, nl=1), tab2)
depends_map_dictstr = ut.align(_dictstr.lstrip(' '), ':')
# hack for formatting
depends_map_dictstr = depends_map_dictstr.replace(tab1 + '}', '}')
line_list.append(tab2 + 'dependsmap=%s,' % (depends_map_dictstr,))
line_list.append(tab1 + ')')
line_list.append('')
return '\n'.join(line_list)
@default_decor
[docs] def dump_schema(db):
"""
Convenience: Dumps all csv database files to disk
NOTE: This function is semi-obsolete because of the auto-generated
current schema file. Use dump_schema_current_autogeneration instead for
all purposes except for parsing out the database schema or for consice visual
representation.
"""
app_resource_dir = ut.get_app_resource_dir('ibeis')
dump_fpath = join(app_resource_dir, 'schema.txt')
with open(dump_fpath, 'w') as file_:
for tablename in sorted(db.get_table_names()):
file_.write(tablename + '\n')
column_list = db.get_columns(tablename)
for column in column_list:
col_name = str(column[1]).ljust(30)
col_type = str(column[2]).ljust(10)
col_null = str(('ALLOW NULL' if column[3] == 1 else 'NOT NULL')).ljust(12)
col_default = str(column[4]).ljust(10)
col_key = str(('KEY' if column[5] == 1 else ''))
col = (col_name, col_type, col_null, col_default, col_key)
file_.write('\t%s%s%s%s%s\n' % col)
ut.view_directory(app_resource_dir)
@default_decor
[docs] def get_table_names(db):
""" Conveinience: """
db.cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
tablename_list = db.cur.fetchall()
return [str(tablename[0]) for tablename in tablename_list]
@default_decor
[docs] def get_table_constraints(db, tablename):
constraint = db.get_metadata_val(tablename + '_constraint', default=None)
#where_clause = 'metadata_key=?'
#colnames = ('metadata_value',)
#data = [(tablename + '_constraint',)]
#constraint = db.get_where(const.METADATA_TABLE, colnames, data, where_clause)
#constraint = constraint[0]
if constraint is None:
return None
else:
return constraint.split(';')
@default_decor
[docs] def get_table_superkey_colnames(db, tablename):
"""
get_table_superkey_colnames
Actually resturns a list of tuples. need to change the name to
get_table_superkey_colnames_list
Args:
tablename (str):
Returns:
list: superkeys
CommandLine:
python -m ibeis.control.SQLDatabaseControl --test-get_table_superkey_colnames
python -m ibeis --tf get_table_superkey_colnames --tablename=contributors
python -m ibeis --tf get_table_superkey_colnames --db PZ_Master0 --tablename=annotations
python -m ibeis --tf get_table_superkey_colnames --db PZ_Master0 --tablename=contributors # NOQA
Example0:
>>> # ENABLE_DOCTEST
>>> import ibeis
>>> ibs = ibeis.opendb(defaultdb='testdb1')
>>> tablename = ut.get_argval('--tablename', type_=str, default='lblimage')
>>> result = ut.list_str(ibs.db.get_table_superkey_colnames(tablename), nl=False)
>>> print(result)
[('lbltype_rowid', 'lblimage_value')]
"""
assert tablename in db.get_table_names(), (
'tablename=%r is not a part of this database' % (tablename,))
superkey_colnames_list_repr = db.get_metadata_val(tablename + '_superkeys', default=None)
# These asserts might not be valid, but in that case this function needs
# to be rewritten under a different name
#assert len(superkeys) == 1, 'INVALID DEVELOPER ASSUMPTION IN
#SQLCONTROLLER. MORE THAN 1 SUPERKEY'
if superkey_colnames_list_repr is None:
superkeys = []
pass
else:
#superkey_colnames = superkey_colnames_str.split(';')
#with ut.EmbedOnException():
if superkey_colnames_list_repr.find(';') > -1:
# SHOW NOT HAPPEN
# hack for old metadata superkey_val format
superkeys = [tuple(map(str, superkey_colnames_list_repr.split(';')))]
else:
# new evalable format
superkeys = eval(superkey_colnames_list_repr)
#superkeys = [
# None if superkey_colname is None else str(superkey_colname)
# for superkey_colname in superkey_colnames
#]
superkeys = list(map(tuple, superkeys))
return superkeys
@default_decor
[docs] def get_table_primarykey_colnames(db, tablename):
columns = db.get_columns(tablename)
primarykey_colnames = tuple([
name
for (column_id, name, type_, notnull, dflt_value, pk,) in columns
if pk
])
return primarykey_colnames
@default_decor
[docs] def get_table_otherkey_colnames(db, tablename):
flat_superkey_colnames_list = ut.flatten(db.get_table_superkey_colnames(tablename))
#superkeys = set((lambda x: x if x is not None else
#[])(db.get_table_superkey_colnames(tablename)))
columns = db.get_columns(tablename)
otherkey_colnames = [name
for (column_id, name, type_, notnull, dflt_value, pk,) in columns
if (not pk and # not primary key
name not in flat_superkey_colnames_list)]
return otherkey_colnames
@default_decor
[docs] def get_table_docstr(db, tablename):
r"""
CommandLine:
python -m ibeis.control.SQLDatabaseControl --exec-get_table_docstr
Example0:
>>> # ENABLE_DOCTEST
>>> import ibeis
>>> ibs = ibeis.opendb(defaultdb='testdb1')
>>> tablename = ut.get_argval('--tablename', type_=str, default='contributors')
>>> docstr = ibs.db.get_table_docstr(tablename)
>>> print(docstr)
"""
docstr = db.get_metadata_val(tablename + '_docstr')
#where_clause = 'metadata_key=?'
#colnames = ('metadata_value',)
#data = [(tablename + '_docstr',)]
#docstr = db.get_where(const.METADATA_TABLE, colnames, data, where_clause)[0]
return docstr
@default_decor
[docs] def get_columns(db, tablename):
"""
get_columns
Args:
tablename (str): table name
Returns:
column_list : list of tuples with format:
(
column_id : id of the column
name : the name of the column
type_ : the type of the column
notnull : 0 or 1 if the column can contains null values
dflt_value : the default value
pk : 0 or 1 if the column partecipate to the primary key
)
References:
http://stackoverflow.com/questions/17717829/how-to-get-column-names-from-a-table-in-sqlite-via-pragma-net-c
http://stackoverflow.com/questions/1601151/how-do-i-check-in-sqlite-whether-a-table-exists
CommandLine:
python -m ibeis.control.SQLDatabaseControl --exec-get_columns
python -m ibeis.control.SQLDatabaseControl --exec-get_columns --tablename=contributors
python -m ibeis.control.SQLDatabaseControl --exec-get_columns --tablename=nonexist
Example:
>>> # ENABLE_DOCTEST
>>> from ibeis.control.SQLDatabaseControl import * # NOQA
>>> import ibeis
>>> db = ibeis.opendb(defaultdb='testdb1').db
>>> tablename = ut.get_argval('--tablename', type_=str, default=ibeis.const.NAME_TABLE)
>>> colrichinfo_list = db.get_columns(tablename)
>>> result = ('colrichinfo_list = %s' % (ut.list_str(colrichinfo_list),))
>>> print(result)
colrichinfo_list = [
(0, 'name_rowid', 'INTEGER', 0, None, 1),
(1, 'name_uuid', 'UUID', 1, None, 0),
(2, 'name_text', 'TEXT', 1, None, 0),
(3, 'name_note', 'TEXT', 0, None, 0),
(4, 'name_temp_flag', 'INTEGER', 0, '0', 0),
(5, 'name_alias_text', 'TEXT', 0, None, 0),
(6, 'name_sex', 'INTEGER', 0, '-1', 0),
]
"""
# check if the table exists first. Throws an error if it does not exist.
db.cur.execute('SELECT 1 FROM ' + tablename + ' LIMIT 1')
db.cur.execute("PRAGMA TABLE_INFO('" + tablename + "')")
colinfo_list = db.cur.fetchall()
colrichinfo_list = [SQLColumnRichInfo(*colinfo) for colinfo in colinfo_list]
return colrichinfo_list
@default_decor
[docs] def get_column_names(db, tablename):
""" Conveinience: Returns the sql tablename columns """
column_list = db.get_columns(tablename)
column_names = [str(column[1]) for column in column_list]
return column_names
@default_decor
[docs] def get_column_types(db, tablename):
""" Conveinience: Returns the sql tablename columns """
def _format(type_, null, default, key):
if key == 1:
return type_ + " PRIMARY KEY"
elif null == 1:
return type_ + " NOT NULL"
elif default is not None:
return type_ + " DEFAULT " + default
else:
return type_
column_list = db.get_columns(tablename)
column_types = [ column[2] for column in column_list]
column_null = [ column[3] for column in column_list]
column_default = [ column[4] for column in column_list]
column_key = [ column[5] for column in column_list]
column_types_ = [
_format(type_, null, default, key)
for type_, null, default, key
in zip(column_types, column_null, column_default, column_key)
]
return column_types_
@default_decor
[docs] def get_column(db, tablename, name):
""" Conveinience: """
_table, (_column,) = sanitize_sql(db, tablename, (name,))
column_vals = db.executeone(
operation='''
SELECT %s
FROM %s
ORDER BY rowid ASC
''' % (_column, _table))
return column_vals
@default_decor
[docs] def get_table_column_data(db, tablename, exclude_columns=[]):
"""
CommandLine:
python -m ibeis.control.SQLDatabaseControl --test-get_table_column_data
Example:
>>> # ENABLE_DOCTEST
>>> from ibeis.control.SQLDatabaseControl import * # NOQA
>>> # build test data
>>> import ibeis
>>> ibs = ibeis.opendb('testdb1')
>>> db = ibs.db
>>> tablename = ibeis.const.ANNOTATION_TABLE
>>> exclude_columns = []
>>> # execute function
>>> column_list, column_names = db.get_table_column_data(tablename)
>>> # verify results
"""
all_column_names = db.get_column_names(tablename)
isvalid_list = [name not in exclude_columns for name in all_column_names]
column_names = ut.filter_items(all_column_names, isvalid_list)
column_list = [db.get_column(tablename, name)
for name in column_names if name not in exclude_columns]
return column_list, column_names
[docs] def make_json_table_definition(db, tablename):
r"""
VERY HACKY FUNC RIGHT NOW. NEED TO FIX LATER
Args:
tablename (?):
Returns:
?: new_transferdata
CommandLine:
python -m ibeis --tf SQLDatabaseControl.make_json_table_definition
CommandLine:
python -m utool --tf iter_module_doctestable --modname=ibeis.control.SQLDatabaseControl
--include_inherited=True
python -m ibeis.control.SQLDatabaseControl --exec-make_json_table_definition
Example:
>>> # DISABLE_DOCTEST
>>> from ibeis.control.SQLDatabaseControl import * # NOQA
>>> import ibeis
>>> ibs = ibeis.opendb('testdb1')
>>> db = ibs.db
>>> # TODO: define heirarchy
>>> tablename = ibs.const.ANNOTATION_TABLE
>>> annot_table_def = db.make_json_table_definition(tablename)
>>> tablename = ibs.const.IMAGE_TABLE
>>> image_table_def = db.make_json_table_definition(tablename)
>>> print('annot_attrs = %s' % (ut.repr2(annot_table_def, nl=True),))
>>> print('image_attrs = %s' % (ut.repr2(image_table_def, nl=True),))
Ignore:
annot_table_def = {
'annot_rowid': 'INTEGER',
'annot_uuid': 'UUID',
'annot_xtl': 'INTEGER',
'annot_ytl': 'INTEGER',
'annot_width': 'INTEGER',
'annot_height': 'INTEGER',
'annot_theta': 'REAL',
'annot_num_verts': 'INTEGER',
'annot_verts': 'TEXT',
'annot_yaw': 'REAL',
'annot_detect_confidence': 'REAL',
'annot_exemplar_flag': 'INTEGER',
'annot_note': 'TEXT',
'annot_visual_uuid': 'UUID',
'annot_semantic_uuid': 'UUID',
'annot_quality': 'INTEGER',
'annot_age_months_est_min': 'INTEGER',
'annot_age_months_est_max': 'INTEGER',
'annot_tags': 'TEXT',
'species_text': 'DATA',
'name_text': 'DATA',
'image_uuid': 'DATA',
}
image_table_def = {
'image_rowid': 'INTEGER',
'image_uuid': 'UUID',
'image_uri': 'TEXT',
'image_ext': 'TEXT',
'image_original_name': 'TEXT',
'image_width': 'INTEGER',
'image_height': 'INTEGER',
'image_time_posix': 'INTEGER',
'image_gps_lat': 'REAL',
'image_gps_lon': 'REAL',
'image_toggle_enabled': 'INTEGER',
'image_toggle_reviewed': 'INTEGER',
'image_note': 'TEXT',
'image_timedelta_posix': 'INTEGER',
'image_original_path': 'TEXT',
'image_location_code': 'TEXT',
'contributor_tag': 'DATA',
'party_tag': 'DATA',
}
"""
new_transferdata = db.get_table_new_transferdata(tablename)
(column_list, column_names, extern_colx_list,
extern_superkey_colname_list, extern_superkey_colval_list,
extern_tablename_list, extern_primarycolnames_list) = new_transferdata
dependsmap = db.get_metadata_val(tablename + '_dependsmap', eval_=True, default=None)
#dependson = db.get_metadata_val(tablename + '_dependson', eval_=True, default=None)
richcolinfo_list = db.get_columns(tablename)
table_dict_def = ut.odict([(r.name, r.type_) for r in richcolinfo_list])
if dependsmap is not None:
for key, val in dependsmap.items():
if val[0] == tablename:
del table_dict_def[key]
elif val[1] is None:
del table_dict_def[key]
else:
# replace with superkey
del table_dict_def[key]
_deptablecols = db.get_columns(val[0])
superkey = val[2]
assert len(superkey) == 1, 'unhandled'
colinfo = {_.name: _ for _ in _deptablecols}[superkey[0]]
table_dict_def[superkey[0]] = colinfo.type_
#json_def_str = ut.dict_str(table_dict_def, aligned=True)
return table_dict_def
#table_obj_def =
@default_decor
[docs] def get_table_new_transferdata(db, tablename, exclude_columns=[]):
"""
CommandLine:
python -m ibeis.control.SQLDatabaseControl --test-get_table_column_data
python -m ibeis.control.SQLDatabaseControl --test-get_table_new_transferdata
python -m ibeis.control.SQLDatabaseControl --test-get_table_new_transferdata:1
Example:
>>> # ENABLE_DOCTEST
>>> from ibeis.control.SQLDatabaseControl import * # NOQA
>>> import ibeis
>>> ibs = ibeis.opendb('testdb1')
>>> db = ibs.db
>>> exclude_columns = []
>>> tablename_list = ibs.db.get_table_names()
>>> for tablename in tablename_list:
... new_transferdata = db.get_table_new_transferdata(tablename)
... column_list, column_names, extern_colx_list, extern_superkey_colname_list, extern_superkey_colval_list, extern_tablename_list, extern_primarycolnames_list = new_transferdata
... print('tablename = %r' % (tablename,))
... print('colnames = ' + ut.list_str(column_names))
... print('extern_colx_list = ' + ut.list_str(extern_colx_list))
... print('extern_superkey_colname_list = ' + ut.list_str(extern_superkey_colname_list))
... print('L___')
Example:
>>> # ENABLE_DOCTEST
>>> from ibeis.control.SQLDatabaseControl import * # NOQA
>>> import ibeis
>>> ibs = ibeis.opendb('testdb1')
>>> db = ibs.db
>>> exclude_columns = []
>>> tablename = ibs.const.IMAGE_TABLE
>>> new_transferdata = db.get_table_new_transferdata(tablename)
>>> column_list, column_names, extern_colx_list, extern_superkey_colname_list, extern_superkey_colval_list, extern_tablename_list, extern_primarycolnames_list = new_transferdata
>>> dependsmap = db.get_metadata_val(tablename + '_dependsmap', eval_=True, default=None)
>>> print('tablename = %r' % (tablename,))
>>> print('colnames = ' + ut.list_str(column_names))
>>> print('extern_colx_list = ' + ut.list_str(extern_colx_list))
>>> print('extern_superkey_colname_list = ' + ut.list_str(extern_superkey_colname_list))
>>> print('dependsmap = %s' % (ut.repr2(dependsmap, nl=True),))
>>> print('L___')
>>> tablename = ibs.const.ANNOTATION_TABLE
>>> new_transferdata = db.get_table_new_transferdata(tablename)
>>> column_list, column_names, extern_colx_list, extern_superkey_colname_list, extern_superkey_colval_list, extern_tablename_list, extern_primarycolnames_list = new_transferdata
>>> dependsmap = db.get_metadata_val(tablename + '_dependsmap', eval_=True, default=None)
>>> print('tablename = %r' % (tablename,))
>>> print('colnames = ' + ut.list_str(column_names))
>>> print('extern_colx_list = ' + ut.list_str(extern_colx_list))
>>> print('extern_superkey_colname_list = ' + ut.list_str(extern_superkey_colname_list))
>>> print('dependsmap = %s' % (ut.repr2(dependsmap, nl=True),))
>>> print('L___')
"""
all_column_names = db.get_column_names(tablename)
isvalid_list = [name not in exclude_columns for name in all_column_names]
column_names = ut.filter_items(all_column_names, isvalid_list)
column_list = [db.get_column(tablename, name)
for name in column_names if name not in exclude_columns]
extern_colx_list = []
extern_tablename_list = []
extern_superkey_colname_list = []
extern_superkey_colval_list = []
extern_primarycolnames_list = []
dependsmap = db.get_metadata_val(tablename + '_dependsmap', eval_=True, default=None)
if dependsmap is not None:
for colname, dependtup in six.iteritems(dependsmap):
assert len(dependtup) == 3, 'must be 3 for now'
(extern_tablename, extern_primary_colnames, extern_superkey_colnames) = dependtup
if extern_primary_colnames is None:
#ut.embed()
# INFER PRIMARY COLNAMES
extern_primary_colnames = db.get_table_primarykey_colnames(extern_tablename)
if extern_superkey_colnames is None:
def get_standard_superkey_colnames(tablename_):
try:
# FIXME: Rectify duplicate code
superkeys = db.get_table_superkey_colnames(tablename_)
if len(superkeys) > 1:
#primary_superkey =
#db.get_metadata_val(tablename_ +
# '_primary_superkey',
# eval_=True)
primary_superkey = db.get_metadata_val(
tablename_ + '_primary_superkey', eval_=True)
db.get_table_superkey_colnames('contributors')
if primary_superkey is None:
raise AssertionError(
('tablename_=%r has multiple superkeys=%r, '
'but no primary superkey.'
' A primary superkey is required') % (
tablename_, superkeys))
else:
index = superkeys.index(primary_superkey)
superkey_colnames = superkeys[index]
elif len(superkeys) == 1:
superkey_colnames = superkeys[0]
else:
print(db.get_table_csv_header(tablename_))
db.print_table_csv('metadata', exclude_columns=['metadata_value'])
# Execute hack to fix contributor tables
if tablename_ == 'contributors':
# hack to fix contributors table
constraint_str = db.get_metadata_val(tablename_ + '_constraint')
parse_result = parse.parse(
'CONSTRAINT superkey UNIQUE ({superkey})',
constraint_str)
superkey = parse_result['superkey']
assert superkey == 'contributor_tag', 'hack failed1'
assert None is db.get_metadata_val('contributors_superkey'), (
'hack failed2')
if True:
db.set_metadata_val('contributors_superkeys',
"[('" + superkey + "',)]")
raise NotImplementedError(
'Cannot Handle: len(superkeys) == 0. '
'Probably a degenerate case')
except Exception as ex:
ut.printex(ex, 'Error Getting superkey colnames',
keys=['tablename_', 'superkeys'])
raise
return superkey_colnames
try:
extern_superkey_colnames = get_standard_superkey_colnames(extern_tablename)
except Exception as ex:
ut.printex(ex, 'Error Building Transferdata',
keys=['tablename_', 'dependtup'])
raise
# INFER SUPERKEY COLNAMES
colx = ut.listfind(column_names, colname)
extern_rowids = column_list[colx]
superkey_column = db.get(extern_tablename, extern_superkey_colnames, extern_rowids)
extern_colx_list.append(colx)
extern_superkey_colname_list.append(extern_superkey_colnames)
extern_superkey_colval_list.append(superkey_column)
extern_tablename_list.append(extern_tablename)
extern_primarycolnames_list.append(extern_primary_colnames)
new_transferdata = (column_list, column_names, extern_colx_list,
extern_superkey_colname_list,
extern_superkey_colval_list, extern_tablename_list,
extern_primarycolnames_list)
return new_transferdata
#def import_table_new_transferdata(tablename, new_transferdata):
# pass
@default_decor
[docs] def merge_databases_new(db, db_src, ignore_tables=None, rowid_subsets=None):
r"""
Copies over all non-rowid properties into another sql table. handles annotated dependenceis.
Does not handle external files
Could handle dependency tree order, but not yet implemented.
Args:
db_src (SQLController): merge data from db_src into db
CommandLine:
python -m ibeis.control.SQLDatabaseControl --test-merge_databases_new:0
python -m ibeis.control.SQLDatabaseControl --test-merge_databases_new:2
Example0:
>>> # DISABLE_DOCTEST
>>> from ibeis.control.SQLDatabaseControl import * # NOQA
>>> import ibeis
>>> #ibs_dst = ibeis.opendb(dbdir='testdb_dst')
>>> ibs_src = ibeis.opendb(db='testdb1')
>>> # OPEN A CLEAN DATABASE
>>> ibs_dst = ibeis.opendb(dbdir='test_sql_merge_dst1', allow_newdir=True, delete_ibsdir=True)
>>> ibs_src.ensure_contributor_rowids()
>>> # build test data
>>> db = ibs_dst.db
>>> db_src = ibs_src.db
>>> rowid_subsets = None
>>> # execute function
>>> db.merge_databases_new(db_src)
Example1:
>>> # DISABLE_DOCTEST
>>> from ibeis.control.SQLDatabaseControl import * # NOQA
>>> import ibeis
>>> ibs_src = ibeis.opendb(db='testdb2')
>>> # OPEN A CLEAN DATABASE
>>> ibs_dst = ibeis.opendb(dbdir='test_sql_merge_dst2', allow_newdir=True, delete_ibsdir=True)
>>> ibs_src.ensure_contributor_rowids()
>>> # build test data
>>> db = ibs_dst.db
>>> db_src = ibs_src.db
>>> ignore_tables = ['lblannot', 'lblimage', 'image_lblimage_relationship', 'annotation_lblannot_relationship', 'keys']
>>> rowid_subsets = None
>>> # execute function
>>> db.merge_databases_new(db_src, ignore_tables=ignore_tables)
Example2:
>>> # DISABLE_DOCTEST
>>> from ibeis.control.SQLDatabaseControl import * # NOQA
>>> import ibeis
>>> ibs_src = ibeis.opendb(db='testdb2')
>>> # OPEN A CLEAN DATABASE
>>> ibs_src.fix_invalid_annotmatches()
>>> ibs_dst = ibeis.opendb(dbdir='test_sql_subexport_dst2', allow_newdir=True, delete_ibsdir=True)
>>> ibs_src.ensure_contributor_rowids()
>>> #ibs_src.delete_all_encounters()
>>> # build test data
>>> db = ibs_dst.db
>>> db_src = ibs_src.db
>>> ignore_tables = ['lblannot', 'lblimage', 'image_lblimage_relationship', 'annotation_lblannot_relationship', 'keys']
>>> # execute function
>>> aid_subset = [1, 2, 3]
>>> rowid_subsets = {const.ANNOTATION_TABLE: aid_subset,
... const.NAME_TABLE: ibs_src.get_annot_nids(aid_subset),
... const.IMAGE_TABLE: ibs_src.get_annot_gids(aid_subset),
... const.ANNOTMATCH_TABLE: [],
... const.EG_RELATION_TABLE: [],
... }
>>> db.merge_databases_new(db_src, ignore_tables=ignore_tables, rowid_subsets=rowid_subsets)
"""
verbose = True
veryverbose = True
# Check version consistency
version_dst = db.get_metadata_val('database_version')
version_src = db_src.get_metadata_val('database_version')
assert version_src == version_dst, 'cannot merge databases that have different versions'
# Get merge tablenames
all_tablename_list = db.get_table_names()
# always ignore the metadata table.
ignore_tables_ = ['metadata']
if ignore_tables is None:
ignore_tables = []
ignore_tables_ += ignore_tables
tablename_list = [tablename for tablename in all_tablename_list
if tablename not in ignore_tables_]
# Reorder tablenames based on dependencies.
# the tables with dependencies are merged after the tables they depend on
dependsmap_list = [
db.get_metadata_val(tablename + '_dependsmap', eval_=True,
default=None)
for tablename in tablename_list]
dependency_digraph = {
tablename: [] if dependsmap is None else ut.get_list_column(dependsmap.values(), 0)
for dependsmap, tablename in zip(dependsmap_list, tablename_list)
}
#EXTEND_SUBSETS = False
#if EXTEND_SUBSETS:
# inverted_digraph = ut.ddict(list)
# for key, item_list in six.iteritems(dependency_digraph):
# for item in item_list:
# inverted_digraph[item].append(key)
# #dependency_digraph
# inverted_digraph = dict(inverted_digraph)
# rowid_subsets
# seen_ = set([])
# #def expand_subsets(rowid_subsets, seen_=seen_):
# # key_list = list(set(rowid_subsets.keys()) - seen_)
# # for key in key_list:
# # item_list = dependency_digraph[key]
# # for item in item_list:
# # dependsmap = dependsmap_list[tablename_list.index(key)]
# # new_transferdata = db_src.get_table_new_transferdata(tablename)
# # # FIXME: SUPER DUPLICATE CODE
# # (column_list, column_names,
# # extern_colx_list, extern_superkey_colname_list,
# # extern_superkey_colval_list, extern_tablename_list,
# # extern_primarycolnames_list
# # ) = new_transferdata
# # for index, (key2, val2) in enumerate(six.iteritems(dependsmap)):
# # if key2 == 'image_rowid':
# # break
# # column_list[key2]
# # column_list[index]
# # pass
# # pass
# # pass
def find_depth(tablename, dependency_digraph):
"""
depth first search to find root self cycles are counted as 0 depth
will break if a true cycle exists
"""
depth_list = [
find_depth(depends_tablename, dependency_digraph)
if depends_tablename != tablename else
0
for depends_tablename in dependency_digraph[tablename]
]
depth = 0 if len(depth_list) == 0 else max(depth_list) + 1
return depth
order_list = [find_depth(tablename, dependency_digraph) for tablename in tablename_list]
sorted_tablename_list = ut.sortedby(tablename_list, order_list)
# ================================
# Merge each table into new database
# ================================
#tablename_to_rowidmap = {} # TODO
#old_rowids_to_new_roids
for tablename in sorted_tablename_list:
if verbose:
print('\n[sqlmerge] Merging tablename=%r' % (tablename,))
# Collect the data from the source table that will be merged in
new_transferdata = db_src.get_table_new_transferdata(tablename)
# FIXME: This needs to pass back sparser output
(column_list, column_names,
# These fields are for external data dependencies. We need to find what the
# new rowids will be in the destintation database
extern_colx_list, extern_superkey_colname_list,
extern_superkey_colval_list, extern_tablename_list,
extern_primarycolnames_list
) = new_transferdata
# FIXME: extract the primary rowid column a little bit nicer
assert column_names[0].endswith('_rowid')
old_rowid_list = column_list[0]
column_names_ = column_names[1:]
column_list_ = column_list[1:]
# +=================================================
# WIP: IF SUBSET REQUSTED FILTER OUT INVALID ROWIDS
if rowid_subsets is not None and tablename in rowid_subsets:
valid_rowids = set(rowid_subsets[tablename])
isvalid_list = [rowid in valid_rowids for rowid in old_rowid_list]
valid_old_rowid_list = ut.filter_items(old_rowid_list, isvalid_list)
valid_column_list_ = [ut.filter_items(col, isvalid_list) for col in column_list_]
valid_extern_superkey_colval_list = [ut.filter_items(col, isvalid_list)
for col in extern_superkey_colval_list]
print(' * filtered number of rows from %d to %d.' % (
len(valid_rowids), len(valid_old_rowid_list)))
else:
print(' * no filtering requested')
valid_extern_superkey_colval_list = extern_superkey_colval_list
valid_old_rowid_list = old_rowid_list
valid_column_list_ = column_list_
#if len(valid_old_rowid_list) == 0:
# continue
# L=================================================
# ================================
# Resolve external superkey lookups
# ================================
if len(extern_colx_list) > 0:
if verbose:
print('[sqlmerge] %s has %d externaly dependant columns to resolve' % (
tablename, len(extern_colx_list)))
modified_column_list_ = valid_column_list_[:]
new_extern_rowid_list = []
# Find the mappings from the old tables rowids to the new tables rowids
for tup in zip(extern_colx_list, extern_superkey_colname_list,
valid_extern_superkey_colval_list,
extern_tablename_list,
extern_primarycolnames_list):
(colx, extern_superkey_colname, extern_superkey_colval,
extern_tablename, extern_primarycolname) = tup
source_colname = column_names_[colx - 1]
if veryverbose or verbose:
if veryverbose:
print('[sqlmerge] +--')
print(('[sqlmerge] * resolving source_colname=%r \n'
' via extern_superkey_colname=%r ...\n'
' -> extern_primarycolname=%r. colx=%r')
% (source_colname, extern_superkey_colname,
extern_primarycolname, colx))
elif verbose:
print('[sqlmerge] * resolving %r via %r -> %r'
% (source_colname, extern_superkey_colname,
extern_primarycolname))
_params_iter = list(zip(extern_superkey_colval))
new_extern_rowids = db.get_rowid_from_superkey(
extern_tablename, _params_iter, superkey_colnames=extern_superkey_colname)
num_Nones = sum(ut.flag_None_items(new_extern_rowids))
if verbose:
print('[sqlmerge] * there were %d none items' % (num_Nones,))
#ut.assert_all_not_None(new_extern_rowids)
new_extern_rowid_list.append(new_extern_rowids)
for colx, new_extern_rowids in zip(extern_colx_list, new_extern_rowid_list):
modified_column_list_[colx - 1] = new_extern_rowids
else:
modified_column_list_ = valid_column_list_
# ================================
# Merge into db with add_cleanly
# ================================
superkey_colnames_list = db.get_table_superkey_colnames(tablename)
try:
superkey_paramxs_list = [
[column_names_.index(str(superkey)) for superkey in superkey_colnames]
for superkey_colnames in superkey_colnames_list
]
except Exception as ex:
ut.printex(ex, keys=['column_names_', 'superkey_colnames_list'])
raise
if len(superkey_colnames_list) > 1:
# FIXME: Rectify duplicate code
primary_superkey = db.get_metadata_val(
tablename + '_primary_superkey', eval_=True, default=None)
if primary_superkey is None:
raise AssertionError(
('tablename=%r has multiple superkey_colnames_list=%r, '
'but no primary superkey. '
'A primary superkey is required') % (tablename, superkey_colnames_list))
else:
superkey_index = superkey_colnames_list.index(primary_superkey)
superkey_paramx = superkey_paramxs_list[superkey_index]
superkey_colnames = superkey_colnames_list[superkey_index]
else:
assert len(superkey_colnames_list) == 1
superkey_paramx = superkey_paramxs_list[0]
superkey_colnames = superkey_colnames_list[0]
params_iter = list(zip(*modified_column_list_))
def get_rowid_from_superkey(*superkey_column_list):
superkey_params_iter = zip(*superkey_column_list)
rowid = db.get_rowid_from_superkey(
tablename, superkey_params_iter, superkey_colnames=superkey_colnames)
return rowid
# TODO: allow for cetrain databases to take precidence over another
# basically allow insert or replace
new_rowid_list = db.add_cleanly(tablename, column_names_,
params_iter,
get_rowid_from_superkey=get_rowid_from_superkey,
superkey_paramx=superkey_paramx)
# TODO: Use mapping generated here for new rowids
old_rowids_to_new_roids = dict(zip(valid_old_rowid_list, new_rowid_list)) # NOQA
#tablename_to_rowidmap[tablename] = old_rowids_to_new_roids
@default_decor
[docs] def get_table_csv(db, tablename, exclude_columns=[]):
""" Conveinience: Converts a tablename to csv format
Args:
tablename (str):
exclude_columns (list):
Returns:
str: csv_table
CommandLine:
python -m ibeis.control.SQLDatabaseControl --test-get_table_csv
python -m ibeis.control.SQLDatabaseControl --exec-get_table_csv --tablename=contributors
Example:
>>> # ENABLE_DOCTEST
>>> from ibeis.control.SQLDatabaseControl import * # NOQA
>>> import ibeis
>>> ibs = ibeis.opendb(defaultdb='testdb1')
>>> tablename = ut.get_argval('--tablename', type_=str, default=ibeis.const.NAME_TABLE)
>>> db = ibs.db
>>> exclude_columns = []
>>> csv_table = db.get_table_csv(tablename, exclude_columns)
>>> # verify results
>>> result = str(csv_table)
>>> print(result)
"""
#=None, column_list=[], header='', column_type=None
column_list, column_names = db.get_table_column_data(tablename, exclude_columns)
# remove column prefix for more compact csvs
column_lbls = [name.replace(tablename[:-1] + '_', '') for name in column_names]
header = db.get_table_csv_header(tablename)
csv_table = ut.make_csv_table(column_list, column_lbls, header)
return csv_table
@default_decor
[docs] def print_table_csv(db, tablename, exclude_columns=[]):
print(db.get_table_csv(tablename, exclude_columns=exclude_columns))
@default_decor
@default_decor
[docs] def print_schema(db):
for tablename in db.get_table_names():
print(db.get_table_csv_header(tablename) + '\n')
@default_decor
[docs] def view_db_in_external_reader(db):
import os
known_readers = ['sqlitebrowser', 'sqliteman']
sqlite3_reader = known_readers[0]
sqlite3_db_fpath = db.get_fpath()
os.system(sqlite3_reader + ' ' + sqlite3_db_fpath)
#ut.cmd(sqlite3_reader, sqlite3_db_fpath)
pass
@default_decor
[docs] def set_db_version(db, version):
# Do things properly, get the metadata_rowid (best because we want to assert anyway)
metadata_key_list = ['database_version']
params_iter = ((metadata_key,) for metadata_key in metadata_key_list)
where_clause = 'metadata_key=?'
# list of relationships for each image
metadata_rowid_list = db.get_where(const.METADATA_TABLE,
('metadata_rowid',), params_iter,
where_clause, unpack_scalars=True)
assert len(metadata_rowid_list) == 1, 'duplicate database_version keys in database'
id_iter = ((metadata_rowid,) for metadata_rowid in metadata_rowid_list)
val_list = ((_,) for _ in [version])
db.set(const.METADATA_TABLE, ('metadata_value',), val_list, id_iter)
@default_decor
[docs] def get_sql_version(db):
""" Conveinience """
db.cur.execute('SELECT sqlite_version()')
sql_version = db.cur.fetchone()
print('[sql] SELECT sqlite_version = %r' % (sql_version,))
# The version number sqlite3 module. NOT the version of SQLite library.
print('[sql] sqlite3.version = %r' % (lite.version,))
# The version of the SQLite library
print('[sql] sqlite3.sqlite_version = %r' % (lite.sqlite_version,))
return sql_version
if __name__ == '__main__':
"""
CommandLine:
python -m ibeis.control.SQLDatabaseControl
python -m ibeis.control.SQLDatabaseControl --allexamples
python -m ibeis.control.SQLDatabaseControl --allexamples --verbtest
python -m ibeis.control.SQLDatabaseControl --allexamples --noface --nosrc
"""
import multiprocessing
multiprocessing.freeze_support() # for win32
import utool as ut # NOQA
ut.doctest_funcs()