Skip to content

Commit

Permalink
fix: ttl merge (#3859)
Browse files Browse the repository at this point in the history
* fix: ttl merge

* swap refactor, more simple

* fix doc

* cleanup

* format

* fix ut

---------

Co-authored-by: Huang Wei <[email protected]>
  • Loading branch information
vagetablechicken and Huang Wei authored Apr 18, 2024
1 parent 66a0dda commit e1369fb
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 125 deletions.
8 changes: 3 additions & 5 deletions docs/en/openmldb_sql/ddl/CREATE_TABLE_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ CREATE TABLE t1 LIKE PARQUET 'file://t1.parquet';
-- SUCCEED
```

### ColumnIndex (optional
### ColumnIndex (optional)

```sql
ColumnIndex ::=
Expand Down Expand Up @@ -237,13 +237,11 @@ The index key must be configured, and other configuration items are optional. Th
| ----------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `ABSOLUTE` | The value of TTL represents the expiration time. The configuration value is a time period such as `100m, 12h, 1d, 365d`. The maximum configurable expiration time is `15768000m` (ie 30 years) | When a record expires, it is eliminated. | `INDEX(KEY=col1, TS=std_time, TTL_TYPE=absolute, TTL=100m)`<br />OpenMLDB will delete data older than 100 minutes. |
| `LATEST` | The value of TTL represents the maximum number of surviving entries. That is, under the same index, the maximum number of data items allowed exists. Up to 1000 can be configured | When the record exceeds the maximum number, it will be eliminated. | `INDEX(KEY=col1, TS=std_time, TTL_TYPE=LATEST, TTL=10)`. OpenMLDB will only keep the last 10 records and delete the previous records. |
| `ABSORLAT` | It defines the expiration time and the maximum number of live records. The configuration value is a 2-tuple of the form `(100m, 10), (1d, 1)`. The maximum can be configured `(15768000m, 1000)`. | Eliminates if and only if the record expires** or if the record exceeds the maximum number of records. | `INDEX(key=c1, ts=c6, ttl=(120min, 100), ttl_type=absorlat)`. When the record exceeds 100, **OR** when the record expires, it will be eliminated |
| `ABSANDLAT` | It defines the expiration time and the maximum number of live records. The configuration value is a 2-tuple of the form `(100m, 10), (1d, 1)`. The maximum can be configured `(15768000m, 1000)`. | When records expire **OR** records exceed the maximum number of records, records will be eliminated. | `INDEX(key=c1, ts=c6, ttl=(120min, 100), ttl_type=absandlat)`. When there are more than 100 records, **OR** the records expire, they will also be eliminated. |

| `ABSORLAT` | It defines the expiration time and the maximum number of live records. The configuration value is a 2-tuple of the form `(100m, 10), (1d, 1)`. The maximum can be configured `(15768000m, 1000)`. | Records will be eliminated if either the time expires **or** the number of records exceeds the maximum limit. | `INDEX(key=c1, ts=c6, ttl=(120min, 100), ttl_type=absorlat)`. Records will be eliminated when either the number of records exceeds 100 **or** the records expire. |
| `ABSANDLAT` | It defines the expiration time and the maximum number of live records. The configuration value is a 2-tuple of the form `(100m, 10), (1d, 1)`. The maximum can be configured `(15768000m, 1000)`. | Records will only be eliminated when both the time expires **and** the number of records exceeds the maximum limit. | `INDEX(key=c1, ts=c6, ttl=(120min, 100), ttl_type=absandlat)`. Records will only be eliminated when the number of records exceeds 100 **and** the records expire. |

#### Example


**Example 1**

The following sql example creates a table with a single-column index.
Expand Down
7 changes: 4 additions & 3 deletions docs/zh/openmldb_sql/ddl/CREATE_TABLE_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,13 @@ IndexOption ::=
| ----------- | ------------------------------------------------------------ | ---------------------------------------------------- | ------------------------------------------------------------ |
| `ABSOLUTE` | TTL的值代表过期时间。配置值为时间段如`100m, 12h, 1d, 365d`。最大可以配置的过期时间为`15768000m`(即30年) | 当记录过期时,会被淘汰。 | `INDEX(KEY=col1, TS=std_time, TTL_TYPE=absolute, TTL=100m)`<br />OpenMLDB将会删除100分钟之前的数据。 |
| `LATEST` | TTL的值代表最大存活条数。即同一个索引下面,最大允许存在的数据条数。最大可以配置1000条 | 记录超过最大条数时,会被淘汰。 | `INDEX(KEY=col1, TS=std_time, TTL_TYPE=LATEST, TTL=10)`。OpenMLDB只会保留最近10条记录,删除以前的记录。 |
| `ABSORLAT` | 配置过期时间和最大存活条数。配置值是一个2元组,形如`(100m, 10), (1d, 1)`。最大可以配置`(15768000m, 1000)`| 当且仅当记录过期****记录超过最大条数时,才会淘汰| `INDEX(key=c1, ts=c6, ttl=(120m, 100), ttl_type=absorlat)`。当记录超过100条,**或者**当记录过期时,会被淘汰 |
| `ABSANDLAT` | 配置过期时间和最大存活条数。配置值是一个2元组,形如`(100m, 10), (1d, 1)`。最大可以配置`(15768000m, 1000)`| 当记录过期****记录超过最大条数时,记录会被淘汰| `INDEX(key=c1, ts=c6, ttl=(120m, 100), ttl_type=absandlat)`。当记录超过100条,**而且**记录过期时,会被淘汰 |
| `ABSORLAT` | 配置过期时间和最大存活条数。配置值是一个2元组,形如`(100m, 10), (1d, 1)`。最大可以配置`(15768000m, 1000)`| 时间过期****记录超过最大条数,二者有一就会被淘汰| `INDEX(key=c1, ts=c6, ttl=(120m, 100), ttl_type=absorlat)`。当记录超过100条,**或者**当记录过期时,会被淘汰 |
| `ABSANDLAT` | 配置过期时间和最大存活条数。配置值是一个2元组,形如`(100m, 10), (1d, 1)`。最大可以配置`(15768000m, 1000)`| 当记录过期**并且**记录超过最大条数时,记录才会被淘汰| `INDEX(key=c1, ts=c6, ttl=(120m, 100), ttl_type=absandlat)`。当记录超过100条,**而且**记录过期时,才会被淘汰。 |

```{note}
最大过期时间和最大存活条数的限制,是出于性能考虑。如果你一定要配置更大的TTL值,请使用UpdateTTL来增大(可无视max限制),或者调整nameserver配置`absolute_ttl_max`和`latest_ttl_max`,重启生效
最大过期时间和最大存活条数的限制,是出于性能考虑。如果你一定要配置更大的TTL值,可先创建表时临时使用合规的TTL值,然后使用nameserver的UpdateTTL接口来调整到所需的值(可无视max限制),生效需要经过一个gc时间;或者,调整nameserver配置`absolute_ttl_max`和`latest_ttl_max`,重启生效后再创建表
```

#### Example
**示例1:创建一张带单列索引的表**

Expand Down
141 changes: 85 additions & 56 deletions src/base/ddl_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ void IndexMapBuilder::Report(absl::string_view db, absl::string_view table, absl

index_map_[index] = ttl;

LOG(INFO) << "suggest creating index for " << db << "." << table << ": " << index << ", " << ttl->DebugString();
LOG(INFO) << "suggest creating index for " << db << "." << table << ": " << index << ", " << ttl->ShortDebugString();
}

int64_t AbsTTLConvert(int64_t time_ms, bool zero_eq_unbounded) {
Expand Down Expand Up @@ -569,7 +569,7 @@ MultiDBIndexMap IndexMapBuilder::ToMap() {
pair.second->set_ttl_type(::openmldb::type::TTLType::kLatestTime);
pair.second->set_lat_ttl(1);
}
auto[db, table, idx_str, column_key] = Decode(pair.first);
auto [db, table, idx_str, column_key] = Decode(pair.first);
DLOG(INFO) << "decode index '" << pair.first << "': " << db << " " << table << " " << idx_str << " "
<< column_key.ShortDebugString();
auto& idx_map_of_table = tmp_map[db][table];
Expand Down Expand Up @@ -637,7 +637,7 @@ std::tuple<std::string, std::string, std::string, common::ColumnKey> IndexMapBui
return {};
}

const auto[db_name, table_name] = GetTable(index_str);
const auto [db_name, table_name] = GetTable(index_str);

common::ColumnKey column_key;
auto key_sep = index_str.find(KEY_MARK);
Expand Down Expand Up @@ -677,69 +677,98 @@ void TTLValueMerge(const common::TTLSt& old_ttl, const common::TTLSt& new_ttl, c
result->set_lat_ttl(tmp_result);
}

// 4(same type merge): same type and max ttls
// 12(A(4,2)): see in code
common::TTLSt stdTTL(const common::TTLSt& ttl) {
common::TTLSt result(ttl);
DCHECK(result.has_ttl_type() && result.ttl_type() != type::TTLType::kRelativeTime)
<< "invalid ttl type" << ttl.ShortDebugString();
if (result.ttl_type() == type::TTLType::kAbsoluteTime) {
// if no lat ttl, set a default 0
DCHECK(!result.has_lat_ttl() || result.lat_ttl() == 0);
result.set_lat_ttl(0);
} else if (result.ttl_type() == type::TTLType::kLatestTime) {
// if no abs ttl, set a default 0
DCHECK(!result.has_abs_ttl() || result.abs_ttl() == 0);
result.set_abs_ttl(0);
} else if (result.ttl_type() == type::TTLType::kAbsAndLat) {
DCHECK(result.has_abs_ttl() && result.has_lat_ttl());
// if any one is 0, won't expire any data, just set abs 0
if (result.abs_ttl() == 0 || result.lat_ttl() == 0) {
result.set_abs_ttl(0);
result.set_lat_ttl(0);
result.set_ttl_type(type::TTLType::kAbsoluteTime);
}
} else if (result.ttl_type() == type::TTLType::kAbsOrLat) {
DCHECK(result.has_abs_ttl() && result.has_lat_ttl());
// if any one is 0, just use the another one, if both 0, set abs 0
if (result.lat_ttl() == 0) {
result.set_ttl_type(type::TTLType::kAbsoluteTime);
} else if (result.abs_ttl() == 0) {
result.set_ttl_type(type::TTLType::kLatestTime);
}
}
return result;
}

bool TTLMerge(const common::TTLSt& old_ttl, const common::TTLSt& new_ttl, common::TTLSt* result) {
// TTLSt has type and two values, updated is complex, so we just check result==old_ttl in the end
result->CopyFrom(old_ttl);

// merge case 1. same type, just merge values(max ttls)
// it's ok to merge both abs and lat ttl value even type is only abs or lat(just unused and default is 0)
// we should std type first, absorlat(10,0) -> abs(10)
// e.g. merge absorlat(1,0) and absorlat(0,2), we need to check the values, otherwise we'll get absorlat(0,0), it's
// too large and if no abs when type is lat, just set a abs 0, to make compare simple(no need to check has_xxx_ttl)
auto left = stdTTL(old_ttl);
auto right = stdTTL(new_ttl);
using type::TTLType;
// complex ttl(absandlat or absorlat) won't have ttl value 0, it has been converted to simple ttl
// merge case 1. same type, just merge values(0 means max)
// merge case 2. different type
if (old_ttl.ttl_type() == new_ttl.ttl_type()) {
TTLValueMerge(old_ttl, new_ttl, result);
if (left.ttl_type() == right.ttl_type()) {
// it's ok to merge both abs and lat ttl value even type is only abs or lat, just 0 merge 0
result->set_ttl_type(left.ttl_type());
TTLValueMerge(left, right, result);
} else {
// type is different
if (old_ttl.ttl_type() == type::TTLType::kAbsAndLat) {
// 3 cases, abs&lat + ?. ?: abs / lat / abs||lat(new_ttl type != abs&lat). Use new ttl, merge values.
// abs&lat + abs -> abs, abs&lat + lat -> lat, abs&lat + abs||lat -> abs||lat. Use the max range, it's ok to
// merge two ttl(e.g. abs 10s&lat 1 + abs 10s(lat 0) -> abs 10s), we won't use the another ttl if result is
// lat or abs.
result->set_ttl_type(new_ttl.ttl_type());
TTLValueMerge(old_ttl, new_ttl, result);
} else if (new_ttl.ttl_type() == type::TTLType::kAbsAndLat) {
// 3 cases, ? + abs&lat. ?: abs / lat / abs||lat
result->set_ttl_type(old_ttl.ttl_type());
TTLValueMerge(old_ttl, new_ttl, result);
} else if (old_ttl.ttl_type() == type::TTLType::kAbsOrLat) {
// 2 cases, abs||lat + ? -> abs||lat. Stay old, merge values
// ?: abs / lat 2 cases(abs||lat + abs&lat is in 2)
TTLValueMerge(old_ttl, new_ttl, result);
} else if (new_ttl.ttl_type() == type::TTLType::kAbsOrLat) {
// 2 cases, abs + abs||lat -> abs||lat, lat + abs||lat -> abs||lat. Use new, merge can't use lat ttl(0) if
// type is abs abs&lat + abs||lat is in 1
result->set_ttl_type(type::TTLType::kAbsOrLat);
if (old_ttl.ttl_type() == type::TTLType::kAbsoluteTime) {
result->set_abs_ttl(TTLValueMerge(old_ttl.abs_ttl(), new_ttl.abs_ttl()));
result->set_lat_ttl(new_ttl.lat_ttl());
// old type != new type, and absandlat or absorlat won't have ttl value 0
// swap first, try to make left type is complex type or (abs + lat)
if (right.ttl_type() == TTLType::kAbsAndLat ||
(right.ttl_type() == TTLType::kAbsOrLat && left.ttl_type() != TTLType::kAbsAndLat)) {
std::swap(left, right);
}
if (left.ttl_type() == TTLType::kLatestTime && right.ttl_type() == TTLType::kAbsoluteTime) {
std::swap(left, right);
}

if (left.ttl_type() == TTLType::kAbsAndLat) {
// 3 cases
// absandlat(x,y)+abs(z), absandlat(x,y)+abs(0): don't merge lat(cuz abs type lat is 0), use absandlat's
// lat. absandlat(x,y)+lat(z), absandlat(x,y)+lat(0): the same absandlat(x,y)+absorlat(k,j): we need to
// store more to avoid delete valid records, merge both. No 0 value, so don't worry about set too large
result->CopyFrom(left);
if (right.ttl_type() == TTLType::kAbsoluteTime) {
result->set_abs_ttl(TTLValueMerge(left.abs_ttl(), right.abs_ttl()));
} else if (right.ttl_type() == TTLType::kLatestTime) {
result->set_lat_ttl(TTLValueMerge(left.lat_ttl(), right.lat_ttl()));
} else {
result->set_abs_ttl(new_ttl.abs_ttl());
result->set_lat_ttl(TTLValueMerge(old_ttl.lat_ttl(), new_ttl.lat_ttl()));
DCHECK(right.ttl_type() == TTLType::kAbsOrLat);
TTLValueMerge(left, right, result);
}
} else if (left.ttl_type() == TTLType::kAbsOrLat) {
// 2 cases
// absorlat + abs/lat = lat/abs, leave the simple type, ignore another one
// merged result will be std, don't worry about the new value of ignored type
DCHECK(right.ttl_type() == TTLType::kAbsoluteTime || right.ttl_type() == TTLType::kLatestTime);
result->set_ttl_type(right.ttl_type());
TTLValueMerge(left, right, result);
} else {
// 2 cases, abs + lat -> abs||lat, lat + abs -> abs||lat. Set type, merge can't use lat ttl(0) if type is
// abs
result->set_ttl_type(type::TTLType::kAbsOrLat);
if (old_ttl.ttl_type() == type::TTLType::kAbsoluteTime) {
DCHECK(new_ttl.ttl_type() == type::TTLType::kLatestTime);
result->set_abs_ttl(old_ttl.abs_ttl());
result->set_lat_ttl(new_ttl.lat_ttl());
} else {
DCHECK(old_ttl.ttl_type() == type::TTLType::kLatestTime);
result->set_abs_ttl(new_ttl.abs_ttl());
result->set_lat_ttl(old_ttl.lat_ttl());
}
DCHECK(left.ttl_type() == TTLType::kAbsoluteTime && right.ttl_type() == TTLType::kLatestTime);
// 1 case
// abs + lat -> absandlat: set type, merge can't use lat ttl(0) if type is abs, so custom merge
result->set_ttl_type(TTLType::kAbsAndLat);
result->set_abs_ttl(left.abs_ttl());
result->set_lat_ttl(right.lat_ttl());
}
}
// after merge, may get complex ttl with 0
result->CopyFrom(stdTTL(*result));

// old ttl may not have one ttl value, but the result must have, so fix the cmp
common::TTLSt old_ttl_fixed(old_ttl);
if (!old_ttl_fixed.has_abs_ttl()) {
old_ttl_fixed.set_abs_ttl(0);
}
if (!old_ttl_fixed.has_lat_ttl()) {
old_ttl_fixed.set_lat_ttl(0);
}
return !google::protobuf::util::MessageDifferencer::Equals(old_ttl_fixed, *result);
return !google::protobuf::util::MessageDifferencer::Equals(old_ttl, *result);
}
} // namespace openmldb::base
4 changes: 2 additions & 2 deletions src/base/ddl_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ class DDLParser {
};

// return true if updated, else false and the result is the same as old_index
// 16 cases: 4(same type merge) + 12(A(4,2), we can get `updated` flag by old==result, and old & new are swapable, but
// in code, we can only check old & new one by one, so not C(4,2))
// 10 cases: 4(same type merge) + 6(C(4,2)), we can get `updated` flag by old==result
// we may update ttl when old_ttl is not standard, e.g. absandlat(0,0)->abs(0)
bool TTLMerge(const common::TTLSt& old_ttl, const common::TTLSt& new_ttl, common::TTLSt* result);

} // namespace openmldb::base
Expand Down
Loading

0 comments on commit e1369fb

Please sign in to comment.