Skip to content

Commit

Permalink
next
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi committed Nov 29, 2024
1 parent 22edbc2 commit 4fb5941
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,10 @@ public static Map<Integer, TSStatus> preReleaseTables(
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public static Map<Integer, TSStatus> commitOrRollbackReleaseTables(
final String database,
final List<String> tableNames,
final ConfigManager configManager,
final boolean isRollback) {
public static Map<Integer, TSStatus> commitReleaseTables(
final String database, final List<String> tableNames, final ConfigManager configManager) {
final TUpdateTableReq req = new TUpdateTableReq();
req.setType(
isRollback
? TsTableInternalRPCType.ROLLBACK_UPDATE_TABLES.getOperationType()
: TsTableInternalRPCType.COMMIT_UPDATE_TABLES.getOperationType());
req.setType(TsTableInternalRPCType.COMMIT_UPDATE_TABLES.getOperationType());
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try {
ReadWriteIOUtils.write(database, outputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,10 @@ private void commitRelease(final ConfigNodeProcedureEnv env) {
return;
}
final Map<Integer, TSStatus> failedResults =
SchemaUtils.commitOrRollbackReleaseTables(
SchemaUtils.commitReleaseTables(
schema.getName(),
tables.stream().map(TsTable::getTableName).collect(Collectors.toList()),
env.getConfigManager(),
false);
env.getConfigManager());
if (!failedResults.isEmpty()) {
LOGGER.warn(
"Failed to commit release tables for alter database {} for altered tables {} to DataNode, failure results: {}",
Expand All @@ -178,56 +177,12 @@ public TDatabaseSchema getSchema() {
return schema;
}

@Override
protected boolean isRollbackSupported(final AlterDatabaseState state) {
return true;
}

@Override
protected void rollbackState(final ConfigNodeProcedureEnv env, final AlterDatabaseState state)
throws IOException, InterruptedException, ProcedureException {
final long startTime = System.currentTimeMillis();
try {
// TODO: Config write?
switch (state) {
case PRE_RELEASE:
LOGGER.info(
"Start rollback pre release info for tables {} when altering database {}",
tables,
schema.getName());
rollbackPreRelease(env);
break;
}
} finally {
LOGGER.info(
"Rollback SetTableProperties-{} costs {}ms.",
state,
(System.currentTimeMillis() - startTime));
}
}

protected void rollbackPreRelease(final ConfigNodeProcedureEnv env) {
if (tables.isEmpty()) {
return;
}
final Map<Integer, TSStatus> failedResults =
SchemaUtils.commitOrRollbackReleaseTables(
schema.getName(),
tables.stream().map(TsTable::getTableName).collect(Collectors.toList()),
env.getConfigManager(),
true);

if (!failedResults.isEmpty()) {
// All dataNodes must clear the related schema cache
LOGGER.warn(
"Failed to rollback pre-release tables for alter database {} for altered tables {} to DataNode, failure results: {}",
schema.getName(),
tables,
failedResults);
setFailure(
new ProcedureException(
new MetadataException("Rollback pre-release tables for alter database failed")));
}
// Do nothing
// Currently we do not store the original schema of the database, the table's consistency will
// be guaranteed by the table cache's fetch
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1573,14 +1573,6 @@ public TSStatus updateTable(final TUpdateTableReq req) {
.releaseWriteLock(SchemaLockType.TIMESERIES_VS_TABLE);
}
break;
case ROLLBACK_UPDATE_TABLES:
database = ReadWriteIOUtils.readString(req.tableInfo);
size = ReadWriteIOUtils.readInt(req.tableInfo);
for (int i = 0; i < size; ++i) {
DataNodeTableCache.getInstance()
.rollbackUpdateTable(database, ReadWriteIOUtils.readString(req.tableInfo));
}
break;
case COMMIT_UPDATE_TABLES:
database = ReadWriteIOUtils.readString(req.tableInfo);
size = ReadWriteIOUtils.readInt(req.tableInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ public enum TsTableInternalRPCType {
ROLLBACK_UPDATE_TABLE((byte) 1),
COMMIT_UPDATE_TABLE((byte) 2),
PRE_UPDATE_TABLES((byte) 3),
ROLLBACK_UPDATE_TABLES((byte) 4),
COMMIT_UPDATE_TABLES((byte) 5);
COMMIT_UPDATE_TABLES((byte) 4);

private final byte operationType;

Expand Down Expand Up @@ -63,8 +62,6 @@ public static TsTableInternalRPCType getType(final byte type) {
case 3:
return PRE_UPDATE_TABLES;
case 4:
return ROLLBACK_UPDATE_TABLES;
case 5:
return COMMIT_UPDATE_TABLES;
default:
throw new IllegalArgumentException("Unknown table update operation type" + type);
Expand Down

0 comments on commit 4fb5941

Please sign in to comment.