前言

QEMU 采用了基于事件驱动的架构,其事件循环机制基于 glib 实现,上一篇文章介绍了 glib 事件循环机制,本文将在此基础上以 8.0.0 版本的 QEMU RISC-V (qemu-system-riscv64) 为例深入分析 QEMU 事件循环机制。

概述

QEMU 的事件循环机制是其核心功能之一,用于协调虚拟机的各种异步操作。通过监听和调度各种事件源,它允许 QEMU 有效地响应外部输入、定时器事件等,从而实现高性能的虚拟化。

QEMU 的默认事件循环是位于文件 utils/main-loop.c 文件中的主循环 main-loop,我们也可以使用选项 –object iothread, id=<my_iothread_name> 自己创建事件循环。

QEMU 事件源

尽管 QEMU 的事件循环机制基于 glib 实现,但是 QEMU 没有直接使用 glib 提供的事件源,而是自定义了一个新的事件源 AioContext,主要有以下两种类型:

  • 监听各种事件,例如 iohandler_ctx
  • 处理块设备层的异步 I/O 请求,例如默认的 qemu_aio_context 或者模块自己创建的 AioContext
/* include/block/aio.h: 124 */

struct AioContext {
    GSource source;

    /* Used by AioContext users to protect from multi-threaded access.  */
    QemuRecMutex lock;

    /*
     * Keep track of readers and writers of the block layer graph.
     * This is essential to avoid performing additions and removal
     * of nodes and edges from block graph while some
     * other thread is traversing it.
     */
    BdrvGraphRWlock *bdrv_graph;

    /* The list of registered AIO handlers.  Protected by ctx->list_lock. */
    AioHandlerList aio_handlers;

    /* The list of AIO handlers to be deleted.  Protected by ctx->list_lock. */
    AioHandlerList deleted_aio_handlers;

    /* Used to avoid unnecessary event_notifier_set calls in aio_notify;
     * only written from the AioContext home thread, or under the BQL in
     * the case of the main AioContext.  However, it is read from any
     * thread so it is still accessed with atomic primitives.
     *
     * If this field is 0, everything (file descriptors, bottom halves,
     * timers) will be re-evaluated before the next blocking poll() or
     * io_uring wait; therefore, the event_notifier_set call can be
     * skipped.  If it is non-zero, you may need to wake up a concurrent
     * aio_poll or the glib main event loop, making event_notifier_set
     * necessary.
     *
     * Bit 0 is reserved for GSource usage of the AioContext, and is 1
     * between a call to aio_ctx_prepare and the next call to aio_ctx_check.
     * Bits 1-31 simply count the number of active calls to aio_poll
     * that are in the prepare or poll phase.
     *
     * The GSource and aio_poll must use a different mechanism because
     * there is no certainty that a call to GSource's prepare callback
     * (via g_main_context_prepare) is indeed followed by check and
     * dispatch.  It's not clear whether this would be a bug, but let's
     * play safe and allow it---it will just cause extra calls to
     * event_notifier_set until the next call to dispatch.
     *
     * Instead, the aio_poll calls include both the prepare and the
     * dispatch phase, hence a simple counter is enough for them.
     */
    uint32_t notify_me;

    /* A lock to protect between QEMUBH and AioHandler adders and deleter,
     * and to ensure that no callbacks are removed while we're walking and
     * dispatching them.
     */
    QemuLockCnt list_lock;

    /* Bottom Halves pending aio_bh_poll() processing */
    BHList bh_list;

    /* Chained BH list slices for each nested aio_bh_poll() call */
    QSIMPLEQ_HEAD(, BHListSlice) bh_slice_list;

    /* Used by aio_notify.
     *
     * "notified" is used to avoid expensive event_notifier_test_and_clear
     * calls.  When it is clear, the EventNotifier is clear, or one thread
     * is going to clear "notified" before processing more events.  False
     * positives are possible, i.e. "notified" could be set even though the
     * EventNotifier is clear.
     *
     * Note that event_notifier_set *cannot* be optimized the same way.  For
     * more information on the problem that would result, see "#ifdef BUG2"
     * in the docs/aio_notify_accept.promela formal model.
     */
    bool notified;
    EventNotifier notifier;

