Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE - Add range port #203

Merged
merged 10 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions exegol/console/cli/SyntaxFormat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from enum import Enum


class SyntaxFormat(Enum):
port_sharing = "[default green not bold][<host_ipv4>:]<host_port>[-<end_host_port>][:<container_port>[-<end_container_port>]][:<protocol>][/default green not bold]"
desktop_config = "[blue]proto[:ip[:port]][/blue]"
volume = "/path/on/host/:/path/in/container/[blue][:ro|rw][/blue]"

def __str__(self):
return self.value
7 changes: 4 additions & 3 deletions exegol/console/cli/actions/GenericParameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from exegol.config.UserConfig import UserConfig
from exegol.console.cli.ExegolCompleter import ContainerCompleter, ImageCompleter, VoidCompleter, DesktopConfigCompleter
from exegol.console.cli.SyntaxFormat import SyntaxFormat
from exegol.console.cli.actions.Command import Option, GroupArg


Expand Down Expand Up @@ -191,12 +192,12 @@ def __init__(self, groupArgs: List[GroupArg]):
action="append",
default=[],
dest="volumes",
help="Share a new volume between host and exegol (format: --volume /path/on/host/:/path/in/container/[blue][:ro|rw][/blue])")
help=f"Share a new volume between host and exegol (format: --volume {SyntaxFormat.volume})")
self.ports = Option("-p", "--port",
action="append",
default=[],
dest="ports",
help="Share a network port between host and exegol (format: --port [<host_ipv4>:]<host_port>[:<container_port>][:<protocol>]. This configuration will disable the shared network with the host.",
help=f"Share a network port between host and exegol (format: --port {SyntaxFormat.port_sharing}). This configuration will disable the default host network.",
completer=VoidCompleter)
self.hostname = Option("--hostname",
dest="hostname",
Expand Down Expand Up @@ -274,7 +275,7 @@ def __init__(self, groupArgs: List[GroupArg]):
default="",
action="store",
help=f"Configure your exegol desktop ([blue]{'[/blue] or [blue]'.join(UserConfig.desktop_available_proto)}[/blue]) and its exposure "
f"(format: [blue]proto[:ip[:port]][/blue]) "
f"(format: {SyntaxFormat.desktop_config}) "
f"(default: [blue]{UserConfig().desktop_default_proto}[/blue]:[blue]{'127.0.0.1' if UserConfig().desktop_default_localhost else '0.0.0.0'}[/blue]:[blue]<random>[/blue])",
completer=DesktopConfigCompleter)
groupArgs.append(GroupArg({"arg": self.desktop, "required": False},
Expand Down
177 changes: 138 additions & 39 deletions exegol/model/ContainerConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from docker.types import Mount
from rich.prompt import Prompt

from exegol.console.cli.SyntaxFormat import SyntaxFormat
from exegol.config.ConstantConfig import ConstantConfig
from exegol.config.EnvInfo import EnvInfo
from exegol.config.UserConfig import UserConfig
Expand Down Expand Up @@ -85,7 +86,7 @@ def __init__(self, container: Optional[Container] = None):
self.__sysctls: Dict[str, str] = {}
self.__envs: Dict[str, str] = {}
self.__labels: Dict[str, str] = {}
self.__ports: Dict[str, Optional[Union[int, Tuple[str, int], List[int], List[Dict[str, Union[int, str]]]]]] = {}
self.__ports: Dict[str, Optional[Union[int, Tuple[str, int], List[Union[int, Tuple[str, int], Dict[str, Union[int, str]]]]]]] = {}
self.__extra_host: Dict[str, str] = {}
self.interactive: bool = True
self.tty: bool = True
Expand Down Expand Up @@ -512,7 +513,7 @@ def enableDesktop(self, desktop_config: str = ""):

def configureDesktop(self, desktop_config: str, create_mode: bool = False):
"""Configure the exegol desktop feature from user parameters.
Accepted format: 'mode:host:port'
Accepted format: 'proto:host:port'
"""
self.__desktop_proto = UserConfig().desktop_default_proto
self.__desktop_host = "127.0.0.1" if UserConfig().desktop_default_localhost else "0.0.0.0"
Expand All @@ -537,7 +538,7 @@ def configureDesktop(self, desktop_config: str, create_mode: bool = False):
except ValueError:
logger.critical(f"Invalid desktop port: '{data}' is not a valid port.")
else:
logger.critical(f"Your configuration is invalid, please use the following format:[green]mode:host:port[/green]")
logger.critical(f"Your configuration is invalid, please use the following format: {SyntaxFormat.desktop_config}")

if self.__desktop_port is None:
logger.debug(f"Desktop port will be set automatically")
Expand Down Expand Up @@ -1117,9 +1118,13 @@ def addPort(self,
if protocol.lower() not in ['tcp', 'udp', 'sctp']:
raise ProtocolNotSupported(f"Unknown protocol '{protocol}'")
logger.debug(f"Adding port {host_ip}:{port_host} -> {port_container}/{protocol}")
self.__ports[f"{port_container}/{protocol}"] = (host_ip, port_host)
# Casting type because at this stage, the data is only controlled by the wrapper itself.
existing_config = self.__ports.get(f"{port_container}/{protocol}", [])
assert type(existing_config) is list
existing_config.append((host_ip, port_host))
self.__ports[f"{port_container}/{protocol}"] = existing_config

def getPorts(self) -> Dict[str, Optional[Union[int, Tuple[str, int], List[int], List[Dict[str, Union[int, str]]]]]]:
def getPorts(self) -> Dict[str, Optional[Union[int, Tuple[str, int], List[Union[int, Tuple[str, int], Dict[str, Union[int, str]]]]]]]:
"""Ports config getter"""
return self.__ports

Expand Down Expand Up @@ -1234,26 +1239,46 @@ def addUserDevice(self, user_device_config: str):
self.__addDevice(user_device_config)

def addRawPort(self, user_test_port: str):
"""Add port config from user input.
Format must be [<host_ipv4>:]<host_port>[:<container_port>][:<protocol>]
"""Add port config or range of ports from user input.
Format must be [<host_ipv4>:]<host_port>[-<end_host_port>][:<container_port>[-<end_container_port>]][:<protocol>]
If host_ipv4 is not set, default to 0.0.0.0
If container_port is not set, default is the same as host port
If container_port is not set, the same port(s) as host port(s) will be used
If protocol is not set, default is 'tcp'"""
match = re.search(r"^((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):)?(\d+):?(\d+)?:?(udp|tcp|sctp)?$", user_test_port)
# Regex to capture port ranges and protocols correctly
match = re.search(r"^((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):)?(\d+)(-(\d+))?:?(\d+)?(-(\d+))?:?(udp|tcp|sctp)?$", user_test_port)
if match is None:
logger.critical(f"Incorrect port syntax ({user_test_port}). Please use this format: [green][<host_ipv4>:]<host_port>[:<container_port>][:<protocol>][/green]")
logger.critical(f"Incorrect port syntax ({user_test_port}). Please use this format: '{SyntaxFormat.port_sharing}'")
return
host_ip = "0.0.0.0" if match.group(2) is None else match.group(2)
protocol = "tcp" if match.group(5) is None else match.group(5)
protocol = match.group(9) if match.group(9) else 'tcp'
try:
host_port = int(match.group(3))
container_port = host_port if match.group(4) is None else int(match.group(4))
if host_port > 65535 or container_port > 65535:
raise ValueError
except ValueError:
logger.critical(f"The syntax for opening prot in NAT is incorrect. The ports must be numbers between 0 and 65535. ({match.group(3)}:{match.group(4)})")
start_host_port = int(match.group(3))
end_host_port = int(match.group(5)) if match.group(5) else start_host_port
start_container_port_defined = match.group(6) is not None
end_container_port_defined = match.group(8) is not None
start_container_port = int(match.group(6)) if start_container_port_defined else start_host_port
# If start_container_port is not defined, use end_host_port, otherwise use start_container_port or end_container_port if defined
if not start_container_port_defined:
end_container_port = end_host_port
else:
end_container_port = int(match.group(8)) if match.group(8) else start_container_port
# check port consistency
if (len(range(start_host_port, end_host_port)) != len(range(start_container_port, end_container_port))) or (
start_host_port != end_host_port and (not start_container_port_defined and end_container_port_defined)) or (
start_host_port != end_host_port and (start_container_port_defined and not end_container_port_defined)):
logger.info(
f"Port sharing configuration does not respect standard usage ({user_test_port}). The configuration in the 'Container sumamry' below will be applied. Please consult the help section for more information on using the -p/--port option.")
# Check if start port is lower than end port
if end_host_port < start_host_port or end_container_port < start_container_port:
raise ValueError("End port cannot be less than start port.")
# Check if any port in the range exceeds the valid range
if end_host_port > 65535 or end_container_port > 65535:
raise ValueError(f"The syntax for opening port in NAT is incorrect. The ports must be numbers between 0 and 65535. ({end_host_port}:{end_container_port})")
except ValueError as e:
logger.critical(e)
return
self.addPort(host_port, container_port, protocol=protocol, host_ip=host_ip)
for host_port, container_port in zip(range(start_host_port, end_host_port + 1), range(start_container_port, end_container_port + 1)):
self.addPort(host_port, container_port, protocol=protocol, host_ip=host_ip)

def addRawEnv(self, env: str):
"""Parse and add an environment variable from raw user input"""
Expand Down Expand Up @@ -1375,35 +1400,109 @@ def getTextPorts(self) -> str:
Dict Port key = container port/protocol
Dict Port Values:
None = Random port
int = open port ont he host
int = open port on the host
tuple = (host_ip, port)
list of int = open multiple host port
list of dict = open one or more ports on host, key ('HostIp' / 'HostPort') and value ip or port"""
result = ''
# TODO if network bridge and container not started, ports config cannot be printed: add a user warning message

start_host_ip = None
start_host_port = None
previous_host_port: Optional[Union[str, int]] = None

start_container_protocole = None
start_container_port = None
previous_container_port = None

previous_entry = None

for container_config, host_config in self.__ports.items():
host_info = "Unknown"
# Parse config
current_container_port = int(container_config.split('/')[0])
current_container_protocole = container_config.split('/')[-1]
# We might have multiple host context config at the same time for the same container config
current_host_contexts: List[Dict[str, Union[str, int]]] = []
# Init range context, container side
if start_container_port is None:
start_container_port = current_container_port
previous_container_port = current_container_port
start_container_protocole = current_container_protocole

# Parse host config multiple format
if host_config is None:
host_info = "0.0.0.0:<Random port>"
elif type(host_config) is int:
host_info = f"0.0.0.0:{host_config}"
elif type(host_config) is tuple:
assert len(host_config) == 2
host_info = f"{host_config[0]}:{host_config[1]}"
elif type(host_config) is list:
sub_info = []
for sub_host_config in host_config:
if type(sub_host_config) is int:
sub_info.append(f"0.0.0.0:{sub_host_config}")
elif type(sub_host_config) is dict:
sub_port = sub_host_config.get('HostPort', '<Random port>')
current_host_contexts.append({"ip": "0.0.0.0",
"port": "<Random port>"})
else:
if type(host_config) is list:
host_configs: List[Union[int, Tuple[str, int], Dict[str, Union[int, str]]]] = host_config
else:
host_configs = cast(List[Union[int, Tuple[str, int], Dict[str, Union[int, str]]]], [host_config])

for current_host_config in host_configs:
if type(current_host_config) is int:
current_host_contexts.append({"ip": "0.0.0.0",
"port": current_host_config})
elif type(current_host_config) is tuple:
assert len(current_host_config) == 2
current_host_contexts.append({"ip": current_host_config[0],
"port": int(current_host_config[1])})
elif type(current_host_config) is dict:
sub_port = current_host_config.get('HostPort')
if sub_port is None:
sub_port = "<Random port>"
sub_info.append(f"{sub_host_config.get('HostIp', '0.0.0.0')}:{sub_port}")
if len(sub_info) > 0:
host_info = ", ".join(sub_info)
else:
logger.debug(f"Unknown port config: {type(host_config)}={host_config}")
result += f"{host_info} :right_arrow: {container_config}{os.linesep}"
elif type(sub_port) is str:
sub_port = int(sub_port)
current_host_contexts.append({"ip": current_host_config.get('HostIp', '0.0.0.0'),
"port": sub_port})
else:
logger.debug(f"Unknown port config: {type(host_config)}={host_config} :right_arrow: {container_config}")
continue

for current_context in current_host_contexts:
current_host_port = current_context.get("port")
current_host_ip = current_context.get('ip')

# Init range context
if start_host_port is None:
start_host_port = current_host_port
previous_host_port = current_host_port
start_host_ip = current_host_ip
# Check if range continue
elif (start_host_ip == current_host_ip and
current_container_protocole == start_container_protocole and
(current_host_port == previous_host_port or
current_host_port == previous_host_port + 1) and
(current_container_port == previous_container_port or
current_container_port == previous_container_port + 1)):
previous_host_port = current_host_port
previous_container_port = current_container_port
# If range exit, submit previous entry + reset new range context
else:
# Register previous range
if previous_entry:
result += previous_entry
# reset context host and container side
start_host_port = current_host_port
previous_host_port = current_host_port
start_host_ip = current_host_ip
start_container_port = current_container_port
previous_container_port = current_container_port
start_container_protocole = current_container_protocole

# Register last range
range_host_port = ""
if type(start_host_port) is int:
assert type(previous_host_port) is int
range_host_port = "" if previous_host_port - start_host_port <= 0 else f"-{previous_host_port}"
range_container_port = "" if previous_container_port - start_container_port <= 0 else f"-{previous_container_port}"
previous_entry = (f"{start_host_ip}:{start_host_port}{range_host_port} :right_arrow: "
f"{start_container_port}{range_container_port}/{start_container_protocole}{os.linesep}")

# Submit last entry is any
if previous_entry:
result += previous_entry

return result

def __str__(self):
Expand Down