handle osd map
handle_osd_map 函数分析 (ceph.10.0)
void OSD::handle_osd_map(MOSDMap *m)
- 检查message的来源,MOSDMap只可能来源于其他OSD或者Monitor,如果不是,则退出当前session并且返回。
Session *session = static_cast<Session *>(m->get_connection()->get_priv());
if (session && !(session->entity_name.is_mon() || session->entity_name.is_osd())) {
//not enough perms!
dout(10) << "got osd map from Session " << session
<< " which we can't take maps from (not a mon or osd)" << dendl;
m->put();
session->put();
return;
}
if (session)
session->put();
- 作用未知 (ojbecter作用未知)
service.objecter->handle_osd_map(m);
- 检查MOSDMAP中最小的epoch(first)与最大的epoch(last). 如果当前OSDMAP中的epoch大于last, 则无需处理,退出。
epoch_t first = m->get_first();
epoch_t last = m->get_last();
dout(3) << "handle_osd_map epochs [" << first << "," << last << "], i have "
<< osdmap->get_epoch()
<< ", src has [" << m->oldest_map << "," << m->newest_map << "]"
<< dendl;
logger->inc(l_osd_map);
logger->inc(l_osd_mape, last - first + 1);
if (first <= osdmap->get_epoch())
logger->inc(l_osd_mape_dup, osdmap->get_epoch() - first + 1);
// make sure there is something new, here, before we bother flushing the queues and such
if (last <= osdmap->get_epoch()) {
dout(10) << " no new maps here, dropping" << dendl;
m->put();
return;
}
- 一些case 直接向monitor请求osdmap (原因未知)
// missing some?
bool skip_maps = false;
if (first > osdmap->get_epoch() + 1) {
dout(10) << "handle_osd_map message skips epochs " << osdmap->get_epoch() + 1
<< ".." << (first-1) << dendl;
if (m->oldest_map <= osdmap->get_epoch() + 1) {
osdmap_subscribe(osdmap->get_epoch()+1, true);
m->put();
return;
}
// always try to get the full range of maps--as many as we can. this
// 1- is good to have
// 2- is at present the only way to ensure that we get a *full* map as
// the first map!
if (m->oldest_map < first) {
osdmap_subscribe(m->oldest_map - 1, true);
m->put();
return;
}
skip_maps = true;
}
- 对所有的OSDMAP从first->last, 记录在OjbectStore并且存入到cache(pinned_maps)中。
ObjectStore::Transaction *_t = new ObjectStore::Transaction;
ObjectStore::Transaction &t = *_t;
// store new maps: queue for disk and put in the osdmap cache
epoch_t start = MAX(osdmap->get_epoch() + 1, first);
for (epoch_t e = start; e <= last; e++) {
map<epoch_t,bufferlist>::iterator p;
// handler full osdmap
p = m->maps.find(e);
if (p != m->maps.end()) {
dout(10) << "handle_osd_map got full map for epoch " << e << dendl;
OSDMap *o = new OSDMap;
bufferlist& bl = p->second;
o->decode(bl);
ghobject_t fulloid = get_osdmap_pobject_name(e);
t.write(coll_t::meta(), fulloid, 0, bl.length(), bl);
pin_map_bl(e, bl);
// add into osdmap cache
pinned_maps.push_back(add_map(o));
got_full_map(e);
continue;
}
// handle incremental osdmap
p = m->incremental_maps.find(e);
if (p != m->incremental_maps.end()) {
dout(10) << "handle_osd_map got inc map for epoch " << e << dendl;
bufferlist& bl = p->second;
ghobject_t oid = get_inc_osdmap_pobject_name(e);
t.write(coll_t::meta(), oid, 0, bl.length(), bl);
pin_map_inc_bl(e, bl);
OSDMap *o = new OSDMap;
if (e > 1) {
bufferlist obl;
get_map_bl(e - 1, obl);
o->decode(obl);
}
OSDMap::Incremental inc;
bufferlist::iterator p = bl.begin();
inc.decode(p);
if (o->apply_incremental(inc) < 0) {
derr << "ERROR: bad fsid? i have " << osdmap->get_fsid() << " and inc has " << inc.fsid << dendl;
assert(0 == "bad fsid");
}
bufferlist fbl;
o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
bool injected_failure = false;
if (g_conf->osd_inject_bad_map_crc_probability > 0 &&
(rand() % 10000) < g_conf->osd_inject_bad_map_crc_probability*10000.0) {
derr << __func__ << " injecting map crc failure" << dendl;
injected_failure = true;
}
if ((inc.have_crc && o->get_crc() != inc.full_crc) || injected_failure) {
dout(2) << "got incremental " << e
<< " but failed to encode full with correct crc; requesting"
<< dendl;
clog->warn() << "failed to encode map e" << e << " with expected crc\n";
dout(20) << "my encoded map was:\n";
fbl.hexdump(*_dout);
*_dout << dendl;
delete o;
request_full_map(e, last);
last = e - 1;
break;
}
got_full_map(e);
// add into osdmap cache
ghobject_t fulloid = get_osdmap_pobject_name(e);
t.write(coll_t::meta(), fulloid, 0, fbl.length(), fbl);
pin_map_bl(e, fbl);
pinned_maps.push_back(add_map(o));
continue;
}
assert(0 == "MOSDMap lied about what maps it had?");
}
功能未知(TODO)
// even if this map isn't from a mon, we may have satisfied our subscription monc->sub_got("osdmap", last);
更新superblock of OSDService
// update the osdmap of superblock if (superblock.oldest_map) { int num = 0; epoch_t min( MIN(m->oldest_map, service.map_cache.cached_key_lower_bound())); for (epoch_t e = superblock.oldest_map; e < min; ++e) { dout(20) << " removing old osdmap epoch " << e << dendl; t.remove(coll_t::meta(), get_osdmap_pobject_name(e)); t.remove(coll_t::meta(), get_inc_osdmap_pobject_name(e)); superblock.oldest_map = e+1; num++; if (num >= cct->_conf->osd_target_transaction_size && (uint64_t)num > (last - first)) // make sure we at least keep pace with incoming maps break; } } if (!superblock.oldest_map || skip_maps) superblock.oldest_map = first; superblock.newest_map = last;
advance all osdmaps in MOSDMAP(advance功能在另一文件说明)
这里需要注意函数await_reserved_maps(), 功能暂时未知
// advance through the new maps
for (epoch_t cur = start; cur <= superblock.newest_map; cur++) {
dout(10) << " advance to epoch " << cur << " (<= newest " << superblock.newest_map << ")" << dendl;
OSDMapRef newmap = get_map(cur);
assert(newmap); // we just cached it above!
// start blacklisting messages sent to peers that go down.
service.pre_publish_map(newmap);
// kill connections to newly down osds
// find the down osds
bool waited_for_reservations = false;
set<int> old;
osdmap->get_all_osds(old);
for (set<int>::iterator p = old.begin(); p != old.end(); ++p) {
// find the osd in the osd but not in newmap or down.
if (*p != whoami &&
osdmap->have_inst(*p) && // in old map
(!newmap->exists(*p) || !newmap->is_up(*p))) { // but not the new one
if (!waited_for_reservations) {
// if @map_reservations have some older osdmap, block and wait
service.await_reserved_maps();
waited_for_reservations = true;
}
note_down_osd(*p);
}
}
// update the current osdmap
osdmap = newmap;
superblock.current_epoch = cur;
advance_map();
had_map_since = ceph_clock_now(cct);
}
- 功能未知(bind_epoch 以及objecter处理)
epoch_t _bind_epoch = service.get_bind_epoch();
if (osdmap->is_up(whoami) &&
osdmap->get_addr(whoami) == client_messenger->get_myaddr() &&
_bind_epoch < osdmap->get_up_from(whoami)) {
if (is_booting()) {
dout(1) << "state: booting -> active" << dendl;
set_state(STATE_ACTIVE);
// set incarnation so that osd_reqid_t's we generate for our
// objecter requests are unique across restarts.
service.objecter->set_client_incarnation(osdmap->get_epoch());
}
}
- 处理最新的osdmap中各种address不一致问题,可以重新绑定(rebind)
bool do_shutdown = false;
bool do_restart = false;
if (osdmap->get_epoch() > 0 &&
is_active()) {
if (!osdmap->exists(whoami)) {
dout(0) << "map says i do not exist. shutting down." << dendl;
do_shutdown = true; // don't call shutdown() while we have everything paused
} else if (!osdmap->is_up(whoami) ||
!osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr()) ||
!osdmap->get_cluster_addr(whoami).probably_equals(cluster_messenger->get_myaddr()) ||
!osdmap->get_hb_back_addr(whoami).probably_equals(hb_back_server_messenger->get_myaddr()) ||
(osdmap->get_hb_front_addr(whoami) != entity_addr_t() &&
!osdmap->get_hb_front_addr(whoami).probably_equals(hb_front_server_messenger->get_myaddr()))) {
//
if (!osdmap->is_up(whoami)) {
if (service.is_preparing_to_stop() || service.is_stopping()) {
service.got_stop_ack();
} else {
clog->warn() << "map e" << osdmap->get_epoch()
<< " wrongly marked me down";
}
}
else if (!osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr()))
clog->error() << "map e" << osdmap->get_epoch()
<< " had wrong client addr (" << osdmap->get_addr(whoami)
<< " != my " << client_messenger->get_myaddr() << ")";
else if (!osdmap->get_cluster_addr(whoami).probably_equals(cluster_messenger->get_myaddr()))
clog->error() << "map e" << osdmap->get_epoch()
<< " had wrong cluster addr (" << osdmap->get_cluster_addr(whoami)
<< " != my " << cluster_messenger->get_myaddr() << ")";
else if (!osdmap->get_hb_back_addr(whoami).probably_equals(hb_back_server_messenger->get_myaddr()))
clog->error() << "map e" << osdmap->get_epoch()
<< " had wrong hb back addr (" << osdmap->get_hb_back_addr(whoami)
<< " != my " << hb_back_server_messenger->get_myaddr() << ")";
else if (osdmap->get_hb_front_addr(whoami) != entity_addr_t() &&
!osdmap->get_hb_front_addr(whoami).probably_equals(hb_front_server_messenger->get_myaddr()))
clog->error() << "map e" << osdmap->get_epoch()
<< " had wrong hb front addr (" << osdmap->get_hb_front_addr(whoami)
<< " != my " << hb_front_server_messenger->get_myaddr() << ")";
if (!service.is_stopping()) {
epoch_t up_epoch = 0;
epoch_t bind_epoch = osdmap->get_epoch();
service.set_epochs(NULL,&up_epoch, &bind_epoch);
do_restart = true;
// now, the osd node is not healthy.
start_waiting_for_healthy();
set<int> avoid_ports;
avoid_ports.insert(cluster_messenger->get_myaddr().get_port());
avoid_ports.insert(hb_back_server_messenger->get_myaddr().get_port());
avoid_ports.insert(hb_front_server_messenger->get_myaddr().get_port());
// rebind
// if rebind failed, do_restart
int r = cluster_messenger->rebind(avoid_ports);
if (r != 0)
do_shutdown = true; // FIXME: do_restart?
r = hb_back_server_messenger->rebind(avoid_ports);
if (r != 0)
do_shutdown = true; // FIXME: do_restart?
r = hb_front_server_messenger->rebind(avoid_ports);
if (r != 0)
do_shutdown = true; // FIXME: do_restart?
hbclient_messenger->mark_down_all();
reset_heartbeat_peers();
}
}
}
- write superblock and commit
// note in the superblock that we were clean thru the prior epoch
epoch_t boot_epoch = service.get_boot_epoch();
if (boot_epoch && boot_epoch >= superblock.mounted) {
superblock.mounted = boot_epoch;
superblock.clean_thru = osdmap->get_epoch();
}
// superblock and commit
write_superblock(t);
store->queue_transaction(
service.meta_osr.get(),
_t,
new C_OnMapApply(&service, _t, pinned_maps, osdmap->get_epoch()),
0, 0);
service.publish_superblock(superblock);
map_lock.put_write();
check_osdmap_features(store);
comsume_map (这里会触发所有的PG peering过程,会导致数据迁移)
// yay! consume_map();
active map (功能在另一文件中说明)
if (is_active() || is_waiting_for_healthy())
maybe_update_heartbeat_peers();
if (!is_active()) {
dout(10) << " not yet active; waiting for peering wq to drain" << dendl;
peering_wq.drain();
} else {
activate_map();
}
if (m->newest_map && m->newest_map > last) {
dout(10) << " msg say newest map is " << m->newest_map << ", requesting more" << dendl;
osdmap_subscribe(osdmap->get_epoch()+1, true);
}
else if (is_booting()) {
start_boot(); // retry
}
else if (do_restart)
start_boot();
osd_lock.Unlock();
if (do_shutdown)
shutdown();
osd_lock.Lock();
m->put();