Source code for ibeis.tests.test_sql_modify

#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function
import utool
from ibeis.control import SQLDatabaseControl
from os.path import join
from functools import partial
print, print_, printDBG, rrr, profile = utool.inject(__name__, '[TEST_SQL_CONTROL]')
import six
import random

if six.PY2:
    __STR__ = unicode
else:
    __STR__ = str

###########################


[docs]def converter(val): return str(val) + '_str'
[docs]def get_rowid_from_text(db, text_list): param_iter = ((text,) for text in text_list) return db.get_rowid_from_superkey('test', param_iter, superkey_colnames=('test_text',))
[docs]def add_text(db, text_list): param_iter = ((text,) for text in text_list) func = partial(get_rowid_from_text, db) return db.add_cleanly('test', ('test_text',), param_iter, func)
[docs]def get_text(db, tablename, rowid_list): return db.get(tablename, ('test_text',), rowid_list)
[docs]def set_integers(db, tablename, integer_list, rowid_list): param_iter = ((integer,) for integer in integer_list) return db.set(tablename, ('test_integer',), param_iter, rowid_list)
[docs]def get_integers(db, tablename, rowid_list): return db.get(tablename, ('test_integer',), rowid_list)
[docs]def get_integers2(db, tablename, rowid_list): return db.get(tablename, ('test_integer2',), rowid_list) ###########################
[docs]def add_table(db): db.add_table('test', ( ('test_rowid', 'INTEGER PRIMARY KEY'), ('test_text', 'TEXT NOT NULL'),), ['CONSTRAINT superkey UNIQUE (test_text)'] )
[docs]def add_column(db): db.add_column('test', 'test_integer', 'INTEGER')
[docs]def modify_table(db): db.modify_table('test', ( ('test_integer', '', 'TEXT', converter), ))
[docs]def reorder_columns(db, order_list): db.reorder_columns('test', order_list)
[docs]def duplicate_table(db): db.duplicate_table('test', 'test2')
[docs]def duplicate_column(db): db.duplicate_column('test2', 'test_integer', 'test_integer2')
[docs]def rename_table(db): db.rename_table('test2', 'test3')
[docs]def rename_column(db): db.rename_column('test3', 'test_integer2', 'test_integer3')
[docs]def drop_table(db): db.drop_table('test3')
[docs]def drop_column(db): db.drop_column('test3', 'test_integer3')
def _make_empty_controller(): print('make_empty_controller') sqldb_fname = 'temp_test_sql_control.sqlite3' sqldb_dpath = utool.util_cplat.get_app_resource_dir('ibeis', 'testfiles') utool.ensuredir(sqldb_dpath) utool.remove_file(join(sqldb_dpath, sqldb_fname), dryrun=False) db = SQLDatabaseControl.SQLDatabaseController(sqldb_dpath=sqldb_dpath, sqldb_fname=sqldb_fname) return db
[docs]def TEST_SQL_MODIFY(): db = _make_empty_controller() ##################### # Add table ##################### add_table(db) # Verify table's schema colname_list = db.get_column_names('test') coltype_list = db.get_column_types('test') assert colname_list == ['test_rowid', 'test_text'], 'Actual values: %r ' % colname_list assert coltype_list == ['INTEGER PRIMARY KEY', 'TEXT NOT NULL'], 'Actual values: %r ' % coltype_list ##################### # Add data to table ##################### text_list = [ utool.random_nonce(8) for _ in range(10) ] rowid_list = add_text(db, text_list) text_list_ = get_text(db, 'test', rowid_list) assert text_list == text_list_, 'Actual values: %r ' % text_list_ ##################### # Add column ##################### add_column(db) # Verify table's schema colname_list = db.get_column_names('test') coltype_list = db.get_column_types('test') assert colname_list == ['test_rowid', 'test_text', 'test_integer'], 'Actual values: %r ' % colname_list assert coltype_list == [u'INTEGER PRIMARY KEY', u'TEXT NOT NULL', u'INTEGER'], 'Actual values: %r ' % coltype_list integer_list = [ random.randint(0, 100) for _ in range(10) ] set_integers(db, 'test', integer_list, rowid_list) integer_list_ = get_integers(db, 'test', rowid_list) assert integer_list == integer_list_, 'Actual values: %r ' % integer_list_ ##################### # Modify table ##################### modify_table(db) # Verify table's schema colname_list = db.get_column_names('test') coltype_list = db.get_column_types('test') assert colname_list == ['test_rowid', 'test_text', 'test_integer'], 'Actual values: %r ' % colname_list assert coltype_list == [u'INTEGER PRIMARY KEY', u'TEXT NOT NULL', u'TEXT'], 'Actual values: %r ' % coltype_list integer_list = get_integers(db, 'test', rowid_list) assert integer_list == [ converter(integer) for integer in integer_list_ ], 'Actual values: %r ' % integer_list ##################### # Duplicate table ##################### duplicate_table(db) # Verify table's schema colname_list = db.get_column_names('test2') coltype_list = db.get_column_types('test2') assert colname_list == ['test_rowid', 'test_text', 'test_integer'], 'Actual values: %r ' % colname_list assert coltype_list == ['INTEGER PRIMARY KEY', 'TEXT NOT NULL', 'TEXT'], 'Actual values: %r ' % coltype_list text_list_ = get_text(db, 'test2', rowid_list) assert text_list == text_list_, 'Actual values: %r ' % text_list_ integer_list_ = get_integers(db, 'test2', rowid_list) assert integer_list == integer_list_, 'Actual values: %r ' % integer_list_ ##################### # Duplicate column ##################### duplicate_column(db) # Verify table's schema colname_list = db.get_column_names('test2') coltype_list = db.get_column_types('test2') assert colname_list == ['test_rowid', 'test_text', 'test_integer', 'test_integer2'], 'Actual values: %r ' % colname_list assert coltype_list == [u'INTEGER PRIMARY KEY', u'TEXT NOT NULL', u'TEXT', u'TEXT'], 'Actual values: %r ' % coltype_list integer_list = get_integers(db, 'test2', rowid_list) integer2_list = get_integers2(db, 'test2', rowid_list) assert integer_list == integer2_list ##################### # Rename table ##################### rename_table(db) tablename_list = db.get_table_names() assert tablename_list == ['metadata', 'test', 'test3'], 'Actual values: %r ' % tablename_list ##################### # Rename column ##################### rename_column(db) # Verify table's schema colname_list = db.get_column_names('test3') coltype_list = db.get_column_types('test3') assert colname_list == ['test_rowid', 'test_text', 'test_integer', 'test_integer3'], 'Actual values: %r ' % colname_list assert coltype_list == [u'INTEGER PRIMARY KEY', u'TEXT NOT NULL', u'TEXT', u'TEXT'], 'Actual values: %r ' % coltype_list ######################### # Reorder table's columns ######################### colname_original_list = db.get_column_names('test') coltype_original_list = db.get_column_types('test') order_list = range(len(colname_original_list)) while order_list == sorted(order_list): random.shuffle(order_list) reorder_columns(db, order_list) # Verify table's schema colname_list_ = db.get_column_names('test') coltype_list_ = db.get_column_types('test') # Find correct new order combined = sorted(list(zip(order_list, colname_original_list, coltype_original_list))) colname_list__ = [ name for i, name, type_ in combined ] coltype_list__ = [ type_ for i, name, type_ in combined ] assert colname_list_ == colname_list__, 'Actual values: %r ' % colname_list_ assert coltype_list_ == coltype_list__, 'Actual values: %r ' % coltype_list_ ##################### # Drop column ##################### drop_column(db) # Verify table's schema colname_list = db.get_column_names('test3') coltype_list = db.get_column_types('test3') assert colname_list == ['test_rowid', 'test_text', 'test_integer'], 'Actual values: %r ' % colname_list assert coltype_list == [u'INTEGER PRIMARY KEY', u'TEXT NOT NULL', u'TEXT'], 'Actual values: %r ' % coltype_list ##################### # Drop table ##################### drop_table(db) tablename_list = db.get_table_names() assert tablename_list == ['metadata', 'test'], 'Actual values: %r ' % tablename_list return locals()
if __name__ == '__main__': import multiprocessing multiprocessing.freeze_support() # For windows test_locals = utool.run_test(TEST_SQL_MODIFY) execstr = utool.execstr_dict(test_locals, 'test_locals') exec(execstr)