Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(shell-api): Fix support for sharded time series collections with getShardDistribution MONGOSH-1447 #2189

Merged
merged 13 commits into from
Oct 8, 2024
111 changes: 80 additions & 31 deletions packages/shell-api/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2076,6 +2076,43 @@ export default class Collection extends ShellApiWithMongoClass {
});
}

/**
* Helper for getting collection info of sharded timeseries collections
* @returns the bucket count and collection info based on given collStats
*/
async _getShardedTimeseriesCollectionInfo(
config: Database,
collStats: Document[]
): Promise<Document | null> {
const timeseriesShardStats = collStats.find(
(extractedShardStats) =>
typeof extractedShardStats.storageStats.timeseries !== 'undefined'
);

if (!timeseriesShardStats) {
return null;
}

const { storageStats } = timeseriesShardStats;

const timeseries: Document = storageStats['timeseries'];

const timeseriesBucketNs: string | undefined = timeseries['bucketsNs'];
gagik marked this conversation as resolved.
Show resolved Hide resolved

if (!timeseriesBucketNs) {
return null;
}

const timeseriesCollectionInfo = await config
.getCollection('collections')
.findOne({
_id: timeseriesBucketNs,
...onlyShardedCollectionsInConfigFilter,
});

return timeseriesCollectionInfo;
}

