Skip to content

Commit

Permalink
improve support for embedded csrs
Browse files Browse the repository at this point in the history
  • Loading branch information
Renato Riccio committed Nov 12, 2024
1 parent fb4b269 commit 89fd5d6
Showing 1 changed file with 54 additions and 24 deletions.
78 changes: 54 additions & 24 deletions mtools/mlaunch/mlaunch.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def run(self, arguments=None):
'several singles or replica sets. '
'Provide either list of shard names or '
'number of shards.'))
init_parser.add_argument('--config', action='store', default=1,
init_parser.add_argument('--config', action='store', default=-1,
type=int, metavar='NUM',
help=('adds NUM config servers to sharded '
'setup (requires --sharded, default=1)'))
Expand Down Expand Up @@ -663,8 +663,13 @@ def init(self):
sys.stderr.write('warning: server requires certificates but no'
' --tlsClientCertificateKeyFile provided\n')
# number of default config servers
if self.args['config'] == -1:
self.args['config'] = 1
if self.args['config'] == -1 and self.args["sharded"]:
if self.args['embeddedcsrs']:
print("Config members set to: " + str(self.args['nodes']))
self.args['config'] = self.args['nodes']
else:
print("Config members set to: 1")
self.args['config'] = 1

# add the 'csrs' parameter as default for MongoDB >= 3.3.0
if (version.parse(self.current_version) >= version.parse("3.3.0") or
Expand All @@ -675,6 +680,11 @@ def init(self):
print("--embeddedcsrs can only be used with MongoDB 8.0.0+")
sys.exit(1)

# Check if embedded is used, if it's the case the number of shards should be decremented by 1
if self.args['embeddedcsrs'] and 'sharded' in self.args and self.args['sharded']:
if len(self.args['sharded']) == 1:
self.args['sharded'][0] = str(int(self.args['sharded'][0]) - 1)

# construct startup strings
self._construct_cmdlines()

Expand Down Expand Up @@ -727,8 +737,6 @@ def init(self):
errmsg += (" * You can also specify a different port range with "
"an additional '--port <startport>'\n")
raise SystemExit(errmsg)



if self.args['sharded']:

Expand All @@ -746,20 +754,18 @@ def init(self):
print('Initiating config server replica set.')
members = sorted(self.get_tagged(["config"]))
self._initiate_replset(members[0], "configRepl")

if not (self.args['embeddedcsrs'] and int(self.args['sharded'][0]) == 1):
for shard in shard_names:
# initiate replica set on first member
if self.args['verbose']:
print('Initiating shard replica set %s.' % shard)
members = sorted(self.get_tagged([shard]))
self._initiate_replset(members[0], shard)
for shard in shard_names:
# initiate replica set on first member
if self.args['verbose']:
print('Initiating shard replica set %s.' % shard)
members = sorted(self.get_tagged([shard]))
self._initiate_replset(members[0], shard)

# add mongos
mongos = sorted(self.get_tagged(['mongos', 'down']))
self._start_on_ports(mongos, wait=True, override_auth=True)

if first_init and not (self.args['embeddedcsrs'] and int(self.args['sharded'][0]) == 1):
if first_init:
# add shards
mongos = sorted(self.get_tagged(['mongos']))
con = self.client('localhost:%i' % mongos[0])
Expand Down Expand Up @@ -804,10 +810,9 @@ def init(self):
time.sleep(1)

if self.args['embeddedcsrs']:
if self.args['verbose']:
print("Configuring embedded config servers")
print("Configuring embedded config servers")
con = self.client('localhost:%i' % mongos[0])
con.admin.command({"transitionFromDedicatedConfigServer": 1})
con.admin.command({'transitionFromDedicatedConfigServer': 1})

elif self.args['single']:
# just start node
Expand Down Expand Up @@ -1046,12 +1051,26 @@ def list(self):
print_docs.append(None)

# configs
# temporary list used to find the list of running nodes
tmp_config_list = []
string_config = "config server"
for node in sorted(self.get_tagged(['config'])):
doc = OrderedDict([('process', 'config server'),
if self.cluster_running[node]:
status = 'running'
if self._check_if_embeddedcsrs():
string_config = "config shard"
else:
status = 'down'

doc = OrderedDict([('process', string_config),
('port', node),
('status', 'running'
if self.cluster_running[node] else 'down')])
print_docs.append(doc)
('status', status)])
tmp_config_list.append(doc)

# construct the final list with the right config type
for item in tmp_config_list:
item['process'] = string_config
print_docs.append(item)

if len(self.get_tagged(['config'])) > 0:
print_docs.append(None)
Expand Down Expand Up @@ -1758,9 +1777,6 @@ def _get_shard_names(self, args):
# ... (only works with replica sets)
n_shards = int(args['sharded'][0])

if args["embeddedcsrs"]:
n_shards -= 1

shard_names = ['shard%.2i'
% (i + 1) for i in range(n_shards)]
except ValueError:
Expand Down Expand Up @@ -2207,6 +2223,20 @@ def _read_key_file(self, keyfile=None):
with open(keyfile, 'rb') as f:
return ''.join(f.readlines())

def _check_if_embeddedcsrs(self) -> bool:
"""
Returns True if embedded CSRS is used
"""
mongos = sorted(self.get_tagged(['mongos']))
con = self.client('localhost:%i' % mongos[0], readPreference="primaryPreferred")
try:
if con['config']['shards'].find_one({ '_id': 'config' }):
return True
return False
except OperationFailure:
print("WARNING: Unable to check if config server is embedded")
return False

def main():
tool = MLaunchTool()
tool.run()
Expand Down

0 comments on commit 89fd5d6

Please sign in to comment.