请选择 进入手机版 | 继续访问电脑版
查看: 159|回复: 0

Python利用contextvars实现管理上下文变量

[复制链接]

2198

主题

0

回帖

7027

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
7027
发表于 2022-10-9 04:01:52 | 显示全部楼层 |阅读模式
Python 在 3.7 的时候引入了一个模块:contextvars,从名字上很容易看出它指的是上下文变量(Context Variables),所以在介绍 contextvars 之前我们需要先了解一下什么是上下文(Context)。
Context 是一个包罗了相关信息内容的对象,举个例子:"好比一部 13 集的动漫,你直接点进第八集,看到女主角在男主角面前流泪了"。相信此时你是不知道为什么女主角会流泪的,因为你没有看前面几集的内容,缺失了相关的上下文信息。
所以 Context 并不是什么神奇的东西,它的作用就是携带一些指定的信息。

web 框架中的 request

我们以 fastapi 和 sanic 为例,看看当一个请求过来的时候,它们是如何解析的。
  1. # fastapi
  2. from fastapi import FastAPI, Request
  3. import uvicorn

  4. app = FastAPI()


  5. @app.get("/index")
  6. async def index(request: Request):
  7.     name = request.query_params.get("name")
  8.     return {"name": name}


  9. uvicorn.run("__main__:app", host="127.0.0.1", port=5555)

  10. # -------------------------------------------------------

  11. # sanic
  12. from sanic import Sanic
  13. from sanic.request import Request
  14. from sanic import response

  15. app = Sanic("sanic")


  16. @app.get("/index")
  17. async def index(request: Request):
  18.     name = request.args.get("name")
  19.     return response.json({"name": name})


  20. app.run(host="127.0.0.1", port=6666)
复制代码
发请求测试一下,看看结果是否正确。
可以看到请求都是乐成的,而且对于 fastapi 和 sanic 而言,其 request 和 视图函数是绑定在一起的。也就是在请求到来的时候,会被封装成一个 Request 对象、然后传递到视图函数中。
但对于 flask 而言则不是这样子的,我们看一下 flask 是如何接收请求参数的。
  1. from flask import Flask, request

  2. app = Flask("flask")


  3. @app.route("/index")
  4. def index():
  5.     name = request.args.get("name")
  6.     return {"name": name}


  7. app.run(host="127.0.0.1", port=7777)
复制代码
我们看到对于 flask 而言则是通过 import request 的方式,如果不需要的话就不消 import,当然我这里并不是在比力哪种方式好,主要是为了引出我们今天的主题。首先对于 flask 而言,如果我再定义一个视图函数的话,那么获取请求参数依旧是相同的方式,但是这样问题就来了,差异的视图函数内部使用同一个 request,难道不会发生辩说吗?
显然根据我们使用 flask 的经验来说,答案是不会的,至于原因就是 ThreadLocal。

ThreadLocal

ThreadLocal,从名字上看可以得出它肯定是和线程相关的。没错,它专门用来创建局部变量,而且创建的局部变量是和线程绑定的。
  1. import threading

  2. # 创建一个 local 对象
  3. local = threading.local()

  4. def get():
  5.     name = threading.current_thread().name
  6.     # 获取绑定在 local 上的 value
  7.     value = local.value
  8.     print(f"线程: {name}, value: {value}")

  9. def set_():
  10.     name = threading.current_thread().name
  11.     # 为不同的线程设置不同的值
  12.     if name == "one":
  13.         local.value = "ONE"
  14.     elif name == "two":
  15.         local.value = "TWO"
  16.     # 执行 get 函数
  17.     get()

  18. t1 = threading.Thread(target=set_, name="one")
  19. t2 = threading.Thread(target=set_, name="two")
  20. t1.start()
  21. t2.start()
  22. """
  23. 线程 one, value: ONE
  24. 线程 two, value: TWO
  25. """
复制代码
可以看到两个线程之间是互不影响的,因为每个线程都有自己唯一的 id,在绑定值的时候会绑定在当前的线程中,获取也会从当前的线程中获取。可以把 ThreadLocal 想象成一个字典:
  1. {
  2.     "one": {"value": "ONE"},
  3.     "two": {"value": "TWO"}
  4. }
