RrUc@s!dZddlZddlmZddlZddlZddlZddlZddl Z dZ ej Z e adZdZdefdYZd efd YZd efd YZd ZdZdZe adadZdejfdYZejedS(s+ Implements ProcessPoolExecutor. The follow diagram and text describe the data-flow through the system: |======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | => | | => | Call Q | => | | | | +----------+ | | +-----------+ | | | | | ... | | | | ... | | | | | | 6 | | | | 5, call() | | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+ Executor.submit() called: - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict - adds the id of the _WorkItem to the "Work Ids" queue Local worker thread: - reads work ids from the "Work Ids" queue and looks up the corresponding WorkItem from the "Work Items" dict: if the work item has been cancelled then it is simply removed from the dict, otherwise it is repackaged as a _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). - reads _ResultItems from "Result Q", updates the future stored in the "Work Items" dict and deletes the dict entry Process #1..n: - reads _CallItems from "Call Q", executes the calls, and puts the resulting _ResultItems in "Request Q" iN(t_bases"Brian Quinlan (brian@sweetapp.com)cCsmtatrttjnd}x!|D]\}}|jdq+Wx|D]\}}|jqOWdS(N((tTruet _shutdownt_threads_queuestlisttitemstputtNonetjoin(Rtttq((s;/usr/lib/fence-agents/bundled/concurrent/futures/process.pyt _python_exitIs it _WorkItemcBseZdZRS(cCs(||_||_||_||_dS(N(tfuturetfntargstkwargs(tselfR RRR((s;/usr/lib/fence-agents/bundled/concurrent/futures/process.pyt__init__Ys   (t__name__t __module__R(((s;/usr/lib/fence-agents/bundled/concurrent/futures/process.pyR Xst _ResultItemcBseZdddZRS(cCs||_||_||_dS(N(twork_idt exceptiontresult(RRRR((s;/usr/lib/fence-agents/bundled/concurrent/futures/process.pyR`s  N(RRRR(((s;/usr/lib/fence-agents/bundled/concurrent/futures/process.pyR_st _CallItemcBseZdZRS(cCs(||_||_||_||_dS(N(RRRR(RRRRR((s;/usr/lib/fence-agents/bundled/concurrent/futures/process.pyRfs   (RRR(((s;/usr/lib/fence-agents/bundled/concurrent/futures/process.pyRescCsxtr|jdt}|dkr8|jddSy|j|j|j}Wn=tk rtj d}|jt |j d|qX|jt |j d|qWdS(sEvaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. Args: call_queue: A multiprocessing.Queue of _CallItems that will be read and evaluated by the worker. result_queue: A multiprocessing.Queue of _ResultItems that will written to by the worker. shutdown: A multiprocessing.Event that will be set as a signal to the worker that it should exit when call_queue is empty. tblockNiRR( RtgetRRRRRt BaseExceptiontsystexc_infoRR(t call_queuet result_queuet call_itemtrte((s;/usr/lib/fence-agents/bundled/concurrent/futures/process.pyt_process_workerls    cCsxtr|jrdSy|jdt}Wntjk rFdSX||}|jjr|jt ||j |j |j dtq||=qqWdS(sMFills call_queue with _WorkItems from pending_work_items. This function never blocks. Args: pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids are consumed and the corresponding _WorkItems from pending_work_items are transformed into _CallItems and put in call_queue. call_queue: A multiprocessing.Queue that will be filled with _CallItems derived from _WorkItems. NR( RtfullRtFalsetqueuetEmptyR tset_running_or_notify_cancelRRRRR(tpending_work_itemstwork_idsRRt work_item((s;/usr/lib/fence-agents/bundled/concurrent/futures/process.pyt_add_call_item_to_queues       c s&dgfd}xtr!t|||jdt}|dk r||j}||j=|jr|jj|jn|jj|j ~n|} t s| dks| j r|sx!dt |kr|qWx|D]} | j qWjdSn~ qWdS(sManages the communication between this process and the worker processes. This function is run in a local thread. Args: executor_reference: A weakref.ref to the ProcessPoolExecutor that owns this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. process: A list of the multiprocessing.Process instances used as workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). call_queue: A multiprocessing.Queue that will be filled with _CallItems derived from _WorkItems for processing by the process workers. result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. ics!jddcd7RERlRFtregister(((s;/usr/lib/fence-agents/bundled/concurrent/futures/process.pyt,s.          % B ^