@returnsPromise
@topologies([Topologies.Sharded])
@apiVersions([])
Expand All @@ -2086,20 +2123,6 @@ export default class Collection extends ShellApiWithMongoClass {

const result = {} as Document;
const config = this._mongo.getDB('config');
const ns = `${this._database._name}.${this._name}`;

const configCollectionsInfo = await config
.getCollection('collections')
.findOne({
_id: ns,
...onlyShardedCollectionsInConfigFilter,
});
if (!configCollectionsInfo) {
throw new MongoshInvalidInputError(
`Collection ${this._name} is not sharded`,
ShellApiErrors.NotConnectedToShardedCluster
);
}

const collStats = await (
await this.aggregate({ $collStats: { storageStats: {} } })
Expand All @@ -2115,12 +2138,35 @@ export default class Collection extends ShellApiWithMongoClass {
avgObjSize: number;
}[] = [];

const ns = `${this._database._name}.${this._name}`;
let configCollectionsInfo: Document;
const existingConfigCollectionsInfo = await config
.getCollection('collections')
.findOne({
_id: ns,
...onlyShardedCollectionsInConfigFilter,
});

if (!existingConfigCollectionsInfo) {
const timeseriesCollectionInfo =
await this._getShardedTimeseriesCollectionInfo(config, collStats);

if (!timeseriesCollectionInfo) {
throw new MongoshInvalidInputError(
gagik marked this conversation as resolved.
Show resolved Hide resolved
`Collection ${this._name} is not sharded`,
ShellApiErrors.NotConnectedToShardedCluster
);
}

configCollectionsInfo = timeseriesCollectionInfo;
} else {
configCollectionsInfo = existingConfigCollectionsInfo;
}
gagik marked this conversation as resolved.
Show resolved Hide resolved

await Promise.all(
collStats.map((extShardStats) =>
collStats.map((extractedShardStats) =>
(async (): Promise<void> => {
// Extract and store only the relevant subset of the stats for this shard
const { shard } = extShardStats;

const { shard } = extractedShardStats;
// If we have an UUID, use that for lookups. If we have only the ns,
// use that. (On 5.0+ servers, config.chunk has uses the UUID, before
// that it had the ns).
Expand All @@ -2131,39 +2177,42 @@ export default class Collection extends ShellApiWithMongoClass {
const [host, numChunks] = await Promise.all([
config
.getCollection('shards')
.findOne({ _id: extShardStats.shard }),
.findOne({ _id: extractedShardStats.shard }),
config.getCollection('chunks').countDocuments(countChunksQuery),
]);
const shardStats = {
shardId: shard,
host: host !== null ? host.host : null,
size: extShardStats.storageStats.size,
count: extShardStats.storageStats.count,
size: extractedShardStats.storageStats.size,
count: extractedShardStats.storageStats.count,
numChunks: numChunks,
avgObjSize: extShardStats.storageStats.avgObjSize,
avgObjSize: extractedShardStats.storageStats.avgObjSize,
};

const key = `Shard ${shardStats.shardId} at ${shardStats.host}`;

const estChunkData =
// In sharded timeseries collections, count is 0.
const shardStatsCount = shardStats.count ?? 0;
gagik marked this conversation as resolved.
Show resolved Hide resolved

const estimatedChunkDataPerChunk =
shardStats.numChunks === 0
? 0
: shardStats.size / shardStats.numChunks;
const estChunkCount =
const estimatedDocsPerChunk =
shardStats.numChunks === 0
? 0
: Math.floor(shardStats.count / shardStats.numChunks);
: Math.floor(shardStatsCount / shardStats.numChunks);

result[key] = {
data: dataFormat(coerceToJSNumber(shardStats.size)),
docs: shardStats.count,
docs: shardStatsCount,
chunks: shardStats.numChunks,
'estimated data per chunk': dataFormat(estChunkData),
'estimated docs per chunk': estChunkCount,
'estimated data per chunk': dataFormat(estimatedChunkDataPerChunk),
'estimated docs per chunk': estimatedDocsPerChunk,
};

totals.size += coerceToJSNumber(shardStats.size);
totals.count += coerceToJSNumber(shardStats.count);
totals.count += coerceToJSNumber(shardStatsCount);
totals.numChunks += coerceToJSNumber(shardStats.numChunks);

conciseShardsStats.push(shardStats);
Expand Down Expand Up @@ -2326,7 +2375,7 @@ export default class Collection extends ShellApiWithMongoClass {
return await this._mongo._serviceProvider.getSearchIndexes(
this._database._name,
this._name,
indexName as string | undefined,
indexName,
{ ...(await this._database._baseOptions()), ...options }
);
}
Expand Down Expand Up @@ -2355,7 +2404,7 @@ export default class Collection extends ShellApiWithMongoClass {
this._name,
[
{
name: (indexName as string | undefined) ?? 'default',
name: indexName ?? 'default',
// Omitting type when it is 'search' for compat with older servers
...(type &&
type !== 'search' && { type: type as 'search' | 'vectorSearch' }),
Expand Down
116 changes: 116 additions & 0 deletions packages/shell-api/src/shard.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2554,6 +2554,122 @@ describe('Shard', function () {
});
});
});

describe('collection.getShardDistribution()', function () {
let db: Database;
const dbName = 'shard-stats-test';
const ns = `${dbName}.test`;

beforeEach(async function () {
db = sh._database.getSiblingDB(dbName);
await db.getCollection('test').insertOne({ key: 1 });
await db.getCollection('test').createIndex({ key: 1 });
});

afterEach(async function () {
await db.dropDatabase();
});

context('unsharded collections', function () {
it('throws an error', async function () {
const caughtError = await db
.getCollection('test')
.getShardDistribution()
.catch((e) => e);
expect(caughtError.message).includes(
'Collection test is not sharded'
);
});
});

context('sharded collections', function () {
beforeEach(async function () {
expect((await sh.enableSharding(dbName)).ok).to.equal(1);
expect(
(await sh.shardCollection(ns, { key: 1 })).collectionsharded
).to.equal(ns);
});

it('returns the correct StatsResult', async function () {
const result = await db.getCollection('test').getShardDistribution();
const shardDistributionValue = result.value as Document;

expect(result.type).to.equal('StatsResult');

const shardFields = Object.keys(shardDistributionValue).filter(
(field) => field !== 'Totals'
);
expect(shardFields.length).to.equal(1);
const shardField = shardFields[0];
expect(
shardDistributionValue[shardField]['estimated docs per chunk']
).to.equal(1);

expect(shardDistributionValue.Totals.docs).to.equal(1);
expect(shardDistributionValue.Totals.chunks).to.equal(1);
});
});

// We explicitly test sharded time series collections as it fallbacks to the bucket information
context('sharded timeseries collections', function () {
skipIfServerVersion(mongos, '< 5.1');

const timeseriesCollectionName = 'testTS';
const timeseriesNS = `${dbName}.${timeseriesCollectionName}`;

beforeEach(async function () {
expect((await sh.enableSharding(dbName)).ok).to.equal(1);

expect(
(
await sh.shardCollection(
timeseriesNS,
{ 'metadata.bucketId': 1 },
{
timeseries: {
timeField: 'timestamp',
metaField: 'metadata',
granularity: 'hours',
},
}
)
).collectionsharded
).to.equal(timeseriesNS);
await db.getCollection(timeseriesCollectionName).insertOne({
metadata: {
bucketId: 1,
type: 'temperature',
},
timestamp: new Date('2021-05-18T00:00:00.000Z'),
temp: 12,
});
});

it('returns the correct StatsResult', async function () {
const result = await db
.getCollection('testTS')
.getShardDistribution();
const shardDistributionValue = result.value as Document;

expect(result.type).to.equal('StatsResult');

const shardFields = Object.keys(shardDistributionValue).filter(
(field) => field !== 'Totals'
);
expect(shardFields.length).to.equal(1);
const shardField = shardFields[0];

// Timeseries will have count 0
expect(
shardDistributionValue[shardField]['estimated docs per chunk']
).to.equal(0);

expect(shardDistributionValue.Totals.docs).to.equal(0);
expect(shardDistributionValue.Totals.chunks).to.equal(1);
});
});
});

describe('collection.stats()', function () {
let db: Database;
let hasTotalSize: boolean;
Expand Down
Loading