classGatherTaskGroup(anyio.abc.TaskGroup):""" A task group that gathers results. AnyIO does not include support `gather`. This class extends the `TaskGroup` interface to allow simple gathering. See https://github.com/agronholm/anyio/issues/100 This class should be instantiated with `create_gather_task_group`. """def__init__(self,task_group:anyio.abc.TaskGroup):self._results:Dict[UUID,Any]={}# The concrete task group implementation to useself._task_group:anyio.abc.TaskGroup=task_groupasyncdef_run_and_store(self,key,fn,args):self._results[key]=awaitfn(*args)defstart_soon(self,fn,*args)->UUID:key=uuid4()# Put a placeholder in-case the result is retrieved earlierself._results[key]=GatherIncompleteself._task_group.start_soon(self._run_and_store,key,fn,args)returnkeyasyncdefstart(self,fn,*args):""" Since `start` returns the result of `task_status.started()` but here we must return the key instead, we just won't support this method for now. """raiseRuntimeError("`GatherTaskGroup` does not support `start`.")defget_result(self,key:UUID)->Any:result=self._results[key]ifresultisGatherIncomplete:raiseGatherIncomplete("Task is not complete. ""Results should not be retrieved until the task group exits.")returnresultasyncdef__aenter__(self):awaitself._task_group.__aenter__()returnselfasyncdef__aexit__(self,*tb):try:retval=awaitself._task_group.__aexit__(*tb)returnretvalfinally:delself._task_group
Since start returns the result of task_status.started() but here we must
return the key instead, we just won't support this method for now.
Source code in src/prefect/utilities/asyncutils.py
426427428429430431
asyncdefstart(self,fn,*args):""" Since `start` returns the result of `task_status.started()` but here we must return the key instead, we just won't support this method for now. """raiseRuntimeError("`GatherTaskGroup` does not support `start`.")
Adds a callback to the given callable on event loop closure. The callable must be
a coroutine function. It will be awaited when the current event loop is shutting
down.
Requires use of asyncio.run() which waits for async generator shutdown by
default or explicit call of asyncio.shutdown_asyncgens(). If the application
is entered with asyncio.run_until_complete() and the user calls
asyncio.close() without the generator shutdown call, this will not trigger
callbacks.
asyncio does not provided any other way to clean up a resource when the event
loop is about to close.
Source code in src/prefect/utilities/asyncutils.py
asyncdefadd_event_loop_shutdown_callback(coroutine_fn:Callable[[],Awaitable]):""" Adds a callback to the given callable on event loop closure. The callable must be a coroutine function. It will be awaited when the current event loop is shutting down. Requires use of `asyncio.run()` which waits for async generator shutdown by default or explicit call of `asyncio.shutdown_asyncgens()`. If the application is entered with `asyncio.run_until_complete()` and the user calls `asyncio.close()` without the generator shutdown call, this will not trigger callbacks. asyncio does not provided _any_ other way to clean up a resource when the event loop is about to close. """asyncdefon_shutdown(key):# It appears that EVENT_LOOP_GC_REFS is somehow being garbage collected early.# We hold a reference to it so as to preserve it, at least for the lifetime of# this coroutine. See the issue below for the initial report/discussion:# https://github.com/PrefectHQ/prefect/issues/7709#issuecomment-1560021109_=EVENT_LOOP_GC_REFStry:yieldexceptGeneratorExit:awaitcoroutine_fn()# Remove self from the garbage collection setEVENT_LOOP_GC_REFS.pop(key)# Create the iterator and store it in a global variable so it is not garbage# collected. If the iterator is garbage collected before the event loop closes, the# callback will not run. Since this function does not know the scope of the event# loop that is calling it, a reference with global scope is necessary to ensure# garbage collection does not occur until after event loop closure.key=id(on_shutdown)EVENT_LOOP_GC_REFS[key]=on_shutdown(key)# Begin iterating so it will be cleaned up as an incomplete generatortry:awaitEVENT_LOOP_GC_REFS[key].__anext__()# There is a poorly understood edge case we've seen in CI where the key is# removed from the dict before we begin generator iteration.exceptKeyError:logger.warning("The event loop shutdown callback was not properly registered. ")pass
Source code in src/prefect/utilities/asyncutils.py
454455456457458459
defcreate_gather_task_group()->GatherTaskGroup:"""Create a new task group that gathers results"""# This function matches the AnyIO API which uses callables since the concrete# task group class depends on the async library being used and cannot be# determined until runtimereturnGatherTaskGroup(anyio.create_task_group())
Unlike asyncio.gather this expects to receive callables not coroutines.
This matches anyio semantics.
Source code in src/prefect/utilities/asyncutils.py
462463464465466467468469470471472473
asyncdefgather(*calls:Callable[[],Coroutine[Any,Any,T]])->List[T]:""" Run calls concurrently and gather their results. Unlike `asyncio.gather` this expects to receive _callables_ not _coroutines_. This matches `anyio` semantics. """keys=[]asyncwithcreate_gather_task_group()astg:forcallincalls:keys.append(tg.start_soon(call))return[tg.get_result(key)forkeyinkeys]
See https://github.com/microsoft/pyright/issues/2142 for an example use
Source code in src/prefect/utilities/asyncutils.py
616263646566676869707172
defis_async_fn(func:Union[Callable[P,R],Callable[P,Awaitable[R]]],)->TypeGuard[Callable[P,Awaitable[R]]]:""" Returns `True` if a function returns a coroutine. See https://github.com/microsoft/pyright/issues/2142 for an example use """whilehasattr(func,"__wrapped__"):func=func.__wrapped__returninspect.iscoroutinefunction(func)
Source code in src/prefect/utilities/asyncutils.py
7576777879808182
defis_async_gen_fn(func):""" Returns `True` if a function is an async generator. """whilehasattr(func,"__wrapped__"):func=func.__wrapped__returninspect.isasyncgenfunction(func)
This will not interrupt long-running system calls like sleep or wait.
Source code in src/prefect/utilities/asyncutils.py
141142143144145146147148149150151
defraise_async_exception_in_thread(thread:Thread,exc_type:Type[BaseException]):""" Raise an exception in a thread asynchronously. This will not interrupt long-running system calls like `sleep` or `wait`. """ret=ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread.ident),ctypes.py_object(exc_type))ifret==0:raiseValueError("Thread not found.")
Runs an async function in the main thread's event loop, blocking the worker
thread until completion
Source code in src/prefect/utilities/asyncutils.py
214215216217218219220221222
defrun_async_from_worker_thread(__fn:Callable[...,Awaitable[T]],*args:Any,**kwargs:Any)->T:""" Runs an async function in the main thread's event loop, blocking the worker thread until completion """call=partial(__fn,*args,**kwargs)returnanyio.from_thread.run(call)
Runs a coroutine from a synchronous context. A thread will be spawned
to run the event loop if necessary, which allows coroutines to run in
environments like Jupyter notebooks where the event loop runs on the main
thread.
defrun_sync(coroutine:Coroutine[Any,Any,T])->T:""" Runs a coroutine from a synchronous context. A thread will be spawned to run the event loop if necessary, which allows coroutines to run in environments like Jupyter notebooks where the event loop runs on the main thread. Args: coroutine: The coroutine to run. Returns: The return value of the coroutine. Example: Basic usage: ```python async def my_async_function(x: int) -> int: return x + 1 run_sync(my_async_function(1)) ``` """# ensure context variables are properly copied to the async framecontext=copy_context()try:loop=asyncio.get_running_loop()exceptRuntimeError:loop=Noneifloopandloop.is_running():withThreadPoolExecutor()asexecutor:future=executor.submit(context.run,asyncio.run,coroutine)returncast(T,future.result())else:returncontext.run(asyncio.run,coroutine)
Runs a sync function in a new interruptible worker thread so that the main
thread's event loop is not blocked
Unlike the anyio function, this performs best-effort cancellation of the
thread using the C API. Cancellation will not interrupt system calls like
sleep.
Source code in src/prefect/utilities/asyncutils.py
asyncdefrun_sync_in_interruptible_worker_thread(__fn:Callable[...,T],*args:Any,**kwargs:Any)->T:""" Runs a sync function in a new interruptible worker thread so that the main thread's event loop is not blocked Unlike the anyio function, this performs best-effort cancellation of the thread using the C API. Cancellation will not interrupt system calls like `sleep`. """classNotSet:passthread:Thread=Noneresult=NotSetevent=asyncio.Event()loop=asyncio.get_running_loop()defcapture_worker_thread_and_result():# Captures the worker thread that AnyIO is using to execute the function so# the main thread can perform actions on itnonlocalthread,resulttry:thread=threading.current_thread()result=__fn(*args,**kwargs)exceptBaseExceptionasexc:result=excraisefinally:loop.call_soon_threadsafe(event.set)asyncdefsend_interrupt_to_thread():# This task waits until the result is returned from the thread, if cancellation# occurs during that time, we will raise the exception in the thread as welltry:awaitevent.wait()exceptanyio.get_cancelled_exc_class():# NOTE: We could send a SIGINT here which allow us to interrupt system# calls but the interrupt bubbles from the child thread into the main thread# and there is not a clear way to prevent it.raise_async_exception_in_thread(thread,anyio.get_cancelled_exc_class())raiseasyncwithanyio.create_task_group()astg:tg.start_soon(send_interrupt_to_thread)tg.start_soon(partial(anyio.to_thread.run_sync,capture_worker_thread_and_result,cancellable=True,limiter=get_thread_limiter(),))assertresultisnotNotSetreturnresult
Runs a sync function in a new worker thread so that the main thread's event loop
is not blocked
Unlike the anyio function, this defaults to a cancellable thread and does not allow
passing arguments to the anyio function so users can pass kwargs to their function.
Note that cancellation of threads will not result in interrupted computation, the
thread may continue running — the outcome will just be ignored.
Source code in src/prefect/utilities/asyncutils.py
asyncdefrun_sync_in_worker_thread(__fn:Callable[...,T],*args:Any,**kwargs:Any)->T:""" Runs a sync function in a new worker thread so that the main thread's event loop is not blocked Unlike the anyio function, this defaults to a cancellable thread and does not allow passing arguments to the anyio function so users can pass kwargs to their function. Note that cancellation of threads will not result in interrupted computation, the thread may continue running — the outcome will just be ignored. """call=partial(__fn,*args,**kwargs)returnawaitanyio.to_thread.run_sync(call,cancellable=True,limiter=get_thread_limiter())
Call an async function from a synchronous context. Block until completion.
If in an asynchronous context, we will run the code in a separate loop instead of
failing but a warning will be displayed since this is not recommended.
Source code in src/prefect/utilities/asyncutils.py
defsync(__async_fn:Callable[P,Awaitable[T]],*args:P.args,**kwargs:P.kwargs)->T:""" Call an async function from a synchronous context. Block until completion. If in an asynchronous context, we will run the code in a separate loop instead of failing but a warning will be displayed since this is not recommended. """ifin_async_main_thread():warnings.warn("`sync` called from an asynchronous context; ""you should `await` the async function directly instead.")withanyio.start_blocking_portal()asportal:returnportal.call(partial(__async_fn,*args,**kwargs))elifin_async_worker_thread():# In a sync context but we can access the event loop thread; send the async# call to the parentreturnrun_async_from_worker_thread(__async_fn,*args,**kwargs)else:# In a sync context and there is no event loop; just create an event loop# to run the async code then tear it downreturnrun_async_in_new_loop(__async_fn,*args,**kwargs)
defsync_compatible(async_fn:T)->T:""" Converts an async function into a dual async and sync function. When the returned function is called, we will attempt to determine the best way to enter the async function. - If in a thread with a running event loop, we will return the coroutine for the caller to await. This is normal async behavior. - If in a blocking worker thread with access to an event loop in another thread, we will submit the async method to the event loop. - If we cannot find an event loop, we will create a new one and run the async method then tear down the loop. """@wraps(async_fn)defcoroutine_wrapper(*args,**kwargs):fromprefect._internal.concurrency.apiimportcreate_call,from_syncfromprefect._internal.concurrency.callsimportget_current_call,loggerfromprefect._internal.concurrency.event_loopimportget_running_loopfromprefect._internal.concurrency.threadsimportget_global_loopfromprefect.settingsimportPREFECT_EXPERIMENTAL_DISABLE_SYNC_COMPATifPREFECT_EXPERIMENTAL_DISABLE_SYNC_COMPAT:returnasync_fn(*args,**kwargs)global_thread_portal=get_global_loop()current_thread=threading.current_thread()current_call=get_current_call()current_loop=get_running_loop()ifcurrent_thread.ident==global_thread_portal.thread.ident:logger.debug(f"{async_fn} --> return coroutine for internal await")# In the prefect async context; return the coro for us to awaitreturnasync_fn(*args,**kwargs)elifin_async_main_thread()and(notcurrent_calloris_async_fn(current_call.fn)):# In the main async context; return the coro for them to awaitlogger.debug(f"{async_fn} --> return coroutine for user await")returnasync_fn(*args,**kwargs)elifin_async_worker_thread():# In a sync context but we can access the event loop thread; send the async# call to the parentreturnrun_async_from_worker_thread(async_fn,*args,**kwargs)elifcurrent_loopisnotNone:logger.debug(f"{async_fn} --> run async in global loop portal")# An event loop is already present but we are in a sync context, run the# call in Prefect's event loop threadreturnfrom_sync.call_soon_in_loop_thread(create_call(async_fn,*args,**kwargs)).result()else:logger.debug(f"{async_fn} --> run async in new loop")# Run in a new event loop, but use a `Call` for nested context detectioncall=create_call(async_fn,*args,**kwargs)returncall()# TODO: This is breaking type hints on the callable... mypy is behind the curve# on argument annotations. We can still fix this for editors though.ifis_async_fn(async_fn):wrapper=coroutine_wrapperelifis_async_gen_fn(async_fn):raiseValueError("Async generators cannot yet be marked as `sync_compatible`")else:raiseTypeError("The decorated function must be async.")wrapper.aio=async_fnreturnwrapper