Interface and implementations of the Ray Task Runner.
Task Runners
in Prefect are responsible for managing the execution of Prefect task runs.
Generally speaking, users are not expected to interact with
task runners outside of configuring and initializing them for a flow.
A parallel task_runner that submits tasks to ray.
By default, a temporary Ray cluster is created for the duration of the flow run.
Alternatively, if you already have a ray instance running, you can provide
the connection URL via the address kwarg.
Args:
address (string, optional): Address of a currently running ray instance; if
one is not provided, a temporary instance will be created.
init_kwargs (dict, optional): Additional kwargs to use when calling ray.init.
Examples:
Using a temporary local ray cluster:
classRayTaskRunner(BaseTaskRunner):""" A parallel task_runner that submits tasks to `ray`. By default, a temporary Ray cluster is created for the duration of the flow run. Alternatively, if you already have a `ray` instance running, you can provide the connection URL via the `address` kwarg. Args: address (string, optional): Address of a currently running `ray` instance; if one is not provided, a temporary instance will be created. init_kwargs (dict, optional): Additional kwargs to use when calling `ray.init`. Examples: Using a temporary local ray cluster: ```python from prefect import flow from prefect_ray.task_runners import RayTaskRunner @flow(task_runner=RayTaskRunner()) def my_flow(): ... ``` Connecting to an existing ray instance: ```python RayTaskRunner(address="ray://192.0.2.255:8786") ``` """def__init__(self,address:str=None,init_kwargs:dict=None,):# Store settingsself.address=addressself.init_kwargs=init_kwargs.copy()ifinit_kwargselse{}self.init_kwargs.setdefault("namespace","prefect")# Runtime attributesself._ray_refs:Dict[str,"ray.ObjectRef"]={}super().__init__()defduplicate(self):""" Return a new instance of with the same settings as this one. """returntype(self)(address=self.address,init_kwargs=self.init_kwargs)def__eq__(self,other:object)->bool:""" Check if an instance has the same settings as this task runner. """iftype(self)==type(other):return(self.address==other.addressandself.init_kwargs==other.init_kwargs)else:returnNotImplemented@propertydefconcurrency_type(self)->TaskConcurrencyType:returnTaskConcurrencyType.PARALLELasyncdefsubmit(self,key:UUID,call:Callable[...,Awaitable[State[R]]],)->None:ifnotself._started:raiseRuntimeError("The task runner must be started before submitting work.")call_kwargs,upstream_ray_obj_refs=self._exchange_prefect_for_ray_futures(call.keywords)remote_options=RemoteOptionsContext.get().current_remote_options# Ray does not support the submission of async functions and we must create a# sync entrypointifremote_options:ray_decorator=ray.remote(**remote_options)else:ray_decorator=ray.remoteself._ray_refs[key]=(ray_decorator(self._run_prefect_task).options(name=call.keywords["task_run"].name).remote(sync_compatible(call.func),*upstream_ray_obj_refs,**call_kwargs))def_exchange_prefect_for_ray_futures(self,kwargs_prefect_futures):"""Exchanges Prefect futures for Ray futures."""upstream_ray_obj_refs=[]defexchange_prefect_for_ray_future(expr):"""Exchanges Prefect future for Ray future."""ifisinstance(expr,PrefectFuture):ray_future=self._ray_refs.get(expr.key)ifray_futureisnotNone:upstream_ray_obj_refs.append(ray_future)returnray_futurereturnexprkwargs_ray_futures=visit_collection(kwargs_prefect_futures,visit_fn=exchange_prefect_for_ray_future,return_data=True,)returnkwargs_ray_futures,upstream_ray_obj_refs@staticmethoddef_run_prefect_task(func,*upstream_ray_obj_refs,**kwargs):"""Resolves Ray futures before calling the actual Prefect task function. Passing upstream_ray_obj_refs directly as args enables Ray to wait for upstream tasks before running this remote function. This variable is otherwise unused as the ray object refs are also contained in kwargs. """defresolve_ray_future(expr):"""Resolves Ray future."""ifisinstance(expr,ray.ObjectRef):returnray.get(expr)returnexprkwargs=visit_collection(kwargs,visit_fn=resolve_ray_future,return_data=True)returnfunc(**kwargs)asyncdefwait(self,key:UUID,timeout:float=None)->Optional[State]:ref=self._get_ray_ref(key)result=Nonewithanyio.move_on_after(timeout):# We await the reference directly instead of using `ray.get` so we can# avoid blocking the event looptry:result=awaitrefexceptRayTaskErrorasexc:# unwrap the original exception that caused task failure, except for# KeyboardInterrupt, which unwraps as TaskCancelledErrorresult=awaitexception_to_crashed_state(exc.cause)exceptBaseExceptionasexc:result=awaitexception_to_crashed_state(exc)returnresultasyncdef_start(self,exit_stack:AsyncExitStack):""" Start the task runner and prep for context exit. - Creates a cluster if an external address is not set. - Creates a client to connect to the cluster. - Pushes a call to wait for all running futures to complete on exit. """ifself.addressandself.address!="auto":self.logger.info(f"Connecting to an existing Ray instance at {self.address}")init_args=(self.address,)elifray.is_initialized():self.logger.info("Local Ray instance is already initialized. ""Using existing local instance.")returnelse:self.logger.info("Creating a local Ray instance")init_args=()context=ray.init(*init_args,**self.init_kwargs)dashboard_url=getattr(context,"dashboard_url",None)exit_stack.push(context)# Display some information about the clusternodes=ray.nodes()living_nodes=[nodefornodeinnodesifnode.get("alive")]self.logger.info(f"Using Ray cluster with {len(living_nodes)} nodes.")ifdashboard_url:self.logger.info(f"The Ray UI is available at {dashboard_url}",)asyncdef_shutdown_ray(self):""" Shuts down the cluster. """self.logger.debug("Shutting down Ray cluster...")ray.shutdown()def_get_ray_ref(self,key:UUID)->"ray.ObjectRef":""" Retrieve the ray object reference corresponding to a prefect future. """returnself._ray_refs[key]
Return a new instance of with the same settings as this one.
Source code in prefect_ray/task_runners.py
132133134135136
defduplicate(self):""" Return a new instance of with the same settings as this one. """returntype(self)(address=self.address,init_kwargs=self.init_kwargs)