复制代码
更准确的说 key 应该是线程的 id,为了直观我们就用线程的 name 代替了,但总之在获取的时候只会获取绑定在该线程上的变量的值。
而 flask 内部也是这么设计的,只不外它没有直接用 threading.local,而是自己实现了一个 Local 类,除了支持线程之外还支持 greenlet 的协程,那么它是怎么实现的呢?首先我们知道 flask 内部存在 "请求 context" 和 "应用 context",它们都是通过栈来维护的(两个差异的栈)。
  1. # flask/globals.py
  2. _request_ctx_stack = LocalStack()
  3. _app_ctx_stack = LocalStack()
  4. current_app = LocalProxy(_find_app)
  5. request = LocalProxy(partial(_lookup_req_object, "request"))
  6. session = LocalProxy(partial(_lookup_req_object, "session"))
复制代码
每个请求都会绑定在当前的 Context 中,等到请求结束之后再销毁,这个过程由框架完成,开发者只需要直接使用 request 即可。所以请求的具体细节流程可以点进源码中检察,这里我们重点关注一个对象:werkzeug.local.Local,也就是上面说的 Local 类,它是变量的设置和获取的关键。直接看部门源码:
  1. # werkzeug/local.py

  2. class Local(object):
  3.     __slots__ = ("__storage__", "__ident_func__")

  4.     def __init__(self):
  5.         # 内部有两个成员:__storage__ 是一个字典,值就存在这里面
  6.         # __ident_func__ 只需要知道它是用来获取线程 id 的即可
  7.         object.__setattr__(self, "__storage__", {})
  8.         object.__setattr__(self, "__ident_func__", get_ident)

  9.     def __call__(self, proxy):
  10.         """Create a proxy for a name."""
  11.         return LocalProxy(self, proxy)

  12.     def __release_local__(self):
  13.         self.__storage__.pop(self.__ident_func__(), None)

  14.     def __getattr__(self, name):
  15.         try:
  16.             # 根据线程 id 得到 value(一个字典)
  17.             # 然后再根据 name 获取对应的值
  18.             # 所以只会获取绑定在当前线程上的值
  19.             return self.__storage__[self.__ident_func__()][name]
  20.         except KeyError:
  21.             raise AttributeError(name)

  22.     def __setattr__(self, name, value):
  23.         ident = self.__ident_func__()
  24.         storage = self.__storage__
  25.         try:
  26.             # 将线程 id 作为 key,然后将值设置在对应的字典中
  27.             # 所以只会将值设置在当前的线程中
  28.             storage[ident][name] = value
  29.         except KeyError:
  30.             storage[ident] = {name: value}

  31.     def __delattr__(self, name):
  32.         # 删除逻辑也很简单
  33.         try:
  34.             del self.__storage__[self.__ident_func__()][name]
  35.         except KeyError:
  36.             raise AttributeError(name)
复制代码
所以我们看到 flask 内部的逻辑其实很简单,通过 ThreadLocal 实现了线程之间的隔离。每个请求都会绑定在各自的 Context 中,获取值的时候也会从各自的 Context 中获取,因为它就是用来生存相关信息的(重要的是同时也实现了隔离)。
相应此刻你已经理解了上下文,但是问题来了,不管是 threading.local 也好、还是类似于 flask 自己实现的 Local 也罢,它们都是针对线程的。如果是使用 async def 定义的协程该怎么办呢?如何实现每个协程的上下文隔离呢?所以终于引出了我们的主角:contextvars。

contextvars

该模块提供了一组接口,可用于在协程中管理、设置、访问局部 Context 的状态。
  1. import asyncio
  2. import contextvars

  3. c = contextvars.ContextVar("只是一个标识, 用于调试")

  4. async def get():
  5.     # 获取值
  6.     return c.get() + "~~~"

  7. async def set_(val):
  8.     # 设置值
  9.     c.set(val)
  10.     print(await get())

  11. async def main():
  12.     coro1 = set_("协程1")
  13.     coro2 = set_("协程2")
  14.     await asyncio.gather(coro1, coro2)


  15. asyncio.run(main())
  16. """
  17. 协程1~~~
  18. 协程2~~~
  19. """
