Skip to content

Commit

Permalink
correctly implement ReserveAbort for modified reservations
Browse files Browse the repository at this point in the history
- do not allow abort on initial reservations
- add invalid transition NSI exception
- rollback to previous version of reservation on successful abort
- add fixtures for modified reservation and connection
- add fixture for protobuf reserve modify request
- add unit test for disallowed abort on initial reservation
- add reservation rollback tests to existing reserve abort confirmed unit test
  • Loading branch information
hanstrompert committed Sep 26, 2024
1 parent f89163c commit 5016f8d
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 72 deletions.
40 changes: 27 additions & 13 deletions src/supa/connection/provider/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,29 +601,43 @@ def ReserveAbort(self, pb_reserve_abort_request: GenericRequest, context: Servic
),
)
else:
try:
rsm = ReservationStateMachine(reservation, state_field="reservation_state")
rsm.reserve_abort_request()
except TransitionNotAllowed as tna:
log.info("Not scheduling ReserveAbortJob", reason=str(tna))
if len(reservation.p2p_criteria_list) <= 1:
log.info("Cannot abort an initial reserve request, abort only allowed on modify")
reserve_abort_response = GenericAcknowledgment(
header=to_response_header(pb_reserve_abort_request.header),
service_exception=to_service_exception(
NsiException(
InvalidTransition,
str(tna),
{
Variable.CONNECTION_ID: str(connection_id),
Variable.RESERVATION_STATE: reservation.reservation_state,
},
"cannot abort an initial reserve request, abort only allowed on modify",
{Variable.CONNECTION_ID: str(connection_id)},
),
connection_id,
),
)
else:
reserve_abort_response = GenericAcknowledgment(
header=to_response_header(pb_reserve_abort_request.header)
)
try:
rsm = ReservationStateMachine(reservation, state_field="reservation_state")
rsm.reserve_abort_request()
except TransitionNotAllowed as tna:
log.info("Not scheduling ReserveAbortJob", reason=str(tna))
reserve_abort_response = GenericAcknowledgment(
header=to_response_header(pb_reserve_abort_request.header),
service_exception=to_service_exception(
NsiException(
InvalidTransition,
str(tna),
{
Variable.CONNECTION_ID: str(connection_id),
Variable.RESERVATION_STATE: reservation.reservation_state,
},
),
connection_id,
),
)
else:
reserve_abort_response = GenericAcknowledgment(
header=to_response_header(pb_reserve_abort_request.header)
)

