// stack.h
NetworkStack // 单例,抽象,具体实现为PosixNetworkStack
|-- workers: Worker // 抽象,具体实现为PosixWorker
| |-- center: EventCenter
| |-- driver: EventDriver // 抽象,具体实现为EpollDriver
| | |-- events
| | |-- epfd
| | |-- size
| |-- owner // pthread_self 线程
| |-- // 外部事件,不会触发epoll,需要pipe唤醒epoll等待线程
| |-- notify_receive_fd // 读管道
| |-- notify_send_fd // 写管道,唤醒epoll等待线程
| |-- external_events: deque<EventCallbackRef> // 外部事件
| |-- process_events // 线程入口函数,处理各种事件
|-- threads // 和work绑定的线程,线程回调函数参考add_threads
全局数据结构。 NetworkStack为1单例; global_centers
持有所有EventCenter实例;
EventCenter能够处理3类事件:File事件、外部事件、定时事件。File事件注册到Epoll,在process_event
时调用wait获取事件。 外部事件和定时事件都不注册到Epoll,也在process_event
中进行处理,也就是说3类事件在同一个线程中进行处理。 因为不注册到Epoll,所以这两类事件可能被Epoll wait阻塞。 针对外部事件,添加事件后会执行唤醒操作,跳过Epoll wait。唤醒操作通过Pip的两个注册到Epoll的文件来实现。
workers的数量为ms_async_op_thread
(3)个。
AsyncMessenger
|-- processor: Processor // 数量为1
| |-- worker: Worker
| |-- listen_handler: EventCallback // Processor::accept
| |-- listen_socket: ServerSocket // 监听端口的socket
|-- dispatch_queue: DispatchQueue
|-- accepting_conns: set<AsyncConnectionRef>
AsyncConnection
|-- worker
|-- dispatch_queue
|-- read_handler: C_handle_read
|-- write_handler: C_handle_write
|-- tick_handler: C_tick_wakeup
|-- wakeup_handler: C_time_wakeup
|-- recv_buf
OSD对Messenger的使用:
- 创建;
set_policy
- bind
- start
- wait
bind
- 端口被占用问题,在给定范围内依次尝试;
- AsyncMessenger把bind操作转交给Processor的bind操作,最终由PosixWorker的
listen
函数完成。异步操作,OSD主线程将任务通过C_submit_event
事件转给Worker线程来处理;
EventCenter::submit_to() => EventCenter::dispatch_event_external() =>
EventCenter::process_events() => Worker::listen()
// 将listen_socket注册给epoll,回调函数Processor::accept
Messenger::add_dispatcher_head() => AsyncMessenger::ready() => Processor::start()
总体架构
NetworkStack实现对底层Posix、RDMA和DPDK的抽象和封装。 维护一组全局Worker,Worker为一个工作线程,负责事件通知、消息读取等任务。例如,有新连接、连接有新数据等。
AsyncMessenger的Processor和AsyncConnection是Worker的主要使用者。 Processor主要负责AsyncMessenger中监听Socket,包括bind、accept等事宜; AsyncConnection为一个端到端的连接,从连接中读取数据,发送数据等事宜。 AsyncMessenger创建Processor或者AsyncMessenger时向NetworkStack请求一个Worker,NetworkStack根据当前Worker的负载情况选择一个负载最轻的Worker给AsyncMessenger,AsyncMessenger将申请到的Worker同Processor或者AsyncMessenger绑定在一起。后面对Processor或AsyncMessenger的操作都有对应的Worker来完成。
PosixWorker是Worker的一个具体实现。 每个Worker都有一个EventCenter,EventCenter维护了3中类型的事件:file事件、time事件和外部事件。 file事件 为可以由底层EventDriver来监听通知的事件,例如socket、pipe;EventDriver报告事件,EventCenter维护fd和事件回调函数的对应关系,从而在事件发生时调用对应的回调进行处理; time事件 是一个定时器,指定在未来的某个时间点执行的任务(可能会被阻塞); 外部事件 为需要交给Worker线程立即执行的任务。例如,在接收到新连接时,Worker1线程接收新连接、创建新连接后需要从新连接中读取数据,此时将读取数据的操作移交给和该连接绑定的Worker2执行。
每个Worker是一个工作线程,线程入口函数为process_events
,用于处理上述3类事件。 大部分情况,process_events
函数会阻塞在等待file事件上。这对于外部事件来说是不能接受的,因为外部事件需要立即处理。 为此,注册了一个pipe的file事件,当向Worker加入一个外部事件后,调用pipe的写fd,唤醒线程。
连接 AsyncConnection
Client端
Server端
建立连接
// listenfd的Worker线程:
// 选择一个load最小的Worker与新建的AsyncConnection绑定
Processor::accept() => AsyncMessenger::add_accept() => AsyncConnection::accept()
// conn的Worker线程:
// 根据Connection的状态处理请求,将socket加入到EventCenter
// Connection的初始状态为 STATE_ACCEPTING,后面根据ceph通信协议解析消息;
// STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
=> AsyncConnection::process()
连接状态为STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
时,读取完整消息并decode,通过AsyncMessenger的DispatcherQueue分发到消息的处理函数。同时,将连接状态设置为STATE_OPEN
。
读取数据
AsyncConnection
|-- state_buffer: char* // 读取到的数据
|-- state_offset: uint64_t // state_buffer的偏移量
|-- recv_buff: char* // 保存预读的数据
|-- recv_start: uint64_t // recv_buff中有效数据的开始位置
|-- recv_end: uint64_t // recv_buff中有效数据的结束位置
读取消息内容,参考read_until
函数。 函数返回值如果为0,代表读取到给定大小的数据; 如果小于0,代表读取出错; 如果大于0,代表暂时没有数据可读,或则读到的数据量小于给定的长度,需要等到下次有数据可读时继续读取。 如果待读取内容小于 ms_tcp_prefetch_max_size
(4K)提供预读功能,否则不提供预读功能。
发送数据
AsyncConnection
|-- // int为优先级,每个优先级下对应一组消息
|-- out_q: map<int, list<pair<bufferlist, Message*> > >
|-- outcoming_bl: bufferlist
AsyncConnection::handle_write()
消息发送者调用AsyncConnection的send_message
函数将消息放入到out_q
队列后就返回,连接的Worker工作线程负责后续的发送操作。
SimpleMessenger
SimpleMessenger
|-- accepter: Accepter(Thread)
| |-- listen_fd // 监听socket
|-- pipes: set<Pipe*>
|-- sd: int // client socket
|-- recv_buf
|-- read_thread: Reader // 入口函数pipe::reader()
|-- write_thread: Writer // 入口函数pipe::writer()
|-- in_q: DispatchQueue
// 接收新连接
Accepter::entry() => SimpleMessenger::add_accept_pipe()