Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(shell-api): Fix support for sharded time series collections with getShardDistribution MONGOSH-1447 #2189

Merged
merged 13 commits into from
Oct 8, 2024
12 changes: 10 additions & 2 deletions packages/shell-api/src/collection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2258,8 +2258,16 @@ describe('Collection', function () {
it('throws when collection is not sharded', async function () {
const serviceProviderCursor = stubInterface<ServiceProviderCursor>();
serviceProviderCursor.limit.returns(serviceProviderCursor);
serviceProviderCursor.tryNext.resolves(null);
serviceProvider.find.returns(serviceProviderCursor as any);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
serviceProviderCursor.tryNext.returns(null as any);
serviceProvider.find.returns(serviceProviderCursor);

const tryNext = sinon.stub();
tryNext.onCall(0).resolves({ storageStats: {} });
tryNext.onCall(1).resolves(null);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
serviceProvider.aggregate.returns({ tryNext } as any);

const error = await collection.getShardDistribution().catch((e) => e);

expect(error).to.be.instanceOf(MongoshInvalidInputError);
Expand Down
109 changes: 79 additions & 30 deletions packages/shell-api/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2076,31 +2076,73 @@ export default class Collection extends ShellApiWithMongoClass {
});
}

@returnsPromise
@topologies([Topologies.Sharded])
@apiVersions([])
async getShardDistribution(): Promise<CommandResult> {
this._emitCollectionApiCall('getShardDistribution', {});

await getConfigDB(this._database); // Warns if not connected to mongos

const result = {} as Document;
const config = this._mongo.getDB('config');
/**
* Helper for getting collection info for sharded collections.
* @throws If the collection is not sharded.
* @returns collection info based on given collStats.
*/
async _getShardedCollectionInfo(
config: Database,
collStats: Document[]
): Promise<Document> {
const ns = `${this._database._name}.${this._name}`;

const configCollectionsInfo = await config
const existingConfigCollectionsInfo = await config
.getCollection('collections')
.findOne({
_id: ns,
...onlyShardedCollectionsInConfigFilter,
});
if (!configCollectionsInfo) {

if (existingConfigCollectionsInfo !== null) {
return existingConfigCollectionsInfo;
}

// If the collection info is not found, check if it is timeseries and use the bucket
const timeseriesShardStats = collStats.find(
(extractedShardStats) =>
typeof extractedShardStats.storageStats.timeseries !== 'undefined'
);

if (!timeseriesShardStats) {
throw new MongoshInvalidInputError(
`Collection ${this._name} is not sharded`,
ShellApiErrors.NotConnectedToShardedCluster
);
}

const { storageStats } = timeseriesShardStats;

const timeseries: Document = storageStats.timeseries;
const timeseriesBucketNs: string = timeseries.bucketsNs;

const timeseriesCollectionInfo = await config
.getCollection('collections')
.findOne({
_id: timeseriesBucketNs,
...onlyShardedCollectionsInConfigFilter,
});

if (!timeseriesCollectionInfo) {
throw new MongoshRuntimeError(
`Error finding collection information for ${timeseriesBucketNs}`,
CommonErrors.CommandFailed
);
}

return timeseriesCollectionInfo;
}

@returnsPromise
@topologies([Topologies.Sharded])
@apiVersions([])
async getShardDistribution(): Promise<CommandResult> {
this._emitCollectionApiCall('getShardDistribution', {});

await getConfigDB(this._database); // Warns if not connected to mongos

const result = {} as Document;
const config = this._mongo.getDB('config');

const collStats = await (
await this.aggregate({ $collStats: { storageStats: {} } })
).toArray();
Expand All @@ -2115,12 +2157,15 @@ export default class Collection extends ShellApiWithMongoClass {
avgObjSize: number;
}[] = [];

const configCollectionsInfo = await this._getShardedCollectionInfo(
config,
collStats
);

await Promise.all(
collStats.map((extShardStats) =>
collStats.map((extractedShardStats) =>
(async (): Promise<void> => {
// Extract and store only the relevant subset of the stats for this shard
const { shard } = extShardStats;

const { shard } = extractedShardStats;
// If we have an UUID, use that for lookups. If we have only the ns,
// use that. (On 5.0+ servers, config.chunk has uses the UUID, before
// that it had the ns).
Expand All @@ -2131,39 +2176,43 @@ export default class Collection extends ShellApiWithMongoClass {
const [host, numChunks] = await Promise.all([
config
.getCollection('shards')
.findOne({ _id: extShardStats.shard }),
.findOne({ _id: extractedShardStats.shard }),
config.getCollection('chunks').countDocuments(countChunksQuery),
]);
const shardStats = {
shardId: shard,
host: host !== null ? host.host : null,
size: extShardStats.storageStats.size,
count: extShardStats.storageStats.count,
size: extractedShardStats.storageStats.size,
count: extractedShardStats.storageStats.count,
numChunks: numChunks,
avgObjSize: extShardStats.storageStats.avgObjSize,
avgObjSize: extractedShardStats.storageStats.avgObjSize,
};

const key = `Shard ${shardStats.shardId} at ${shardStats.host}`;

const estChunkData =
// In sharded timeseries collections we do not have a count
// so we intentionally pass NaN as a result to the client.
const shardStatsCount: number = shardStats.count ?? NaN;

const estimatedChunkDataPerChunk =
shardStats.numChunks === 0
? 0
: shardStats.size / shardStats.numChunks;
const estChunkCount =
const estimatedDocsPerChunk =
shardStats.numChunks === 0
? 0
: Math.floor(shardStats.count / shardStats.numChunks);
: Math.floor(shardStatsCount / shardStats.numChunks);

result[key] = {
data: dataFormat(coerceToJSNumber(shardStats.size)),
docs: shardStats.count,
docs: shardStatsCount,
chunks: shardStats.numChunks,
'estimated data per chunk': dataFormat(estChunkData),
'estimated docs per chunk': estChunkCount,
'estimated data per chunk': dataFormat(estimatedChunkDataPerChunk),
'estimated docs per chunk': estimatedDocsPerChunk,
};

totals.size += coerceToJSNumber(shardStats.size);
totals.count += coerceToJSNumber(shardStats.count);
totals.count += coerceToJSNumber(shardStatsCount);
totals.numChunks += coerceToJSNumber(shardStats.numChunks);

conciseShardsStats.push(shardStats);
Expand Down Expand Up @@ -2326,7 +2375,7 @@ export default class Collection extends ShellApiWithMongoClass {
return await this._mongo._serviceProvider.getSearchIndexes(
this._database._name,
this._name,
indexName as string | undefined,
indexName,
{ ...(await this._database._baseOptions()), ...options }
);
}
Expand Down Expand Up @@ -2355,7 +2404,7 @@ export default class Collection extends ShellApiWithMongoClass {
this._name,
[
{
name: (indexName as string | undefined) ?? 'default',
name: indexName ?? 'default',
// Omitting type when it is 'search' for compat with older servers
...(type &&
type !== 'search' && { type: type as 'search' | 'vectorSearch' }),
Expand Down
116 changes: 116 additions & 0 deletions packages/shell-api/src/shard.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2554,6 +2554,122 @@ describe('Shard', function () {
});
});
});

describe('collection.getShardDistribution()', function () {
let db: Database;
const dbName = 'get-shard-distribution-test';
const ns = `${dbName}.test`;

beforeEach(async function () {
db = sh._database.getSiblingDB(dbName);
await db.getCollection('test').insertOne({ key: 1 });
await db.getCollection('test').createIndex({ key: 1 });
});

afterEach(async function () {
await db.dropDatabase();
});

context('unsharded collections', function () {
it('throws an error', async function () {
const caughtError = await db
.getCollection('test')
.getShardDistribution()
.catch((e) => e);
expect(caughtError.message).includes(
'Collection test is not sharded'
);
});
});

context('sharded collections', function () {
beforeEach(async function () {
expect((await sh.enableSharding(dbName)).ok).to.equal(1);
expect(
(await sh.shardCollection(ns, { key: 1 })).collectionsharded
).to.equal(ns);
});

it('returns the correct StatsResult', async function () {
const result = await db.getCollection('test').getShardDistribution();
const shardDistributionValue = result.value as Document;

expect(result.type).to.equal('StatsResult');

const shardFields = Object.keys(shardDistributionValue).filter(
(field) => field !== 'Totals'
);
expect(shardFields.length).to.equal(1);
const shardField = shardFields[0];
expect(
shardDistributionValue[shardField]['estimated docs per chunk']
).to.equal(1);

expect(shardDistributionValue.Totals.docs).to.equal(1);
expect(shardDistributionValue.Totals.chunks).to.equal(1);
});
});

// We explicitly test sharded time series collections as it fallbacks to the bucket information
context('sharded timeseries collections', function () {
skipIfServerVersion(mongos, '< 5.1');

const timeseriesCollectionName = 'getShardDistributionTS';
const timeseriesNS = `${dbName}.${timeseriesCollectionName}`;

beforeEach(async function () {
expect((await sh.enableSharding(dbName)).ok).to.equal(1);

expect(
(
await sh.shardCollection(
timeseriesNS,
{ 'metadata.bucketId': 1 },
{
timeseries: {
timeField: 'timestamp',
metaField: 'metadata',
granularity: 'hours',
},
}
)
).collectionsharded
).to.equal(timeseriesNS);
await db.getCollection(timeseriesCollectionName).insertOne({
metadata: {
bucketId: 1,
type: 'temperature',
},
timestamp: new Date('2021-05-18T00:00:00.000Z'),
temp: 12,
});
});

it('returns the correct StatsResult', async function () {
const result = await db
.getCollection(timeseriesCollectionName)
.getShardDistribution();
const shardDistributionValue = result.value as Document;

expect(result.type).to.equal('StatsResult');

const shardFields = Object.keys(shardDistributionValue).filter(
(field) => field !== 'Totals'
);
expect(shardFields.length).to.equal(1);
const shardField = shardFields[0];

// Timeseries will have count NaN
expect(
shardDistributionValue[shardField]['estimated docs per chunk']
).to.be.NaN;

expect(shardDistributionValue.Totals.docs).to.be.NaN;
expect(shardDistributionValue.Totals.chunks).to.equal(1);
});
});
});

describe('collection.stats()', function () {
let db: Database;
let hasTotalSize: boolean;
Expand Down
Loading