关于gc的配置项

  • rgw gc max objs: gc hint obj个数 (默认值:32)
  • rgw gc obj min wait: 对象被gc处理前最少等待时间 (默认值: 7200s)
  • rgw gc processor max time: 锁定gc hint obj的超时时间 (默认值: 3600s)
  • rgw gc processor period: gc进程的运行周期 (默认值: 3600s)

查看运行时gc配置的方法:

cd /var/run/ceph/
ceph daemon /var/run/ceph/ceph-client.rgw.ceph-2.rgw0.28965.93983884214936.asok config show | grep gc

gc 的数据结构

class RGWGC : public DoutPrefixProvider {
  //用于向下访问ceph rados
  CephContext *cct;
  RGWRados *store;

  //即为配置项: rgw gc max objs
  int max_objs;

  //用于gc的对象名字数组
  string *obj_names;

  //判断gc worker是否退出
  std::atomic<bool> down_flag = { false };
  int tag_index(const string& tag);

  //gc工作线程
  class GCWorker : public Thread {
    const DoutPrefixProvider *dpp;
    CephContext *cct;
    RGWGC *gc;
    Mutex lock;
    Cond cond;
  public:
    GCWorker(const DoutPrefixProvider *_dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp), cct(_cct), gc(_gc), lock("GCWorker") {}
    void *entry() override;
    void stop();
  };

  GCWorker *worker;
};

gc 源码分析

  • gc initialize // rgw/rgw_gc.cc
void RGWGC::initialize(CephContext *_cct, RGWRados *_store) {
  cct = _cct;
  store = _store;

  //获取 gc max objs配置信息
  max_objs = min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max());
  //init gc 数组
  obj_names = new string[max_objs];

  for (int i = 0; i < max_objs; i++) {
    obj_names[i] = gc_oid_prefix;
    char buf[32];
    snprintf(buf, 32, ".%d", i);
    obj_names[i].append(buf);   //每一个 obj_names[i]中存放的是: "gc.0" (0~31)
  }
}
  • 循环处理逻辑
void *RGWGC::GCWorker::entry() {
  do {
    utime_t start = ceph_clock_now();
    ldpp_dout(dpp, 2) << "garbage collection: start" << dendl;

	//进行一次gc操作
    int r = gc->process(true);
    if (r < 0) {
      ldpp_dout(dpp, 0) << "ERROR: garbage collection process() returned error r=" << r << dendl;
    }
    ldpp_dout(dpp, 2) << "garbage collection: stop" << dendl;

	//判断是否需要退出gc
    if (gc->going_down())
      break;

    utime_t end = ceph_clock_now();
    end -= start;
    int secs = cct->_conf->rgw_gc_processor_period;

	//刚才的gc->process处理过程耗时小于一个gc周期, 那就继续进行gc
    if (secs <= end.sec())
      continue; // next round

    secs -= end.sec();

	//刚才的gc->process处理过程大于一个gc周期, 这里先睡会, 等待下一个gc周期到来再进行gc操作
    lock.Lock();
    cond.WaitInterval(lock, utime_t(secs, 0));
    lock.Unlock();
  } while (!gc->going_down());

  return NULL;
}


//progress
int RGWGC::process(bool expired_only)
{
  //gc hint obj的超时时间
  int max_secs = cct->_conf->rgw_gc_processor_max_time;

  //生成随机号作为本次待 gc的 obj_name[index]
  const int start = ceph::util::generate_random_number(0, max_objs - 1);

  //初始化一个io_manager用做本次的gc操作
  RGWGCIOManager io_manager(this, store->ctx(), this);

  for (int i = 0; i < max_objs; i++) {
    int index = (i + start) % max_objs; //得到index

  	/*
  	随机选择 gc object的过程有点 SB, 因为随机选择的gc object上不一定有需要进行gc的对象信息。
  	具体的process还需在分析 TODO
    */
    int ret = process(index, max_secs, expired_only, io_manager);
    if (ret < 0)
      return ret;
  }

  //drain里做的是删除操作
  if (!going_down()) {
    io_manager.drain();
  }

  return 0;
}
//process
int RGWGC::process(int index, int max_secs, bool expired_only, RGWGCIOManager& io_manager)
{
  .......

  rados::cls::lock::Lock l(gc_index_lock_name);

  s[index], 比如:当前index对应的string"gc.3" {0~31} 
  int ret = l.lock_exclusive(&store->gc_pool_ctx, obj_names[index]);
  
  string marker;
  string next_marker;
  bool truncated;
  IoCtx *ctx = new IoCtx;
  do {
    int max = 100;
	//当前obj_names[index]=="gc.3" 对应的 items, 可以在gc pool中查看
    std::list<cls_rgw_gc_obj_info> entries;

	// 读取 gc object对象omap中的待删除对象信息列表, 存放在entries中
    ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max,
			  expired_only, entries, &truncated, next_marker);
	......
    marker = next_marker;
    string last_pool;
    std::list<cls_rgw_gc_obj_info>::iterator iter;

	//遍历读取到的 entries
    for (iter = entries.begin(); iter != entries.end(); ++iter) {

      cls_rgw_gc_obj_info& info = *iter;
      std::list<cls_rgw_obj>::iterator liter;
      cls_rgw_obj_chain& chain = info.chain;

      utime_t now = ceph_clock_now();
      if (now >= end) {
        goto done;
      }

      if (chain.objs.empty()) {
        io_manager.schedule_tag_removal(index, info.tag); //为空的话直接删除
      }
      else
      {
        io_manager.add_tag_io_size(index, info.tag, chain.objs.size());
		for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
	  		cls_rgw_obj& obj = *liter;

	  		if (obj.pool != last_pool) {
	    		delete ctx;
	    		ctx = new IoCtx;
	    		ret = rgw_init_ioctx(store->get_rados_handle(), obj.pool, *ctx);
	    		if (ret < 0) {
	      			last_pool = "";
	      			ldpp_dout(this, 0) << "ERROR: failed to create ioctx pool=" <<
					obj.pool << dendl;
	      			continue;
	    		}
	    		last_pool = obj.pool;
	  		}

	  		ctx->locator_set_key(obj.loc);

	  		const string& oid = obj.key.name; /* just stored raw oid there */

	  		ldpp_dout(this, 5) << "RGWGC::process removing " << obj.pool <<
	    						":" << obj.key.name << dendl;
	  		ObjectWriteOperation op;
	  		cls_refcount_put(op, info.tag, true); 
	  	
	  		//
	  		//将该tag记录到这个op中,待下边异步执行, 这里执行的是put操作
	  		// TODO 这里的aio写操作是干嘛的?
	  		ret = io_manager.schedule_io(ctx, oid, &op, index, info.tag);
	  		if (ret < 0) {
	    		ldpp_dout(this, 0) << "WARNING: failed to schedule deletion for oid=" << oid << dendl;
	  		}

		} // chains loop
      } // else -- chains not empty
    } // entries loop
  } while (truncated);

done:
  /* we don't drain here, because if we're going down we don't want to
   * hold the system if backend is unresponsive
   */
  l.unlock(&store->gc_pool_ctx, obj_names[index]);
  delete ctx;

  return 0;
}
  • remove
  //对 ios 进行操作
  void drain() {
    drain_ios();
    flush_remove_tags();
    /* the tags draining might have generated more ios, drain those too */
    drain_ios();
  }

  void drain_ios() {
    while (!ios.empty()) {
      if (gc->going_down()) {
        return;
      }

      
      handle_next_completion();  //TODO 
    }
  }