当前位置:首页 > CN2资讯 > 正文内容

windows python 高并发服务器

15小时前CN2资讯


前记

本文描述的是如何基于Asyncio.Future的特性编写一个语言级别的防缓存击穿的工具–Share,并介绍它的使用和高并发下的处理方法。

1.缓存击穿

在后端服务中,大部分的系统瓶颈都集中在DB上,为了提升服务性能和减轻DB压力,一般会添加一层缓存层,然后每个请求都会先从缓存层获取数据,如果获取不到数据则先从DB系统中获取数据,获取到数据后才把数据放置在缓存层中再返回数据。 不过缓存层缓存的数据是有过期时间的,当设置的值过期时会导致所有请求无法从缓存层获取到数据,进而蜂拥冲向DB系统获取数据,这就是缓存击穿。由于缓存击穿是瞬时性且量级很大,所以它容易快速的提升DB系统压力,甚至打崩DB系统,流程如图:

从图中可以看到,在缓存值失效后,所有请求会先后击穿缓存并请求到DB系统,DB系统从被击穿到缓存值被设置的这段时间会执行大量相同的查询,这些查询除了浪费系统资源外还会提升系统压力,为此,大部分业务会使用加锁来解决这个问题。

在加锁后,整个请求的流程就会变为先访问缓存层,在发现缓存层没有对应数据时(缓存失效),请求会先去请求锁,当请求到锁的请求才可以去DB系统查询,并在缓存系统设置缓存值,而获取不到锁的请求只能等待锁释放后从缓存系统中获取值并返回,如图:

通过图可以看到,在加锁后,访问DB系统的同类请求只剩一个了,这样一来可以减轻DB系统的压力,但是在采用加锁逻辑后会把压力从DB系统转移给了负责锁的系统,只是锁系统能容忍的上限会比DB系统高很多。 此外,如果这个锁系统是一个分布式锁,那么此时的锁系统也是一个热点值,后端服务与分布式锁系统之间会因为大量的请求获取锁而产生许多IO。

2.语言级别的解决方案

为了在解决缓存击穿的问题,同时减少缓存击穿时导致不同系统的IO交互次数变多的情况,新的解决方案必须是编程语言级别的,而不是一个单独的组件。同时,这个解决方案除了能兜住大量缓存击穿的请求外,还需要做到只让其中的第一个访问的请求能够命中DB系统获取值再返回且拿到的值又能跟其他请求共享。 由于这个解决方案会在多个请求之间共享值,所以我取名为Share,它在系统架构中的位置如图:

通过图可以发现Share的位置与锁一样,不过具体逻辑却会有不同,如果仔细研究它的逻辑,会发现它的逻辑与Asyncio.Future类似。 比如在asyncio.Future的使用过程中,不同的协程可以通过await asyncio.Future()方法获取到已经被设置的结果,同时,如果这个值还没设置,其他协程在调用await asyncio.Future()时会一直被阻塞,直到其他协程通过set_result设置结果。

Note: 在下面介绍Share中将以某个协程调用代替请求的操作

有了asyncio.Future后,Share的实现会变得很轻松,只要再实现如何放行第一个协程的执行即可。Share实现的第一步是定义一个类似于如下的数据结构:

Dict[str, asyncio.Future]

这个数据结构是一个Dict,其中它的key是这类协程的标识,然后再根据这个数据结构添加对应的逻辑:

  • 当协程通过Share被调用时,根据key判断是否有同类协程
  • 如果没有则初始化一个asyncio.Future,然后再执行这个协程,在协程执行完毕时把协程的返回值设置在asyncio.Future中并从字典中删除这个key以及返回数据。
  • 如果有则调用await asyncio.Future等待第一个共享协程的返回值。

具体代码如下:

import asyncio from typing import Any, Dict, Callable future_dict: Dict[str, asyncio.Future] = {} async def share(key: str, fn: Callable, param: Any = None) -> Any: if key not in future_dict: try: future = asyncio.Future() future_dict[key] = future future.set_result(await fn(*(param or ()))) finally: future_dict.pop(key, None) else: future = future_dict[key] return await future # 以下是测试share是否能够正常运行的程序 async def delay_return(duration: int) -> int: await asyncio.sleep(1) return duration async def main() -> None: task_list = [share("demo", delay_return, (i, ))for i in range(10)] done, _ = await asyncio.wait(task_list) print([future.result() for future in done]) asyncio.run(main())

在运行后可以发现,不同协程的初始化参数虽然是不同的,但是他们的结果是一样的(结果取决于哪个协程先运行),比如我这次运行后它的所有结果都为3,如下:

[3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

这个结果意味着语言级别的兜底逻辑的没问题的,但是它还有一些问题仍然需要解决。

3.Share的基础实现

在一开始时,我也是简单的实现了一个工具函数来解决缓存击穿的问题,但是在线上运行一段时间后,发现这个工具函数仍有一些小问题需要解决,于是对它进行了一些复杂化处理,使其能够拓展并解决一些高并发的问题,同时也提升了易用性。

Share整个实现分为两部分,第一部份是一个名为Token的类,它的底层就是一个asyncio.Future,而提供的方法都是基于asyncio.Future的封装,代码如下:

class Token(Generic[_Tp]): def __init__(self, key: Any): self._key: Any = key self._future: Optional[asyncio.Future[_Tp]] = None def can_do(self) -> bool: # 初始化future以及判断是否执行后续操作 # 这个逻辑可能有点怪,但是暂时没有想到更好的办法 if not self._future: self._future = asyncio.Future() return True return False def is_done(self) -> bool: # 判断是否执行完成 return self._future is not None and self._future.done() async def await_done(self) -> _Tp: # 获取设置在future的结果 if not self._future: raise RuntimeError(f"You should use Token<{self._key}>.can_do() before Token<{self._key}>.await_done()") if not self._future.done(): await self._future return self._future.result() def set_result(self, result: Union[_Tp, Exception]) -> bool: # 设置结果到future中,需要注意的是,如果是异常,需要通过`set_exception`设置异常,否则在设置异常后调用`await asyncio.Futurte`时不会抛出错误。 if self._future and not self._future.done(): if isinstance(result, Exception): self._future.set_exception(result) else: self._future.set_result(result) return True return False

而第二部分就是Share的主体部分了,代码如下:

class Share(object): def __init__(self) -> None: # 初始化存储token的容器 self._token_dict: Dict[_ShareKeyType, Token] = dict() def _get_token(self, key: _ShareKeyType) -> Token: # 获取token的简单封装 if key not in self._token_dict: self._token_dict[key] = Token(key) return self._token_dict[key] async def _do_handle( self, key: _ShareKeyType, func: Callable[P, Coroutine[Any, Any, R_T]], args: P.args, kwargs: P.args ) -> R_T: token: Token = self._get_token(key) try: # 判断是否可以执行操作 if token.can_do(): # 如果可以则执行 try: token.set_result(await func(*(args or ()), **(kwargs or {}))) except Exception as e: # 存储异常值 token.set_result(e) # 通过token获取值并返回,没有值则会阻塞 return await token.await_done() finally: # 用完就删除掉 self._future_dict.pop(key, None) def do( self, key: _ShareKeyType, func: Callable[P, Coroutine[Any, Any, R_T]], args: P.args = None, kwargs: P.kwargs = None, ) -> Coroutine[Any, Any, R_T]: return self._do_handle(key, func, args, kwargs)

通过代码可以发现Share的主体逻辑非常简单,其中_do_handle的逻辑与第二节中的share函数类似,而新增的do方法只是_do_handle的一层封装,它在采用了PEP-612的类型标注后,使用者可以方便的从编辑器知道do的返回类型,接下来通过一段代码来检查Share是否正常,如下:

async def delay_return(duration: int) -> int: await asyncio.sleep(1) return duration async def main() -> None: share = Share() task_list = [share.do("demo", delay_return, (i, ))for i in range(10)] done, _ = await asyncio.wait(task_list) print([future.result() for future in done]) asyncio.run(main())

在运行代码后输出如下(值可能不同):

[3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

通过结果可以发现Share运行正常,毕竟它的实现逻辑与share函数类似,但是当把鼠标移动到task_list上面可以发现,由于do方法采用了PEP-612的类型标注后,编辑器可以展示它的类型了,如下: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Au2vSQ1f-1692158232520)()]

