코 루틴은 비동기 코드를 작성하는 강력한 도구입니다. 그들은 우리가 동시 운영을 처리하는 방법에 혁명을 일으켜 확장 가능하고 효율적인 응용 프로그램을보다 쉽게 구축 할 수 있도록했습니다. 나는 코 루틴으로 작업하는 데 많은 시간을 보냈으며, 맞춤형 비동기 프리미티브를 만드는 것에 대한 통찰력을 공유하게되어 기쁩니다.
기본부터 시작합시다. 코 루틴은 일시 정지 및 재개 될 수있는 특수 기능으로 협력적인 멀티 태스킹이 가능합니다. 그것들은 Python의 비동기/대기 구문의 기초입니다. 코 루틴을 정의 할 때는 본질적으로 이벤트 루프로 다시 제어 할 수있는 함수를 만들어 다른 작업이 실행될 수 있습니다.
사용자 정의 기다릴 수있는 객체를 만들려면 대기하는 메소드를 구현해야합니다. 이 방법은 반복기를 반환해야합니다. 간단한 예는 다음과 같습니다.
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
이 CustomaWaitable 클래스는 내장 된 Awaitables와 마찬가지로 Await 키워드와 함께 사용할 수 있습니다. 기다리면 제어가 한 번 제어를 한 다음 그 값을 반환합니다.
하지만 더 복잡한 비동기 프리미티브를 만들고 싶다면 어떻게해야합니까? 맞춤형 세마포어를 구현하는 것을 살펴 보겠습니다. 세마포어는 여러 코 루틴으로 공유 리소스에 대한 액세스를 제어하는 데 사용됩니다.
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value이 CustomsEmaphore 클래스는 Async Context Manager 프로토콜뿐만 아니라 획득 및 릴리스 방법을 구현합니다. 최대 2 개의 코 루틴이 세마포어를 동시에 얻을 수 있습니다. 이제 효율적인 이벤트 루프 생성에 대해 이야기합시다. Python의 Asyncio는 강력한 이벤트 루프 구현을 제공하지만 사용자 정의가 필요한 경우가있을 수 있습니다. 다음은 사용자 정의 이벤트 루프의 기본 예입니다.
가져 오기 시간 컬렉션에서 수입 Deque 클래스 customeventloop : def __init __ (self) : self._ready = deque () self._stopping = false def call_soon (자체, 콜백, *args) : self._ready.append ((콜백, args)) def run_forever (self) : self._stopping : self._run_once () def _run_once (self) : ntodo = len (self._ready) _ in range (ntodo) : 콜백, args = self._ready.popleft () 콜백 (*args) def stop (self) : self._stopping = true def run_until_complete (self, coro) : def _done_callback (FUT) : self.stop () task = self.create_task (coro) task.add_done_callback (_done_callback) self.run_forever () return task.result () def create_task (self, coro) : task = task (coro, self) self.call_soon (task._step) 반환 작업 수업 작업 : def __init __ (self, coro, loop) : self._coro = coro self._loop = 루프 self._done = 거짓 self._result = 없음 self._callbacks = [] def _step (self) : 노력하다: self._done : 반품 결과 = self._coro.send (없음) Isinstance (결과, SleepAndle) : result._task = self self._loop.call_soon (result._wake_up) 또 다른: self._loop.call_soon (self._step) e : stopiteration을 제외하고 : self.set_result (e.value) def set_result (자체, 결과) : self._result = 결과 self._done = true self._callbacks의 콜백 : self._loop.call_soon (콜백, 자기) def add_done_callback (자체, 콜백) : self._done : self._loop.call_soon (콜백, 자기) 또 다른: self._callbacks.append (콜백) def result (self) : self._done이 아닌 경우 : runtimeerror ya ( '작업이 완료되지 않음') Self._Result를 반환합니다 클래스 수면 핸들 : def __init __ (자기, 지속 시간) : self._duration = 기간 self._task = 없음 self._start_time = time.time () def _wake_up (self) : if time.time () - self._start_time> = self._duration : self._task._loop.call_soon (self._task._step) 또 다른: self._task._loop.call_soon (self._wake_up) 비동기 DEF 수면 (기간) : 수면 핸들 리턴 (시간) Async def example () : 인쇄 ( "시작") 잠을 기다리고 있습니다 (1) 인쇄 ( "1 초 후") 잠을 기다리고있다 (2) 인쇄 ( "2 초 후") "완료"반환 loop = customeventLoop () result = loop.run_until_complete (example ()) 인쇄 (결과)
이 사용자 정의 이벤트 루프는 실행 작업, 핸들링 코 루틴 및 간단한 수면 기능과 같은 기본 기능을 구현합니다. Python의 내장 이벤트 루프만큼 기능이 풍부하지는 않지만 핵심 개념을 보여줍니다.import time from collections import deque class CustomEventLoop: def __init__(self): self._ready = deque() self._stopping = False def call_soon(self, callback, *args): self._ready.append((callback, args)) def run_forever(self): while not self._stopping: self._run_once() def _run_once(self): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) def stop(self): self._stopping = True def run_until_complete(self, coro): def _done_callback(fut): self.stop() task = self.create_task(coro) task.add_done_callback(_done_callback) self.run_forever() return task.result() def create_task(self, coro): task = Task(coro, self) self.call_soon(task._step) return task class Task: def __init__(self, coro, loop): self._coro = coro self._loop = loop self._done = False self._result = None self._callbacks = [] def _step(self): try: if self._done: return result = self._coro.send(None) if isinstance(result, SleepHandle): result._task = self self._loop.call_soon(result._wake_up) else: self._loop.call_soon(self._step) except StopIteration as e: self.set_result(e.value) def set_result(self, result): self._result = result self._done = True for callback in self._callbacks: self._loop.call_soon(callback, self) def add_done_callback(self, callback): if self._done: self._loop.call_soon(callback, self) else: self._callbacks.append(callback) def result(self): if not self._done: raise RuntimeError('Task is not done') return self._result class SleepHandle: def __init__(self, duration): self._duration = duration self._task = None self._start_time = time.time() def _wake_up(self): if time.time() - self._start_time >= self._duration: self._task._loop.call_soon(self._task._step) else: self._task._loop.call_soon(self._wake_up) async def sleep(duration): return SleepHandle(duration) async def example(): print("Start") await sleep(1) print("After 1 second") await sleep(2) print("After 2 more seconds") return "Done" loop = CustomEventLoop() result = loop.run_until_complete(example()) print(result)자체를 구현할 수 있습니다.Import Asyncio를 가져옵니다 수입 Heapq Class PriorityEventLoop (asyncio.abstracteventLoop) : def __init __ (self) : self._ready = [] self._stopping = false self._clock = 0 def call_at (self, when, callback, *args, context = none) : 핸들 = asyncio.handle (콜백, args, 자기, 컨텍스트) heapq.heappush (self._ready, (언제, 핸들)) 반환 핸들 def call_later (자체, 지연, 콜백, *args, context = none) : return self.call_at (self._clock 지연, 콜백, *args, context = context) def call_soon (self, 콜백, *args, context = none) : return self.call_at (self._clock, 콜백, *args, context = context) def time (self) : Self._Clock을 반환합니다 def stop (self) : self._stopping = true def is_running (self) : 셀프를 반환하지 마십시오 ._stopping def run_forever (self) : self._ weel ready and self._stopping : self._run_once () def _run_once (self) : Self._ready가 아닌 경우 : 반품 언제, 핸들 = heapq.heappop (self._ready) self._clock = 언제 핸들 ._run () def create_task (self, coro) : return asyncio.task (coro, loop = self) def run_until_complete (자기, 미래) : asyncio.futures._chain_future (미래, self.create_future ()) self.run_forever () 미래가 아닌 경우 .done () : RIVE RUNTIMEERROR ( '이벤트 루프가 미래가 완료되기 전에 중지되었습니다.') Return Future.result () def create_future (self) : return asyncio.future (loop = self) Async def low_priority_task () : 인쇄 ( "낮은 우선 순위 작업 시작") Asyncio.sleep (2) 인쇄 ( "우선 순위가 낮은 작업 완료") Async def high_priority_task () : print ( "높은 우선 순위 작업 시작") Asyncio.sleep (1) 기다려 인쇄 ( "높은 우선 순위 작업 완료") Async def main () : loop = asyncio.get_event_loop () loop.call_later (0.1, loop.create_task, low_priority_task ()) loop.call_later (0, loop.create_task, high_priority_task ()) Asyncio.sleep (3) asyncio.run (main ())
이 PriorityEventLoop은 힙 큐를 사용하여 예정된 실행 시간을 기반으로 작업을 관리합니다. 지연으로 작업을 예약하여 우선 순위를 할당 할 수 있습니다.import asyncio import heapq class PriorityEventLoop(asyncio.AbstractEventLoop): def __init__(self): self._ready = [] self._stopping = False self._clock = 0 def call_at(self, when, callback, *args, context=None): handle = asyncio.Handle(callback, args, self, context) heapq.heappush(self._ready, (when, handle)) return handle def call_later(self, delay, callback, *args, context=None): return self.call_at(self._clock delay, callback, *args, context=context) def call_soon(self, callback, *args, context=None): return self.call_at(self._clock, callback, *args, context=context) def time(self): return self._clock def stop(self): self._stopping = True def is_running(self): return not self._stopping def run_forever(self): while self._ready and not self._stopping: self._run_once() def _run_once(self): if not self._ready: return when, handle = heapq.heappop(self._ready) self._clock = when handle._run() def create_task(self, coro): return asyncio.Task(coro, loop=self) def run_until_complete(self, future): asyncio.futures._chain_future(future, self.create_future()) self.run_forever() if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result() def create_future(self): return asyncio.Future(loop=self) async def low_priority_task(): print("Low priority task started") await asyncio.sleep(2) print("Low priority task finished") async def high_priority_task(): print("High priority task started") await asyncio.sleep(1) print("High priority task finished") async def main(): loop = asyncio.get_event_loop() loop.call_later(0.1, loop.create_task, low_priority_task()) loop.call_later(0, loop.create_task, high_priority_task()) await asyncio.sleep(3) asyncio.run(main())Import Asyncio를 가져옵니다 Async def cancellable_operation () : 노력하다: print ( "작동 시작") Asyncio.sleep (5) print ( "작동 완료") asyncio.cancellederRor를 제외하고 : print ( "조작 취소") # 필요한 청소를 수행합니다 Raine # CancelledError를 다시 표시하십시오 Async def main () : task = asyncio.create_task (cancellable_operation ()) Asyncio.sleep (2) task.cancel () 노력하다: 작업을 기다리고 있습니다 asyncio.cancellederRor를 제외하고 : print ( "메인 : 작업이 취소되었습니다") asyncio.run (main ())
이 예에서 Cancellable_operation은 CancelledError를 잡고 필요한 청소를 수행 한 다음 예외를 다시 만들어냅니다. 이것은 여전히 취소 상태를 전파하면서 우아한 취급을 허용합니다.import asyncio async def cancellable_operation(): try: print("Operation started") await asyncio.sleep(5) print("Operation completed") except asyncio.CancelledError: print("Operation was cancelled") # Perform any necessary cleanup raise # Re-raise the CancelledError async def main(): task = asyncio.create_task(cancellable_operation()) await asyncio.sleep(2) task.cancel() try: await task except asyncio.CancelledError: print("Main: task was cancelled") asyncio.run(main())Class asyncrange : def __init __ (self, start, stop, step = 1) : self.start = 시작 self.stop = 중지 self.step = 단계 def __aiter __ (self) : 자아를 반환하십시오 Async def __anext __ (self) : self.start> = self.stop 인 경우 : stopasynciteration을 높이십시오 value = self.start self.start = self.step Asyncio.sleep (0.1) # 일부 비동기 작업을 시뮬레이션합니다 반환 값 Async def main () : Asyncrange (0, 5)에서 I에 대한 비동기 : 인쇄 (i) asyncio.run (main ())
이 Asyncrange 클래스는 비동기 반복 프로토콜을 구현하여 루프에 비동기로 사용할 수 있습니다.class AsyncRange: def __init__(self, start, stop, step=1): self.start = start self.stop = stop self.step = step def __aiter__(self): return self async def __anext__(self): if self.start >= self.stop: raise StopAsyncIteration value = self.start self.start = self.step await asyncio.sleep(0.1) # Simulate some async work return value async def main(): async for i in AsyncRange(0, 5): print(i) asyncio.run(main())클래스 asyncresource : Async def __aenter __ (self) : 인쇄 ( "리소스 획득") Asyncio.sleep (1) # Async 획득 시뮬레이션을 기다립니다 자아를 반환하십시오 Async def __aexit __ (self, exc_type, exc, tb) : print ( "리소스 방출") Asyncio.sleep (1) # Async 릴리스 시뮬레이션을 기다립니다 Async def main () : 자원으로 Asyncresource ()가있는 비동기 : 인쇄 ( "리소스 사용") Asyncio.sleep (1) 기다려 asyncio.run (main ())
aenter
이 asyncresource 클래스는class AsyncResource: async def __aenter__(self): print("Acquiring resource") await asyncio.sleep(1) # Simulate async acquisition return self async def __aexit__(self, exc_type, exc, tb): print("Releasing resource") await asyncio.sleep(1) # Simulate async release async def main(): async with AsyncResource() as resource: print("Using resource") await asyncio.sleep(1) asyncio.run(main())aexit메소드를 구현하여 문과 비동기와 함께 사용할 수 있습니다. 결론적으로, Python의 Coroutine 시스템은 맞춤형 비동기 프리미티브를 구축하기위한 강력한 기반을 제공합니다. 기본 메커니즘과 프로토콜을 이해함으로써 특정 비동기 문제에 대한 맞춤형 솔루션을 만들고 복잡한 동시 시나리오에서 성능을 최적화하며 Python의 비동기 기능을 확장 할 수 있습니다. 이러한 사용자 정의 구현은 학습 및 특정 사용 사례에 적합하지만 Python의 내장 된 Asyncio 라이브러리는 최적화되어 있으며 대부분의 시나리오에 있어야합니다. 행복한 코딩!
우리의 창조물우리의 작품을 확인하십시오 :
투자자 센트럴
|
Smart Living| epochs & echoes | 수수께끼의 미스터리 | Hindutva | 엘리트 개발 | JS 학교 우리는 매체에 있습니다
Tech Koala Insights|
Epochs & Echoes World| 투자자 중앙 매체 | 당황스러운 미스터리 매체 | Science & Epochs Medium | Modern Hindutva
부인 성명: 제공된 모든 리소스는 부분적으로 인터넷에서 가져온 것입니다. 귀하의 저작권이나 기타 권리 및 이익이 침해된 경우 자세한 이유를 설명하고 저작권 또는 권리 및 이익에 대한 증거를 제공한 후 이메일([email protected])로 보내주십시오. 최대한 빨리 처리해 드리겠습니다.
Copyright© 2022 湘ICP备2022001581号-3