Skip to content

Commit

Permalink
feat(shell-api): add shardedDataDistribution to sh.status() MONGOSH-1326
Browse files Browse the repository at this point in the history
  • Loading branch information
gagik authored Oct 25, 2024
1 parent 4ced968 commit e91cd29
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 35 deletions.
47 changes: 43 additions & 4 deletions packages/shell-api/src/helpers.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { ShardedDataDistribution } from './helpers';
import {
assertArgsDefinedType,
coerceToJSNumber,
Expand All @@ -19,6 +20,7 @@ import sinon from 'ts-sinon';
import chai, { expect } from 'chai';
import { EventEmitter } from 'events';
import sinonChai from 'sinon-chai';
import { stub } from 'sinon';
chai.use(sinonChai);

const fakeConfigDb = makeFakeConfigDatabase(
Expand Down Expand Up @@ -133,6 +135,21 @@ describe('getPrintableShardStatus', function () {
let serviceProvider: ServiceProvider;
let inBalancerRound = false;

const mockedShardedDataDistribution: ShardedDataDistribution = [
{
ns: 'test.ns',
shards: [
{
shardName: 'test',
numOrphanedDocs: 1,
numOwnedDocuments: 5,
orphanedSizeBytes: 20,
ownedSizeBytes: 80,
},
],
},
];

beforeEach(async function () {
serviceProvider = await NodeDriverServiceProvider.connect(
await testServer.connectionString(),
Expand Down Expand Up @@ -186,6 +203,20 @@ describe('getPrintableShardStatus', function () {
});

it('returns an object with sharding information', async function () {
const mockedAdminDb = {
aggregate: stub()
.withArgs([{ $shardedDataDistribution: {} }])
.resolves({
toArray: stub().resolves(mockedShardedDataDistribution),
}),
};
const getSiblingDB = stub();
getSiblingDB.withArgs('admin').returns(mockedAdminDb);
getSiblingDB.withArgs('config').returns(configDatabase);

configDatabase.getSiblingDB = getSiblingDB;
configDatabase._maybeCachedHello = stub().returns({ msg: 'isdbgrid' });

const status = await getPrintableShardStatus(configDatabase, false);
expect(status.shardingVersion.clusterId).to.be.instanceOf(bson.ObjectId);
expect(status.shards.map(({ host }: { host: string }) => host)).to.include(
Expand All @@ -202,6 +233,10 @@ describe('getPrintableShardStatus', function () {
);
expect(status.databases).to.have.lengthOf(1);
expect(status.databases[0].database._id).to.equal('config');

expect(status.shardedDataDistribution).to.equal(
mockedShardedDataDistribution
);
});

describe('hides all internal deprecated fields in shardingVersion', function () {
Expand All @@ -214,7 +249,9 @@ describe('getPrintableShardStatus', function () {
]) {
it(`does not show ${hiddenField} in shardingVersion`, async function () {
const status = await getPrintableShardStatus(configDatabase, false);
expect(status.shardingVersion[hiddenField]).to.equal(undefined);
expect((status.shardingVersion as any)[hiddenField]).to.equal(
undefined
);
});
}
});
Expand All @@ -235,8 +272,10 @@ describe('getPrintableShardStatus', function () {

it('returns an object with verbose sharding information if requested', async function () {
const status = await getPrintableShardStatus(configDatabase, true);
expect(status['most recently active mongoses'][0].up).to.be.a('number');
expect(status['most recently active mongoses'][0].waiting).to.be.a(
expect((status['most recently active mongoses'][0] as any).up).to.be.a(
'number'
);
expect((status['most recently active mongoses'][0] as any).waiting).to.be.a(
'boolean'
);
});
Expand Down Expand Up @@ -281,7 +320,7 @@ describe('getPrintableShardStatus', function () {
status.balancer['Collections with active migrations']
).to.have.lengthOf(1);
expect(
status.balancer['Collections with active migrations'].join('')
status.balancer['Collections with active migrations']?.join('')
).to.include('asdf');
});

Expand Down
136 changes: 115 additions & 21 deletions packages/shell-api/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import type {
bson,
} from '@mongosh/service-provider-core';
import type { ClientSideFieldLevelEncryptionOptions } from './field-level-encryption';
import { type AutoEncryptionOptions } from 'mongodb';
import type { AutoEncryptionOptions, Long, ObjectId, Timestamp } from 'mongodb';
import { shellApiType } from './enums';
import type { AbstractCursor } from './abstract-cursor';
import type ChangeStreamCursor from './change-stream-cursor';
Expand Down Expand Up @@ -226,8 +226,8 @@ export function processDigestPassword(
export async function getPrintableShardStatus(
configDB: Database,
verbose: boolean
): Promise<Document> {
const result = {} as any;
): Promise<ShardingStatusResult> {
const result = {} as ShardingStatusResult;

// configDB is a DB object that contains the sharding metadata of interest.
const mongosColl = configDB.getCollection('mongos');
Expand Down Expand Up @@ -259,9 +259,12 @@ export async function getPrintableShardStatus(
);
}

result.shardingVersion = version;
result.shardingVersion = version as {
_id: number;
clusterId: ObjectId;
};

result.shards = shards;
result.shards = shards as ShardingStatusResult['shards'];

// (most recently) active mongoses
const mongosActiveThresholdMs = 60000;
Expand All @@ -280,9 +283,8 @@ export async function getPrintableShardStatus(
}
}

mongosAdjective = `${mongosAdjective} mongoses`;
if (mostRecentMongosTime === null) {
result[mongosAdjective] = 'none';
result[`${mongosAdjective} mongoses`] = 'none';
} else {
const recentMongosQuery = {
ping: {
Expand All @@ -295,25 +297,27 @@ export async function getPrintableShardStatus(
};

if (verbose) {
result[mongosAdjective] = await (await mongosColl.find(recentMongosQuery))
result[`${mongosAdjective} mongoses`] = await (
await mongosColl.find(recentMongosQuery)
)
.sort({ ping: -1 })
.toArray();
} else {
result[mongosAdjective] = (
result[`${mongosAdjective} mongoses`] = (
(await (
await mongosColl.aggregate([
{ $match: recentMongosQuery },
{ $group: { _id: '$mongoVersion', num: { $sum: 1 } } },
{ $sort: { num: -1 } },
])
).toArray()) as any[]
).toArray()) as { _id: string; num: number }[]
).map((z: { _id: string; num: number }) => {
return { [z._id]: z.num };
});
}
}

const balancerRes: Record<string, any> = {};
const balancerRes = {} as ShardingStatusResult['balancer'];
await Promise.all([
(async (): Promise<void> => {
// Is autosplit currently enabled
Expand All @@ -331,13 +335,13 @@ export async function getPrintableShardStatus(
})(),
(async (): Promise<void> => {
// Is the balancer currently active
let balancerRunning = 'unknown';
let balancerRunning: 'yes' | 'no' | 'unknown' = 'unknown';
try {
const balancerStatus = await configDB.adminCommand({
balancerStatus: 1,
});
balancerRunning = balancerStatus.inBalancerRound ? 'yes' : 'no';
} catch (err: any) {
} catch {
// pass, ignore all error messages
}
balancerRes['Currently running'] = balancerRunning;
Expand All @@ -364,7 +368,7 @@ export async function getPrintableShardStatus(
if (activeLocks?.length > 0) {
balancerRes['Collections with active migrations'] = activeLocks.map(
(lock) => {
return `${lock._id} started at ${lock.when}`;
return `${lock._id} started at ${lock.when}` as const;
}
);
}
Expand Down Expand Up @@ -418,8 +422,23 @@ export async function getPrintableShardStatus(
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);

type MigrationResult =
| {
_id: 'Success';
count: number;
from: never;
to: never;
}
// Failed migration
| {
_id: string;
count: number;
from: string;
to: string;
};

// Successful migrations.
let migrations = await (
let migrations = (await (
await changelogColl.aggregate([
{
$match: {
Expand All @@ -437,11 +456,11 @@ export async function getPrintableShardStatus(
},
},
])
).toArray();
).toArray()) as MigrationResult[];

// Failed migrations.
migrations = migrations.concat(
await (
(await (
await changelogColl.aggregate([
{
$match: {
Expand Down Expand Up @@ -472,11 +491,12 @@ export async function getPrintableShardStatus(
},
},
])
).toArray()
).toArray()) as MigrationResult[]
);

const migrationsRes: Record<number, string> = {};
migrations.forEach((x: any) => {
const migrationsRes: ShardingStatusResult['balancer']['Migration Results for the last 24 hours'] =
{};
migrations.forEach((x) => {
if (x._id === 'Success') {
migrationsRes[x.count] = x._id;
} else {
Expand All @@ -500,7 +520,7 @@ export async function getPrintableShardStatus(
// All databases in config.databases + those implicitly referenced
// by a sharded collection in config.collections
// (could become a single pipeline using $unionWith when we drop 4.2 server support)
const [databases, collections] = await Promise.all([
const [databases, collections, shardedDataDistribution] = await Promise.all([
(async () =>
await (await configDB.getCollection('databases').find())
.sort({ _id: 1 })
Expand All @@ -513,7 +533,22 @@ export async function getPrintableShardStatus(
)
.sort({ _id: 1 })
.toArray())(),
(async () => {
try {
// $shardedDataDistribution is available since >= 6.0.3
const adminDB = configDB.getSiblingDB('admin');
return (await (
await adminDB.aggregate([{ $shardedDataDistribution: {} }])
).toArray()) as ShardedDataDistribution;
} catch {
// Pass, most likely an older version.
return undefined;
}
})(),
]);

result.shardedDataDistribution = shardedDataDistribution;

// Special case the config db, since it doesn't have a record in config.databases.
databases.push({ _id: 'config', primary: 'config', partitioned: true });

Expand Down Expand Up @@ -648,6 +683,65 @@ export async function getPrintableShardStatus(
return result;
}

export type ShardingStatusResult = {
shardingVersion: {
_id: number;
clusterId: ObjectId;
/** This gets deleted when it is returned from getPrintableShardStatus */
currentVersion?: number;
};
shards: {
_id: string;
host: string;
state: number;
tags: string[];
topologyTime: Timestamp;
replSetConfigVersion: Long;
}[];
[mongoses: `${string} mongoses`]:
| 'none'
| {
[version: string]:
| number
| {
up: number;
waiting: boolean;
};
}[];
autosplit: {
'Currently enabled': 'yes' | 'no';
};
balancer: {
'Currently enabled': 'yes' | 'no';
'Currently running': 'yes' | 'no' | 'unknown';
'Failed balancer rounds in last 5 attempts': number;
'Migration Results for the last 24 hours':
| 'No recent migrations'
| {
[count: number]:
| 'Success'
| `Failed with error '${string}', from ${string} to ${string}`;
};
'Balancer active window is set between'?: `${string} and ${string} server local time`;
'Last reported error'?: string;
'Time of Reported error'?: string;
'Collections with active migrations'?: `${string} started at ${string}`[];
};
shardedDataDistribution?: ShardedDataDistribution;
databases: { database: Document; collections: Document }[];
};

export type ShardedDataDistribution = {
ns: string;
shards: {
shardName: string;
numOrphanedDocs: number;
numOwnedDocuments: number;
orphanedSizeBytes: number;
ownedSizeBytes: number;
}[];
}[];

export async function getConfigDB(db: Database): Promise<Database> {
const helloResult = await db._maybeCachedHello();
if (helloResult.msg !== 'isdbgrid') {
Expand Down
Loading

0 comments on commit e91cd29

Please sign in to comment.