    QSLIST_HEAD(, Coroutine) scheduled_coroutines;
    QEMUBH *co_schedule_bh;

    int thread_pool_min;
    int thread_pool_max;
    /* Thread pool for performing work and receiving completion callbacks.
     * Has its own locking.
     */
    struct ThreadPool *thread_pool;

#ifdef CONFIG_LINUX_AIO
    /*
     * State for native Linux AIO.  Uses aio_context_acquire/release for
     * locking.
     */
    struct LinuxAioState *linux_aio;
#endif
#ifdef CONFIG_LINUX_IO_URING
    /*
     * State for Linux io_uring.  Uses aio_context_acquire/release for
     * locking.
     */
    struct LuringState *linux_io_uring;

    /* State for file descriptor monitoring using Linux io_uring */
    struct io_uring fdmon_io_uring;
    AioHandlerSList submit_list;
#endif

    /* TimerLists for calling timers - one per clock type.  Has its own
     * locking.
     */
    QEMUTimerListGroup tlg;

    int external_disable_cnt;

    /* Number of AioHandlers without .io_poll() */
    int poll_disable_cnt;

    /* Polling mode parameters */
    int64_t poll_ns;        /* current polling time in nanoseconds */
    int64_t poll_max_ns;    /* maximum polling time in nanoseconds */
    int64_t poll_grow;      /* polling time growth factor */
    int64_t poll_shrink;    /* polling time shrink factor */

    /* AIO engine parameters */
    int64_t aio_max_batch;  /* maximum number of requests in a batch */

    /*
     * List of handlers participating in userspace polling.  Protected by
     * ctx->list_lock.  Iterated and modified mostly by the event loop thread
     * from aio_poll() with ctx->list_lock incremented.  aio_set_fd_handler()
     * only touches the list to delete nodes if ctx->list_lock's count is zero.
     */
    AioHandlerList poll_aio_handlers;

    /* Are we in polling mode or monitoring file descriptors? */
    bool poll_started;

    /* epoll(7) state used when built with CONFIG_EPOLL */
    int epollfd;

    const FDMonOps *fdmon_ops;
};

上述结构体较为复杂,下面主要分析几个较为关键的成员变量的含义与作用:

  • GSource source: glib 事件源,每一个 glib 自定义事件源的第一个成员必须是 GSource 结构的成员
  • QemuRecMutex lock: 用于保护 AioContext 的递归互斥锁,防止多线程访问
  • AioHandlerList aio_handlers: 一个链表头,其链表中的数据类型为 AioHandler,所有加入到 AioContext 事件源的文件 fd 的事件处理函数都挂载到该链表上
  • uint32_t notify_me:用于避免在 aio_notify 中进行不必要的 event_notifier_set 调用
  • bool notified:notify_menotified 都与 aio_notify 相关,主要用于在块设备层的 I/O 同步时处理 QEMU 下半部 (Bottom Halvs,BH)
  • BHList bh_list: QEMU 下半部链表,用来连接挂到该事件源的下半部,QEMU 的 BH 默认挂载到 qemu_aio_context
  • EventNotifier notifier: 事件通知对象,在块设备进行同步且需要调用 BH 的时候需要用到该成员
  • QEMUTimerListGroup tlg: 管理挂到该事件源的定时器

AioContext 拓展了 glib 中事件源的功能,不但支持 fd 的事件处理,还模拟内核中的下半部机制,实现了 QEMU 中的下半部以及定时器的管理。AioContext 可以通过调用 aio_context_new 函数进行创建并初始化:

/* util/async.c: 542 */