复制代码
ContextVar 提供了两个方法,分别是 get 和 set,用于获取值和设置值。我们看到效果和 ThreadingLocal 类似,数据在协程之间是隔离的,不会受到相互的影响。
但我们再仔细观察一下,我们是在 set_ 函数中设置的值,然后在 get 函数中获取值。可 await get() 相当于是开启了一个新的协程,那么意味着设置值和获取值不是在同一个协程当中。但即便如此,我们依旧可以获取到希望的结果。因为 Python 的协程是无栈协程,通过 await 可以实现级联调用。
我们不妨再套一层:
  1. import asyncio
  2. import contextvars

  3. c = contextvars.ContextVar("只是一个标识, 用于调试")

  4. async def get1():
  5.     return await get2()

  6. async def get2():
  7.     return c.get() + "~~~"

  8. async def set_(val):
  9.     # 设置值
  10.     c.set(val)
  11.     print(await get1())
  12.     print(await get2())

  13. async def main():
  14.     coro1 = set_("协程1")
  15.     coro2 = set_("协程2")
  16.     await asyncio.gather(coro1, coro2)


  17. asyncio.run(main())
  18. """
  19. 协程1~~~
  20. 协程1~~~
  21. 协程2~~~
  22. 协程2~~~
  23. """
复制代码
我们看到不管是 await get1() 还是 await get2(),得到的都是 set_ 中设置的结果,说明它是可以嵌套的。
而且在这个过程当中,可以重新设置值。
  1. import asyncio
  2. import contextvars

  3. c = contextvars.ContextVar("只是一个标识, 用于调试")

  4. async def get1():
  5.     c.set("重新设置")
  6.     return await get2()

  7. async def get2():
  8.     return c.get() + "~~~"

  9. async def set_(val):
  10.     # 设置值
  11.     c.set(val)
  12.     print("------------")
  13.     print(await get2())
  14.     print(await get1())
  15.     print(await get2())
  16.     print("------------")

  17. async def main():
  18.     coro1 = set_("协程1")
  19.     coro2 = set_("协程2")
  20.     await asyncio.gather(coro1, coro2)


  21. asyncio.run(main())
  22. """
  23. ------------
  24. 协程1~~~
  25. 重新设置~~~
  26. 重新设置~~~
  27. ------------
  28. ------------
  29. 协程2~~~
  30. 重新设置~~~
  31. 重新设置~~~
  32. ------------
  33. """
复制代码
先 await get2() 得到的就是 set_ 函数中设置的值,这是符合预期的。但是我们在 get1 中将值重新设置了,那么之后不管是 await get1() 还是直接 await get2(),得到的都是新设置的值。
这也说明了,一个协程内部 await 另一个协程,另一个协程内部 await 另另一个协程,不管套娃(await)多少次,它们获取的值都是一样的。而且在任意一个协程内部都可以重新设置值,然后获取会得到最后一次设置的值。再举个栗子:
  1. import asyncio
  2. import contextvars

  3. c = contextvars.ContextVar("只是一个标识, 用于调试")

  4. async def get1():
  5.     return await get2()

  6. async def get2():
  7.     val = c.get() + "~~~"
  8.     c.set("重新设置啦")
  9.     return val

  10. async def set_(val):
  11.     # 设置值
  12.     c.set(val)
  13.     print(await get1())
  14.     print(c.get())

  15. async def main():
  16.     coro = set_("古明地觉")
  17.     await coro

  18. asyncio.run(main())
  19. """
  20. 古明地觉~~~
  21. 重新设置啦
  22. """
