lifecycle源码分析
lc 总流程
lc的处理框架和gc的几乎一样, 总体流程如下:
lc handle 图
lc 源码分析
总处理框架
int RGWLC::process()
{
int max_secs = cct->_conf->rgw_lc_lock_max_time;
//随机定义一个开始的地方
const int start = ceph::util::generate_random_number(0, max_objs - 1);
for (int i = 0; i < max_objs; i++) {
int index = (i + start) % max_objs; //得到一个lc index
int ret = process(index, max_secs);
if (ret < 0)
return ret;
}
return 0;
}
单个 lc.index 的处理
int RGWLC::process(int index, int max_lock_secs)
{
//生成"lc_process锁"
rados::cls::lock::Lock l(lc_index_lock_name); //lc_index_lock_name="lc_process
do {
utime_t now = ceph_clock_now();
//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
pair<string, int > entry;
......
//锁住该 obj_names[index] {"lc.index"}
int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
<< obj_names[index] << ", sleep 5, try again" << dendl;
sleep(5);
continue;
}
if (ret < 0)
return 0;
//在lc所在的pool中取 lc.index 对应的head信息, 因为header信息中含有当前lc.index应处理的bucket对应的marker
cls_rgw_lc_obj_head head;
ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
<< obj_names[index] << ", ret=" << ret << dendl;
goto exit;
}
//如果当前header的start_date不是今天
if(!if_already_run_today(head.start_date)) {
//将head.start_date更新到今天的日期
head.start_date = now;
//将当前header的marker清空
head.marker.clear();
//由于当前header的start_date不是今天,即当前lc.index今天不用被执行, 那么将该lc中的item的状态全部更新为 uninitial
ret = bucket_lc_prepare(index);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object "
<< obj_names[index] << ", ret=" << ret << dendl;
goto exit; //释放锁并退出
}
}
//当前lc的start_date是今天,表示当前lc需要今天被处理
//从当前lc的marker出取出一个entry来进行处理
ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
<< obj_names[index] << dendl;
goto exit;
}
//entry.first是bucket的sharding id
if (entry.first.empty())
goto exit;
//将当前entry状态置为 processing
entry.second = lc_processing;
ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry " << obj_names[index]
<< " (" << entry.first << "," << entry.second << ")" << dendl;
goto exit;
}
//更新header.marker,并写入到 lc_pool中的 lc.index 对象的header中
head.marker = entry.first;
ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;
goto exit;
}
l.unlock(&store->lc_pool_ctx, obj_names[index]);
ret = bucket_lc_process(entry.first);
bucket_lc_post(index, max_lock_secs, entry, ret);
}while(1);
exit:
l.unlock(&store->lc_pool_ctx, obj_names[index]);
return 0;
}
bucket_lc_prepare
int RGWLC::bucket_lc_prepare(int index)
{
map<string, int > entries;
string marker; //空
#define MAX_LC_LIST_ENTRIES 100
do {
//entries中每个对象<string, int>中string是bucket sharding id, 结构:bucket_name:bucket_id
//从当前marker处取出100条lc item放到 entries中, 每个entry格式:pair<string(bucket sharding id), int(status: uninitial, complete,..)>
int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
if (ret < 0)
return ret;
map<string, int>::iterator iter;
for (iter = entries.begin(); iter != entries.end(); ++iter) {
pair<string, int > entry(iter->first, lc_uninitial);
//逐条设置为 lc_uninitial
ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::bucket_lc_prepare() failed to set entry on "
<< obj_names[index] << dendl;
return ret;
}
}
if (!entries.empty()) { //如果 entries 非空
marker = std::move(entries.rbegin()->first); //更新一下 marker
}
} while (!entries.empty());
return 0;
}
bucket内的核心处理
// 参数shard_id是bucket进行sharding时候的id
int RGWLC::bucket_lc_process(string& shard_id)
{
RGWLifecycleConfiguration config(cct);
RGWBucketInfo bucket_info;
map<string, bufferlist> bucket_attrs;
string no_ns, list_versions;
vector<rgw_bucket_dir_entry> objs;
auto obj_ctx = store->svc.sysobj->init_obj_ctx();
vector<std::string> result;
boost::split(result, shard_id, boost::is_any_of(":"));
string bucket_tenant = result[0];
string bucket_name = result[1];
string bucket_marker = result[2];
///获取bucket info, 以及bucket_attr; (bucket_info是引用, bucket_attrs是指针)
int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs);
if (ret < 0) {
ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name << " failed" << dendl;
return ret;
}
if (bucket_info.bucket.marker != bucket_marker) {
ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" << bucket_tenant
<< ":" << bucket_name << " cur_marker=" << bucket_info.bucket.marker
<< " orig_marker=" << bucket_marker << dendl;
return -ENOENT;
}
RGWRados::Bucket target(store, bucket_info);
//获取该bucket对应的名为lc的attr
map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
if (aiter == bucket_attrs.end())
return 0;
bufferlist::const_iterator iter{&aiter->second};
try {
config.decode(iter);
} catch (const buffer::error& e) {
ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed" << dendl;
return -1;
}
//获取配置文件中对应的待过滤的 prefix, 以及对应的 lc_opration
multimap<string, lc_op>& prefix_map = config.get_prefix_map();
ldpp_dout(this, 10) << __func__ << "() prefix_map size="
<< prefix_map.size()
<< dendl;
rgw_obj_key pre_marker;
rgw_obj_key next_marker;
for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
auto& op = prefix_iter->second;
if (!is_valid_op(op)) {
continue;
}
ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first << dendl;
//更新marker
if (prefix_iter != prefix_map.begin() &&
(prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
next_marker = pre_marker;
} else {
pre_marker = next_marker;
}
LCObjsLister ol(store, bucket_info);
ol.set_prefix(prefix_iter->first);
ret = ol.init();
if (ret < 0) {
if (ret == (-ENOENT))
return 0;
ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
return ret;
}
op_env oenv(op, store, this, bucket_info, ol);
LCOpRule orule(oenv);
//build lc handler对象
orule.build();
ceph::real_time mtime;
rgw_bucket_dir_entry o;
for (; ol.get_obj(&o); ol.next()) {
ldpp_dout(this, 20) << __func__ << "(): key=" << o.key << dendl;
//执行lc相关操作: expiration or transition
int ret = orule.process(o);
if (ret < 0) {
ldpp_dout(this, 20) << "ERROR: orule.process() returned ret="
<< ret
<< dendl;
}
if (going_down()) {
return 0;
}
}
}
//针对分段上传的对象进行该操作
ret = handle_multipart_expiration(&target, prefix_map);
return ret;
}