handle osd map

handle_osd_map 函数分析 (ceph.10.0)

void OSD::handle_osd_map(MOSDMap *m)
  • 检查message的来源,MOSDMap只可能来源于其他OSD或者Monitor,如果不是,则退出当前session并且返回。
  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
  if (session && !(session->entity_name.is_mon() || session->entity_name.is_osd())) {
    //not enough perms!
    dout(10) << "got osd map from Session " << session
             << " which we can't take maps from (not a mon or osd)" << dendl;
    m->put();
    session->put();
    return;
  }
  if (session)
    session->put();
  • 作用未知 (ojbecter作用未知)
service.objecter->handle_osd_map(m);
  • 检查MOSDMAP中最小的epoch(first)与最大的epoch(last). 如果当前OSDMAP中的epoch大于last, 则无需处理,退出
  epoch_t first = m->get_first();
  epoch_t last = m->get_last();
  dout(3) << "handle_osd_map epochs [" << first << "," << last << "], i have "
      << osdmap->get_epoch()
      << ", src has [" << m->oldest_map << "," << m->newest_map << "]"
      << dendl;

  logger->inc(l_osd_map);
  logger->inc(l_osd_mape, last - first + 1);
  if (first <= osdmap->get_epoch())
    logger->inc(l_osd_mape_dup, osdmap->get_epoch() - first + 1);

  // make sure there is something new, here, before we bother flushing the queues and such
  if (last <= osdmap->get_epoch()) {
    dout(10) << " no new maps here, dropping" << dendl;
    m->put();
    return;
  }
  • 一些case 直接向monitor请求osdmap (原因未知)
  // missing some?
  bool skip_maps = false;
  if (first > osdmap->get_epoch() + 1) {
    dout(10) << "handle_osd_map message skips epochs " << osdmap->get_epoch() + 1
         << ".." << (first-1) << dendl;
    if (m->oldest_map <= osdmap->get_epoch() + 1) {
      osdmap_subscribe(osdmap->get_epoch()+1, true);
      m->put();
      return;
    }
    // always try to get the full range of maps--as many as we can.  this
    //  1- is good to have
    //  2- is at present the only way to ensure that we get a *full* map as
    //     the first map!
    if (m->oldest_map < first) {
      osdmap_subscribe(m->oldest_map - 1, true);
      m->put();
      return;
    }
    skip_maps = true;
  }
  • 对所有的OSDMAP从first->last, 记录在OjbectStore并且存入到cache(pinned_maps)中
  ObjectStore::Transaction *_t = new ObjectStore::Transaction;
  ObjectStore::Transaction &t = *_t;

  // store new maps: queue for disk and put in the osdmap cache
  epoch_t start = MAX(osdmap->get_epoch() + 1, first);
  for (epoch_t e = start; e <= last; e++) {
    map<epoch_t,bufferlist>::iterator p;

    // handler full osdmap
    p = m->maps.find(e);
    if (p != m->maps.end()) {
      dout(10) << "handle_osd_map  got full map for epoch " << e << dendl;
      OSDMap *o = new OSDMap;
      bufferlist& bl = p->second;

      o->decode(bl);

      ghobject_t fulloid = get_osdmap_pobject_name(e);
      t.write(coll_t::meta(), fulloid, 0, bl.length(), bl);
      pin_map_bl(e, bl);

      // add into osdmap cache
      pinned_maps.push_back(add_map(o));

      got_full_map(e);
      continue;
    }

    // handle incremental osdmap 
    p = m->incremental_maps.find(e);
    if (p != m->incremental_maps.end()) {
      dout(10) << "handle_osd_map  got inc map for epoch " << e << dendl;
      bufferlist& bl = p->second;
      ghobject_t oid = get_inc_osdmap_pobject_name(e);
      t.write(coll_t::meta(), oid, 0, bl.length(), bl);
      pin_map_inc_bl(e, bl);

      OSDMap *o = new OSDMap;
      if (e > 1) {
    bufferlist obl;
    get_map_bl(e - 1, obl);
    o->decode(obl);
      }

      OSDMap::Incremental inc;
      bufferlist::iterator p = bl.begin();
      inc.decode(p);
      if (o->apply_incremental(inc) < 0) {
    derr << "ERROR: bad fsid?  i have " << osdmap->get_fsid() << " and inc has " << inc.fsid << dendl;
    assert(0 == "bad fsid");
      }

      bufferlist fbl;
      o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);

      bool injected_failure = false;
      if (g_conf->osd_inject_bad_map_crc_probability > 0 &&
      (rand() % 10000) < g_conf->osd_inject_bad_map_crc_probability*10000.0) {
    derr << __func__ << " injecting map crc failure" << dendl;
    injected_failure = true;
      }

      if ((inc.have_crc && o->get_crc() != inc.full_crc) || injected_failure) {
    dout(2) << "got incremental " << e
        << " but failed to encode full with correct crc; requesting"
        << dendl;
    clog->warn() << "failed to encode map e" << e << " with expected crc\n";
    dout(20) << "my encoded map was:\n";
    fbl.hexdump(*_dout);
    *_dout << dendl;
    delete o;
    request_full_map(e, last);
    last = e - 1;
    break;
      }
      got_full_map(e);

      // add into osdmap cache 
      ghobject_t fulloid = get_osdmap_pobject_name(e);
      t.write(coll_t::meta(), fulloid, 0, fbl.length(), fbl);
      pin_map_bl(e, fbl);
      pinned_maps.push_back(add_map(o));
      continue;
    }

    assert(0 == "MOSDMap lied about what maps it had?");
  }
  • 功能未知(TODO)

    // even if this map isn't from a mon, we may have satisfied our subscription
    monc->sub_got("osdmap", last);
    
  • 更新superblock of OSDService

    // update the osdmap of superblock
    if (superblock.oldest_map) {
      int num = 0;
      epoch_t min(
        MIN(m->oldest_map,
        service.map_cache.cached_key_lower_bound()));
      for (epoch_t e = superblock.oldest_map; e < min; ++e) {
        dout(20) << " removing old osdmap epoch " << e << dendl;
        t.remove(coll_t::meta(), get_osdmap_pobject_name(e));
        t.remove(coll_t::meta(), get_inc_osdmap_pobject_name(e));
        superblock.oldest_map = e+1;
        num++;
        if (num >= cct->_conf->osd_target_transaction_size &&
        (uint64_t)num > (last - first))  // make sure we at least keep pace with incoming maps
      break;
      }
    }
    
    if (!superblock.oldest_map || skip_maps)
      superblock.oldest_map = first;
    superblock.newest_map = last;
    
  • advance all osdmaps in MOSDMAP(advance功能在另一文件说明)
    这里需要注意函数await_reserved_maps(), 功能暂时未知