复制代码
await get1() 的时候会执行 await get2(),然后在里面拿到 c.set 设置的值,打印 "古明地觉~~~"。但是在 get2 里面,又将值重新设置了,所以第二个 print 打印的就是新设置的值。\
如果在 get 之前没有先 set,那么会抛出一个 LookupError,所以 ContextVar 支持默认值:
  1. import asyncio
  2. import contextvars

  3. c = contextvars.ContextVar("只是一个标识, 用于调试",
  4.                            default="哼哼")

  5. async def set_(val):
  6.     print(c.get())
  7.     c.set(val)
  8.     print(c.get())

  9. async def main():
  10.     coro = set_("古明地觉")
  11.     await coro

  12. asyncio.run(main())
  13. """
  14. 哼哼
  15. 古明地觉
  16. """
复制代码
除了在 ContextVar 中指定默认值之外,也可以在 get 中指定:
  1. import asyncio
  2. import contextvars

  3. c = contextvars.ContextVar("只是一个标识, 用于调试",
  4.                            default="哼哼")

  5. async def set_(val):
  6.     print(c.get("古明地恋"))
  7.     c.set(val)
  8.     print(c.get())

  9. async def main():
  10.     coro = set_("古明地觉")
  11.     await coro

  12. asyncio.run(main())
  13. """
  14. 古明地恋
  15. 古明地觉
  16. """
复制代码
所以结论如下,如果在 c.set 之前使用 c.get:

  • 当 ContextVar 和 get 中都没有指定默认值,会抛出 LookupError;
  • 只要有一方设置了,那么会得到默认值;
  • 如果都设置了,那么以 get 为准;
如果 c.get 之前执行了 c.set,那么无论 ContextVar 和 get 有没有指定默认值,获取到的都是 c.set 设置的值。
所以总的来说还是比力好理解的,而且 ContextVar 除了可以作用在协程上面,它也可以用在线程上面。没错,它可以替代 threading.local,我们来试一下:
  1. import threading
  2. import contextvars

  3. c = contextvars.ContextVar("context_var")

  4. def get():
  5.     name = threading.current_thread().name
  6.     value = c.get()
  7.     print(f"线程 {name}, value: {value}")

  8. def set_():
  9.     name = threading.current_thread().name
  10.     if name == "one":
  11.         c.set("ONE")
  12.     elif name == "two":
  13.         c.set("TWO")
  14.     get()

  15. t1 = threading.Thread(target=set_, name="one")
  16. t2 = threading.Thread(target=set_, name="two")
  17. t1.start()
  18. t2.start()
  19. """
  20. 线程 one, value: ONE
  21. 线程 two, value: TWO
  22. """
复制代码
和 threading.local 的体现是一样的,但是更建议使用 ContextVars。不外前者可以绑定任意多个值,而后者只能绑定一个值(可以通过传递字典的方式解决这一点)。

c.Token

当我们调用 c.set 的时候,其实会返回一个 Token 对象:
  1. import contextvars

  2. c = contextvars.ContextVar("context_var")
  3. token = c.set("val")
  4. print(token)
  5. """
  6. <Token var=<ContextVar name='context_var' at 0x00..> at 0x00...>
  7. """
复制代码
Token 对象有一个 var 属性,它是只读的,会返回指向此 token 的 ContextVar 对象。
  1. import contextvars

  2. c = contextvars.ContextVar("context_var")
  3. token = c.set("val")

  4. print(token.var is c)  # True
  5. print(token.var.get())  # val

  6. print(
  7.     token.var.set("val2").var.set("val3").var is c
  8. )  # True
  9. print(c.get())  # val3
复制代码
Token 对象还有一个 old_value 属性,它会返回上一次 set 设置的值,如果是第一次 set,那么会返回一个 <Token.MISSING>。
  1. import contextvars

  2. c = contextvars.ContextVar("context_var")
  3. token = c.set("val")

  4. # 该 token 是第一次 c.set 所返回的
  5. # 在此之前没有 set,所以 old_value 是 <Token.MISSING>
  6. print(token.old_value)  # <Token.MISSING>

  7. token = c.set("val2")
  8. print(c.get())  # val2
  9. # 返回上一次 set 的值
  10. print(token.old_value)  # val
