current position:Home>Fastapi series - synchronous and asynchronous mutual conversion processing practice

Fastapi series - synchronous and asynchronous mutual conversion processing practice

2022-04-29 16:27:28Xiao Zhong

Preface

We know Fastapi Itself is a hybrid framework , It can run our services both synchronously and asynchronously .

  • The synchronization method mainly uses multithreading to realize concurrency
  • The asynchronous method is mainly based on the cooperative process to realize concurrency

The main problems encountered in the hybrid framework are :

  • In the synchronous function, if it involves calling asynchronous functions ( Or co process object )
  • If you encounter synchronous code in asynchronous functions ( Time consuming, blocking or partial libraries do not support asynchronous processes )

The above two scenarios need to be processed and transformed .

Basic knowledge points

1 Description of conversion

From synchronous to asynchronous , Or asynchronous to synchronous , The problems they involve are relatively complex , Even the conversion process actually has related performance losses , Because in the conversion process , What we need to consider is thread safety or co process safety .

  • Synchronous to asynchronous conversion :

    • The problem handling process is to throw the synchronously running code into an additional thread pool for execution , To avoid congestion , And after a single thread completes the logical processing of our synchronous function, we need to return the result to the asynchronous coroutine .
  • Asynchronous to synchronous conversion :

    • The problem handling process is to convert the call of a co procedure function running on the main thread into an ordinary function running in the sub thread , And after the asynchronous function processes the completed logic, it returns to the sub thread running synchronously .

2 Involving some functions

2.1 run_in_executor( Synchronous to asynchronous )

  • Used in asynchronous functions ( By async Modified asynchronous function ) Inside , Call synchronization function ( If the synchronization function is blocked at this time , Then it may cause the whole event loop of the whole single thread mode to block ) Put the call of synchronization function into the thread pool for execution , Let other tasks continue to execute in the event loop
  • In short : One is the main thread , Directly open a new thread , Run blocking function

2.2 asyncio.run_coroutine_threadsafe( Asynchronous to synchronous )

  • The processing scheme used to call the coroutine object in the synchronization function ( Cannot be used in noncooperative functions await Call asynchronous object ). Its final processing mechanism value : This is the asyncio Bag future Object conversion returns a concurrent.futures Bag future object .
  • In short : Join the process dynamically , The parameters are a callback function and a loop object , The return value is future object , adopt future.result() Get the return value of the callback function

2.3 asyncio.to_thread

The of this function is py3.9 Only available in version , It's right asyncio A scheme for multithreading of operations , Using it, you can directly perform functions and open threads .

Transform and deal with actual cases

Let's start with a specific case to illustrate .

1、 Asynchronous to synchronous conversion

This scene is a comparative scene , such as : You need to read the submitted data in the multithreaded mode body When . Because of itself Fastapi Provided reques.body() It's a coprocessor function , That is to use awite reques.body() So there is a conversion problem .

1.1 Problem performance

@app.post("/get/access_token")
def access_token(reques:Request,name=Body(...)):
     print(reques.body())
    await reques.body()
    return 'pk'
 Copy code 

In the top await reques.body() It's obviously wrong , Nested in ordinary functions await Calling Ctrip function is not allowed . So we need to convert . Problem solving , introduce django Provides a conversion package asgiref. This package is independent , He can install... Independently . When the framework is installed, it is actually installed by default asgiref.

1.2 asgiref Package introduction

asgiref Provides two very convenient conversion functions , adopt asgiref You can convert asynchronous or synchronous functions to each other .

  • AsyncToSync Turn asynchronous to synchronous : It's in asynchronous functions ( Coroutines function ) Convert ordinary functions , It is convenient to call... In the synchronous thread function
  • SynctoAsync Turn synchronous to asynchronous : It is to convert the thread mode of a function called by the same thread into a multi-threaded call and wrap it into a ( Coroutines function ) return

But if you look at asgiref explain , One thing to note is :

By default , sync_to_async For safety reasons , All synchronous code in the program will run in the same thread ; You can use Disable this feature for higher performance  @sync_to_async(thread_sensitive=False), But make sure your code doesn't rely on any binding to threads when doing this ( Such as database connection ) The content of .

The above description is from the official website , In fact, if thread safety is not considered , It is recommended to set thread_sensitive=false, This can get better performance . But if there is a context dependency binding to the thread , That is, when it is commonly referred to as local thread , Cannot close .

1.3 asgiref Asynchronous to synchronous conversion

from fastapi import FastAPI,Request
from fastapi.responses import PlainTextResponse
from fastapi.params import Body
from fastapi.background import  BackgroundTasks

