Les coroutines en python sont un outil puissant pour écrire du code asynchrone. Ils ont révolutionné la façon dont nous gérons les opérations simultanées, ce qui facilite la création d'applications évolutives et efficaces. J'ai passé beaucoup de temps à travailler avec les coroutines, et je suis ravi de partager quelques informations sur la création de primitives asynchrones personnalisées.
Commençons par les bases. Les coroutines sont des fonctions spéciales qui peuvent être interrompues et reprendre, permettant un multitâche coopératif. Ils sont le fondement de la syntaxe asynchrone / attente de Python. Lorsque vous définissez une coroutine, vous créez essentiellement une fonction qui peut rendre le contrôle à la boucle d'événement, permettant à d'autres tâches de s'exécuter.
Pour créer un objet attendable personnalisé, vous devez implémenter la méthode Await . Cette méthode doit renvoyer un itérateur. Voici un exemple simple:
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
Cette classe CustomAwaitable peut être utilisée avec le mot-clé Await, tout comme les attentes intégrées. Lorsqu'il attend, il donne un contrôle une fois, puis renvoie sa valeur.
Mais que se passe-t-il si nous voulons créer des primitives asynchrones plus complexes? Examinons la mise en œuvre d'un sémaphore personnalisé. Les sémaphores sont utilisés pour contrôler l'accès à une ressource partagée par plusieurs coroutines:
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._valueCette classe CustomSemaphore implémente les méthodes d'acquisition et de libération, ainsi que le protocole Async Context Manager ( aenter et aexit ). Il permet à un maximum de deux coroutines d'acquérir le sémaphore simultanément.
Maintenant, parlons de la création de boucles d'événements efficaces. Bien qu'Asyncio de Python fournit une implémentation de boucle d'événements robuste, il peut y avoir des cas où vous avez besoin d'un personnal. Voici un exemple de base d'une boucle d'événement personnalisée:
import time from collections import deque class CustomEventLoop: def __init__(self): self._ready = deque() self._stopping = False def call_soon(self, callback, *args): self._ready.append((callback, args)) def run_forever(self): while not self._stopping: self._run_once() def _run_once(self): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) def stop(self): self._stopping = True def run_until_complete(self, coro): def _done_callback(fut): self.stop() task = self.create_task(coro) task.add_done_callback(_done_callback) self.run_forever() return task.result() def create_task(self, coro): task = Task(coro, self) self.call_soon(task._step) return task class Task: def __init__(self, coro, loop): self._coro = coro self._loop = loop self._done = False self._result = None self._callbacks = [] def _step(self): try: if self._done: return result = self._coro.send(None) if isinstance(result, SleepHandle): result._task = self self._loop.call_soon(result._wake_up) else: self._loop.call_soon(self._step) except StopIteration as e: self.set_result(e.value) def set_result(self, result): self._result = result self._done = True for callback in self._callbacks: self._loop.call_soon(callback, self) def add_done_callback(self, callback): if self._done: self._loop.call_soon(callback, self) else: self._callbacks.append(callback) def result(self): if not self._done: raise RuntimeError('Task is not done') return self._result class SleepHandle: def __init__(self, duration): self._duration = duration self._task = None self._start_time = time.time() def _wake_up(self): if time.time() - self._start_time >= self._duration: self._task._loop.call_soon(self._task._step) else: self._task._loop.call_soon(self._wake_up) async def sleep(duration): return SleepHandle(duration) async def example(): print("Start") await sleep(1) print("After 1 second") await sleep(2) print("After 2 more seconds") return "Done" loop = CustomEventLoop() result = loop.run_until_complete(example()) print(result)Cette boucle d'événement personnalisée implémente les fonctionnalités de base comme l'exécution des tâches, la gestion des coroutines et même une fonction de sommeil simple. Ce n'est pas aussi riche en fonctionnalités que la boucle d'événement intégrée de Python, mais il démontre les concepts de base.
L'un des défis dans l'écriture du code asynchrone est de gérer les priorités des tâches. Alors que Asyncio de Python ne fournit pas de files d'attente de priorité intégrées pour les tâches, nous pouvons implémenter le nôtre:
import asyncio import heapq class PriorityEventLoop(asyncio.AbstractEventLoop): def __init__(self): self._ready = [] self._stopping = False self._clock = 0 def call_at(self, when, callback, *args, context=None): handle = asyncio.Handle(callback, args, self, context) heapq.heappush(self._ready, (when, handle)) return handle def call_later(self, delay, callback, *args, context=None): return self.call_at(self._clock delay, callback, *args, context=context) def call_soon(self, callback, *args, context=None): return self.call_at(self._clock, callback, *args, context=context) def time(self): return self._clock def stop(self): self._stopping = True def is_running(self): return not self._stopping def run_forever(self): while self._ready and not self._stopping: self._run_once() def _run_once(self): if not self._ready: return when, handle = heapq.heappop(self._ready) self._clock = when handle._run() def create_task(self, coro): return asyncio.Task(coro, loop=self) def run_until_complete(self, future): asyncio.futures._chain_future(future, self.create_future()) self.run_forever() if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result() def create_future(self): return asyncio.Future(loop=self) async def low_priority_task(): print("Low priority task started") await asyncio.sleep(2) print("Low priority task finished") async def high_priority_task(): print("High priority task started") await asyncio.sleep(1) print("High priority task finished") async def main(): loop = asyncio.get_event_loop() loop.call_later(0.1, loop.create_task, low_priority_task()) loop.call_later(0, loop.create_task, high_priority_task()) await asyncio.sleep(3) asyncio.run(main())Cette priorityEventloop utilise une file d'attente de tas pour gérer les tâches en fonction de leur temps d'exécution planifié. Vous pouvez attribuer des priorités en planifiant des tâches avec différents retards.
Gestion de l'annulation gracieusement est un autre aspect important du travail avec les coroutines. Voici un exemple de la façon d'implémenter des tâches annuables:
import asyncio async def cancellable_operation(): try: print("Operation started") await asyncio.sleep(5) print("Operation completed") except asyncio.CancelledError: print("Operation was cancelled") # Perform any necessary cleanup raise # Re-raise the CancelledError async def main(): task = asyncio.create_task(cancellable_operation()) await asyncio.sleep(2) task.cancel() try: await task except asyncio.CancelledError: print("Main: task was cancelled") asyncio.run(main())Dans cet exemple, le CANCELLABLABLE_OPERATION ACTIRE L'ANCELLEDEDERROR, effectue tout nettoyage nécessaire, puis remonte l'exception. Cela permet une manipulation gracieuse de l'annulation tout en propageant le statut d'annulation.
Explorons la mise en œuvre des itérateurs asynchronisés personnalisés. Ceux-ci sont utiles pour créer des séquences qui peuvent être itérées sur asynchrone:
class AsyncRange: def __init__(self, start, stop, step=1): self.start = start self.stop = stop self.step = step def __aiter__(self): return self async def __anext__(self): if self.start >= self.stop: raise StopAsyncIteration value = self.start self.start = self.step await asyncio.sleep(0.1) # Simulate some async work return value async def main(): async for i in AsyncRange(0, 5): print(i) asyncio.run(main())Cette classe Asyncrange implémente le protocole d'itulator asynchronisé, ce qui lui permet d'être utilisé en asynchronisation pour les boucles.
Enfin, examinons la mise en œuvre des gestionnaires de contexte asynchrones personnalisés. Ceux-ci sont utiles pour gérer les ressources qui doivent être acquises et libérées de manière asynchrone:
class AsyncResource: async def __aenter__(self): print("Acquiring resource") await asyncio.sleep(1) # Simulate async acquisition return self async def __aexit__(self, exc_type, exc, tb): print("Releasing resource") await asyncio.sleep(1) # Simulate async release async def main(): async with AsyncResource() as resource: print("Using resource") await asyncio.sleep(1) asyncio.run(main())Cette classe AsynCresource implémente les méthodes aenter et aexit , ce qui lui permet d'être utilisé avec l'async avec instruction.
En conclusion, le système Coroutine de Python fournit une base puissante pour construire des primitives asynchrones personnalisées. En comprenant les mécanismes et protocoles sous-jacents, vous pouvez créer des solutions sur mesure pour des défis asynchrones spécifiques, optimiser les performances dans des scénarios simultanés complexes et étendre les capacités asynchrones de Python. N'oubliez pas que si ces implémentations personnalisées sont idéales pour l'apprentissage et les cas d'utilisation spécifiques, la bibliothèque Asyncio intégrée de Python est très optimisée et devrait être votre choix pour la plupart des scénarios. Codage heureux!
Nos créations
Assurez-vous de consulter nos créations:
investisseur central | Smart Living | époques et échos | Mystères perplexes | hindutva | elite dev | js écoles
Nous sommes sur le milieu
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Mystères déroutants Medium | Science et époques Medium | Hindutva moderne
Clause de non-responsabilité: Toutes les ressources fournies proviennent en partie d'Internet. En cas de violation de vos droits d'auteur ou d'autres droits et intérêts, veuillez expliquer les raisons détaillées et fournir une preuve du droit d'auteur ou des droits et intérêts, puis l'envoyer à l'adresse e-mail : [email protected]. Nous nous en occuperons pour vous dans les plus brefs délais.
Copyright© 2022 湘ICP备2022001581号-3