此外,基于_do_handle可以开发出一个装饰器,这样用起来就非常方便了,使用方法如下:

async def main() -> None: share = Share() @share.wrapper_do() async def delay_return(duration: int) -> int: await asyncio.sleep(1) return duration task_list = [share.do("demo", delay_return, (i, ))for i in range(10)] done, _ = await asyncio.wait(task_list) print([future.result() for future in done]) asyncio.run(main())

对应的实现方法见源码

4.高并发下的问题

现在通过Share可以解决缓存击穿的问题了,但是与其他中间层一样,在引入Share之后会产生其他的严重的问题。

假设有这样一个场景,这个场景使用的DB系统有一个奇葩的Bug,这个Bug会导致每有1w次请求就有一个请求会被堵塞10秒,在未引入缓存击穿的保护逻辑之前,并不会有什么太大的影响,因为它的影响面很小,毕竟平均下来一个用户一天也就遇到几次,但是在引入缓存击穿保护的逻辑之后,就需要考虑这个问题对系统的影响了。 因为缓存击穿保护逻辑放行的请求在通过DB获取数据时,刚好遇到了Bug而堵塞了10秒,导致这个请求被堵住10秒后才能获取到值,这样会导致所有经过Share的请求在10秒内都被堵住,而这时影响面就非常大了。 首先它会影响到使用这个功能的所有接口在这10秒内这些功能无法使用,其次是这些请求会占用文件描述符和内存等资源,在占用过多时会影响其它服务的使用进而造成服务雪崩,为此需要对Share进行改进,防止单个请求异常而影响到其他地方的问题。

使用asyncio.wait之类的带有超时异常机制的方法来执行也是可以的,因为Python Asyncio的异常传递性,无论是asyncio.wait(share.do(xxx), timeout=xxx)还是share.do(asyncio.wait(xxx, timeout=xxx)),第一个被放行的协程在执行超时后抛出的异常会传递给其他协程。

4.1.放行指定比例的请求

目前的Share只会允许第一个协程能被真正的执行,如果可以按照一定的几率放行请求,那么就能在防止请求堵住与降低DB压力之间做到一个平衡。具体的代码实现如下(只列出变更的方法):

class Share(object): def __init__(self, rate: Optional[Tuple[int, int]] = None) -> None: # 当rate = (1, 100)时代表是百分之一 # 当rate = (1, 1000)时代表是千分之一 if rate and rate[0] > rate[1]: raise ValueError(f"rate[0] should less than rate[1], but {rate[0]} > {rate[1]}") self._rate: Optional[Tuple[int, int]] = rate self._token_dict: Dict[_ShareKeyType, Token] = dict() ... async def _do_handle( self, key: _ShareKeyType, func: Callable[P, Coroutine[Any, Any, R_T]], args: P.args, kwargs: P.args ) -> R_T: token: Token = self._get_token(key) try: can_do = token.can_do() if not can_do and self._rate: can_do = random.randint(self._rate[0], self._rate[1]) == self._rate[0] if can_do: try: # 多个请求也无所谓,Token会确保只有一个请求执行 token.set_result(await func(*(args or ()), **(kwargs or {}))) except Exception as e: token.set_result(e) return await token.await_done() finally: self._token_dict.pop(key, None)