AioContext *aio_context_new(Error **errp)
{
    int ret;
    AioContext *ctx;

    ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
    QSLIST_INIT(&ctx->bh_list);
    QSIMPLEQ_INIT(&ctx->bh_slice_list);
    aio_context_setup(ctx);

    ret = event_notifier_init(&ctx->notifier, false);
    if (ret < 0) {
        error_setg_errno(errp, -ret, "Failed to initialize event notifier");
        goto fail;
    }
    g_source_set_can_recurse(&ctx->source, true);
    qemu_lockcnt_init(&ctx->list_lock);

    ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
    QSLIST_INIT(&ctx->scheduled_coroutines);

    aio_set_event_notifier(ctx, &ctx->notifier,
                           false,
                           aio_context_notifier_cb,
                           aio_context_notifier_poll,
                           aio_context_notifier_poll_ready);
#ifdef CONFIG_LINUX_AIO
    ctx->linux_aio = NULL;
#endif

#ifdef CONFIG_LINUX_IO_URING
    ctx->linux_io_uring = NULL;
#endif

    ctx->thread_pool = NULL;
    qemu_rec_mutex_init(&ctx->lock);
    timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);

    ctx->poll_ns = 0;
    ctx->poll_max_ns = 0;
    ctx->poll_grow = 0;
    ctx->poll_shrink = 0;

    ctx->aio_max_batch = 0;

    ctx->thread_pool_min = 0;
    ctx->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;

    register_aiocontext(ctx);

    return ctx;
fail:
    g_source_destroy(&ctx->source);
    return NULL;
}

aio_context_new 函数返回一个指向新创建的 AioContext 的指针,并接受一个 Error 指针的指针,用于报告初始化过程中出现的错误。该函数首先调用 g_source_new 等函数完成 AioContext 的创建、设置以及 QEMU 下半部列表的初始化,接着初始化事件通知器及其回调函数、初始化锁并设置协程调度,然后完成线程池、计时器列表、轮询参数等其他内容的初始化工作,最后注册新创建的 AioContext

AioContext 的创建函数中,aio_set_event_notifer 函数调用了 aio_set_fd_handler 函数,后者用于添加或者删除 AioContext 事件源中的 fd,如果是添加则会设置 fd 对应的读写函数。添加事件源中 fd 监听处理的具体过程如下:

/* util/aio-posix.c: 100 */

void aio_set_fd_handler(AioContext *ctx,
                        int fd,
                        bool is_external,
                        IOHandler *io_read,
                        IOHandler *io_write,
                        AioPollFn *io_poll,
                        IOHandler *io_poll_ready,
                        void *opaque)
{
    AioHandler *node;
    AioHandler *new_node = NULL;
    bool is_new = false;
    bool deleted = false;
    int poll_disable_change;

    if (io_poll && !io_poll_ready) {
        io_poll = NULL; /* polling only makes sense if there is a handler */
    }

    qemu_lockcnt_lock(&ctx->list_lock);

    node = find_aio_handler(ctx, fd);

    /* Are we deleting the fd handler? */
    if (!io_read && !io_write && !io_poll) {
        if (node == NULL) {
            qemu_lockcnt_unlock(&ctx->list_lock);
            return;
        }
        /* Clean events in order to unregister fd from the ctx epoll. */
        node->pfd.events = 0;

        poll_disable_change = -!node->io_poll;
    } else {
        poll_disable_change = !io_poll - (node && !node->io_poll);
        if (node == NULL) {
            is_new = true;
        }
        /* Alloc and insert if it's not already there */
        new_node = g_new0(AioHandler, 1);

        /* Update handler with latest information */
        new_node->io_read = io_read;
        new_node->io_write = io_write;
        new_node->io_poll = io_poll;
        new_node->io_poll_ready = io_poll_ready;
        new_node->opaque = opaque;
        new_node->is_external = is_external;

        if (is_new) {
            new_node->pfd.fd = fd;
        } else {
            new_node->pfd = node->pfd;
        }
        g_source_add_poll(&ctx->source, &new_node->pfd);

        new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
        new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);

        QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
    }

    /* No need to order poll_disable_cnt writes against other updates;
     * the counter is only used to avoid wasting time and latency on
     * iterated polling when the system call will be ultimately necessary.
     * Changing handlers is a rare event, and a little wasted polling until
     * the aio_notify below is not an issue.
     */
    qatomic_set(&ctx->poll_disable_cnt,
               qatomic_read(&ctx->poll_disable_cnt) + poll_disable_change);

    ctx->fdmon_ops->update(ctx, node, new_node);
    if (node) {
        deleted = aio_remove_fd_handler(ctx, node);
    }
    qemu_lockcnt_unlock(&ctx->list_lock);
    aio_notify(ctx);

    if (deleted) {
        g_free(node);
    }
}

