”工欲善其事,必先利其器。“—孔子《论语.录灵公》
首页 > 编程 > 使用异步生成器在 TypeScript 中异步迭代事件发射器

使用异步生成器在 TypeScript 中异步迭代事件发射器

发布于2024-11-08
浏览:279

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

介绍

在现代 Web 开发中,我们经常处理事件,无论是处理传入的 WebSocket 消息、服务器发送的事件 (SSE) 还是来自 Redis Pub/Sub 等服务的数据流。虽然 Node.js 提供了事件驱动的功能,但它缺乏一种开箱即用的方法来使用 for wait...of 循环异步迭代事件。

在这篇文章中,我将引导您通过一种简单而强大的方法来使用 TypeScript 和 AsyncGenerator 创建异步事件迭代器。这种方法旨在允许您以干净且可预测的方式使用来自任何类型事件发射器的事件,并完全控制取消和清理逻辑。

用例:Redis Pub/Sub

在我最近的一个项目中,我需要监听 Redis Pub/Sub 通道并将服务器发送的事件 (SSE) 异步分派给连接的客户端。面临的挑战是在不压垮系统的情况下处理传入事件,同时允许消费者随时取消事件流。

解决方案?一个事件迭代器,可将任何事件发射器(例如 Redis Pub/Sub)转换为异步迭代器。这使我们能够以受控的方式处理事件,并在必要时优雅地处理取消。

让我们深入了解实施。

守则

export type Context = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise;

export type Subscriber = (
    context: Context,
) => void | CleanupFn | Promise;

export async function* createEventIterator(
    subscriber: Subscriber,
): AsyncGenerator {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}

它是如何运作的

此函数接受订阅者函数,您可以将其挂接到任何事件发射器或发布/订阅系统中。订阅者提供了两个基本方法:

  1. emit:允许订阅者将新事件推送到迭代器中。
  2. 取消:提供一种方式来指示迭代应该停止。

该函数返回一个 AsyncGenerator,允许您使用 for wait...of 循环迭代事件。

分解代码

  1. 上下文对象:
    Context 类型提供了一个接口来发出新事件或取消订阅。订阅者使用此上下文来控制事件流。

  2. 事件队列:
    events: T[] 数组用作存储发出的事件的缓冲区。生成器将一一处理这些事件。如果队列中没有事件,它将等待下一个事件被发出。

  3. 发出逻辑:
    发出函数将新事件添加到队列并解决任何待处理的承诺(即,如果生成器正在等待新事件)。

  4. 消除
    如果调用 cancel 函数,它会设置一个标志 (cancelled = true) 来指示循环应该退出。在生成器完成之前,队列中的任何剩余事件仍将被处理。

  5. 清理
    取消后,生成器将调用取消订阅函数(如果提供)来执行任何必要的清理。这对于取消订阅 Redis 等外部系统或清理资源尤其重要。

示例:收听 Redis Pub/Sub

让我们看看如何使用此事件迭代器来监听 Redis Pub/Sub 并异步迭代传入的消息。

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();

在此示例中,我们使用 createEventIterator 订阅 Redis Pub/Sub 通道并异步迭代消息。每次有新消息到达时,它都会被发送到生成器中,我们可以在那里实时处理它。如果收到特定消息(例如“STOP”),我们将中断循环并取消订阅 Redis。

示例:使用 EventEmitter

以下是如何将 createEventIterator 与 Node.js 的 EventEmitter 结合使用:

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();

在此示例中:

  • 我们使用EventEmitter来发出事件,这些事件由createEventIterator捕获。
  • 迭代器监听“data”事件并异步处理它。
  • 与Redis示例类似,我们可以在收到特定事件('STOP')时停止迭代。

这种方法的好处

  • 异步控制:通过利用AsyncGenerator,我们可以异步处理事件,按照自己的节奏处理它们,并在需要时暂停处理。

  • 取消:随时取消事件流的能力使这种方法变得灵活,特别是在可能需要正常关闭连接的现实场景中。

  • 通用:此迭代器可用于任何事件发射器或 Pub/Sub 系统,使其适用于不同的应用程序。

结论

事件驱动架构是许多现代 Web 应用程序的基石,但当我们需要异步控制事件流时,它们的管理可能会变得棘手。借助 TypeScript 中 AsyncGenerator 的强大功能,您可以构建像此事件迭代器这样的优雅解决方案,使您的事件处理代码更干净、更易于维护。

我希望这篇文章可以帮助您开始为您自己的事件发射器使用异步迭代器。如果您有任何问题或想法,请随时在评论中分享!

