# -*- coding: utf-8 -*-
"""
Accepts and handles requests for tasks.
Each of the following runs in its own Thread/Process.
BASICALLY DO A CLIENT/SERVER TO SPAWN PROCESSES
AND THEN A PUBLISH SUBSCRIBE TO RETURN DATA
Accepter:
Receives tasks and requests
Delegates tasks and responds to requests
Tasks are delgated to an engine
Engine:
the engine accepts requests.
the engine immediately responds WHERE it will be ready.
the engine sends a message to the collector saying that something will be ready.
the engine then executes a task.
The engine is given direct access to the data.
Collector:
The collector accepts requests
The collector can respond:
* <ResultContent>
* Results are ready.
* Results are not ready.
* Unknown jobid.
* Error computing results.
* Progress percent.
References:
Simple task farm, with routed replies in pyzmq
http://stackoverflow.com/questions/7809200/implementing-task-farm-messaging-pattern-with-zeromq
https://gist.github.com/minrk/1358832
#python -m ibeis --tf test_zmq_task
python -m ibeis.web.zmq_task_queue --main
python -m ibeis.web.zmq_task_queue --main --bg
python -m ibeis.web.zmq_task_queue --main --fg
"""
from __future__ import absolute_import, division, print_function, unicode_literals
#if False:
# import os
# os.environ['UTOOL_NOCNN'] = 'True'
import six
import utool as ut
import time
import zmq
import uuid # NOQA
import itertools
import numpy as np
import functools
from functools import partial
from ibeis.control import accessor_decors, controller_inject
print, rrr, profile = ut.inject2(__name__, '[zmqstuff]')
CLASS_INJECT_KEY, register_ibs_method = (
controller_inject.make_ibs_register_decorator(__name__))
register_api = controller_inject.get_ibeis_flask_api(__name__)
ctx = zmq.Context.instance()
url = 'tcp://127.0.0.1'
_portgen = functools.partial(six.next, itertools.count(51381))
engine_url1 = url + ':' + six.text_type(_portgen())
engine_url2 = url + ':' + six.text_type(_portgen())
collect_url1 = url + ':' + six.text_type(_portgen())
collect_url2 = url + ':' + six.text_type(_portgen())
collect_pushpull_url = url + ':' + six.text_type(_portgen())
NUM_JOBS = 2
NUM_ENGINES = 1
VERBOSE_JOBS = ut.get_argflag('--bg') or ut.get_argflag('--fg')
[docs]def ensure_simple_server(port=5832):
r"""
CommandLine:
python -m ibeis.web.zmq_task_queue --exec-ensure_simple_server
python -m utool.util_web --exec-start_simple_webserver
Example:
>>> # DISABLE_DOCTEST
>>> from ibeis.web.zmq_task_queue import * # NOQA
>>> result = ensure_simple_server()
>>> print(result)
"""
if ut.is_local_port_open(port):
bgserver = ut.spawn_background_process(ut.start_simple_webserver, port=port)
return bgserver
else:
bgserver = ut.DynStruct()
bgserver.terminate2 = lambda: None
print('server is running elsewhere')
return bgserver
@accessor_decors.default_decorator
@register_api('/api/core/check_uuids/', methods=['GET', 'POST'])
@register_ibs_method
[docs]def web_check_uuids(ibs, image_uuid_list=[], annot_uuid_list=[]):
# Unique list
image_uuid_list = list(set(image_uuid_list))
annot_uuid_list = list(set(annot_uuid_list))
# Check for all annot UUIDs exist
missing_image_uuid_list = ibs.get_image_missing_uuid(image_uuid_list)
missing_annot_uuid_list = ibs.get_annot_missing_uuid(annot_uuid_list)
if len(missing_image_uuid_list) > 0 or len(missing_annot_uuid_list) > 0:
kwargs = {
'missing_image_uuid_list' : missing_image_uuid_list,
'missing_annot_uuid_list' : missing_annot_uuid_list,
}
raise controller_inject.WebMissingUUIDException(**kwargs)
@register_ibs_method
[docs]def initialize_job_manager(ibs):
"""
Run from the webserver
Example:
>>> # DISABLE_DOCTEST
>>> from ibeis.web.zmq_task_queue import * # NOQA
>>> import ibeis
>>> ibs = ibeis.opendb('testdb1')
Example:
>>> # WEB_DOCTEST
>>> from ibeis.web.zmq_task_queue import * # NOQA
>>> import ibeis
>>> web_instance = ibeis.opendb_bg_web(db='testdb1', wait=10)
>>> baseurl = 'http://127.0.1.1:5000'
>>> _payload = {'image_attrs_list': [], 'annot_attrs_list': []}
>>> payload = ut.map_dict_vals(ut.to_json, _payload)
>>> #resp = requests.post(baseurl + '/api/core/helloworld/?f=b', data=payload)
>>> resp = requests.post(baseurl + '/api/core/add_images_json/', data=payload)
>>> print(resp)
>>> web_instance.terminate()
>>> json_dict = resp.json()
>>> text = json_dict['response']
>>> print(text)
"""
ibs.job_manager = ut.DynStruct()
ibs.job_manager.jobiface = JobInterface(0)
if not ut.get_argflag('--fg'):
ibs.job_manager.reciever = JobBackend()
ibs.job_manager.reciever.initialize_background_processes(dbdir=ibs.get_dbdir())
ibs.job_manager.jobiface.initialize_client_thread()
#import ibeis
##dbdir = '/media/raid/work/testdb1'
#ibs = ibeis.opendb('testdb1', asproxy=True)
#from ibeis.web import app
#proc = ut.spawn_background_process(app.start_from_ibeis, ibs, port=5000)
@register_ibs_method
[docs]def close_job_manager(ibs):
ibs.job_manager = None
@accessor_decors.default_decorator
@register_api('/api/core/start_identify_annots/', methods=['GET', 'POST'])
@register_ibs_method
[docs]def start_identify_annots(ibs, qannot_uuid_list, dannot_uuid_list=None,
pipecfg={}, callback_url=None):
r"""
REST:
Method: GET
URL: /api/core/identify_annots/
Args:
qannot_uuid_list (list) : specifies the query annotations to
identify.
dannot_uuid_list (list) : specifies the annotations that the
algorithm is allowed to use for identification. If not
specified all annotations are used. (default=None)
pipecfg (dict) : dictionary of pipeline configuration arguments
(default=None)
CommandLine:
# Run as main process
python -m ibeis.web.zmq_task_queue --exec-start_identify_annots:0
# Run using server process
python -m ibeis.web.zmq_task_queue --exec-start_identify_annots:1
# Split into multiple processes
python -m ibeis.web.zmq_task_queue --main --bg
python -m ibeis.web.zmq_task_queue --exec-start_identify_annots:1 --fg
python -m ibeis.web.zmq_task_queue --exec-start_identify_annots:1 --domain http://52.33.105.88
python -m ibeis.web.zmq_task_queue --exec-start_identify_annots:1 --duuids=[]
python -m ibeis.web.zmq_task_queue --exec-start_identify_annots:1 --domain http://52.33.105.88 --duuids=03a17411-c226-c960-d180-9fafef88c880
Example:
>>> # DISABLE_DOCTEST
>>> from ibeis.web.zmq_task_queue import * # NOQA
>>> from ibeis.web import zmq_task_queue
>>> import ibeis
>>> ibs, qaids, daids = ibeis.testdata_expanded_aids(
>>> defaultdb='PZ_MTEST', a=['default:qsize=2,dsize=10'])
>>> qannot_uuid_list = ibs.get_annot_uuids(qaids)
>>> dannot_uuid_list = ibs.get_annot_uuids(daids)
>>> pipecfg = {}
>>> ibs.initialize_job_manager()
>>> jobid = ibs.start_identify_annots(qannot_uuid_list, dannot_uuid_list, pipecfg)
>>> result = ibs.wait_for_job_result(jobid, timeout=None, freq=2)
>>> print(result)
>>> import utool as ut
>>> print(ut.to_json(result))
>>> ibs.close_job_manager()
Example:
>>> # WEB_DOCTEST
>>> from ibeis.web.zmq_task_queue import * # NOQA
>>> import ibeis
>>> web_ibs = ibeis.opendb_bg_web('testdb1', wait=3) # , domain='http://52.33.105.88')
>>> aids = web_ibs.send_ibeis_request('/api/annot/', 'get')[0:10]
>>> uuid_list = web_ibs.send_ibeis_request('/api/annot/uuids/', aid_list=aids)
>>> quuid_list = ut.get_argval('--quuids', type_=list, default=uuid_list)
>>> duuid_list = ut.get_argval('--duuids', type_=list, default=uuid_list)
>>> data = dict(
>>> qannot_uuid_list=quuid_list, dannot_uuid_list=duuid_list,
>>> pipecfg={},
>>> callback_url='http://127.0.1.1:5832'
>>> )
>>> # Start callback server
>>> bgserver = ensure_simple_server()
>>> # --
>>> jobid = web_ibs.send_ibeis_request('/api/core/start_identify_annots/', **data)
>>> waittime = 1
>>> while True:
>>> print('jobid = %s' % (jobid,))
>>> response1 = web_ibs.send_ibeis_request('/api/core/get_job_status/', jobid=jobid)
>>> if response1['jobstatus'] == 'completed':
>>> break
>>> time.sleep(waittime)
>>> waittime = 10
>>> print('response1 = %s' % (response1,))
>>> response2 = web_ibs.send_ibeis_request('/api/core/get_job_result/', jobid=jobid)
>>> print('response2 = %s' % (response2,))
>>> cmdict = ut.from_json(response2['json_result'])[0]
>>> print('Finished test')
>>> web_ibs.terminate2()
>>> bgserver.terminate2()
Ignore:
qaids = daids = ibs.get_valid_aids()
http://127.0.1.1:5000/api/core/start_identify_annots/'
jobid = ibs.start_identify_annots(**payload)
"""
# Check UUIDs
combined_annot_uuid_list = qannot_uuid_list + dannot_uuid_list
ibs.web_check_uuids(annot_uuid_list=combined_annot_uuid_list)
#import ibeis
#from ibeis.web import zmq_task_queue
#ibs.load_plugin_module(zmq_task_queue)
def ensure_uuid_list(list_):
if list_ is not None and len(list_) > 0 and isinstance(list_[0], six.string_types):
list_ = list(map(uuid.UUID, list_))
return list_
qannot_uuid_list = ensure_uuid_list(qannot_uuid_list)
dannot_uuid_list = ensure_uuid_list(dannot_uuid_list)
qaid_list = ibs.get_annot_aids_from_uuid(qannot_uuid_list)
if dannot_uuid_list is None:
daid_list = ibs.get_valid_aids()
#None
else:
if len(dannot_uuid_list) == 1 and dannot_uuid_list[0] is None:
# VERY HACK
daid_list = ibs.get_valid_aids()
else:
daid_list = ibs.get_annot_aids_from_uuid(dannot_uuid_list)
ibs.assert_valid_aids(qaid_list, msg='error in start_identify qaids', auuid_list=qannot_uuid_list)
ibs.assert_valid_aids(daid_list, msg='error in start_identify daids', auuid_list=dannot_uuid_list)
jobid = ibs.job_manager.jobiface.queue_job('query_chips_simple_dict', callback_url, qaid_list, daid_list, pipecfg)
#if callback_url is not None:
# #import requests
# #requests.
# #callback_url
return jobid
@accessor_decors.default_decorator
@register_api('/api/core/start_detect_image/', methods=['GET', 'POST'])
@register_ibs_method
[docs]def start_detect_image(ibs, image_uuid_list, species=None):
"""
REST:
Method: GET
URL: /api/core/start_detect_image/
Args:
image_uuid_list (list) : list of image uuids to detect on.
species (str) : species to detect
"""
raise NotImplementedError('add_images_json')
@register_api('/api/core/get_job_status/', methods=['GET', 'POST'])
@register_ibs_method
[docs]def get_job_status(ibs, jobid):
"""
Web call that returns the status of a job
CommandLine:
# Run Everything together
python -m ibeis.web.zmq_task_queue --exec-get_job_status
# Start job queue in its own process
python -m ibeis.web.zmq_task_queue --main --bg
# Start web server in its own process
./dev.py --web
pass
# Run foreground process
python -m ibeis.web.zmq_task_queue --exec-get_job_status:0 --fg
Example:
>>> # WEB_DOCTEST
>>> from ibeis.web.zmq_task_queue import * # NOQA
>>> import ibeis
>>> web_ibs = ibeis.opendb_bg_web('testdb1', wait=3) # , domain='http://52.33.105.88')
>>> # Test get status of a job id that does not exist
>>> response = web_ibs.send_ibeis_request('/api/core/get_job_status/', jobid='badjob')
>>> web_ibs.terminate2()
"""
status = ibs.job_manager.jobiface.get_job_status(jobid)
return status
@register_api('/api/core/get_job_result/', methods=['GET', 'POST'])
@register_ibs_method
[docs]def get_job_result(ibs, jobid):
"""
Web call that returns the result of a job
"""
result = ibs.job_manager.jobiface.get_job_result(jobid)
return result
@register_api('/api/core/wait_for_job_result/', methods=['GET', 'POST'])
@register_ibs_method
[docs]def wait_for_job_result(ibs, jobid, timeout=10, freq=.1):
ibs.job_manager.jobiface.wait_for_job_result(jobid, timeout=timeout, freq=freq)
result = ibs.job_manager.jobiface.get_unpacked_result(jobid)
return result
[docs]def test_zmq_task():
"""
CommandLine:
python -m ibeis.web.zmq_task_queue --exec-test_zmq_task
python -b -m ibeis.web.zmq_task_queue --exec-test_zmq_task
python -m ibeis.web.zmq_task_queue --main
python -m ibeis.web.zmq_task_queue --main --bg
python -m ibeis.web.zmq_task_queue --main --fg
Example:
>>> # SCRIPT
>>> from ibeis.web.zmq_task_queue import * # NOQA
>>> test_zmq_task()
"""
_init_signals()
# now start a few clients, and fire off some requests
client_id = np.random.randint(1000)
jobiface = JobInterface(client_id)
reciever = JobBackend()
if ut.get_argflag('--bg'):
from ibeis.init import sysres
dbdir = sysres.get_args_dbdir('cache', False, None, None,
cache_priority=False)
reciever.initialize_background_processes(dbdir)
print('[testzmq] parent process is looping forever')
while True:
time.sleep(1)
elif ut.get_argflag('--fg'):
jobiface.initialize_client_thread()
else:
dbdir = sysres.get_args_dbdir('cache', False, None, None,
cache_priority=False)
reciever.initialize_background_processes(dbdir)
jobiface.initialize_client_thread()
# Foreground test script
print('... waiting for jobs')
if ut.get_argflag('--cmd'):
ut.embed()
jobiface.queue_job()
else:
print('[test] ... emit test1')
jobid1 = jobiface.queue_job('helloworld', 1)
jobiface.wait_for_job_result(jobid1)
#jobiface.get_job_status(jobid1)
#jobid_list = [jobiface.queue_job('helloworld', 5) for _ in range(NUM_JOBS)]
#jobid_list += [jobiface.queue_job('get_valid_aids')]
jobid_list = []
#identify_jobid = jobiface.queue_job('query_chips', [1], [3, 4, 5], cfgdict={'K': 1})
identify_jobid = jobiface.queue_job('query_chips_simple_dict', [1], [3, 4, 5], cfgdict={'K': 1})
for jobid in jobid_list:
jobiface.wait_for_job_result(jobid)
jobiface.wait_for_job_result(identify_jobid)
print('FINISHED TEST SCRIPT')
[docs]class JobBackend(object):
def __init__(self):
#self.num_engines = 3
self.num_engines = NUM_ENGINES
self.engine_queue_proc = None
self.collect_queue_proc = None
self.engine_procs = None
self.collect_proc = None
# --
only_engine = ut.get_argflag('--only-engine')
self.spawn_collector = not only_engine
self.spawn_engine = not ut.get_argflag('--no-engine')
self.spawn_queue = not only_engine
def __del__(self):
if VERBOSE_JOBS:
print('Cleaning up job backend')
if self.engine_procs is not None:
for i in self.engine_procs:
i.terminate()
if self.engine_queue_proc is not None:
self.engine_queue_proc.terminate()
if self.collect_proc is not None:
self.collect_proc.terminate()
if self.collect_queue_proc is not None:
self.collect_queue_proc.terminate()
if VERBOSE_JOBS:
print('Killed external procs')
[docs] def initialize_background_processes(self, dbdir=None, wait=.5):
print = partial(ut.colorprint, color='fuchsia')
#if VERBOSE_JOBS:
print('Initialize Background Processes')
#_spawner = ut.spawn_background_process
def _spawner(func, *args, **kwargs):
if wait != 0:
print('Waiting for background process (%s) to spin up' % (ut.get_funcname(func,)))
proc = ut.spawn_background_process(func, *args, **kwargs)
time.sleep(wait)
assert proc.is_alive(), 'proc (%s) died too soon' % (ut.get_funcname(func,))
return proc
#_spawner = ut.spawn_background_daemon_thread
if self.spawn_queue:
self.engine_queue_proc = _spawner(engine_queue_loop)
self.collect_queue_proc = _spawner(collect_queue_loop)
if self.spawn_collector:
self.collect_proc = _spawner(collector_loop)
if self.spawn_engine:
self.engine_procs = [_spawner(engine_loop, i, dbdir)
for i in range(self.num_engines)]
# wait for processes to spin up
if self.spawn_queue:
assert self.engine_queue_proc.is_alive(), 'engine died too soon'
assert self.collect_queue_proc.is_alive(), 'collector queue died too soon'
if self.spawn_collector:
assert self.collect_proc.is_alive(), 'collector died too soon'
if self.spawn_engine:
for engine in self.engine_procs:
assert engine.is_alive(), 'engine died too soon'
#ut.embed()
[docs]class JobInterface(object):
def __init__(jobiface, id_):
jobiface.id_ = id_
jobiface.verbose = 2 if VERBOSE_JOBS else 1
[docs] def init(jobiface):
# Starts several new processes
jobiface.initialize_background_processes()
# Does not create a new process, but connects sockets on this process
jobiface.initialize_client_thread()
[docs] def initialize_client_thread(jobiface):
print = partial(ut.colorprint, color='blue')
if jobiface.verbose:
print('Initializing JobInterface')
jobiface.engine_deal_sock = ctx.socket(zmq.DEALER)
jobiface.engine_deal_sock.setsockopt_string(zmq.IDENTITY, 'client%s.engine.DEALER' % (jobiface.id_,))
jobiface.engine_deal_sock.connect(engine_url1)
if jobiface.verbose:
print('connect engine_url1 = %r' % (engine_url1,))
jobiface.collect_deal_sock = ctx.socket(zmq.DEALER)
jobiface.collect_deal_sock.setsockopt_string(zmq.IDENTITY, 'client%s.collect.DEALER' % (jobiface.id_,))
jobiface.collect_deal_sock.connect(collect_url1)
if jobiface.verbose:
print('connect collect_url1 = %r' % (collect_url1,))
[docs] def queue_job(jobiface, action, callback_url=None, *args, **kwargs):
r"""
IBEIS:
This is just a function that lives in the main thread and ships off
a job.
The client - sends messages, and receives replies after they
have been processed by the
"""
# NAME: job_client
with ut.Indenter('[client %d] ' % (jobiface.id_)):
print = partial(ut.colorprint, color='blue')
if jobiface.verbose >= 1:
print('----')
engine_request = {'action': action, 'args': args, 'kwargs': kwargs, 'callback_url': callback_url}
if jobiface.verbose >= 2:
print('Queue job: %s' % (engine_request))
# Flow of information tags:
# CALLS: engine_queue
jobiface.engine_deal_sock.send_json(engine_request)
if jobiface.verbose >= 3:
print('..sent, waiting for response')
# RETURNED FROM: job_client_return
reply_notify = jobiface.engine_deal_sock.recv_json()
if jobiface.verbose >= 2:
print('Got reply: %s' % ( reply_notify))
jobid = reply_notify['jobid']
return jobid
[docs] def get_job_status(jobiface, jobid):
with ut.Indenter('[client %d] ' % (jobiface.id_)):
print = partial(ut.colorprint, color='teal')
if jobiface.verbose >= 1:
print('----')
print('Request status of jobid=%r' % (jobid,))
pair_msg = dict(action='job_status', jobid=jobid)
# CALLS: collector_request_status
jobiface.collect_deal_sock.send_json(pair_msg)
if jobiface.verbose >= 3:
print('... waiting for collector reply')
reply = jobiface.collect_deal_sock.recv_json()
if jobiface.verbose >= 2:
print('got reply = %s' % (ut.repr2(reply, truncate=True),))
return reply
[docs] def get_job_result(jobiface, jobid):
with ut.Indenter('[client %d] ' % (jobiface.id_)):
if jobiface.verbose >= 1:
print = partial(ut.colorprint, color='teal')
print('----')
print('Request result of jobid=%r' % (jobid,))
pair_msg = dict(action='job_result', jobid=jobid)
# CALLER: collector_request_result
jobiface.collect_deal_sock.send_json(pair_msg)
if jobiface.verbose >= 3:
print('... waiting for collector reply')
reply = jobiface.collect_deal_sock.recv_json()
if jobiface.verbose >= 2:
print('got reply = %s' % (ut.repr2(reply, truncate=True),))
return reply
[docs] def get_unpacked_result(jobiface, jobid):
reply = jobiface.get_job_result(jobid)
json_result = reply['json_result']
result = ut.from_json(json_result)
#print('Job %r result = %s' % (jobid, ut.repr2(result, truncate=True),))
return result
[docs] def wait_for_job_result(jobiface, jobid, timeout=10, freq=.1):
t = ut.Timer(verbose=False)
t.tic()
while True:
reply = jobiface.get_job_status(jobid)
if reply['jobstatus'] == 'completed':
return
elif reply['jobstatus'] == 'exception':
result = jobiface.get_unpacked_result(jobid)
#raise Exception(result)
print('Exception occured in engine')
return result
elif reply['jobstatus'] == 'working':
pass
elif reply['jobstatus'] == 'unknown':
pass
else:
raise Exception('Unknown jobstatus=%r' % (reply['jobstatus'],))
time.sleep(freq)
if timeout is not None and t.toc() > timeout:
raise Exception('Timeout')
[docs]def make_queue_loop(iface1, iface2, name=None):
"""
Standard queue loop
Args:
iface1 (str): address for the client that deals
iface2 (str): address for the server that routes
name (None): (default = None)
"""
assert name is not None, 'must name queue'
queue_name = name + '_queue'
loop_name = queue_name + '_loop'
def queue_loop():
print = partial(ut.colorprint, color='green')
with ut.Indenter('[%s] ' % (queue_name,)):
if VERBOSE_JOBS:
print('Init make_queue_loop: name=%r' % (name,))
# bind the client dealer to the queue router
rout_sock = ctx.socket(zmq.ROUTER)
rout_sock.setsockopt_string(zmq.IDENTITY, 'queue.' + name + '.' + 'ROUTER')
rout_sock.bind(iface1)
if VERBOSE_JOBS:
print('bind %s_url2 = %r' % (name, iface1,))
# bind the server router to the queue dealer
deal_sock = ctx.socket(zmq.DEALER)
deal_sock.setsockopt_string(zmq.IDENTITY, 'queue.' + name + '.' + 'DEALER')
deal_sock.bind(iface2)
if VERBOSE_JOBS:
print('bind %s_url2 = %r' % (name, iface2,))
if 1:
# the remainder of this function can be entirely replaced with
zmq.device(zmq.QUEUE, rout_sock, deal_sock)
else:
# but this shows what is really going on:
poller = zmq.Poller()
poller.register(rout_sock, zmq.POLLIN)
poller.register(deal_sock, zmq.POLLIN)
while True:
evts = dict(poller.poll())
# poll() returns a list of tuples [(socket, evt), (socket, evt)]
# dict(poll()) turns this into {socket:evt, socket:evt}
if rout_sock in evts:
msg = rout_sock.recv_multipart()
# ROUTER sockets prepend the identity of the jobiface,
# for routing replies
if VERBOSE_JOBS:
print('ROUTER relayed %r via DEALER' % (msg,))
deal_sock.send_multipart(msg)
if deal_sock in evts:
msg = deal_sock.recv_multipart()
if VERBOSE_JOBS:
print('DEALER relayed %r via ROUTER' % (msg,))
rout_sock.send_multipart(msg)
if VERBOSE_JOBS:
print('Exiting %s' % (loop_name,))
ut.set_funcname(queue_loop, loop_name)
return queue_loop
collect_queue_loop = make_queue_loop(collect_url1, collect_url2, name='collect')
[docs]def engine_queue_loop():
"""
Specialized queue loop
"""
# Flow of information tags:
# NAME: engine_queue
iface1, iface2 = engine_url1, engine_url2
name = 'engine'
queue_name = name + '_queue'
loop_name = queue_name + '_loop'
print = partial(ut.colorprint, color='red')
with ut.Indenter('[%s] ' % (queue_name,)):
print('Init specialized make_queue_loop: name=%r' % (name,))
# bind the client dealer to the queue router
rout_sock = ctx.socket(zmq.ROUTER)
rout_sock.setsockopt_string(zmq.IDENTITY, 'special_queue.' + name + '.' + 'ROUTER')
rout_sock.bind(iface1)
if VERBOSE_JOBS:
print('bind %s_url2 = %r' % (name, iface1,))
# bind the server router to the queue dealer
deal_sock = ctx.socket(zmq.DEALER)
deal_sock.setsockopt_string(zmq.IDENTITY, 'special_queue.' + name + '.' + 'DEALER')
deal_sock.bind(iface2)
if VERBOSE_JOBS:
print('bind %s_url2 = %r' % (name, iface2,))
collect_deal_sock = ctx.socket(zmq.DEALER)
collect_deal_sock.setsockopt_string(zmq.IDENTITY, queue_name + '.collect.DEALER')
collect_deal_sock.connect(collect_url1)
if VERBOSE_JOBS:
print('connect collect_url1 = %r' % (collect_url1,))
job_counter = 0
# but this shows what is really going on:
poller = zmq.Poller()
poller.register(rout_sock, zmq.POLLIN)
poller.register(deal_sock, zmq.POLLIN)
while True:
evts = dict(poller.poll())
if rout_sock in evts:
# HACK GET REQUEST FROM CLIENT
job_counter += 1
# CALLER: job_client
idents, engine_request = rcv_multipart_json(rout_sock, num=1, print=print)
#jobid = 'result_%s' % (id_,)
#jobid = 'result_%s' % (uuid.uuid4(),)
jobid = 'jobid-%04d' % (job_counter,)
if VERBOSE_JOBS:
print('Creating jobid %r' % (jobid,))
# Reply immediately with a new jobid
reply_notify = {
'jobid': jobid,
'status': 'ok',
'text': 'job accepted',
'action': 'notification',
}
engine_request = engine_request
engine_request['jobid'] = jobid
if VERBOSE_JOBS:
print('...notifying collector about new job')
# CALLS: collector_notify
collect_deal_sock.send_json(reply_notify)
if VERBOSE_JOBS:
print('... notifying client that job was accepted')
# RETURNS: job_client_return
send_multipart_json(rout_sock, idents, reply_notify)
if VERBOSE_JOBS:
print('... notifying backend engine to start')
# CALL: engine_
send_multipart_json(deal_sock, idents, engine_request)
if deal_sock in evts:
pass
if VERBOSE_JOBS:
print('Exiting %s' % (loop_name,))
[docs]def engine_loop(id_, dbdir=None):
r"""
IBEIS:
This will be part of a worker process with its own IBEISController
instance.
Needs to send where the results will go and then publish the results there.
The engine_loop - receives messages, performs some action, and sends a reply,
preserving the leading two message parts as routing identities
"""
# NAME: engine_
# CALLED_FROM: engine_queue
import ibeis
#base_print = print # NOQA
print = partial(ut.colorprint, color='darkred')
with ut.Indenter('[engine %d] ' % (id_)):
if VERBOSE_JOBS:
print('Initializing engine')
print('connect engine_url2 = %r' % (engine_url2,))
assert dbdir is not None
#ibs = ibeis.opendb(dbname)
ibs = ibeis.opendb(dbdir=dbdir, use_cache=False, web=False)
engine_rout_sock = ctx.socket(zmq.ROUTER)
engine_rout_sock.connect(engine_url2)
collect_deal_sock = ctx.socket(zmq.DEALER)
collect_deal_sock.setsockopt_string(zmq.IDENTITY, 'engine.collect.DEALER')
collect_deal_sock.connect(collect_url1)
if VERBOSE_JOBS:
print('connect collect_url1 = %r' % (collect_url1,))
print('engine is initialized')
while True:
idents, engine_request = rcv_multipart_json(engine_rout_sock, print=print)
action = engine_request['action']
jobid = engine_request['jobid']
args = engine_request['args']
kwargs = engine_request['kwargs']
callback_url = engine_request['callback_url']
# Start working
if VERBOSE_JOBS:
print('starting job=%r' % (jobid,))
# Map actions to IBEISController calls here
if action == 'helloworld':
def helloworld(time_=0, *args, **kwargs):
time.sleep(time_)
retval = ('HELLO time_=%r ' % (time_,)) + ut.repr2((args, kwargs))
return retval
action_func = helloworld
else:
# check for ibs func
action_func = getattr(ibs, action)
if VERBOSE_JOBS:
print('resolving to ibeis function')
try:
result = action_func(*args, **kwargs)
exec_status = 'ok'
except Exception as ex:
result = ut.formatex(ex, keys=['jobid'], tb=True)
result = ut.strip_ansi(result)
exec_status = 'exception'
json_result = ut.to_json(result)
engine_result = dict(
exec_status=exec_status,
json_result=json_result,
jobid=jobid,
)
# Store results in the collector
collect_request = dict(
idents=idents,
action='store',
jobid=jobid,
engine_result=engine_result,
callback_url=callback_url,
)
if VERBOSE_JOBS:
print('...done working. pushing result to collector')
# CALLS: collector_store
collect_deal_sock.send_json(collect_request)
# ----
if VERBOSE_JOBS:
print('Exiting engine loop')
[docs]def collector_loop():
"""
Service that stores completed algorithm results
"""
print = partial(ut.colorprint, color='yellow')
with ut.Indenter('[collect] '):
collect_rout_sock = ctx.socket(zmq.ROUTER)
collect_rout_sock.setsockopt_string(zmq.IDENTITY, 'collect.ROUTER')
collect_rout_sock.connect(collect_url2)
if VERBOSE_JOBS:
print('connect collect_url2 = %r' % (collect_url2,))
collecter_data = {}
awaiting_data = {}
while True:
# several callers here
# CALLER: collector_notify
# CALLER: collector_store
# CALLER: collector_request_status
# CALLER: collector_request_result
idents, collect_request = rcv_multipart_json(collect_rout_sock, print=print)
reply = {}
action = collect_request['action']
if VERBOSE_JOBS:
print('...building action=%r response' % (action,))
if action == 'notification':
# From the Queue
jobid = collect_request['jobid']
awaiting_data[jobid] = collect_request['text']
elif action == 'store':
# From the Engine
engine_result = collect_request['engine_result']
callback_url = collect_request['callback_url']
jobid = engine_result['jobid']
collecter_data[jobid] = engine_result
if callback_url is not None:
if VERBOSE_JOBS:
print('calling callback_url')
try:
import requests
# requests.get(callback_url)
requests.post(callback_url, data={'jobid': jobid})
except Exception as ex:
msg = 'ERROR in collector. Tried to call callback_url=%r' % (callback_url,)
print(msg)
ut.printex(ex, msg)
#requests.post(callback_url)
if VERBOSE_JOBS:
print('stored result')
elif action == 'job_status':
# From a Client
jobid = collect_request['jobid']
if jobid in collecter_data:
engine_result = collecter_data[jobid]
reply['jobstatus'] = 'completed'
reply['exec_status'] = engine_result['exec_status']
elif jobid in awaiting_data:
reply['jobstatus'] = 'working'
else:
reply['jobstatus'] = 'unknown'
reply['status'] = 'ok'
reply['jobid'] = jobid
elif action == 'job_result':
# From a Client
jobid = collect_request['jobid']
try:
engine_result = collecter_data[jobid]
json_result = engine_result['json_result']
reply['jobid'] = jobid
reply['status'] = 'ok'
# reply['json_result'] = json_result
# We want to parse the JSON result here, since we need to live in
# Python land for the rest of the call until the API wrapper
# converts the Python objcets to JSON before the response is
# generated. This prevents the API from converting a Python
# string of JSON to a JSON string of JSON, which is bad.
reply['json_result'] = ut.from_json(json_result)
except KeyError:
reply['jobid'] = jobid
reply['status'] = 'invalid'
reply['json_result'] = None
else:
# Other
print('...error unknown action=%r' % (action,))
reply['status'] = 'error'
reply['text'] = 'unknown action'
send_multipart_json(collect_rout_sock, idents, reply)
[docs]def send_multipart_json(sock, idents, reply):
"""
helper
"""
reply_json = ut.to_json(reply).encode('utf-8')
multi_reply = idents + [reply_json]
sock.send_multipart(multi_reply)
[docs]def rcv_multipart_json(sock, num=2, print=print):
"""
helper
"""
# note that the first two parts will be ['Controller.ROUTER', 'Client.<id_>']
# these are needed for the reply to propagate up to the right client
multi_msg = sock.recv_multipart()
if VERBOSE_JOBS:
print('----')
print('RCV Json: %s' % (ut.repr2(multi_msg, truncate=True),))
idents = multi_msg[:num]
request_json = multi_msg[num]
request = ut.from_json(request_json)
return idents, request
def _on_ctrl_c(signal, frame):
print('[ibeis.zmq] Caught ctrl+c')
print('[ibeis.zmq] sys.exit(0)')
import sys
sys.exit(0)
def _init_signals():
import signal
signal.signal(signal.SIGINT, _on_ctrl_c)
if __name__ == '__main__':
"""
CommandLine:
python -m ibeis.web.zmq_task_queue
python -m ibeis.web.zmq_task_queue --allexamples
python -m ibeis.web.zmq_task_queue --allexamples --noface --nosrc
"""
import multiprocessing
multiprocessing.freeze_support() # for win32
if ut.get_argflag('--main'):
with ut.Timer('full'):
test_zmq_task()
else:
import utool as ut # NOQA
ut.doctest_funcs()