小編給大家分享一下如何實現(xiàn)ceph SimpleMessenger模塊消息的接收,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
創(chuàng)新互聯(lián)自2013年起,公司以成都網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)、系統(tǒng)開發(fā)、網(wǎng)絡(luò)推廣、文化傳媒、企業(yè)宣傳、平面廣告設(shè)計等為主要業(yè)務(wù),適用行業(yè)近百種。服務(wù)企業(yè)客戶千余家,涉及國內(nèi)多個省份客戶。擁有多年網(wǎng)站建設(shè)開發(fā)經(jīng)驗。為企業(yè)提供專業(yè)的網(wǎng)站建設(shè)、創(chuàng)意設(shè)計、宣傳推廣等服務(wù)。 通過專業(yè)的設(shè)計、獨特的風(fēng)格,為不同客戶提供各種風(fēng)格的特色服務(wù)。
OSD服務(wù)端消息的接收起始于OSD::init()中的messenger::add_dispatcher_head(Dispatcher *d)函數(shù)
|- 358 void add_dispatcher_head(Dispatcher *d) { || 359 bool first = dispatchers.empty(); || 360 dispatchers.push_front(d); || 361 if (d->ms_can_fast_dispatch_any()) || 362 fast_dispatchers.push_front(d); || 363 if (first) || 364 ready(); //如果dispatcher list空,啟動SimpleMessenger::ready,不為空證明SimpleMessenger已經(jīng)啟動了 || 365 }
在SimpleMessenger::ready()中,啟動DispatchQueue等待mqueue,如果綁定了端口就啟動 accepter接收線程
76 void SimpleMessenger::ready() - 77 { | 78 ldout(cct,10) << "ready " << get_myaddr() << dendl; | 79 dispatch_queue.start(); //啟動DispatchQueue,等待mqueue | 80 | 81 lock.Lock(); | 82 if (did_bind) | 83 accepter.start(); | 84 lock.Unlock(); | 85 }
Accepter是Thread的繼承類,Accepter::start()最終調(diào)用Accepter::entry(),在entry中 accept并把接收到的sd加入到Pipe類中
void *Accepter::entry() { ... struct pollfd pfd; pfd.fd = listen_sd; pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP; while (!done) { int r = poll(&pfd, 1, -1); if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP)) break; // accept entity_addr_t addr; socklen_t slen = sizeof(addr.ss_addr()); int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen); if (sd >= 0) { errors = 0; ldout(msgr->cct,10) << "accepted incoming on sd " << sd << dendl; msgr->add_accept_pipe(sd); //注冊一個pipe,啟動讀線程,從該sd中讀取數(shù)據(jù) } else { ldout(msgr->cct,0) << "accepter no incoming connection? sd = " << sd << " errno " << errno << " " << cpp_strerror(errno) << dendl; if (++errors > 4) break; } } ... return 0;
在SimpleMessenger::add_accept_pipe(int sd)中,申請一個Pipe類并把sd加入到Pipe中,開始Pipe::start_reader()
340 Pipe *SimpleMessenger::add_accept_pipe(int sd) - 341 { | 342 lock.Lock(); | 343 Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL); | 344 p->sd = sd; | 345 p->pipe_lock.Lock(); | 346 p->start_reader(); | 347 p->pipe_lock.Unlock(); | 348 pipes.insert(p); | 349 accepting_pipes.insert(p); | 350 lock.Unlock(); | 351 return p; | 352 }
Pipe類內(nèi)部有一個Reader和Writer線程類,Pipe::start_reader()啟動Pipe::Reader::entry(),最終啟動Pipe::reader函數(shù)
134 void Pipe::start_reader() - 135 { | 136 assert(pipe_lock.is_locked()); | 137 assert(!reader_running); |- 138 if (reader_needs_join) { || 139 reader_thread.join(); || 140 reader_needs_join = false; || 141 } | 142 reader_running = true; | 143 reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes); | 144 }
|- 48 class Reader : public Thread { || 49 Pipe *pipe; || 50 public: || 51 explicit Reader(Pipe *p) : pipe(p) {} || 52 void *entry() { pipe->reader(); return 0; } || 53 } reader_thread;
在Pipe::reader函數(shù)中根據(jù)tag接收不同類型的消息,如果是CEPH_MSGR_TAG_MSG類型消息調(diào)用read_message接收消息,并把消息加入到mqueue中
void Pipe::reader() { pipe_lock.Lock(); if (state == STATE_ACCEPTING) { accept(); //第一次進入此函數(shù)處理 assert(pipe_lock.is_locked()); } // loop. while (state != STATE_CLOSED && state != STATE_CONNECTING) { assert(pipe_lock.is_locked()); ...... ...... else if (tag == CEPH_MSGR_TAG_MSG) { ldout(msgr->cct,20) << "reader got MSG" << dendl; Message *m = 0; int r = read_message(&m, auth_handler.get()); pipe_lock.Lock(); if (!m) { if (r < 0) fault(true); continue; } ...... ...... ...... // note last received message. in_seq = m->get_seq(); cond.Signal(); // wake up writer, to ack this ldout(msgr->cct,10) << "reader got message " << m->get_seq() << " " << m << " " << *m << dendl; in_q->fast_preprocess(m); //mds 、mon不會進入此函數(shù),預(yù)處理 if (delay_thread) { utime_t release; if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { release = m->get_recv_stamp(); release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; } delay_thread->queue(release, m); } else { if (in_q->can_fast_dispatch(m)) { reader_dispatching = true; pipe_lock.Unlock(); in_q->fast_dispatch(m); pipe_lock.Lock(); reader_dispatching = false; if (state == STATE_CLOSED || notify_on_dispatch_done) { // there might be somebody waiting notify_on_dispatch_done = false; cond.Signal(); } } else { //mds進入此else in_q->enqueue(m, m->get_priority(), conn_id); //把接收到的messenger加入到mqueue中 } } } ...... ...... } // reap? reader_running = false; reader_needs_join = true; unlock_maybe_reap(); ldout(msgr->cct,10) << "reader done" << dendl; }
在Pipe::DispatchQueue::enqueue函數(shù)中加入到mqueue中
void DispatchQueue::enqueue(Message *m, int priority, uint64_t id) { Mutex::Locker l(lock); ldout(cct,20) << "queue " << m << " prio " << priority << dendl; add_arrival(m); if (priority >= CEPH_MSG_PRIO_LOW) { mqueue.enqueue_strict( id, priority, QueueItem(m)); } else { mqueue.enqueue( id, priority, m->get_cost(), QueueItem(m)); } cond.Signal(); //喚醒dispatch_queue.start() 啟動的dispatchThread,進入entry進行處理 }
看完了這篇文章,相信你對“如何實現(xiàn)ceph SimpleMessenger模塊消息的接收”有了一定的了解,如果想了解更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!
當(dāng)前文章:如何實現(xiàn)cephSimpleMessenger模塊消息的接收
網(wǎng)站鏈接:http://aaarwkj.com/article2/gipjoc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站排名、手機網(wǎng)站建設(shè)、網(wǎng)站設(shè)計、網(wǎng)站建設(shè)、商城網(wǎng)站、軟件開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)