#  Define our APP service object 
app = FastAPI()


@app.post("/get/access_token")
def access_token(reques:Request,name=Body(...)):
    # print(reques.body())
    from asgiref.sync import async_to_sync
    body = async_to_sync(reques.body)()
    print(body)
    return PlainTextResponse(body.decode(encoding='utf-8'))


if __name__ == '__main__':
    import uvicorn
    uvicorn.run('kkk:app', host="127.0.0.1", port=8100, debug=True, reload=True)
 Copy code 

In the above code , The key code part is :

from asgiref.sync import async_to_sync
body = async_to_sync(reques.body)()
 Copy code 

It should be noted that :async_to_sync What needs to be passed in is a awaitable, The return is a function , So we need to do it after the function () Carry out the call processing of a function . The final result is shown in the figure below :

image.png

2、 Synchronous to asynchronous conversion

Here we need to say , Why do we need to turn synchronization into asynchrony ? Because at present, most libraries may not have corresponding aio The implementation of the , Suppose, for example, our scene http Request Library ( At present, there are asynchronous , Such as aiohttp,httpx etc. , This is just an assumption )requests, If the whole process of our project is processed asynchronously , So how do we use synchronous logic in asynchronous logic , Without causing congestion ? After all, we know that coroutines are single process and single thread , If synchronization is called , No doubt. , It must be blocked , So we need to do the conversion .

2.1 BackgroundTasks Case code demonstration :

In fact, the conversion from asynchronous to synchronous is FastApi The obvious part of the framework is the background task , Through the source code of background tasks, we can analyze , It already has obvious examples of how to convert our synchronous functions , Such as BackgroundTasks Source code :

import asyncio
import typing

from starlette.concurrency import run_in_threadpool


class BackgroundTask:
    def __init__(
        self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
    ) -> None:
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.is_async = asyncio.iscoroutinefunction(func)

    async def __call__(self) -> None:
        if self.is_async:
            await self.func(*self.args, **self.kwargs)
        else:
            await run_in_threadpool(self.func, *self.args, **self.kwargs)
 Copy code 

In the above code , We can see that they judge whether it is... By the function is_async To deal with it , If it's a coplanar function, it's direct

 await self.func(*self.args, **self.kwargs)
 Copy code 

If it is an ordinary function, then it will use

await run_in_threadpool(self.func, *self.args, **self.kwargs)
 Copy code 

After wrapping, return a specific collaboration object .

Then continue along run_in_threadpool Check the source code of ,


async def run_in_threadpool(
    func: typing.Callable[..., T], *args: typing.Any, **kwargs: typing.Any
) -> T:
    if contextvars is not None:  # pragma: no cover
        # Ensure we run in the same context
        child = functools.partial(func, *args, **kwargs)
        context = contextvars.copy_context()
        func = context.run
        args = (child,)
    elif kwargs:  # pragma: no cover
        # run_sync doesn't accept 'kwargs', so bind them in here
        func = functools.partial(func, **kwargs)
    return await anyio.to_thread.run_sync(func, *args)
 Copy code 

We can see that he was through

await anyio.to_thread.run_sync(func, *args)
 Copy code 

To complete the final conversion . If necessary , We can refer to this to make our own conversion library .

2.2 asgiref Synchronous to asynchronous conversion

We mentioned above asgiref The transformation is mutual , alike asgiref The corresponding conversion is also provided .

asgiref The document address on the official website is :

asgi.readthedocs.io/en/latest/ Specific cases :


from fastapi import FastAPI,Request
from fastapi.responses import PlainTextResponse,HTMLResponse
from asgiref.sync import sync_to_async
import requests
#  Define our APP service object 
app = FastAPI()

def getdata():
    return requests.get('http://www.baidu.com').text

@app.get("/get/access_token")
async def access_token(reques:Request):
    asds= await sync_to_async(func=getdata)()
    return HTMLResponse(asds)


if __name__ == '__main__':
    import uvicorn
    uvicorn.run('kkk:app', host="127.0.0.1", port=8100, debug=True, reload=True)
 Copy code 

One thing to note is that sync_to_async The return is Ctrip's function , So it needs to be converted into Ctrip object later , Need to add ()

await Explain popular science :

  • await Followed by a co process object or Awaitable, Its main purpose is to declare that the program needs to hang here and wait for the return of the process control flow .

2.3 asyncer Fastapi The framework boss writes his own conversion library

asyncer and asgiref equally , It can not only asynchronize synchronous code , You can also synchronize asynchronous code . We mentioned above asgiref The transformation is mutual , alike asyncer The corresponding conversion is also provided .

