a XC?hz@sdZdZddlZddlmZddlZddlZddlZddl m Z ddl Z ddl Z ddl mZddlZddlZddlZe ZdaGdd d Zd d Ze ed Zd ZGdddeZGdddZddZGdddeZGdddeZ GdddeZ!Gddde Z"ddZ#ddZ$d.d d!Z%d"d#Z&Gd$d%d%e j'Z(da)da*d&d'Z+d(d)Z,Gd*d+d+ej-Z.Gd,d-d-ej/Z0dS)/a- Implements ProcessPoolExecutor. The following diagram and text describe the data-flow through the system: |======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | | | | Call Q | | Process | | | +----------+ | | +-----------+ | Pool | | | | ... | | | | ... | +---------+ | | | 6 | => | | => | 5, call() | => | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+ Executor.submit() called: - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict - adds the id of the _WorkItem to the "Work Ids" queue Local worker thread: - reads work ids from the "Work Ids" queue and looks up the corresponding WorkItem from the "Work Items" dict: if the work item has been cancelled then it is simply removed from the dict, otherwise it is repackaged as a _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). - reads _ResultItems from "Result Q", updates the future stored in the "Work Items" dict and deletes the dict entry Process #1..n: - reads _CallItems from "Call Q", executes the calls, and puts the resulting _ResultItems in "Result Q" z"Brian Quinlan (brian@sweetapp.com)N)_base)Queue)partialFc@s,eZdZddZddZddZddZd S) _ThreadWakeupcCsd|_tjdd\|_|_dS)NF)Zduplex)_closedmpZPipe_reader_writerselfr ?/opt/alt/python39/lib64/python3.9/concurrent/futures/process.py__init__Csz_ThreadWakeup.__init__cCs$|js d|_|j|jdSNT)rr closerr r r r rGs z_ThreadWakeup.closecCs|js|jddS)N)rr Z send_bytesr r r r wakeupMsz_ThreadWakeup.wakeupcCs |js|jr|jqdSN)rrZpollZ recv_bytesr r r r clearQs z_ThreadWakeup.clearN)__name__ __module__ __qualname__rrrrr r r r rBsrcCs@datt}|D]\}}|q|D]\}}|q*dSr)_global_shutdownlist_threads_wakeupsitemsrjoin)r_ thread_wakeuptr r r _python_exitWs     r =c@seZdZddZddZdS)_RemoteTracebackcCs ||_dSrtb)r r%r r r rwsz_RemoteTraceback.__init__cCs|jSrr$r r r r __str__ysz_RemoteTraceback.__str__N)rrrrr&r r r r r#vsr#c@seZdZddZddZdS)_ExceptionWithTracebackcCs8tt|||}d|}||_d|j_d||_dS)Nz """ %s""") tracebackformat_exceptiontyperexc __traceback__r%)r r,r%r r r r}s  z _ExceptionWithTraceback.__init__cCst|j|jffSr) _rebuild_excr,r%r r r r __reduce__sz"_ExceptionWithTraceback.__reduce__N)rrrrr/r r r r r'|sr'cCst||_|Sr)r# __cause__)r,r%r r r r.s r.c@seZdZddZdS) _WorkItemcCs||_||_||_||_dSr)futurefnargskwargs)r r2r3r4r5r r r rsz_WorkItem.__init__Nrrrrr r r r r1sr1c@seZdZdddZdS) _ResultItemNcCs||_||_||_dSr)work_id exceptionresult)r r8r9r:r r r rsz_ResultItem.__init__)NNr6r r r r r7sr7c@seZdZddZdS) _CallItemcCs||_||_||_||_dSr)r8r3r4r5)r r8r3r4r5r r r rsz_CallItem.__init__Nr6r r r r r;sr;cs.eZdZdZdfdd ZfddZZS) _SafeQueuez=Safe Queue set exception to the future object linked to a jobrcs&||_||_||_tj||ddS)N)ctx)pending_work_items shutdown_lockrsuperr)r max_sizer=r>r?r __class__r r rsz_SafeQueue.__init__cst|trtt|||j}tdd||_ |j |j d}|j |jWdn1sj0Y|dur|j|nt||dS)Nz """ {}"""r() isinstancer;r)r*r+r-r#formatrr0r>popr8r?rrr2 set_exceptionr@_on_queue_feeder_error)r eobjr% work_itemrBr r rHs (z!_SafeQueue._on_queue_feeder_error)r)rrr__doc__rrH __classcell__r r rBr r<sr<cgs,t|}tt||}|s dS|VqdS)z, Iterates over zip()ed iterables in chunks. N)ziptuple itertoolsislice) chunksize iterablesitchunkr r r _get_chunkss rVcsfdd|DS)z Processes a chunk of an iterable passed to map. Runs the function passed to map() on a chunk of the iterable passed to map. This function is run in a separate process. csg|] }|qSr r ).0r4r3r r rz"_process_chunk..r )r3rUr rXr _process_chunks rZc Cs`z|t|||dWnBtyZ}z*t||j}|t||dWYd}~n d}~00dS)z.Safely send back the given result or exception)r:r9r9N)putr7 BaseExceptionr'r-) result_queuer8r:r9rIr,r r r _sendback_results   r_c Cs|dur:z ||Wn$ty8tjjdddYdS0|jdd}|dur`|tdSz|j|j i|j }Wn@ty}z(t ||j }t ||j|dWYd}~nd}~00t ||j|d~~q:dS)aEvaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. Args: call_queue: A ctx.Queue of _CallItems that will be read and evaluated by the worker. result_queue: A ctx.Queue of _ResultItems that will written to by the worker. initializer: A callable initializer, or None initargs: A tuple of args for the initializer NzException in initializer:T)exc_infoblockr[)r:)r]rZLOGGERZcriticalgetr\osgetpidr3r4r5r'r-r_r8) call_queuer^ initializerinitargsZ call_itemrrIr,r r r _process_workers$     &rjcspeZdZdZfddZddZddZdd Zd d Zd d Z ddZ ddZ ddZ ddZ ddZZS)_ExecutorManagerThreadatManages the communication between this process and the worker processes. The manager is run in a local thread. Args: executor: A reference to the ProcessPoolExecutor that owns this thread. A weakref will be own by the manager as well as references to internal objects used to introspect the state of the executor. csf|j|_|j|_|j|jfdd}t|||_|j|_|j |_ |j |_ |j |_|j|_tdS)NcSs<tjd||Wdn1s.0YdS)Nz?Executor collected: triggering callback for QueueManager wakeup)rutildebugr)rrr?r r r weakref_cbs z3_ExecutorManagerThread.__init__..weakref_cb)_executor_manager_thread_wakeupr_shutdown_lockr?weakrefrefexecutor_reference _processes processes _call_queuerf _result_queuer^ _work_idswork_ids_queue_pending_work_itemsr>r@r)r executorrnrBr r rs  z_ExecutorManagerThread.__init__cCs||\}}}|r(||dS|durX||~|}|durV|j~|r||j s| dSqdSr) add_call_item_to_queuewait_result_broken_or_wakeupterminate_brokenprocess_result_itemrs_idle_worker_semaphorereleaseis_shutting_downflag_executor_shutting_downr>join_executor_internals)r result_item is_brokencauser{r r r run:s"   z_ExecutorManagerThread.runcCs~|jrdSz|jjdd}Wntjy6YdS0|j|}|jrn|jj t ||j |j |j ddq|j|=qqdS)NFraT)rfZfullryrcqueueEmptyr>r2Zset_running_or_notify_cancelr\r;r3r4r5)r r8rKr r r r|Zs"    z-_ExecutorManagerThread.add_call_item_to_queuec Cs|jj}|jjrJ|jj}||g}ddt|jD}tj ||}d}d}d}||vrz| }d}Wqt y} z t t| | | j}WYd} ~ qd} ~ 00n ||vrd}|j|jWdn1s0Y|||fS)NcSsg|] }|jqSr )sentinelrWpr r r rY{rzG_ExecutorManagerThread.wait_result_broken_or_wakeup..TF)r^rrrrruvaluesrZ connectionwaitZrecvr]r)r*r+r-r?r) r Z result_readerZ wakeup_readerZreadersZworker_sentinelsZreadyrrrrIr r r r}qs( ,(z3_ExecutorManagerThread.wait_result_broken_or_wakeupcCs~t|tr>|sJ|j|}||jsz|dSn<|j|jd}|durz|j rl|j |j n|j |j dSr)rDintrrurFrrr>r8r9r2rGZ set_resultr:)r rrrKr r r rs   z*_ExecutorManagerThread.process_result_itemcCs|}tp|dup|jSr)rsr_shutdown_thread)r r{r r r rs z'_ExecutorManagerThread.is_shutting_downcCs|}|dur d|_d|_d}td}|durHtdd|d|_|jD]\}}|j |~qR|j |j D] }|q~|dS)NzKA child process terminated abruptly, the process pool is not usable anymoreTz^A process in the process pool was terminated abruptly while the future was running or pending.z ''' r(z''')rs_brokenrBrokenProcessPoolr#rr0r>rr2rGrrurZ terminater)r rr{Zbper8rKrr r r r~s"   z'_ExecutorManagerThread.terminate_brokencCs|}|dur|d|_|jr|i}|jD]\}}|js*|||<q*||_z|jWqLt j yrYqvYqL0qLd|_dS)NTF) rsr_cancel_pending_futuresr>rr2ZcancelryZ get_nowaitrr)r r{Znew_pending_work_itemsr8rKr r r rs   z2_ExecutorManagerThread.flag_executor_shutting_downc Csl|}d}||krh|dkrht||D]8}z|jd|d7}Wq,tjybYq Yq,0q,q dS)Nrr!)get_n_children_aliverangerfZ put_nowaitrZFull)r Zn_children_to_stopZn_sentinels_sentir r r shutdown_workerss   z'_ExecutorManagerThread.shutdown_workerscCsh||j|j|j|jWdn1sB0Y|jD] }|qVdSr) rrfrZ join_threadr?rrurrr rr r r rs  (z._ExecutorManagerThread.join_executor_internalscCstdd|jDS)Ncss|]}|VqdSr)Zis_aliverr r r rz>_ExecutorManagerThread.get_n_children_alive..)sumrurr r r r r sz+_ExecutorManagerThread.get_n_children_alive)rrrrLrrr|r}rrr~rrrrrMr r rBr rks +  & rkc Cshtrtrttdaztd}Wnttfy:YdS0|dkrHdS|dkrTdSd|attdS)NTSC_SEM_NSEMS_MAXz@system provides too few semaphores (%d available, 256 necessary))_system_limits_checked_system_limitedNotImplementedErrorrdsysconfAttributeError ValueError)Z nsems_maxr r r _check_system_limitss rccs&|D]}||r|VqqdS)z Specialized implementation of itertools.chain.from_iterable. Each item in *iterable* should be a list. This function is careful not to keep references to yielded objects. N)reverserF)iterableZelementr r r _chain_from_iterable_of_lists,src@seZdZdZdS)rzy Raised when a process in a ProcessPoolExecutor terminated abruptly while a future was in the running state. N)rrrrLr r r r r8srcs~eZdZdddZddZddZd d Zd d Zd dZe j jj e_ dddfdd Z dddddZ e j j j e _ ZS)ProcessPoolExecutorNr cCsJt|dur6tpd|_tjdkrntt|j|_n8|dkrHtdn tjdkrh|tkrhtdt||_|dur~t }||_ |j j ddd k|_ |durt|std ||_||_d|_i|_d|_t|_td|_d|_d|_i|_d|_t|_|jt }t!||j |j|j|jd |_"d |j"_#|$|_%t&'|_(dS) aSInitializes a new ProcessPoolExecutor instance. Args: max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. mp_context: A multiprocessing context to launch the workers. This object should provide SimpleQueue, Queue and Process. initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. Nr!Zwin32rz"max_workers must be greater than 0zmax_workers must be <= F)Z allow_noneforkzinitializer must be a callable)rAr=r>r?rT))rrd cpu_count _max_workerssysplatformmin_MAX_WINDOWS_WORKERSrrZ get_context _mp_contextZget_start_method#_safe_to_dynamically_spawn_childrencallable TypeError _initializer _initargs_executor_manager_threadrtr threadingZLockrpZ Semaphorerr _queue_countrzrrroEXTRA_QUEUED_CALLSr<rvZ _ignore_epipeZ SimpleQueuerwrrrx)r Z max_workersZ mp_contextrgrhZ queue_sizer r r r@sZ         zProcessPoolExecutor.__init__cCs<|jdur8|js|t||_|j|jt|j<dSr)rr_launch_processesrkstartrorr r r r _start_executor_manager_threads   z2ProcessPoolExecutor._start_executor_manager_threadcCs2|jjddrdSt|j}||jkr.|dS)NF)Zblocking)racquirelenrtr_spawn_process)r Z process_countr r r _adjust_process_counts   z)ProcessPoolExecutor._adjust_process_countcCs2|jrJdtt|j|jD] }|q dS)NzhProcesses cannot be fork()ed after the thread has started, deadlock in the child processes could result.)rrrrtrr)r rr r r rs z%ProcessPoolExecutor._launch_processescCs8|jjt|j|j|j|jfd}|||j|j <dS)N)targetr4) rZProcessrjrvrwrrrrtpidrr r r rsz"ProcessPoolExecutor._spawn_processcOs|j|jrt|j|jr&tdtr2tdt}t||||}||j |j <|j |j |j d7_ |j |jr|||WdS1s0YdS)Nz*cannot schedule new futures after shutdownz6cannot schedule new futures after interpreter shutdownr!)rprrr RuntimeErrorrrZFuturer1rzrrxr\rorrrr)r r3r4r5fwr r r submits"   zProcessPoolExecutor.submitr!)timeoutrRcs:|dkrtdtjtt|t|d|i|d}t|S)ajReturns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. r!zchunksize must be >= 1.rR)r)rr@maprrZrVr)r r3rrRrSZresultsrBr r rs zProcessPoolExecutor.mapTF)cancel_futurescCs|j0||_d|_|jdur(|jWdn1s<0Y|jdur^|r^|jd|_d|_|jdur|r|j d|_d|_ d|_dSr) rprrrorrrrvrwrrt)r rrr r r shutdowns (  zProcessPoolExecutor.shutdown)NNNr )T)rrrrrrrrrrExecutorrLrrrMr r rBr r?s U   r)NN)1rL __author__rdZconcurrent.futuresrrZmultiprocessingrZmultiprocessing.connectionZmultiprocessing.queuesrrrq functoolsrrPrr)WeakKeyDictionaryrrrr Z_register_atexitrr Exceptionr#r'r.objectr1r7r;r<rVrZr_rjZThreadrkrrrrZBrokenExecutorrrrr r r r sP*       )