版本声明 本文转载于:https://dev.to/redjohnsh/asynchronously-iterating-over-event-emitters-in-typescript-with-async-generators-3mk?1如有侵犯,请联系[email protected]删除
最新教程 更多>
  • 如何将PANDAS DataFrame列转换为DateTime格式并按日期过滤?
    如何将PANDAS DataFrame列转换为DateTime格式并按日期过滤?
    Transform Pandas DataFrame Column to DateTime FormatScenario:Data within a Pandas DataFrame often exists in various formats, including strings.使用时间数据时...
    编程 发布于2025-07-14
  • 反射动态实现Go接口用于RPC方法探索
    反射动态实现Go接口用于RPC方法探索
    在GO 使用反射来实现定义RPC式方法的界面。例如,考虑一个接口,例如:键入myService接口{ 登录(用户名,密码字符串)(sessionId int,错误错误) helloworld(sessionid int)(hi String,错误错误) } 替代方案而不是依靠反射...
    编程 发布于2025-07-14
  • 如何将多种用户类型(学生,老师和管理员)重定向到Firebase应用中的各自活动?
    如何将多种用户类型(学生,老师和管理员)重定向到Firebase应用中的各自活动?
    Red: How to Redirect Multiple User Types to Respective ActivitiesUnderstanding the ProblemIn a Firebase-based voting app with three distinct user type...
    编程 发布于2025-07-14
  • 如何使用Python的请求和假用户代理绕过网站块?
    如何使用Python的请求和假用户代理绕过网站块?
    如何使用Python的请求模拟浏览器行为,以及伪造的用户代理提供了一个用户 - 代理标头一个有效方法是提供有效的用户式header,以提供有效的用户 - 设置,该标题可以通过browser和Acterner Systems the equestersystermery和操作系统。通过模仿像Chro...
    编程 发布于2025-07-14
  • C++20 Consteval函数中模板参数能否依赖于函数参数?
    C++20 Consteval函数中模板参数能否依赖于函数参数?
    [ consteval函数和模板参数依赖于函数参数在C 17中,模板参数不能依赖一个函数参数,因为编译器仍然需要对非contexexpr futcoriations contim at contexpr function进行评估。 compile time。 C 20引入恒定函数,必须在编译时进行...
    编程 发布于2025-07-14
  • 在JavaScript中如何并发运行异步操作并正确处理错误?
    在JavaScript中如何并发运行异步操作并正确处理错误?
    同意操作execution 在执行asynchronous操作时,相关的代码段落会遇到一个问题,当执行asynchronous操作:此实现在启动下一个操作之前依次等待每个操作的完成。要启用并发执行,需要进行修改的方法。 第一个解决方案试图通过获得每个操作的承诺来解决此问题,然后单独等待它们: co...
    编程 发布于2025-07-14
  • 如何使用FormData()处理多个文件上传?
    如何使用FormData()处理多个文件上传?
    )处理多个文件输入时,通常需要处理多个文件上传时,通常是必要的。 The fd.append("fileToUpload[]", files[x]); method can be used for this purpose, allowing you to send multi...
    编程 发布于2025-07-14
  • 切换到MySQLi后CodeIgniter连接MySQL数据库失败原因
    切换到MySQLi后CodeIgniter连接MySQL数据库失败原因
    无法连接到mySQL数据库:故障排除错误消息要调试问题,建议将以下代码添加到文件的末尾.//config/database.php并查看输出: ... ... 回声'... echo '<pre>'; print_r($db['default']); echo '</pr...
    编程 发布于2025-07-14
  • 如何在Java中正确显示“ DD/MM/YYYY HH:MM:SS.SS”格式的当前日期和时间?
    如何在Java中正确显示“ DD/MM/YYYY HH:MM:SS.SS”格式的当前日期和时间?
    如何在“ dd/mm/yyyy hh:mm:mm:ss.ss”格式“ gormat 解决方案: args)抛出异常{ 日历cal = calendar.getInstance(); SimpleDateFormat SDF =新的SimpleDateFormat(“...
    编程 发布于2025-07-14
  • 如何克服PHP的功能重新定义限制?
    如何克服PHP的功能重新定义限制?
    克服PHP的函数重新定义限制在PHP中,多次定义一个相同名称的函数是一个no-no。尝试这样做,如提供的代码段所示,将导致可怕的“不能重新列出”错误。 但是,PHP工具腰带中有一个隐藏的宝石:runkit扩展。它使您能够灵活地重新定义函数。 runkit_function_renction_re...
    编程 发布于2025-07-14
  • Python元类工作原理及类创建与定制
    Python元类工作原理及类创建与定制
    python中的metaclasses是什么? Metaclasses负责在Python中创建类对象。就像类创建实例一样,元类也创建类。他们提供了对类创建过程的控制层,允许自定义类行为和属性。在Python中理解类作为对象的概念,类是描述用于创建新实例或对象的蓝图的对象。这意味着类本身是使用类关...
    编程 发布于2025-07-14
  • 如何从PHP中的数组中提取随机元素?
    如何从PHP中的数组中提取随机元素?
    从阵列中的随机选择,可以轻松从数组中获取随机项目。考虑以下数组:; 从此数组中检索一个随机项目,利用array_rand( array_rand()函数从数组返回一个随机键。通过将$项目数组索引使用此键,我们可以从数组中访问一个随机元素。这种方法为选择随机项目提供了一种直接且可靠的方法。
    编程 发布于2025-07-14
  • 如何实时捕获和流媒体以进行聊天机器人命令执行?
    如何实时捕获和流媒体以进行聊天机器人命令执行?
    在开发能够执行命令的chatbots的领域中,实时从命令执行实时捕获Stdout,一个常见的需求是能够检索和显示标准输出(stdout)在cath cath cant cant cant cant cant cant cant cant interfaces in Chate cant inter...
    编程 发布于2025-07-14
  • 在Python中如何创建动态变量?
    在Python中如何创建动态变量?
    在Python 中,动态创建变量的功能可以是一种强大的工具,尤其是在使用复杂的数据结构或算法时,Dynamic Variable Creation的动态变量创建。 Python提供了几种创造性的方法来实现这一目标。利用dictionaries 一种有效的方法是利用字典。字典允许您动态创建密钥并分...
    编程 发布于2025-07-14

免责声明: 提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发到邮箱:[email protected] 我们会第一时间内为您处理。

Copyright© 2022 湘ICP备2022001581号-3