asyncer The document address on the official website is :

asyncer.tiangolo.com/

The installation of the library :

pip isntall asyncer

Through the official website example , It's also very convenient for us to use , Here's an official example , The following example is an example of synchronous to asynchronous conversion :

from fastapi import FastAPI,Request
from fastapi.responses import PlainTextResponse,HTMLResponse
from asyncer import asyncify
import requests
#  Define our APP service object 
app = FastAPI()

def do_sync_work(name):
    return requests.get(f'http://www.baidu.com?name={name}').text

@app.get("/get/access_token")
async def access_token(reques:Request):
    # asds= await sync_to_async(func=getdata)()
    message = await asyncify(do_sync_work)(name="World")
    return HTMLResponse(message)


if __name__ == '__main__':
    import uvicorn
    uvicorn.run('kkk:app', host="127.0.0.1", port=8100, debug=True, reload=True)
 Copy code 

One thing to note is that asyncify The return is Ctrip's function , So it needs to be converted into Ctrip object later , Need to add ()

await Explain popular science :

  • await Followed by a co process object or Awaitable, Its main purpose is to declare that the program needs to hang here and wait for the return of the process control flow .

2.4 awaits library , In the form of ornaments

There is also an interesting Library awaits, Convert... Through decorator

awaits The document address on the official website is :

github.com/pomponchik/…

If you are interested, you can directly consult the official website address for use , This library feels that it is worth studying .

2.5 aioify A library that allows each function to be asynchronous and wait

aioify Kurt, in his own words : Let each function be asynchronous and wait .

The installation of the library :

pip isntall aioify

Concrete example :

from fastapi import FastAPI,Request
from fastapi.responses import PlainTextResponse,HTMLResponse
from aioify import aioify
import requests
#  Define our APP service object 
app = FastAPI()

def do_sync_work(name):
    return requests.get(f'http://www.baidu.com?name={name}').text

@app.get("/get/access_token")
async def access_token(reques:Request):
    # asds= await sync_to_async(func=getdata)()
    message = await aioify(do_sync_work)(name="World")
    print(message)
    return HTMLResponse(message)


if __name__ == '__main__':
    import uvicorn
    uvicorn.run('kkk:app', host="127.0.0.1", port=8100, debug=True, reload=True)
 Copy code 

In fact, most libraries are the same . Directly convert to use . It can also be used as Decorator After the direct synchronization function decoration , To use , The following sample code :

from fastapi import FastAPI,Request
from fastapi.responses import PlainTextResponse,HTMLResponse
from aioify import aioify
import requests
#  Define our APP service object 
app = FastAPI()

@aioify
def do_sync_work(name):
    return requests.get(f'http://www.baidu.com?name={name}').text

@app.get("/get/access_token")
async def access_token(reques:Request):
    # asds= await sync_to_async(func=getdata)()
    message = await do_sync_work(name="World")
    print(message)
    return HTMLResponse(message)


if __name__ == '__main__':
    import uvicorn
    uvicorn.run('kkk:app', host="127.0.0.1", port=8100, debug=True, reload=True)
 Copy code 

2.6 summary - The essence of synchronous to asynchronous conversion

Each library has its own advantages , But in essence, the internal source code has the same goal ! The essence of synchronous to asynchronous conversion is to call synchronous functions through threads , The following code example :

 def asyncify(
    function: Callable[T_ParamSpec, T_Retval],
    *,
    cancellable: bool = False,
    limiter: Optional[anyio.CapacityLimiter] = None
) -> Callable[T_ParamSpec, Awaitable[T_Retval]]:

    async def wrapper(
        *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs
    ) -> T_Retval:
        partial_f = functools.partial(function, *args, **kwargs)
        return await anyio.to_thread.run_sync(
            partial_f, cancellable=cancellable, limiter=limiter
        )

    return wrapper
 Copy code 

It's all through await anyio.to_thread.run_sync To process .

The above is just a combination of personal needs , Take practical notes of your study ! If there is a clerical error ! Welcome criticism and correction ! Thank you guys !

ending

END

Simple books :www.jianshu.com/u/d6960089b…

Nuggets :juejin.cn/user/296393…

official account : Wechat search 【 Programmer Xiao Zhong 】

Xiao Zhong | writing 【 Welcome to learn and communicate together 】| QQ:308711822

copyright notice
author[Xiao Zhong],Please bring the original link to reprint, thank you.
https://en.qdmana.com/2022/04/202204291627196411.html

Random recommended