该函数接受以下 6 个参数:

  • AioContext *ctx: 待添加 fd 的异步 I/O 上下文
  • int fd: 待处理的文件描述符
  • bool is_external: 用于块设备层,指示是否为外部事件,对于事件监听的 fd 都设置为 false
  • IOHandler *io_read: 读取回调函数
  • IOHandler *io_write: 写入回调函数
  • AioPollFn *io_poll: 轮询回调函数
  • IOHandler *io_poll_ready: 轮询就绪回调函数
  • void *opaque: 传递给回调函数的透明指针

aio_set_fd_handler 函数首先调用 find_aio_handler 查找当前事件源中是否已有 fd,对于新加入的情况,这里会创建一个名为 nodeAioHandler,使用 fd 初始化 node->pfd.fd,并将其插入到 ctx->aio_handlers 链表上,调用 glib 接口函数 g_source_add_poll 将该 fd 插入到了事件源监听 fd 列表中。最后设置 node 的事件读写函数为 io_readio_write,根据 io_readio_write 的有无设置 node->pfd.events,即要监听的事件。在 aio_set_fd_handler 执行完成之后,新的 fd 事件就加入到了事件源的 aio_handlers 链表上了,具体如下图所示:

状态机回调函数

QEMU 丰富了 glib 的事件源,状态机回调函数的实现逻辑变得更加复杂,但基本框架仍然遵循 glib 规范,aio_context_new 函数中的 aio_source_funcs 是状态机回调函数的声明:

/* util/async.c: 389 */

static GSourceFuncs aio_source_funcs = {
    aio_ctx_prepare,
    aio_ctx_check,
    aio_ctx_dispatch,
    aio_ctx_finalize
};

其中 aio_ctx_prepare 是状态机 prepare 阶段的回调函数,目的是准备 AioContext 进行轮询,计算超时,并检查是否有即时事件需要处理。如果有即时事件,它会返回 true,这意味着主事件循环应该立即处理这些事件而非等待。

/* util/async.c: 267 */

static gboolean
aio_ctx_prepare(GSource *source, gint    *timeout)
{
    AioContext *ctx = (AioContext *) source;

    qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) | 1);

    /*
     * Write ctx->notify_me before computing the timeout
     * (reading bottom half flags, etc.).  Pairs with
     * smp_mb in aio_notify().
     */
    smp_mb();

    /* We assume there is no timeout already supplied */
    *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));

    if (aio_prepare(ctx)) {
        *timeout = 0;
    }

    return *timeout == 0;
}

aio_ctx_check 是状态机 check 阶段的回调函数,主要负责检查 AioContext 是否有待处理的事件。如果有则返回 true。该函数首先使用原子操作清除 ctx->notify_me 的最低位,这是用于跟踪是否需要通知 AioContext 的标志。接着调用 aio_notify_accept 来处理任何挂起的通知,并通过 QSLIST_FOREACH_RCU 宏遍历 ctx->bh_list 中的所有下半部,如果找到一个已调度但未删除的下半部,函数返回 true。然后,使用 QSIMPLEQ_FOREACHQSLIST_FOREACH_RCU 宏遍历 ctx->bh_slice_list 中的所有下半部,如果在任何切片中找到一个已调度但未删除的下半部,函数返回 true。最后,使用 aio_pending 函数检查是否有其他挂起的事件。此外,函数还会检查 timerlistgroup_deadline_ns 的返回值是否为 0,这意味着有一个立即的定时器事件需要处理。

/* util/async.c: 291 */