代码中Share在__init__方法添加了一个新的参数rate,并在_do_handle方法中使用到,新的_do_handle方法除了会放行第一个协程外,其他的协程会通过rate来决定是否放行,具体的逻辑是调用者在通过Share的_do_handle执行协程时,_do_handle在判断不允许放行后会使用random模块根据rate生成一个随机数,如果生成的随机数与rate[0]相等时就会放行请求,现在改进第三节的测试代码以便验证rate的效果,具体代码如下:

async def delay_return(duration: int) -> int: # 由于结果只有一个,所以需要打印出来才能判断是否有多个协程被放行 print(f"I go it, {duration}") await asyncio.sleep(1) return duration async def main() -> None: # 设置有1/3的放行概率 share = Share(rate=(1, 3)) task_list = [share.do("demo", delay_return, (i, ))for i in range(10)] done, _ = await asyncio.wait(task_list) print([future.result() for future in done]) asyncio.run(main())

在运行程序后一般会看到有多条I go it, xxx的文本输出,如下:

I go it, 2 I go it, 1 I go it, 6 I go it, 3 [2, 2, 2, 2, 2, 2, 2, 2, 2, 2]

通过输出也可以看到经过Share的处理后,所有协程获取到的结果还是取决于第一个协程执行的结果,但是确实有4个协程得到了执行了,这样一来即使有第个协程被堵住,其他协程也能够正常执行。

除了自动的按照一定比例放行协程的执行外,Share还有两个方法可以手动放行协程的执行,调用者只需要自己根据业务场景在恰当的时间调用对应的方法也可以解决高并发下由于引入Share而引发的问题。

比如每隔n秒钟执行一次。

4.2.取消被堵住的请求

Token底层的asyncio.Future拥有一个cancel的方法,通过调用cancel方法后不仅可以取消Future还可以把取消异常传递给Future对应的协程,进而中断协程的运行。

对于取消机制和asyncio.Future可以参考:

  • Python Asyncio 库之从ChatGPT Bug了解Cancel机制
  • Python的可等待对象在Asyncio的作用

于是可以通过这个方法使Share拥有取消被堵住的请求功能, 具体的改进逻辑是先在Token暴露出一个cancel的方法,这个方法会尽最大的能力取消可以被取消的Future:

class Token(Generic[_Tp]): ... def cancel(self) -> bool: # 如果future可以被取消,则尽最大的努力取消future if self._future is not None and not self._future.done(): if self._future.cancelled(): self._future.cancel() else: self.set_result(asyncio.CancelledError()) return True return False

不过Token只是Future的封装,调用者无法接触到Token,所以需要在Share添加一个cancel方法使调用者可以通过这个方法取消因Share影响的协程,从而释放资源占用,具体修改代码如下:

class Share(object): ... def cancel(self, key: Optional[_ShareKeyType] = None) -> None: if not key: # 如果key为空,则取消所有相关token for token in self._token_dict.values(): token.cancel() else: # 不为空则按照Key取消指定的token self._token_dict[key].cancel()

修改完毕后编写一段代码进行验证:

import time async def delay_print(duration: int) -> int: await asyncio.sleep(1) return duration async def cancel_in_aio(share: "Share") -> None: await asyncio.sleep(0.1) share.cancel() async def main() -> None: share = Share() task_list: "List[Coroutine]" = [ share.do("test_cancel_in_aio", delay_print, args=[i]) for i in [11, 12, 13, 14, 15, 16, 17, 18, 19] ] # 添加一个取消share所有token的操作 task_list.append(cancel_in_aio(share)) t_list = [asyncio.Task(t) for t in task_list] await asyncio.sleep(1) # 统计有多少协程被取消 result = [] for t in t_list: # 不统计`cancel_in_aio` # 部分Python版本可能没有这个方法,需要注释掉,但是打印的结果会多出一个1 if t._coro.__name__ == "cancel_in_aio": # type: ignore continue try: await t result.append(1) except asyncio.CancelledError: result.append(0) print(result) asyncio.run(main())

