QuXiao's Blog

Life && Tech && Thoughts

Memcached源码学习——线程模型

Written on

Memcached中有以下几类线程:

  • 主线程
  • 工作线程
  • 维护线程

主线程,又可以叫分发线程,除了完成程序的各种参数、以及其他线程的初始化以外,还会listen端口,新建连接,并且将该连接分发到其他的工作线程。

工作线程,大部分实际的工作都是他们干的,包括读取请求的协议内容、解析、进行具体存、取、更新、删除kv的操作,最后返回结果。

另外,还有一个维护线程,它的工作就是在需要的时候(存放的item大于总量的2/3)对hash表进行扩展。

Memcached处理请求时,采用的是单进程多线程的Master-Worker模型,通过libevent这个事件响应库来实现的。

首先来看一下主线程和工作线程之间是怎么交互的吧:

工作线程在初始化的时候,会建立一个pipe(管道),两端分别为:notify_receive_fd,以及notify_send_fd

    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {
            perror("Can't create notify pipe");
            exit(1);
        }

        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];

        setup_thread(&threads[i]);
        /* Reserve three fds for the libevent base, and two for the pipe */
        stats.reserved_fds += 5;
    }

也就是说当其他线程向notify_send_fd文件描述符写内容的时候,notify_receive_fd就可以接受到。

接着,就用到了libevent的API:

    me->base = event_init();

    /* Listen for notifications from other threads */
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);

    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }

每个工作线程都新建一个libevent实例(me->base),并且将notify_event绑定在这个实例上。

notify_event什么时候触发? 当notify_receive_fd有内容的时候被触发。

触发了执行什么函数? 执行thread_libevent_process (me)函数。

那在哪个地方会写notify_send_fd呢? 在主线程将新建的连接分发给工作时,就会向某个线程的notify_send_fd写一个空的字符串用来唤醒这个线程。下面的代码一目了然:

    void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                           int read_buffer_size, enum network_transport transport) {
        CQ_ITEM *item = cqi_new();
        /*这就是所谓的round robin*/
        int tid = (last_thread + 1) % settings.num_threads;

        LIBEVENT_THREAD *thread = threads + tid;

        last_thread = tid;

        item->sfd = sfd;
        /* ... */

        cq_push(thread->new_conn_queue, item);

        if (write(thread->notify_send_fd, "", 1) != 1) {
            perror("Writing to thread notify pipe");
        }
    }

首先来看看主线程,当他把其他工作线程、维护线程启起来之后,就开始侦听socket端口了(可以在memcached的源码中看出tcp和udp在处理逻辑上有很多不同的地方,但我不知道为什么不一样,就只看了处理tcp部分的代码,看来改补一补网络通信的知识了……),主要逻辑在server_sockets函数中:

    if (IS_UDP(transport)) {
        int c;

        for (c = 0; c < settings.num_threads_per_udp; c++) {
            /* this is guaranteed to hit all threads because we round-robin */
            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
                              UDP_READ_BUFFER_SIZE, transport);
        }
    } else {
        if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                         EV_READ | EV_PERSIST, 1,
                                         transport, main_base))) {
            fprintf(stderr, "failed to create listening connection\n");
            exit(EXIT_FAILURE);
        }
        listen_conn_add->next = listen_conn;
        listen_conn = listen_conn_add;
    }

conn_new函数建立了连接之后,将socket文件描述符于event_handler函数绑定,当有socket请求过来的时候,就执行event_handler。在event_handler中,就是直接调用drive_machine这个大大的状态转移函数,一次连接的所有状态就都在这个函数里面处理了。

主线程首先达到drive_machine中的conn_listenning状态,然后通过dispatch_conn_new将这次的连接分配给某个工作线程,工作线程再经历其他的状态,完成一次请求。

drive_machine的具体逻辑比较复杂,这里就不讲了。

好,最后简单回顾下这个工作流程:

  1. 主线程接受到memcached客户端的请求(是在哪儿一直接收请求的?)
  2. 主线程通过round robin找到一个工作线程
  3. 主线程将创建的连接push到工作线程的连接队列中,然后唤醒这个工作线程
  4. 工作线程被唤醒后,从自己的线程队列中取出一个连接
  5. 解析请求、对hash表进行相应的操作,写入返回

整体看下来,感觉memcached的源码并没有什么高深的算法,用的都是很朴素的链表、底层的网络通信、线程间互斥、字符串解析等等,感觉除了网络通信比较繁琐以外,其他的地方都是一个刚毕业的计算机专业的学生可以也应该掌握的。通过对一些简单、实用的东西进行有效的组合,就可以获取一个功能更强大的合成体。

--EOF--

comments powered by Disqus