static gboolean
aio_ctx_check(GSource *source)
{
    AioContext *ctx = (AioContext *) source;
    QEMUBH *bh;
    BHListSlice *s;

    /* Finish computing the timeout before clearing the flag. */
    qatomic_store_release(&ctx->notify_me, qatomic_read(&ctx->notify_me) & ~1);
    aio_notify_accept(ctx);

    QSLIST_FOREACH_RCU(bh, &ctx->bh_list, next) {
        if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
            return true;
        }
    }

    QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) {
        QSLIST_FOREACH_RCU(bh, &s->bh_list, next) {
            if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
                return true;
            }
        }
    }
    return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
}

aio_ctx_dispatch 是状态机 dispatch 阶段的回调函数,整体逻辑较为简单,在确保参数 callback 不为空的情况下直接调用 aio_dispatch 函数进行处理。

/* util/async.c: 318 */

static gboolean
aio_ctx_dispatch(GSource     *source,
                 GSourceFunc  callback,
                 gpointer     user_data)
{
    AioContext *ctx = (AioContext *) source;

    assert(callback == NULL);
    aio_dispatch(ctx);
    return true;
}

aio_dispatch 函数是所有事件处理函数中最为重要的一个,主要负责事件的分派处理,具体负责:

  • 下半部处理
  • 处理文件 fd 列表中有事件的 fd
  • 调用定时器到期的函数

aio_dispatch 函数代码对上述三个行为进行了封装:

/* util/aio-posix.c: 418 */

void aio_dispatch(AioContext *ctx)
{
    qemu_lockcnt_inc(&ctx->list_lock);
    aio_bh_poll(ctx);
    aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
    qemu_lockcnt_dec(&ctx->list_lock);
    timerlistgroup_run_timers(&ctx->tlg);
}

这里我们重点关注 aio_dispatch_handlers 函数的执行逻辑:

/* util/aio-posix.c: 406 */

static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
{
    AioHandler *node;
    bool progress = false;
    AioHandler *tmp;

    /*
     * We have to walk very carefully in case aio_set_fd_handler is
     * called while we're walking.
     */
    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
        int revents = node->pfd.revents;

        if (!node->deleted &&
            (revents || event_notifier_get_handle(node->e) == event) &&
            node->io_notify) {
            node->pfd.revents = 0;
            node->io_notify(node->e);

            /* aio_notify() does not count as progress */
            if (node->e != &ctx->notifier) {
                progress = true;
            }
        }

        if (!node->deleted &&
            (node->io_read || node->io_write)) {
            node->pfd.revents = 0;
            if ((revents & G_IO_IN) && node->io_read) {
                node->io_read(node->opaque);
                progress = true;
            }
            if ((revents & G_IO_OUT) && node->io_write) {
                node->io_write(node->opaque);
                progress = true;
            }

            /* if the next select() will return an event, we have progressed */
            if (event == event_notifier_get_handle(&ctx->notifier)) {
                WSANETWORKEVENTS ev;
                WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
                if (ev.lNetworkEvents) {
                    progress = true;
                }
            }
        }

        if (node->deleted) {
            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
                QLIST_REMOVE(node, node);
                g_free(node);
                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
            }
        }
    }

    return progress;
}

aio_dispatch_handlers 函数的行为主要分为 4 个步骤:

  1. 使用 QLIST_FOREACH_SAFE_RCU 宏安全地遍历 ctx->aio_handlers 列表,检查监听 fd 上的事件是否发生。fd 发生的事件存在 node->pfd.revents 中,注册时指定需要接受的事件存放在 node->pfd.events 中,revents 变量保存了 fd 接收到的事件。
  2. 处理通知,如果事件没有被删除,其事件标志被设置,且有一个通知回调函数,那么该回调函数将被调用。
  3. 处理读写事件,对应 G_IO_IN 可读事件来说,会调用注册的 fd 的 io_read 回调,对 G_IN_OUT 可写事件来说,会调用注册的 fd 的 io_write 函数。
  4. 如果当前的 fd 已经被标记删除了,则会删除这个节点并释放锁。