运行代码后可以看到输出结果如下,通过输出结果可以知道所有协程都被取消了:

[0, 0, 0, 0, 0, 0, 0, 0, 0]

4.3.忘记被堵住的请求

直接取消同一类请求也可能太狠了,它属于一个应急的方法,在业务场景中该操作可能导致多数用户在同一时间内都收到异常响应,为此Share还引入一个forget的功能,使Share能忘掉当前托管的Token,使后续的请求访问Share时,Share能够另起炉灶一个新的Token,这样一来新的请求被会之前的Token影响到。这个功能对应的改造很简单,只需要动到Share类,如下:

class Share(object): ... def forget(self, key: _ShareKeyType) -> None: if key not in self._token_dict: raise KeyError(f"Token {key} not found") token = self._token_dict[key] if self._token_dict[key].is_done(): raise RuntimeError(f"{token} is done") self._token_dict.pop(key, None)

接着在修改对应的老朋友–验证代码,如下:

async def delay_return(duration: int) -> int: await asyncio.sleep(1) return duration async def main() -> None: share = Share() a_task = asyncio.Task(asyncio.wait([share.do("demo", delay_return, (i, ))for i in range(10)])) await asyncio.sleep(0.01) share.forget("demo") b_task = asyncio.Task(asyncio.wait([share.do("demo", delay_return, (i, ))for i in range(10, 20)])) await asyncio.sleep(0.1) print({future.result() for future in (await a_task)[0]}) print({future.result() for future in (await b_task)[0]}) asyncio.run(main())

这段代码会执行两批协程,第一批返回的值只有可能是0-9,而第二批的值只有可能是10-19,它们的运行间隔只有0.01秒,但是运行时长是一样的。 此外,在执行第二批之前会先调用share.forget("demo"),使Share忘记了自己托管过第一批协程,在运行代码后可以看到如下输出:

{4} {15}

通过输出可以发现,第一批协程执行时间与第二批协程执行的时间虽然是一样的,但是他们共享的是不同的结果,Share会正常的忘记掉第一批协程。

不过这个功能还是有点缺陷,假设第二批协程都能正常执行,但第一批协程还是因为被放行的协程在执行时被堵住而全都堵塞了,这是一种糟糕的情况。 大部分场景下都希望第二批协程执行完毕后,第一批协程也能共享到第一批协程的执行结果(被卡住的协程除外),于是需要对forget的功能进行升级。 首先是Token的改造,Token需要在被forget后又能在下个协程调用时重新被记起来,改造的代码如下:

class Token(Generic[_Tp]): def __init__(self, key: Any): # 标记Token是否处于被忘记 self.is_forget = False self._key: Any = key self._future: Optional[asyncio.Future[_Tp]] = None def can_do(self) -> bool: if not self._future: self._future = asyncio.Future() return True if self.is_forget: # 如果该Token被忘记了,但是future还存在,那就重新记得Token,并放行该协程 self.is_forget = False return True return False ...

接着就是Share的改造,主要是添加一个参数用于判断在调用forget时是否为强制忘记,代码如下:

class Share(object): def forget(self, key: _ShareKeyType, force: bool = True) -> None: # 添加一个参数用于是否强制忘记 if key not in self._token_dict: raise KeyError(f"Token {key} not found") token = self._token_dict[key] if self._token_dict[key].is_done(): raise RuntimeError(f"{token} is done") if force: # 如果是强制忘记则像之前一样移除Token self._token_dict.pop(key, None) else: # 不是强制忘记则只标记Token的属性为忘记,等待重新被记起来 token.is_forget = True

接着运行如下测试代码:

