^v^

知行合一 止于至善

Ceph AsyncMessenger笔记

07 May 2018 » Ceph


// stack.h
NetworkStack // 单例,抽象,具体实现为PosixNetworkStack
    |-- workers: Worker // 抽象,具体实现为PosixWorker
    |    |-- center: EventCenter
    |        |-- driver: EventDriver // 抽象,具体实现为EpollDriver
    |        |    |-- events
    |        |    |-- epfd
    |        |    |-- size
    |        |-- owner // pthread_self 线程
    |        |-- // 外部事件,不会触发epoll,需要pipe唤醒epoll等待线程
    |        |-- notify_receive_fd  // 读管道
    |        |-- notify_send_fd  // 写管道,唤醒epoll等待线程
    |        |-- external_events: deque<EventCallbackRef>  // 外部事件
    |        |-- process_events // 线程入口函数,处理各种事件
    |-- threads // 和work绑定的线程,线程回调函数参考add_threads

全局数据结构。 NetworkStack为1单例; global_centers持有所有EventCenter实例;

EventCenter能够处理3类事件:File事件、外部事件、定时事件。File事件注册到Epoll,在process_event时调用wait获取事件。 外部事件和定时事件都不注册到Epoll,也在process_event中进行处理,也就是说3类事件在同一个线程中进行处理。 因为不注册到Epoll,所以这两类事件可能被Epoll wait阻塞。 针对外部事件,添加事件后会执行唤醒操作,跳过Epoll wait。唤醒操作通过Pip的两个注册到Epoll的文件来实现。

workers的数量为ms_async_op_thread(3)个。

AsyncMessenger
    |-- processor: Processor // 数量为1
    |    |-- worker: Worker
    |    |-- listen_handler: EventCallback // Processor::accept
    |    |-- listen_socket: ServerSocket  // 监听端口的socket
    |-- dispatch_queue: DispatchQueue
    |-- accepting_conns: set<AsyncConnectionRef>


AsyncConnection
    |-- worker
    |-- dispatch_queue
    |-- read_handler: C_handle_read
    |-- write_handler: C_handle_write
    |-- tick_handler: C_tick_wakeup
    |-- wakeup_handler: C_time_wakeup
    |-- recv_buf

OSD对Messenger的使用:

  1. 创建;
  2. set_policy
  3. bind
  4. start
  5. wait

bind

  1. 端口被占用问题,在给定范围内依次尝试;
  2. AsyncMessenger把bind操作转交给Processor的bind操作,最终由PosixWorker的listen函数完成。异步操作,OSD主线程将任务通过C_submit_event事件转给Worker线程来处理;
EventCenter::submit_to() => EventCenter::dispatch_event_external() =>
EventCenter::process_events() => Worker::listen()

// 将listen_socket注册给epoll,回调函数Processor::accept
Messenger::add_dispatcher_head() => AsyncMessenger::ready() => Processor::start()

总体架构

asyncmessenger

NetworkStack实现对底层Posix、RDMA和DPDK的抽象和封装。 维护一组全局Worker,Worker为一个工作线程,负责事件通知、消息读取等任务。例如,有新连接、连接有新数据等。

AsyncMessenger的Processor和AsyncConnection是Worker的主要使用者。 Processor主要负责AsyncMessenger中监听Socket,包括bind、accept等事宜; AsyncConnection为一个端到端的连接,从连接中读取数据,发送数据等事宜。 AsyncMessenger创建Processor或者AsyncMessenger时向NetworkStack请求一个Worker,NetworkStack根据当前Worker的负载情况选择一个负载最轻的Worker给AsyncMessenger,AsyncMessenger将申请到的Worker同Processor或者AsyncMessenger绑定在一起。后面对Processor或AsyncMessenger的操作都有对应的Worker来完成。

PosixWorker是Worker的一个具体实现。 每个Worker都有一个EventCenter,EventCenter维护了3中类型的事件:file事件、time事件和外部事件。 file事件 为可以由底层EventDriver来监听通知的事件,例如socket、pipe;EventDriver报告事件,EventCenter维护fd和事件回调函数的对应关系,从而在事件发生时调用对应的回调进行处理; time事件 是一个定时器,指定在未来的某个时间点执行的任务(可能会被阻塞); 外部事件 为需要交给Worker线程立即执行的任务。例如,在接收到新连接时,Worker1线程接收新连接、创建新连接后需要从新连接中读取数据,此时将读取数据的操作移交给和该连接绑定的Worker2执行。

每个Worker是一个工作线程,线程入口函数为process_events,用于处理上述3类事件。 大部分情况,process_events函数会阻塞在等待file事件上。这对于外部事件来说是不能接受的,因为外部事件需要立即处理。 为此,注册了一个pipe的file事件,当向Worker加入一个外部事件后,调用pipe的写fd,唤醒线程。

连接 AsyncConnection

Client端

client

Server端

建立连接

// listenfd的Worker线程:
// 选择一个load最小的Worker与新建的AsyncConnection绑定
Processor::accept() => AsyncMessenger::add_accept() => AsyncConnection::accept()

// conn的Worker线程:
// 根据Connection的状态处理请求,将socket加入到EventCenter
// Connection的初始状态为 STATE_ACCEPTING,后面根据ceph通信协议解析消息;
// STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
=> AsyncConnection::process()

连接状态为STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH时,读取完整消息并decode,通过AsyncMessenger的DispatcherQueue分发到消息的处理函数。同时,将连接状态设置为STATE_OPEN

读取数据

AsyncConnection
    |-- state_buffer: char*     // 读取到的数据
    |-- state_offset: uint64_t  // state_buffer的偏移量
    |-- recv_buff: char*  // 保存预读的数据
    |-- recv_start: uint64_t  // recv_buff中有效数据的开始位置
    |-- recv_end: uint64_t    // recv_buff中有效数据的结束位置

读取消息内容,参考read_until函数。 函数返回值如果为0,代表读取到给定大小的数据; 如果小于0,代表读取出错; 如果大于0,代表暂时没有数据可读,或则读到的数据量小于给定的长度,需要等到下次有数据可读时继续读取。 如果待读取内容小于 ms_tcp_prefetch_max_size(4K)提供预读功能,否则不提供预读功能。

发送数据

AsyncConnection
    |-- // int为优先级,每个优先级下对应一组消息
    |-- out_q: map<int, list<pair<bufferlist, Message*> > >
    |-- outcoming_bl: bufferlist


AsyncConnection::handle_write()

消息发送者调用AsyncConnection的send_message函数将消息放入到out_q队列后就返回,连接的Worker工作线程负责后续的发送操作。

SimpleMessenger

SimpleMessenger
    |-- accepter: Accepter(Thread)
    |    |-- listen_fd  // 监听socket
    |-- pipes: set<Pipe*>
        |-- sd: int     // client socket
        |-- recv_buf
        |-- read_thread: Reader  // 入口函数pipe::reader()
        |-- write_thread: Writer // 入口函数pipe::writer()
        |-- in_q: DispatchQueue

// 接收新连接
Accepter::entry() => SimpleMessenger::add_accept_pipe()

simple messenger

Related Posts