aio_ctx_finalize 函数是事件源不再被引用时的回调函数,其主要目的是确保与 AioContext 关联的所有资源都被正确地清理和释放,以防止发生内存泄漏等问题。aio_ctx_finalize 函数的整体逻辑相对而言比较简单,这里不再赘述。

poll 机制

应用程序对文件描述符的读写,一般有两种方式:

  • 阻塞式 IO: 如果 fd 的某个状态(比如可读,可写)没有准备就绪,进程将永远阻塞在这个 IO 上
  • 非阻塞式 IO: 进程 IO 立即执行,如果 fd 没有准备就绪,直接返回错误,表示如果继续执行 IO 将阻塞。

通常来说,上述两种方式能够满足大多数应用程序对文件描述符的读写需求,但是在更复杂的场景下,比如一个不允许阻塞的进程需要读写一个可能永远阻塞进程的 fd,这时就需要使用内核提供的 poll 机制。poll 机制不直接读写 IO,而是先探测 fd 的 IO 状态是否就绪。这个探测行为可以设置超时时间:如果在超时时间到达前 IO 状态准备就绪,poll 接口立即返回,指示应用程序可以进行读写操作;如果 poll 超过了设置的超时时间,但应用需要的 IO 状态仍未完全就绪,poll 接口也会返回,同时把已经就绪的部分 IO 状态返回。这种机制使得应用程序能够根据内核返回的 poll 信息确定下一步的操作,避免应用程序被永远阻塞在某一 IO 上。

QEMU 基于 glib 事件循环的框架,定制了 glib 事件循环机制中几乎所有的接口。需要注意的是,QEMU 并没有调用 glib 的接口 g_main_loop_run 执行事件循环,而是直接调用了 glib 提供的 g_poll 函数实现 poll 的功能。g_poll 函数具有可移植性,在有 poll 接口的平台上由 poll 模拟,在没有 poll 接口的平台上由 select 模拟。当用户不想执行整个 glib 事件循环,又想实现阻塞一段时间的高级 IO,就可以通过直接调用 g_poll 实现。

QEMU 主事件循环的具体逻辑由函数 qemu_main_loop 体现:

/* softmmu/runstate.c: 720 */

int qemu_main_loop(void)
{
    int status = EXIT_SUCCESS;
#ifdef CONFIG_PROFILER
    int64_t ti;
#endif

    while (!main_loop_should_exit(&status)) {
#ifdef CONFIG_PROFILER
        ti = profile_getclock();
#endif
        main_loop_wait(false);
#ifdef CONFIG_PROFILER
        dev_time += profile_getclock() - ti;
#endif
    }

    return status;
}

观察上述代码我们可以发现,QEMU 使用了一条 while 语句循环调用 main_loop_wait 函数,再通过一串函数调用链最终调用 g_poll 接口。g_poll 接口接受的参数是一个文件描述符的数组,因此它能够同时 poll 多个 fd,poll 的时候是休眠的,一旦其中一个 fd 准备好,poll 就会被唤醒,同时 GPollFD.revent 会被内核设置,以表明 fd 的状态。QEMU 对 g_poll 接口的封装实现除了以纳秒为单位计算超时之外,其他方面均与 glib 的 g_poll 接口保持一致:

/* util/qemu-timer.c: 335 */

int qemu_poll_ns(GPollFD *fds, guint nfds, int64_t timeout)
{
#ifdef CONFIG_PPOLL
    if (timeout < 0) {
        return ppoll((struct pollfd *)fds, nfds, NULL, NULL);
    } else {
        struct timespec ts;
        int64_t tvsec = timeout / 1000000000LL;
        /* Avoid possibly overflowing and specifying a negative number of
         * seconds, which would turn a very long timeout into a busy-wait.
         */
        if (tvsec > (int64_t)INT32_MAX) {
            tvsec = INT32_MAX;
        }
        ts.tv_sec = tvsec;
        ts.tv_nsec = timeout % 1000000000LL;
        return ppoll((struct pollfd *)fds, nfds, &ts, NULL);
    }
#else
    return g_poll(fds, nfds, qemu_timeout_ns_to_ms(timeout));
#endif
}