_is_first: bool = True async def delay_return(duration: int) -> int: global _is_first if _is_first: # 第一个执行的协程耗费的时间会比较久一点 _is_first = False print(f"{duration} is first") await asyncio.sleep(3) else: await asyncio.sleep(1) return duration async def main() -> None: share = Share() a_task = asyncio.Task(asyncio.wait([share.do("demo", delay_return, (i, ))for i in range(10)])) await asyncio.sleep(0.01) share.forget("demo", force=False) b_task = asyncio.Task(asyncio.wait([share.do("demo", delay_return, (i, ))for i in range(10, 20)])) await asyncio.sleep(0.1) # a_task会执行比较久,所以先打印b_task print({future.result() for future in (await b_task)[0]}, asyncio.get_event_loop().time()) print({future.result() for future in (await a_task)[0]}, asyncio.get_event_loop().time()) asyncio.run(main())

然后可以在终端中看到如下输出:

4 is first {12} 291267.247258633 {12} 291269.238582831

通过输出可以发现,4是最先执行的,但是最后a和b任务的结果都是12(第二批的值),同时第一批执行完毕的时间是比第二批晚了3秒钟。


    你可能想看:

    扫描二维码推送至手机访问。

    版权声明:本文由皇冠云发布,如需转载请注明出处。

    本文链接:https://www.idchg.com/info/30267.html

    分享给朋友:

    “windows python 高并发服务器” 的相关文章

    选择合适的服务器购买攻略:性能、预算与品牌分析

    在购买服务器之前,进行充分的准备至关重要。首先,我喜欢明确自己购买服务器的目的。是否只是用来搭建网站,还是用于复杂的数据处理,抑或是作为云计算的基础设施?这些需求会直接影响我的选择。明确目标后,我可以更好地针对我的具体需求进行规划。 接着,我必须考虑预算。无论是想购买入门级的服务器,还是高性能的旗舰...

    VPSDime评测:高性价比的VPS服务选择

    VPSDime概述 在如今互联网发展的浪潮中,各种主机服务商层出不穷,VPSDime作为一家成立于2013年的海内外主机服务商,引起了我的关注。它隶属于Nodisto IT,专注于VPS业务,提供多种类型的虚拟专用服务器。这对我这样的用户来说,选择合适的主机服务显得尤为重要,尤其是对于需要高性能和高...

    如何利用阿里云24元优惠活动体验云计算服务

    阿里云是一家全球知名的云计算服务提供商,致力于为用户提供多样化的云计算产品与服务。最近推出的24元优惠活动,更是为不少用户带来了新的机遇。这项优惠活动的主要目标是让更多的个人和企业体验到优质的云服务,尤其是在数字化转型日益重要的今天。用户可以通过这一活动以超低价格体验阿里云的强大功能。 在参与这个优...

    探索阿什本:全球数据中心之都的科技与美食之旅

    阿什本,这个名字或许在很多人耳中听起来并不陌生。作为美国弗吉尼亚州劳登郡的一部分,它距离华盛顿特区仅34英里,恰如其分地威尔士着城市的繁华。在我踏上这片土地的那一刻,便被它的快速发展与活力所吸引。阿什本不仅是一个城市,更是全球数据中心的中心,称其为“全球数据中心之都”可谓名至实归。 在阿什本,互联网...

    如何选择适合你的匿名服务器以保护隐私和数据安全

    在当今互联网时代,保护个人隐私和数据安全变得尤为重要。匿名服务器的概念应运而生,成为许多人实现在线安全和隐私的一种方式。简单来说,匿名服务器是一种特殊的服务器,能够隐藏用户的真实IP地址,从而在用户上网时保护其身份和活动。这对于那些希望自由浏览网络、避免被追踪的用户尤其重要。 匿名服务器通常与虚拟私...

    怎么看VPS的路由好不好:评估与优化路由性能的方法

    如何评估VPS路由性能 VPS的路由性能对网站的加载速度和用户体验至关重要。评估VPS的路由性能,我通常会关注几个关键指标,包括延迟、丢包率和带宽。了解这些内容能够帮助我判断服务器能否在高流量时段保持稳定运行。 在这过程中,我特别重视使用一些专业的测试工具。这些工具可以帮助我全面了解VPS的网络性能...