classPrefectAgent:@experimental_parameter("work_pool_name",group="work_pools",when=lambday:yisnotNone)def__init__(self,work_queues:List[str]=None,work_queue_prefix:Union[str,List[str]]=None,work_pool_name:str=None,prefetch_seconds:int=None,default_infrastructure:Infrastructure=None,default_infrastructure_document_id:UUID=None,limit:Optional[int]=None,)->None:ifdefault_infrastructureanddefault_infrastructure_document_id:raiseValueError("Provide only one of 'default_infrastructure' and"" 'default_infrastructure_document_id'.")self.work_queues:Set[str]=set(work_queues)ifwork_queueselseset()self.work_pool_name=work_pool_nameself.prefetch_seconds=prefetch_secondsself.submitting_flow_run_ids=set()self.cancelling_flow_run_ids=set()self.scheduled_task_scopes=set()self.started=Falseself.logger=get_logger("agent")self.task_group:Optional[anyio.abc.TaskGroup]=Noneself.limit:Optional[int]=limitself.limiter:Optional[anyio.CapacityLimiter]=Noneself.client:Optional[PrefectClient]=Noneifisinstance(work_queue_prefix,str):work_queue_prefix=[work_queue_prefix]self.work_queue_prefix=work_queue_prefixself._work_queue_cache_expiration:pendulum.DateTime=Noneself._work_queue_cache:List[WorkQueue]=[]ifdefault_infrastructure:self.default_infrastructure_document_id=(default_infrastructure._block_document_id)self.default_infrastructure=default_infrastructureelifdefault_infrastructure_document_id:self.default_infrastructure_document_id=default_infrastructure_document_idself.default_infrastructure=Noneelse:self.default_infrastructure=Process()self.default_infrastructure_document_id=Noneasyncdefupdate_matched_agent_work_queues(self):ifself.work_queue_prefix:ifself.work_pool_name:matched_queues=awaitself.client.read_work_queues(work_pool_name=self.work_pool_name,work_queue_filter=WorkQueueFilter(name=WorkQueueFilterName(startswith_=self.work_queue_prefix)),)else:matched_queues=awaitself.client.match_work_queues(self.work_queue_prefix)matched_queues=set(q.nameforqinmatched_queues)ifmatched_queues!=self.work_queues:new_queues=matched_queues-self.work_queuesremoved_queues=self.work_queues-matched_queuesifnew_queues:self.logger.info(f"Matched new work queues: {', '.join(new_queues)}")ifremoved_queues:self.logger.info(f"Work queues no longer matched: {', '.join(removed_queues)}")self.work_queues=matched_queuesasyncdefget_work_queues(self)->AsyncIterator[WorkQueue]:""" Loads the work queue objects corresponding to the agent's target work queues. If any of them don't exist, they are created. """# if the queue cache has not expired, yield queues from the cachenow=pendulum.now("UTC")if(self._work_queue_cache_expirationornow)>now:forqueueinself._work_queue_cache:yieldqueuereturn# otherwise clear the cache, set the expiration for 30 seconds, and# reload the work queuesself._work_queue_cache.clear()self._work_queue_cache_expiration=now.add(seconds=30)awaitself.update_matched_agent_work_queues()fornameinself.work_queues:try:work_queue=awaitself.client.read_work_queue_by_name(work_pool_name=self.work_pool_name,name=name)exceptObjectNotFound:# if the work queue wasn't found, create itifnotself.work_queue_prefix:# do not attempt to create work queues if the agent is polling for# queues using a regextry:work_queue=awaitself.client.create_work_queue(work_pool_name=self.work_pool_name,name=name)ifself.work_pool_name:self.logger.info(f"Created work queue {name!r} in work pool"f" {self.work_pool_name!r}.")else:self.logger.info(f"Created work queue '{name}'.")# if creating it raises an exception, it was probably just# created by some other agent; rather than entering a re-read# loop with new error handling, we log the exception and# continue.exceptException:self.logger.exception(f"Failed to create work queue {name!r}.")continueself._work_queue_cache.append(work_queue)yieldwork_queueasyncdefget_and_submit_flow_runs(self)->List[FlowRun]:""" The principle method on agents. Queries for scheduled flow runs and submits them for execution in parallel. """ifnotself.started:raiseRuntimeError("Agent is not started. Use `async with PrefectAgent()...`")self.logger.debug("Checking for scheduled flow runs...")before=pendulum.now("utc").add(seconds=self.prefetch_secondsorPREFECT_AGENT_PREFETCH_SECONDS.value())submittable_runs:List[FlowRun]=[]ifself.work_pool_name:responses=awaitself.client.get_scheduled_flow_runs_for_work_pool(work_pool_name=self.work_pool_name,work_queue_names=[wq.nameasyncforwqinself.get_work_queues()],scheduled_before=before,)submittable_runs.extend([response.flow_runforresponseinresponses])else:# load runs from each work queueasyncforwork_queueinself.get_work_queues():# print a nice message if the work queue is pausedifwork_queue.is_paused:self.logger.info(f"Work queue {work_queue.name!r} ({work_queue.id}) is paused.")else:try:queue_runs=awaitself.client.get_runs_in_work_queue(id=work_queue.id,limit=10,scheduled_before=before)submittable_runs.extend(queue_runs)exceptObjectNotFound:self.logger.error(f"Work queue {work_queue.name!r} ({work_queue.id}) not"" found.")exceptExceptionasexc:self.logger.exception(exc)submittable_runs.sort(key=lambdarun:run.next_scheduled_start_time)forflow_runinsubmittable_runs:# don't resubmit a runifflow_run.idinself.submitting_flow_run_ids:continuetry:ifself.limiter:self.limiter.acquire_on_behalf_of_nowait(flow_run.id)exceptanyio.WouldBlock:self.logger.info(f"Flow run limit reached; {self.limiter.borrowed_tokens} flow runs"" in progress.")breakelse:self.logger.info(f"Submitting flow run '{flow_run.id}'")self.submitting_flow_run_ids.add(flow_run.id)self.task_group.start_soon(self.submit_run,flow_run,)returnlist(filter(lambdarun:run.idinself.submitting_flow_run_ids,submittable_runs))asyncdefcheck_for_cancelled_flow_runs(self):ifnotself.started:raiseRuntimeError("Agent is not started. Use `async with PrefectAgent()...`")self.logger.debug("Checking for cancelled flow runs...")work_queue_filter=(WorkQueueFilter(name=WorkQueueFilterName(any_=list(self.work_queues)))ifself.work_queueselseNone)work_pool_filter=(WorkPoolFilter(name=WorkPoolFilterName(any_=[self.work_pool_name]))ifself.work_pool_nameelseWorkPoolFilter(name=WorkPoolFilterName(any_=["default-agent-pool"])))named_cancelling_flow_runs=awaitself.client.read_flow_runs(flow_run_filter=FlowRunFilter(state=FlowRunFilterState(type=FlowRunFilterStateType(any_=[StateType.CANCELLED]),name=FlowRunFilterStateName(any_=["Cancelling"]),),# Avoid duplicate cancellation callsid=FlowRunFilterId(not_any_=list(self.cancelling_flow_run_ids)),),work_pool_filter=work_pool_filter,work_queue_filter=work_queue_filter,)typed_cancelling_flow_runs=awaitself.client.read_flow_runs(flow_run_filter=FlowRunFilter(state=FlowRunFilterState(type=FlowRunFilterStateType(any_=[StateType.CANCELLING]),),# Avoid duplicate cancellation callsid=FlowRunFilterId(not_any_=list(self.cancelling_flow_run_ids)),),work_pool_filter=work_pool_filter,work_queue_filter=work_queue_filter,)cancelling_flow_runs=named_cancelling_flow_runs+typed_cancelling_flow_runsifcancelling_flow_runs:self.logger.info(f"Found {len(cancelling_flow_runs)} flow runs awaiting cancellation.")forflow_runincancelling_flow_runs:self.cancelling_flow_run_ids.add(flow_run.id)self.task_group.start_soon(self.cancel_run,flow_run)returncancelling_flow_runsasyncdefcancel_run(self,flow_run:FlowRun)->None:""" Cancel a flow run by killing its infrastructure """ifnotflow_run.infrastructure_pid:self.logger.error(f"Flow run '{flow_run.id}' does not have an infrastructure pid"" attached. Cancellation cannot be guaranteed.")awaitself._mark_flow_run_as_cancelled(flow_run,state_updates={"message":("This flow run is missing infrastructure tracking information"" and cancellation cannot be guaranteed.")},)returntry:infrastructure=awaitself.get_infrastructure(flow_run)exceptException:self.logger.exception(f"Failed to get infrastructure for flow run '{flow_run.id}'. ""Flow run cannot be cancelled.")# Note: We leave this flow run in the cancelling set because it cannot be# cancelled and this will prevent additional attempts.returnifnothasattr(infrastructure,"kill"):self.logger.error(f"Flow run '{flow_run.id}' infrastructure {infrastructure.type!r} ""does not support killing created infrastructure. ""Cancellation cannot be guaranteed.")returnself.logger.info(f"Killing {infrastructure.type}{flow_run.infrastructure_pid} for flow run "f"'{flow_run.id}'...")try:awaitinfrastructure.kill(flow_run.infrastructure_pid)exceptInfrastructureNotFoundasexc:self.logger.warning(f"{exc} Marking flow run as cancelled.")awaitself._mark_flow_run_as_cancelled(flow_run)exceptInfrastructureNotAvailableasexc:self.logger.warning(f"{exc} Flow run cannot be cancelled by this agent.")exceptException:self.logger.exception("Encountered exception while killing infrastructure for flow run "f"'{flow_run.id}'. Flow run may not be cancelled.")# We will try again on generic exceptionsself.cancelling_flow_run_ids.remove(flow_run.id)returnelse:awaitself._mark_flow_run_as_cancelled(flow_run)self.logger.info(f"Cancelled flow run '{flow_run.id}'!")asyncdef_mark_flow_run_as_cancelled(self,flow_run:FlowRun,state_updates:Optional[dict]=None)->None:state_updates=state_updatesor{}state_updates.setdefault("name","Cancelled")state_updates.setdefault("type",StateType.CANCELLED)state=flow_run.state.copy(update=state_updates)awaitself.client.set_flow_run_state(flow_run.id,state,force=True)# Do not remove the flow run from the cancelling set immediately because# the API caches responses for the `read_flow_runs` and we do not want to# duplicate cancellations.awaitself._schedule_task(60*10,self.cancelling_flow_run_ids.remove,flow_run.id)asyncdefget_infrastructure(self,flow_run:FlowRun)->Infrastructure:deployment=awaitself.client.read_deployment(flow_run.deployment_id)flow=awaitself.client.read_flow(deployment.flow_id)# overrides only apply when configuring known infra blocksifnotdeployment.infrastructure_document_id:ifself.default_infrastructure:infra_block=self.default_infrastructureelse:infra_document=awaitself.client.read_block_document(self.default_infrastructure_document_id)infra_block=Block._from_block_document(infra_document)# Add flow run metadata to the infrastructureprepared_infrastructure=infra_block.prepare_for_flow_run(flow_run,deployment=deployment,flow=flow)returnprepared_infrastructure## get infrainfra_document=awaitself.client.read_block_document(deployment.infrastructure_document_id)# this piece of logic applies any overrides that may have been set on the# deployment; overrides are defined as dot.delimited paths on possibly nested# attributes of the infrastructure blockdoc_dict=infra_document.dict()infra_dict=doc_dict.get("data",{})foroverride,valuein(deployment.infra_overridesor{}).items():nested_fields=override.split(".")data=infra_dictforfieldinnested_fields[:-1]:data=data[field]# once we reach the end, set the valuedata[nested_fields[-1]]=value# reconstruct the infra blockdoc_dict["data"]=infra_dictinfra_document=BlockDocument(**doc_dict)infrastructure_block=Block._from_block_document(infra_document)# TODO: Here the agent may update the infrastructure with agent-level settings# Add flow run metadata to the infrastructureprepared_infrastructure=infrastructure_block.prepare_for_flow_run(flow_run,deployment=deployment,flow=flow)returnprepared_infrastructureasyncdefsubmit_run(self,flow_run:FlowRun)->None:""" Submit a flow run to the infrastructure """ready_to_submit=awaitself._propose_pending_state(flow_run)ifready_to_submit:try:infrastructure=awaitself.get_infrastructure(flow_run)exceptExceptionasexc:self.logger.exception(f"Failed to get infrastructure for flow run '{flow_run.id}'.")awaitself._propose_failed_state(flow_run,exc)ifself.limiter:self.limiter.release_on_behalf_of(flow_run.id)else:# Wait for submission to be completed. Note that the submission function# may continue to run in the background after this exits.readiness_result=awaitself.task_group.start(self._submit_run_and_capture_errors,flow_run,infrastructure)ifreadiness_resultandnotisinstance(readiness_result,Exception):try:awaitself.client.update_flow_run(flow_run_id=flow_run.id,infrastructure_pid=str(readiness_result),)exceptException:self.logger.exception("An error occured while setting the `infrastructure_pid`"f" on flow run {flow_run.id!r}. The flow run will not be"" cancellable.")self.logger.info(f"Completed submission of flow run '{flow_run.id}'")else:# If the run is not ready to submit, release the concurrency slotifself.limiter:self.limiter.release_on_behalf_of(flow_run.id)self.submitting_flow_run_ids.remove(flow_run.id)asyncdef_submit_run_and_capture_errors(self,flow_run:FlowRun,infrastructure:Infrastructure,task_status:anyio.abc.TaskStatus=None,)->Union[InfrastructureResult,Exception]:# Note: There is not a clear way to determine if task_status.started() has been# called without peeking at the internal `_future`. Ideally we could just# check if the flow run id has been removed from `submitting_flow_run_ids`# but it is not so simple to guarantee that this coroutine yields back# to `submit_run` to execute that line when exceptions are raised during# submission.try:result=awaitinfrastructure.run(task_status=task_status)exceptExceptionasexc:ifnottask_status._future.done():# This flow run was being submitted and did not start successfullyself.logger.exception(f"Failed to submit flow run '{flow_run.id}' to infrastructure.")# Mark the task as started to prevent agent crashtask_status.started(exc)awaitself._propose_crashed_state(flow_run,"Flow run could not be submitted to infrastructure")else:self.logger.exception(f"An error occured while monitoring flow run '{flow_run.id}'. ""The flow run will not be marked as failed, but an issue may have ""occurred.")returnexcfinally:ifself.limiter:self.limiter.release_on_behalf_of(flow_run.id)ifnottask_status._future.done():self.logger.error(f"Infrastructure returned without reporting flow run '{flow_run.id}' ""as started or raising an error. This behavior is not expected and ""generally indicates improper implementation of infrastructure. The ""flow run will not be marked as failed, but an issue may have occurred.")# Mark the task as started to prevent agent crashtask_status.started()ifresult.status_code!=0:awaitself._propose_crashed_state(flow_run,("Flow run infrastructure exited with non-zero status code"f" {result.status_code}."),)returnresultasyncdef_propose_pending_state(self,flow_run:FlowRun)->bool:state=flow_run.statetry:state=awaitpropose_state(self.client,Pending(),flow_run_id=flow_run.id)exceptAbortasexc:self.logger.info((f"Aborted submission of flow run '{flow_run.id}'. "f"Server sent an abort signal: {exc}"),)returnFalseexceptException:self.logger.error(f"Failed to update state of flow run '{flow_run.id}'",exc_info=True,)returnFalseifnotstate.is_pending():self.logger.info((f"Aborted submission of flow run '{flow_run.id}': "f"Server returned a non-pending state {state.type.value!r}"),)returnFalsereturnTrueasyncdef_propose_failed_state(self,flow_run:FlowRun,exc:Exception)->None:try:awaitpropose_state(self.client,awaitexception_to_failed_state(message="Submission failed.",exc=exc),flow_run_id=flow_run.id,)exceptAbort:# We've already failed, no need to note the abort but we don't want it to# raise in the agent processpassexceptException:self.logger.error(f"Failed to update state of flow run '{flow_run.id}'",exc_info=True,)asyncdef_propose_crashed_state(self,flow_run:FlowRun,message:str)->None:try:state=awaitpropose_state(self.client,Crashed(message=message),flow_run_id=flow_run.id,)exceptAbort:# Flow run already marked as failedpassexceptException:self.logger.exception(f"Failed to update state of flow run '{flow_run.id}'")else:ifstate.is_crashed():self.logger.info(f"Reported flow run '{flow_run.id}' as crashed: {message}")asyncdef_schedule_task(self,__in_seconds:int,fn,*args,**kwargs):""" Schedule a background task to start after some time. These tasks will be run immediately when the agent exits instead of waiting. The function may be async or sync. Async functions will be awaited. """asyncdefwrapper(task_status):# If we are shutting down, do not sleep; otherwise sleep until the scheduled# time or shutdownifself.started:withanyio.CancelScope()asscope:self.scheduled_task_scopes.add(scope)task_status.started()awaitanyio.sleep(__in_seconds)self.scheduled_task_scopes.remove(scope)else:task_status.started()result=fn(*args,**kwargs)ifinspect.iscoroutine(result):awaitresultawaitself.task_group.start(wrapper)# Context management ---------------------------------------------------------------asyncdefstart(self):self.started=Trueself.task_group=anyio.create_task_group()self.limiter=(anyio.CapacityLimiter(self.limit)ifself.limitisnotNoneelseNone)self.client=get_client()awaitself.client.__aenter__()awaitself.task_group.__aenter__()asyncdefshutdown(self,*exc_info):self.started=False# We must cancel scheduled task scopes before closing the task groupforscopeinself.scheduled_task_scopes:scope.cancel()awaitself.task_group.__aexit__(*exc_info)awaitself.client.__aexit__(*exc_info)self.task_group=Noneself.client=Noneself.submitting_flow_run_ids.clear()self.cancelling_flow_run_ids.clear()self.scheduled_task_scopes.clear()self._work_queue_cache_expiration=Noneself._work_queue_cache=[]asyncdef__aenter__(self):awaitself.start()returnselfasyncdef__aexit__(self,*exc_info):awaitself.shutdown(*exc_info)
asyncdefcancel_run(self,flow_run:FlowRun)->None:""" Cancel a flow run by killing its infrastructure """ifnotflow_run.infrastructure_pid:self.logger.error(f"Flow run '{flow_run.id}' does not have an infrastructure pid"" attached. Cancellation cannot be guaranteed.")awaitself._mark_flow_run_as_cancelled(flow_run,state_updates={"message":("This flow run is missing infrastructure tracking information"" and cancellation cannot be guaranteed.")},)returntry:infrastructure=awaitself.get_infrastructure(flow_run)exceptException:self.logger.exception(f"Failed to get infrastructure for flow run '{flow_run.id}'. ""Flow run cannot be cancelled.")# Note: We leave this flow run in the cancelling set because it cannot be# cancelled and this will prevent additional attempts.returnifnothasattr(infrastructure,"kill"):self.logger.error(f"Flow run '{flow_run.id}' infrastructure {infrastructure.type!r} ""does not support killing created infrastructure. ""Cancellation cannot be guaranteed.")returnself.logger.info(f"Killing {infrastructure.type}{flow_run.infrastructure_pid} for flow run "f"'{flow_run.id}'...")try:awaitinfrastructure.kill(flow_run.infrastructure_pid)exceptInfrastructureNotFoundasexc:self.logger.warning(f"{exc} Marking flow run as cancelled.")awaitself._mark_flow_run_as_cancelled(flow_run)exceptInfrastructureNotAvailableasexc:self.logger.warning(f"{exc} Flow run cannot be cancelled by this agent.")exceptException:self.logger.exception("Encountered exception while killing infrastructure for flow run "f"'{flow_run.id}'. Flow run may not be cancelled.")# We will try again on generic exceptionsself.cancelling_flow_run_ids.remove(flow_run.id)returnelse:awaitself._mark_flow_run_as_cancelled(flow_run)self.logger.info(f"Cancelled flow run '{flow_run.id}'!")
asyncdefget_and_submit_flow_runs(self)->List[FlowRun]:""" The principle method on agents. Queries for scheduled flow runs and submits them for execution in parallel. """ifnotself.started:raiseRuntimeError("Agent is not started. Use `async with PrefectAgent()...`")self.logger.debug("Checking for scheduled flow runs...")before=pendulum.now("utc").add(seconds=self.prefetch_secondsorPREFECT_AGENT_PREFETCH_SECONDS.value())submittable_runs:List[FlowRun]=[]ifself.work_pool_name:responses=awaitself.client.get_scheduled_flow_runs_for_work_pool(work_pool_name=self.work_pool_name,work_queue_names=[wq.nameasyncforwqinself.get_work_queues()],scheduled_before=before,)submittable_runs.extend([response.flow_runforresponseinresponses])else:# load runs from each work queueasyncforwork_queueinself.get_work_queues():# print a nice message if the work queue is pausedifwork_queue.is_paused:self.logger.info(f"Work queue {work_queue.name!r} ({work_queue.id}) is paused.")else:try:queue_runs=awaitself.client.get_runs_in_work_queue(id=work_queue.id,limit=10,scheduled_before=before)submittable_runs.extend(queue_runs)exceptObjectNotFound:self.logger.error(f"Work queue {work_queue.name!r} ({work_queue.id}) not"" found.")exceptExceptionasexc:self.logger.exception(exc)submittable_runs.sort(key=lambdarun:run.next_scheduled_start_time)forflow_runinsubmittable_runs:# don't resubmit a runifflow_run.idinself.submitting_flow_run_ids:continuetry:ifself.limiter:self.limiter.acquire_on_behalf_of_nowait(flow_run.id)exceptanyio.WouldBlock:self.logger.info(f"Flow run limit reached; {self.limiter.borrowed_tokens} flow runs"" in progress.")breakelse:self.logger.info(f"Submitting flow run '{flow_run.id}'")self.submitting_flow_run_ids.add(flow_run.id)self.task_group.start_soon(self.submit_run,flow_run,)returnlist(filter(lambdarun:run.idinself.submitting_flow_run_ids,submittable_runs))
asyncdefget_work_queues(self)->AsyncIterator[WorkQueue]:""" Loads the work queue objects corresponding to the agent's target work queues. If any of them don't exist, they are created. """# if the queue cache has not expired, yield queues from the cachenow=pendulum.now("UTC")if(self._work_queue_cache_expirationornow)>now:forqueueinself._work_queue_cache:yieldqueuereturn# otherwise clear the cache, set the expiration for 30 seconds, and# reload the work queuesself._work_queue_cache.clear()self._work_queue_cache_expiration=now.add(seconds=30)awaitself.update_matched_agent_work_queues()fornameinself.work_queues:try:work_queue=awaitself.client.read_work_queue_by_name(work_pool_name=self.work_pool_name,name=name)exceptObjectNotFound:# if the work queue wasn't found, create itifnotself.work_queue_prefix:# do not attempt to create work queues if the agent is polling for# queues using a regextry:work_queue=awaitself.client.create_work_queue(work_pool_name=self.work_pool_name,name=name)ifself.work_pool_name:self.logger.info(f"Created work queue {name!r} in work pool"f" {self.work_pool_name!r}.")else:self.logger.info(f"Created work queue '{name}'.")# if creating it raises an exception, it was probably just# created by some other agent; rather than entering a re-read# loop with new error handling, we log the exception and# continue.exceptException:self.logger.exception(f"Failed to create work queue {name!r}.")continueself._work_queue_cache.append(work_queue)yieldwork_queue
asyncdefsubmit_run(self,flow_run:FlowRun)->None:""" Submit a flow run to the infrastructure """ready_to_submit=awaitself._propose_pending_state(flow_run)ifready_to_submit:try:infrastructure=awaitself.get_infrastructure(flow_run)exceptExceptionasexc:self.logger.exception(f"Failed to get infrastructure for flow run '{flow_run.id}'.")awaitself._propose_failed_state(flow_run,exc)ifself.limiter:self.limiter.release_on_behalf_of(flow_run.id)else:# Wait for submission to be completed. Note that the submission function# may continue to run in the background after this exits.readiness_result=awaitself.task_group.start(self._submit_run_and_capture_errors,flow_run,infrastructure)ifreadiness_resultandnotisinstance(readiness_result,Exception):try:awaitself.client.update_flow_run(flow_run_id=flow_run.id,infrastructure_pid=str(readiness_result),)exceptException:self.logger.exception("An error occured while setting the `infrastructure_pid`"f" on flow run {flow_run.id!r}. The flow run will not be"" cancellable.")self.logger.info(f"Completed submission of flow run '{flow_run.id}'")else:# If the run is not ready to submit, release the concurrency slotifself.limiter:self.limiter.release_on_behalf_of(flow_run.id)self.submitting_flow_run_ids.remove(flow_run.id)