如果 QEMU 配置了 PPOLL,qemu_poll_ns 函数会使用 PPOLL 实现 poll 探测,否则直接调用 g_poll 函数实现 poll 探测。

主事件循环

一般流程

QEMU 事件循环执行流程遵循 glib 标准,与普通应用的执行流程类似,相关函数的调用关系如下图所示:

QEMU 的 main 函数定义在 softmmu/ vl.c 文件中,主函数在调用 qemu_init() 完成所有初始化工作之后会直接跳转函数 qemu_main 开始执行主事件循环。

初始化

QEMU 主事件循环初始化由 qemu_init_main_loop 函数负责:

/* util/main-loop.c: 156 */

int qemu_init_main_loop(Error **errp)
{
    int ret;
    GSource *src;

    init_clocks(qemu_timer_notify_cb);

    ret = qemu_signal_init(errp);
    if (ret) {
        return ret;
    }

    qemu_aio_context = aio_context_new(errp);
    if (!qemu_aio_context) {
        return -EMFILE;
    }
    qemu_set_current_aio_context(qemu_aio_context);
    qemu_notify_bh = qemu_bh_new(notify_event_cb, NULL);
    gpollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
    src = aio_get_g_source(qemu_aio_context);
    g_source_set_name(src, "aio-context");
    g_source_attach(src, NULL);
    g_source_unref(src);
    src = iohandler_get_g_source();
    g_source_set_name(src, "io-handler");
    g_source_attach(src, NULL);
    g_source_unref(src);
    return 0;
}

观察上述代码可以发现,QEMU 事件初始化通过 glib 接口封装出本文开头所介绍的两种自定义事件源 qemu_aio_contextiohandler_ctx,然后将它们加入到 glib 默认的事件循环 default GMainContext 中。此外,一个上下文只在一个线程中运行,而 qemu_aio_contextiohandler_ctx 属于同一个上下文,因此可以运行在同一线程中。

关键函数

在上述函数调用关系图中,较为关键的是 os_host_main_loop_wait 函数,具体执行对 fd 的监听行为。需要注意的是,main_loop_wait 在调用 os_host_main_loop_wait 函数前,会调用 qemu_soonest_timeout 函数先计算一个最小的 timeout 值,该值是从定时器列表中获取的,表示监听事件的时候最多让主事件循环阻塞的时间,timeout 的存在使得 QEMU 能够及时处理系统中的定时器到期事件。

os_host_main_loop_wait 函数定义位于 util/main-loop.c 文件中,下面针对代码进行具体分析:

/* util/main-loop.c: 296 */

static int os_host_main_loop_wait(int64_t timeout)
{
    GMainContext *context = g_main_context_default();
    int ret;

    g_main_context_acquire(context);

    glib_pollfds_fill(&timeout);

    qemu_mutex_unlock_iothread();
    replay_mutex_unlock();

    ret = qemu_poll_ns((GPollFD *)gpollfds->data, gpollfds->len, timeout);

    replay_mutex_lock();
    qemu_mutex_lock_iothread();

    glib_pollfds_poll();

    g_main_context_release(context);

    return ret;
}

在上述代码中,我们需要重点关注以下三个函数:

  • glib_pollfds_fill: 检索事件源中待监控的 fd 集合
  • qemu_poll_ns: QEMU 对 g_poll 接口的封装实现,以纳秒为单位计算超时时间
  • glib_pollfds_poll: 有事件发生,准备开始执行回调

其中 qemu_poll_ns 函数上文已经分析过,这里重点关注另外两个函数。

/* util/main-loop.c: 255 */