复制代码
那么这个 Token 对象有什么作用呢?从目前来看貌似没太大用处啊,其实它最大的用处就是和 reset 搭配使用,可以对状态进行重置。
  1. import contextvars
  2. ####
  3. c = contextvars.ContextVar("context_var")
  4. token = c.set("val")
  5. # 显然是可以获取的
  6. print(c.get())  # val

  7. # 将其重置为 token 之前的状态
  8. # 但这个 token 是第一次 set 返回的
  9. # 那么之前就相当于没有 set 了
  10. c.reset(token)
  11. try:
  12.     c.get()  # 此时就会报错
  13. except LookupError:
  14.     print("报错啦")  # 报错啦

  15. # 但是我们可以指定默认值
  16. print(c.get("默认值"))  # 默认值
复制代码
contextvars.Context

它负责生存 ContextVars 对象和设置的值之间的映射,但是我们不会直接通过 contextvars.Context 来创建,而是通过 contentvars.copy_context 函数来创建。
  1. import contextvars

  2. c1 = contextvars.ContextVar("context_var1")
  3. c1.set("val1")
  4. c2 = contextvars.ContextVar("context_var2")
  5. c2.set("val2")

  6. # 此时得到的是所有 ContextVar 对象和设置的值之间的映射
  7. # 它实现了 collections.abc.Mapping 接口
  8. # 因此我们可以像操作字典一样操作它
  9. context = contextvars.copy_context()
  10. # key 就是对应的 ContextVar 对象,value 就是设置的值
  11. print(context[c1])  # val1
  12. print(context[c2])  # val2
  13. for ctx, value in context.items():
  14.     print(ctx.get(), ctx.name, value)
  15.     """
  16.     val1 context_var1 val1
  17.     val2 context_var2 val2
  18.     """

  19. print(len(context))  # 2
复制代码
除此之外,context 还有一个 run 方法:
  1. import contextvars

  2. c1 = contextvars.ContextVar("context_var1")
  3. c1.set("val1")
  4. c2 = contextvars.ContextVar("context_var2")
  5. c2.set("val2")

  6. context = contextvars.copy_context()

  7. def change(val1, val2):
  8.     c1.set(val1)
  9.     c2.set(val2)
  10.     print(c1.get(), context[c1])
  11.     print(c2.get(), context[c2])

  12. # 在 change 函数内部,重新设置值
  13. # 然后里面打印的也是新设置的值
  14. context.run(change, "VAL1", "VAL2")
  15. """
  16. VAL1 VAL1
  17. VAL2 VAL2
  18. """

  19. print(c1.get(), context[c1])
  20. print(c2.get(), context[c2])
  21. """
  22. val1 VAL1
  23. val2 VAL2
  24. """
复制代码
我们看到 run 方法接收一个 callable,如果在里面修改了 ContextVar 实例设置的值,那么对于 ContextVar 而言只会在函数内部生效,一旦出了函数,那么还是原来的值。但是对于 Context 而言,它是会受到影响的,即便出了函数,也是新设置的值,因为它直接把内部的字典给修改了。

小结

以上就是 contextvars 模块的用法,在多个协程之间传递数据是非常方便的,而且也是并发安全的。如果你用过 Go 的话,你应该会发现和 Go 在 1.7 版本引入的 context 模块比力相似,当然 Go 的 context 模块功能要更强大一些,除了可以传递数据之外,对多个 goroutine 的级联管理也提供了非常清蒸的解决方案。
总之对于 contextvars 而言,它传递的数据应该是多个协程之间需要共享的数据,像 cookie, session, token 之类的,好比上游接收了一个 token,然后不绝地向下透传。但是不要把本应该作为函数参数的数据,也通过 contextvars 来传递,这样就有点舍本逐末了。
到此这篇关于Python利用contextvars实现管理上下文变量的文章就介绍到这了,更多相关Python contextvars管理变量内容请搜索趣UU以前的文章或继续浏览下面的相关文章希望大家以后多多支持趣UU!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
打赏作者
  • 0
  • 0
  • 0
  • 0
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表