// advance through the new maps
  for (epoch_t cur = start; cur <= superblock.newest_map; cur++) {
    dout(10) << " advance to epoch " << cur << " (<= newest " << superblock.newest_map << ")" << dendl;

    OSDMapRef newmap = get_map(cur);
    assert(newmap);  // we just cached it above!

    // start blacklisting messages sent to peers that go down.
    service.pre_publish_map(newmap);

    // kill connections to newly down osds
    // find the down osds 
    bool waited_for_reservations = false;
    set<int> old;
    osdmap->get_all_osds(old);
    for (set<int>::iterator p = old.begin(); p != old.end(); ++p) {
      // find the osd in the osd  but not in newmap  or  down.
      if (*p != whoami &&
      osdmap->have_inst(*p) &&                        // in old map
      (!newmap->exists(*p) || !newmap->is_up(*p))) {  // but not the new one
        if (!waited_for_reservations) {
         // if @map_reservations have some older osdmap, block and wait
          service.await_reserved_maps();
          waited_for_reservations = true;
        }
    note_down_osd(*p);
      }
    }

    // update the current osdmap 
    osdmap = newmap;

    superblock.current_epoch = cur;

    advance_map();
    had_map_since = ceph_clock_now(cct);
  }
  • 功能未知(bind_epoch 以及objecter处理)
  epoch_t _bind_epoch = service.get_bind_epoch();
  if (osdmap->is_up(whoami) &&
      osdmap->get_addr(whoami) == client_messenger->get_myaddr() &&
      _bind_epoch < osdmap->get_up_from(whoami)) {

    if (is_booting()) {
      dout(1) << "state: booting -> active" << dendl;
      set_state(STATE_ACTIVE);

      // set incarnation so that osd_reqid_t's we generate for our
      // objecter requests are unique across restarts.
      service.objecter->set_client_incarnation(osdmap->get_epoch());
    }
  }
  • 处理最新的osdmap中各种address不一致问题,可以重新绑定(rebind)
  bool do_shutdown = false;
  bool do_restart = false;
  if (osdmap->get_epoch() > 0 &&
      is_active()) {
    if (!osdmap->exists(whoami)) {
      dout(0) << "map says i do not exist.  shutting down." << dendl;
      do_shutdown = true;   // don't call shutdown() while we have everything paused
    } else if (!osdmap->is_up(whoami) ||
           !osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr()) ||
           !osdmap->get_cluster_addr(whoami).probably_equals(cluster_messenger->get_myaddr()) ||
           !osdmap->get_hb_back_addr(whoami).probably_equals(hb_back_server_messenger->get_myaddr()) ||
           (osdmap->get_hb_front_addr(whoami) != entity_addr_t() &&
                !osdmap->get_hb_front_addr(whoami).probably_equals(hb_front_server_messenger->get_myaddr()))) {
       //
      if (!osdmap->is_up(whoami)) {
    if (service.is_preparing_to_stop() || service.is_stopping()) {
      service.got_stop_ack();
    } else {
      clog->warn() << "map e" << osdmap->get_epoch()
              << " wrongly marked me down";
    }
      }
      else if (!osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr()))
    clog->error() << "map e" << osdmap->get_epoch()
            << " had wrong client addr (" << osdmap->get_addr(whoami)
             << " != my " << client_messenger->get_myaddr() << ")";
      else if (!osdmap->get_cluster_addr(whoami).probably_equals(cluster_messenger->get_myaddr()))
    clog->error() << "map e" << osdmap->get_epoch()
            << " had wrong cluster addr (" << osdmap->get_cluster_addr(whoami)
             << " != my " << cluster_messenger->get_myaddr() << ")";
      else if (!osdmap->get_hb_back_addr(whoami).probably_equals(hb_back_server_messenger->get_myaddr()))
    clog->error() << "map e" << osdmap->get_epoch()
            << " had wrong hb back addr (" << osdmap->get_hb_back_addr(whoami)
             << " != my " << hb_back_server_messenger->get_myaddr() << ")";
      else if (osdmap->get_hb_front_addr(whoami) != entity_addr_t() &&
               !osdmap->get_hb_front_addr(whoami).probably_equals(hb_front_server_messenger->get_myaddr()))
    clog->error() << "map e" << osdmap->get_epoch()
            << " had wrong hb front addr (" << osdmap->get_hb_front_addr(whoami)
             << " != my " << hb_front_server_messenger->get_myaddr() << ")";

      if (!service.is_stopping()) {
        epoch_t up_epoch = 0;
        epoch_t bind_epoch = osdmap->get_epoch();
        service.set_epochs(NULL,&up_epoch, &bind_epoch);
    do_restart = true;

    // now, the osd node is not healthy.
    start_waiting_for_healthy();

    set<int> avoid_ports;
    avoid_ports.insert(cluster_messenger->get_myaddr().get_port());
    avoid_ports.insert(hb_back_server_messenger->get_myaddr().get_port());
    avoid_ports.insert(hb_front_server_messenger->get_myaddr().get_port());

    // rebind
    // if rebind failed, do_restart
    int r = cluster_messenger->rebind(avoid_ports);
    if (r != 0)
      do_shutdown = true;  // FIXME: do_restart?

    r = hb_back_server_messenger->rebind(avoid_ports);
    if (r != 0)
      do_shutdown = true;  // FIXME: do_restart?

    r = hb_front_server_messenger->rebind(avoid_ports);
    if (r != 0)
      do_shutdown = true;  // FIXME: do_restart?

    hbclient_messenger->mark_down_all();

    reset_heartbeat_peers();
      }
    }
  }
  • write superblock and commit
  // note in the superblock that we were clean thru the prior epoch
  epoch_t boot_epoch = service.get_boot_epoch();
  if (boot_epoch && boot_epoch >= superblock.mounted) {
    superblock.mounted = boot_epoch;
    superblock.clean_thru = osdmap->get_epoch();
  }

  // superblock and commit
  write_superblock(t);
  store->queue_transaction(
    service.meta_osr.get(),
    _t,
    new C_OnMapApply(&service, _t, pinned_maps, osdmap->get_epoch()),
    0, 0);
  service.publish_superblock(superblock);

  map_lock.put_write();

  check_osdmap_features(store);
  • comsume_map (这里会触发所有的PG peering过程,会导致数据迁移)

    // yay!
    consume_map();
    
  • active map (功能在另一文件中说明)

if (is_active() || is_waiting_for_healthy())
    maybe_update_heartbeat_peers();

  if (!is_active()) {
    dout(10) << " not yet active; waiting for peering wq to drain" << dendl;
    peering_wq.drain();
  } else {
    activate_map();
  }

  if (m->newest_map && m->newest_map > last) {
    dout(10) << " msg say newest map is " << m->newest_map << ", requesting more" << dendl;
    osdmap_subscribe(osdmap->get_epoch()+1, true);
  }
  else if (is_booting()) {
    start_boot();  // retry
  }
  else if (do_restart)
    start_boot();

  osd_lock.Unlock();
  if (do_shutdown)
    shutdown();
  osd_lock.Lock();

  m->put();