static void glib_pollfds_fill(int64_t *cur_timeout)
{
    GMainContext *context = g_main_context_default();
    int timeout = 0;
    int64_t timeout_ns;
    int n;

    g_main_context_prepare(context, &max_priority);

    glib_pollfds_idx = gpollfds->len;
    n = glib_n_poll_fds;
    do {
        GPollFD *pfds;
        glib_n_poll_fds = n;
        g_array_set_size(gpollfds, glib_pollfds_idx + glib_n_poll_fds);
        pfds = &g_array_index(gpollfds, GPollFD, glib_pollfds_idx);
        n = g_main_context_query(context, max_priority, &timeout, pfds,
                                 glib_n_poll_fds);
    } while (n != glib_n_poll_fds);

    if (timeout < 0) {
        timeout_ns = -1;
    } else {
        timeout_ns = (int64_t)timeout * (int64_t)SCALE_MS;
    }

    *cur_timeout = qemu_soonest_timeout(timeout_ns, *cur_timeout);
}

glib_pollfds_fill 函数的主要工作是获取所有需要进行监听的 fd,并且计算一个最小的超时时间。该函数首先调用 g_main_context_prepare 开始为主循环的监听做准备。接着在一个循环中调用 g_main_context_query 获取需要监听的 fd,所有 fd 保存在全局变量 gpollfds 数组中,需要监听的 fd 的数量保存在 glib_n_poll_fds 中,g_main_context_query 还会返回 fd 时间最小的 timeout,该值用来与传入的参数 cur_timeout 进行比较,选取较小值表示主循环的最大阻塞时间。

/* util/main-loop.c: 284 */

static void glib_pollfds_poll(void)
{
    GMainContext *context = g_main_context_default();
    GPollFD *pfds = &g_array_index(gpollfds, GPollFD, glib_pollfds_idx);

    if (g_main_context_check(context, max_priority, pfds, glib_n_poll_fds)) {
        g_main_context_dispatch(context);
    }
}

glib_pollfds_poll 函数负责对事件进行分发处理,它的代码结构比较简单,首先调用 glib 框架的 g_main_context_check 检测事件,然后再调用 g_main_context_dispatch 对事件进行分发处理。

事件处理

signalfd 是 Linux 的一个系统调用,可以将特定的信号与一个 fd 绑定,当有信号到达的时 fd 就会产生对应的可读事件。下面以 signalfd 为例梳理总结 QEMU 事件处理的过程。

初始化阶段主要逻辑如下:

  1. 调用 qemu_signal_init 将一个 fd 与一组信号关联起来,qemu_signal_init 调用 qemu_set_fd_handler 函数将该 signalfd 对应的可读回调函数设置为 sigfd_handler
  2. qemu_set_fd_handler 在首次调用时会调用 iohandler_init 创建一个全局的 iohandler_ctx 事件源,这个事件源的作用是监听 QEMU 中的各类事件
  3. qemu_signal_initiohandlers_ctxaio_handlers 上挂载一个 AioHandler 节点,其 fd 为 signalfd,其 io_read 函数为 sigfd_handler
  4. 调用 aio_context_new 创建一个全局的 qemu_aio_context 事件源,这个事件源主要用于处理 BH 和块设备层的同步使用
  5. 调用 aio_get_g_sourceiohandler_get_g_source 分别获取 qemu_aio_contextiohandler_ctxGSource 成员变量,以该变量为参数调用 g_source_attach 两个 AioContext 加入主循环

将信号对应的 fd 加入事件源并将事件源加入主循环之后,QEMU 就会进入上文所述的 qemu_main_loop 函数循环进行事件监听。当使用 kill 命令向 QEMU 进程发送 SIGALARM 信号时,signalfd 就会有可读信号,从而导致主循环返回调用 g_main_context_dispatch 进行事件分发,这会调用 aio_ctx_dispatch,最终会调用 qemu_signal_init 注册的可读处理函数 sigfd_handler 进行事件处理。

总结

本文在 glib 事件循环机制的基础上,介绍了 QEMU 事件循环机制的基本组成,包括事件源、状态机回调函数以及 poll 机制,详细分析了 QEMU 主事件循环的运行原理,并以 signalfd 为例梳理了 QEMU 事件处理流程。在下一篇文章中,我们将继续深入分析 QEMU 下半部(Bottom Halvs,BH)机制。

参考资料