if not reserve_abort_response.service_exception.connection_id:
from supa import scheduler
Expand Down
26 changes: 23 additions & 3 deletions src/supa/job/reserve.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ def __call__(self) -> None:
The reservation will be aborted and
a ReserveAbortConfirmed message will be sent to the NSA/AG.
If the reservation state machine is not in the correct state for a ReserveCommit
If the reservation state machine is not in the correct state for a ReserveAbort
an NSI error is returned leaving the state machine unchanged.
"""
self.log.info("Reserve abort reservation")
Expand Down Expand Up @@ -659,7 +659,27 @@ def __call__(self) -> None:
else:
request = to_generic_confirmed_request(reservation)
reservation.reservation_timeout = False # would probably be better to add reservation state to fsm
rsm.reserve_abort_confirmed()
# only allowed to abort a reserve modify request, e.q. there is more than one criteria version
if len(reservation.p2p_criteria_list) > 1:
# 1. set connection.bandwidth to previous bandwidth
connection.bandwidth = reservation.p2p_criteria_list[-2].bandwidth
# 2. remove most recent version of criteria
session.delete(reservation.p2p_criteria_list[-1])
# 3. remove most recent version of schedule
session.delete(reservation.schedules[-1])
# 4. decrement reservation version with one
reservation.version -= 1
rsm.reserve_abort_confirmed()
else: # the server should have prevented that this code is reached
request = to_error_request(
to_header(reservation),
NsiException(
GenericInternalError,
"cannot abort an initial reserve request, should not have reached this code",
{Variable.CONNECTION_ID: str(self.connection_id)},
),
self.connection_id,
)

stub = requester.get_stub()
if isinstance(request, GenericConfirmedRequest):
Expand Down Expand Up @@ -768,7 +788,7 @@ def __call__(self) -> None:
)
else:
#
# TODO: release reserved resources(?)
# TODO: release reserved resources in NRM(?)
#
self.log.debug("set reservation timeout to true in db")
request = _to_reserve_timeout_request(reservation)
Expand Down
51 changes: 51 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,47 @@ def connection_id() -> Generator[UUID, None, None]:
# connection, schedule and p2p_criteria are deleted through cascade


@pytest.fixture()
def connection_id_modified(connection_id: UUID) -> None:
"""Transform a connection ID into a modified connection ID."""
from supa.db.session import db_session

with db_session() as session:
reservation = session.query(Reservation).filter(Reservation.connection_id == connection_id).one()
reservation.version = 1
reservation.schedules.append(
Schedule(
version=1,
start_time=datetime.now(timezone.utc) + timedelta(minutes=20),
end_time=datetime.now(timezone.utc) + timedelta(minutes=30),
)
)
reservation.p2p_criteria_list.append(
P2PCriteria(
version=1,
bandwidth=20,
symmetric=True,
src_domain="example.domain:2001",
src_topology="topology",
src_stp_id="port1",
src_vlans="1783",
src_selected_vlan=1783,
dst_domain="example.domain:2001",
dst_topology="topology",
dst_stp_id="port2",
dst_vlans="1783",
dst_selected_vlan=1783,
)
)
request = Request(
connection_id=connection_id,
correlation_id=uuid4(),
request_type=RequestType.Reserve, # should add specific request type
request_data=b"should add request message here",
)
session.add(request)


@pytest.fixture
def connection(connection_id: Column) -> None:
"""Add connection record for given connection_id."""
Expand Down Expand Up @@ -183,6 +224,16 @@ def connection(connection_id: Column) -> None:
session.add(connection)


@pytest.fixture
def connection_modified(connection_id: Column, connection: None) -> None:
"""Add connection record for given connection_id."""
from supa.db.session import db_session

with db_session() as session:
connection_from_db = session.query(Connection).filter(Connection.connection_id == connection_id).one()
connection_from_db.bandwidth = 20


@pytest.fixture()
def reserve_timeout_job(connection_id: Column) -> None:
"""Schedule a ReserveTimeoutJob for connection_id."""
Expand Down
101 changes: 67 additions & 34 deletions tests/connection/provider/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ def pb_reserve_request(
return pb_request


@pytest.fixture()
def pb_reserve_modify_request(pb_reserve_request: ReserveRequest, connection_id: UUID) -> ReserveRequest:
"""Create protobuf reserve modify request with connection_id added to request."""
pb_reserve_request.connection_id = str(connection_id)
return pb_reserve_request


@pytest.fixture()
def pb_reserve_request_end_time_before_start_time(pb_reserve_request: ReserveRequest) -> ReserveRequest:
"""Modify schedule of reserve request so that end time is before start time."""
Expand Down Expand Up @@ -185,14 +192,14 @@ def test_reserve_request_end_time_in_past(pb_reserve_request_end_time_in_past: R
assert "End time lies in the past" in caplog.text


def test_reserve_modify(pb_reserve_request: ReserveRequest, connection_id: UUID, connection: None, caplog: Any) -> None:
def test_reserve_modify(
pb_reserve_modify_request: ReserveRequest, connection_id: UUID, connection: None, caplog: Any
) -> None:
"""Test the connection provider Reserve Modify happy path."""
service = ConnectionProviderService()
mock_context = unittest.mock.create_autospec(spec=ServicerContext)
request_correlation_id = pb_reserve_request.header.correlation_id
# add existing connection_id to reservation to mark this a modify request
pb_reserve_request.connection_id = str(connection_id)
reserve_response = service.Reserve(pb_reserve_request, mock_context)
request_correlation_id = pb_reserve_modify_request.header.correlation_id
reserve_response = service.Reserve(pb_reserve_modify_request, mock_context)
assert request_correlation_id == reserve_response.header.correlation_id
assert not reserve_response.header.reply_to
assert reserve_response.connection_id == str(connection_id)
Expand All @@ -203,17 +210,15 @@ def test_reserve_modify(pb_reserve_request: ReserveRequest, connection_id: UUID,


def test_reserve_modify_illegal_version(
pb_reserve_request: ReserveRequest, connection_id: UUID, connection: None, caplog: Any
pb_reserve_modify_request: ReserveRequest, connection_id: UUID, connection: None, caplog: Any
) -> None:
"""Test the connection provider Reserve Modify returns UnsupportedParameter exception on illegal version."""
service = ConnectionProviderService()
mock_context = unittest.mock.create_autospec(spec=ServicerContext)
request_correlation_id = pb_reserve_request.header.correlation_id
# add existing connection_id to reservation to mark this a modify request
pb_reserve_request.connection_id = str(connection_id)
request_correlation_id = pb_reserve_modify_request.header.correlation_id
# criteria version may only be incremented by 1
pb_reserve_request.criteria.version = pb_reserve_request.criteria.version + 2
reserve_response = service.Reserve(pb_reserve_request, mock_context)
pb_reserve_modify_request.criteria.version = pb_reserve_modify_request.criteria.version + 2
reserve_response = service.Reserve(pb_reserve_modify_request, mock_context)
assert request_correlation_id == reserve_response.header.correlation_id
assert not reserve_response.header.reply_to
assert not reserve_response.connection_id
Expand All @@ -224,16 +229,16 @@ def test_reserve_modify_illegal_version(


def test_reserve_modify_unknown_connection_id(
pb_reserve_request: ReserveRequest, connection: None, caplog: Any
pb_reserve_modify_request: ReserveRequest, connection: None, caplog: Any
) -> None:
"""Test the connection provider Reserve returns ReservationNonExistent exception for unknown connection id."""
service = ConnectionProviderService()
mock_context = unittest.mock.create_autospec(spec=ServicerContext)
request_correlation_id = pb_reserve_request.header.correlation_id
request_correlation_id = pb_reserve_modify_request.header.correlation_id
# add unknown connection_id to this modify request
non_existing_connection_id = str(uuid4())
pb_reserve_request.connection_id = str(non_existing_connection_id)
reserve_response = service.Reserve(pb_reserve_request, mock_context)
pb_reserve_modify_request.connection_id = str(non_existing_connection_id)
reserve_response = service.Reserve(pb_reserve_modify_request, mock_context)
assert request_correlation_id == reserve_response.header.correlation_id
assert not reserve_response.header.reply_to
assert not reserve_response.connection_id
Expand All @@ -246,17 +251,17 @@ def test_reserve_modify_unknown_connection_id(


def test_reserve_modify_reservation_already_started(
pb_reserve_request: ReserveRequest, connection_id: UUID, start_now: None, connection: None, caplog: Any
pb_reserve_modify_request: ReserveRequest, connection_id: UUID, start_now: None, connection: None, caplog: Any
) -> None:
"""Test the connection provider Reserve returns UnsupportedParameter exception when already started."""
service = ConnectionProviderService()
mock_context = unittest.mock.create_autospec(spec=ServicerContext)
request_correlation_id = pb_reserve_request.header.correlation_id
# add existing connection_id to reservation to mark this a modify request
pb_reserve_request.connection_id = str(connection_id)
request_correlation_id = pb_reserve_modify_request.header.correlation_id
# change start time to 1 minute in te future
pb_reserve_request.criteria.schedule.start_time.FromDatetime(datetime.now(timezone.utc) + timedelta(minutes=1))
reserve_response = service.Reserve(pb_reserve_request, mock_context)
pb_reserve_modify_request.criteria.schedule.start_time.FromDatetime(
datetime.now(timezone.utc) + timedelta(minutes=1)
)
reserve_response = service.Reserve(pb_reserve_modify_request, mock_context)
assert request_correlation_id == reserve_response.header.correlation_id
assert not reserve_response.header.reply_to
assert not reserve_response.connection_id
Expand All @@ -267,15 +272,13 @@ def test_reserve_modify_reservation_already_started(


def test_reserve_modify_invalid_transition(
pb_reserve_request: ReserveRequest, connection_id: UUID, reserve_held: None, connection: None, caplog: Any
pb_reserve_modify_request: ReserveRequest, connection_id: UUID, reserve_held: None, connection: None, caplog: Any
) -> None:
"""Test the connection provider Reserve Modify returns InvalidTransition exception when not in modifiable state."""
service = ConnectionProviderService()
mock_context = unittest.mock.create_autospec(spec=ServicerContext)
request_correlation_id = pb_reserve_request.header.correlation_id
# add existing connection_id to reservation to mark this a modify request
pb_reserve_request.connection_id = str(connection_id)
reserve_response = service.Reserve(pb_reserve_request, mock_context)
request_correlation_id = pb_reserve_modify_request.header.correlation_id
reserve_response = service.Reserve(pb_reserve_modify_request, mock_context)
assert request_correlation_id == reserve_response.header.correlation_id
assert not reserve_response.header.reply_to
assert not reserve_response.connection_id
Expand All @@ -288,17 +291,15 @@ def test_reserve_modify_invalid_transition(


def test_reserve_modify_unset_criteria_version(
pb_reserve_request: ReserveRequest, connection_id: UUID, connection: None, caplog: Any
pb_reserve_modify_request: ReserveRequest, connection_id: UUID, connection: None, caplog: Any
) -> None:
"""Test the connection provider Reserve Modify returns InvalidTransition exception when not in modifiable state."""
service = ConnectionProviderService()
mock_context = unittest.mock.create_autospec(spec=ServicerContext)
request_correlation_id = pb_reserve_request.header.correlation_id
# add existing connection_id to reservation to mark this a modify request
pb_reserve_request.connection_id = str(connection_id)
request_correlation_id = pb_reserve_modify_request.header.correlation_id
# criteria version set to 0, is equivalent to unset (Python ProtoBuf is unable to distinguish)
pb_reserve_request.criteria.version = 0
reserve_response = service.Reserve(pb_reserve_request, mock_context)
pb_reserve_modify_request.criteria.version = 0
reserve_response = service.Reserve(pb_reserve_modify_request, mock_context)
assert request_correlation_id == reserve_response.header.correlation_id
assert not reserve_response.header.reply_to
assert reserve_response.connection_id == str(connection_id)
Expand Down Expand Up @@ -377,7 +378,14 @@ def test_reserve_commit_timed_out(
assert "Cannot commit a timed out reservation" in caplog.text


def test_reserve_abort(pb_reserve_abort_request: GenericRequest, reserve_held: None, caplog: Any) -> None:
def test_reserve_abort(
connection_id: UUID,
connection_id_modified: None,
connection: None,
pb_reserve_abort_request: GenericRequest,
reserve_held: None,
caplog: Any,
) -> None:
"""Test the connection provider ReserveAbort happy path."""
service = ConnectionProviderService()
mock_context = unittest.mock.create_autospec(spec=ServicerContext)
Expand Down Expand Up @@ -406,8 +414,33 @@ def test_reserve_abort_random_connection_id(pb_reserve_abort_request: GenericReq
assert "Connection ID does not exist" in caplog.text


def test_reserve_abort_initial_reserve_request(
connection_id: UUID,
pb_reserve_abort_request: GenericRequest,
caplog: Any,
) -> None:
"""Test the connection provider ReserveAbort returns service exception when aborting initial reserve request."""
service = ConnectionProviderService()
mock_context = unittest.mock.create_autospec(spec=ServicerContext)
reserve_abort_response = service.ReserveAbort(pb_reserve_abort_request, mock_context)
assert pb_reserve_abort_request.header.correlation_id == reserve_abort_response.header.correlation_id
assert pb_reserve_abort_request.connection_id == reserve_abort_response.service_exception.connection_id
assert not reserve_abort_response.header.reply_to
assert reserve_abort_response.HasField("service_exception")
assert reserve_abort_response.service_exception.error_id == "00201"
assert len(reserve_abort_response.service_exception.variables) == 1
assert reserve_abort_response.service_exception.variables[0].type == "connectionId"
assert reserve_abort_response.service_exception.variables[0].value == pb_reserve_abort_request.connection_id
assert "Cannot abort an initial reserve request, abort only allowed on modify" in caplog.text


def test_reserve_abort_invalid_transition(
pb_reserve_abort_request: GenericRequest, reserve_aborting: None, caplog: Any
connection_id: UUID,
connection_id_modified: None,
connection: None,
pb_reserve_abort_request: GenericRequest,
reserve_aborting: None,
caplog: Any,
) -> None:
"""Test the connection provider ReserveAbort returns service exception when in invalid state for request."""
service = ConnectionProviderService()
Expand Down
Loading

0 comments on commit 5016f8d

Please sign in to comment.