Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SP-CRA security solutions #331

Merged
merged 20 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6706600
Added 2_0 folder for MS 2.0
SelinaTII Sep 18, 2023
0525b29
Added 2_0 folder for MS 2.0
SelinaTII Sep 18, 2023
445b80d
Added folder for jamming
SelinaTII Sep 19, 2023
a54694e
Added the script performing Signal Processing assisted Authentication…
anshul-tii Oct 2, 2023
95b6360
Improved script with bug fixes and resolving the issue of infinite ex…
anshul-tii Nov 6, 2023
58f7cca
Functional and regression test cases for the Signal processing assist…
anshul-tii Nov 6, 2023
fceb884
Adding Basic performance test Cases for the SP-CRA Implementation
anshul-tii Nov 6, 2023
8f21dd2
Adding relevant security test cases for SP-CRA Implementation
anshul-tii Nov 6, 2023
02ca7a7
Minor Comments formating
anshul-tii Nov 6, 2023
474b17f
Added following changes: sendall() is used instead of send, fix the …
anshul-tii Nov 8, 2023
d2aba4e
Resolved space issue to maintian uniformity in code pointed by Mika
anshul-tii Nov 8, 2023
2bcd76d
Merge branch 'develop' into PHYSEC_SOLN
anshul-tii Nov 9, 2023
128d1c4
Modified the script to comply with the main script. THe entry point o…
anshul-tii Nov 17, 2023
c394ef1
main.py script acting as an entry point for all securiy features. THe…
anshul-tii Nov 17, 2023
36a7899
updating branch
anshul-tii Nov 17, 2023
0e75f1e
RSS Authentication Scripts
anshul-tii Nov 21, 2023
8fde56c
Modified the RSS_Auth Class implementation to do away with passing s…
anshul-tii Nov 21, 2023
ba1beff
Modified the main.py script and the feature.yaml file to consider RSS…
anshul-tii Nov 21, 2023
1193a9d
Added a debug flag option to control the disply of authentication tab…
anshul-tii Nov 21, 2023
afc25c7
Making all feature disbaled by default in accordance to Sebastien com…
anshul-tii Nov 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions modules/sc-mesh-secure-deployment/src/2_0/features.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
PHY: true
IDS: false
jamming: false

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import pytest
import socket
from unittest.mock import patch, MagicMock, mock_open
from SP_CRA_mod import PHYCRA
import numpy as np

@pytest.fixture
def phycra_instance():
with patch('SP_CRA_mod.socket.socket'):
instance = PHYCRA()
instance.acf = MagicMock()
instance.acf_client = MagicMock()
yield instance
# Ensure proper cleanup
instance.server.close()
if hasattr(instance, 'shutdown_server'):
instance.shutdown_server()
instance.listen_sock.close()

# Test for __init__ method
def test_init(phycra_instance):
assert phycra_instance.SERVER is not None
assert phycra_instance.BROADCAST_PORT == 5051
assert phycra_instance.PORT == 5050

# Test for log_authentication method
def test_log_authentication(phycra_instance):
with patch('builtins.open', mock_open()) as mock_file:
phycra_instance.log_authentication('127.0.0.1', '00:00:00:00:00:00', 'Success')
mock_file.assert_called_with('server_log.txt', 'a')
mock_file().write.assert_called()

# Test for display_table method
def test_display_table(phycra_instance, capsys):
with patch('builtins.open', mock_open(read_data='2023-11-03 12:00:00\t127.0.0.1\t00:00:00:00:00:00\tSuccess\n')):
phycra_instance.display_table()
captured = capsys.readouterr()
assert '127.0.0.1' in captured.out
assert '00:00:00:00:00:00' in captured.out
assert 'Success' in captured.out

# Test for server_start method
def test_server_start(phycra_instance):
with patch('SP_CRA_mod.socket.socket') as mock_socket, \
patch('SP_CRA_mod.threading.Thread') as mock_thread:
phycra_instance.server_start()
mock_socket.assert_called_with(socket.AF_INET, socket.SOCK_STREAM)
mock_thread.assert_called()

# Test for broadcast_status method
def test_broadcast_status(phycra_instance):
with patch('SP_CRA_mod.socket.socket') as mock_socket:
phycra_instance.broadcast_status()
mock_socket.assert_called_with(socket.AF_INET, socket.SOCK_DGRAM)

# Test for get_mac_address method
def test_get_mac_address(phycra_instance):
with patch('SP_CRA_mod.get_mac_address') as mock_get_mac:
mock_get_mac.return_value = '00:00:00:00:00:00'
assert phycra_instance.get_mac_address() == '00:00:00:00:00:00'

# Test for connect_to_server method
def test_connect_to_server(phycra_instance):
with patch('SP_CRA_mod.socket.socket') as mock_socket, \
patch('SP_CRA_mod.pickle.loads') as mock_pickle_loads:
mock_socket.return_value.recv.return_value = b'pickle_data'
mock_pickle_loads.return_value = np.array([1, 2, 3])
phycra_instance.acf_client = np.array([[1, 2, 3]])
phycra_instance.connect_to_server('127.0.0.1')
mock_socket.assert_called_with(socket.AF_INET, socket.SOCK_STREAM)

# Test for listen_for_broadcast method
def test_listen_for_broadcast(phycra_instance):
with patch('SP_CRA_mod.socket.socket') as mock_socket, \
patch('SP_CRA_mod.PHYCRA.connect_to_server') as mock_connect:
phycra_instance.listen_for_broadcast()
mock_socket.assert_called_with(socket.AF_INET, socket.SOCK_DGRAM)
mock_connect.assert_not_called()

# Test for get_server_ip method
def test_get_server_ip(phycra_instance):
with patch('SP_CRA_mod.socket.socket') as mock_socket:
mock_socket.return_value.getsockname.return_value = ('127.0.0.1', 0)
assert phycra_instance.get_server_ip() == '127.0.0.1'

@patch('SP_CRA_mod.socket.socket')
def test_handle_client_success(mock_socket, phycra_instance):
# Set up
conn = MagicMock()
addr = ('192.168.1.1', 5050)
phycra_instance.acf = ['dummy_acf']
random_index = 0

with patch('SP_CRA_mod.random.randint', return_value=random_index), \
patch('SP_CRA_mod.pickle.dumps', return_value=b'encoded_acf'), \
patch('SP_CRA_mod.PHYCRA.log_authentication') as mock_log_auth:

conn.recv.side_effect = [
(2).to_bytes(2, 'big'), # rx_index_length
str(random_index).encode(phycra_instance.FORMAT), # rx_index
(2).to_bytes(2, 'big'), # mac_address_length
'AA:BB:CC:DD:EE:FF'.encode(phycra_instance.FORMAT) # mac_address
]

# Invoke
phycra_instance.handle_client(conn, addr)

# Check
conn.sendall.assert_called_with(b'encoded_acf')
mock_log_auth.assert_called_with(addr[0], 'AA:BB:CC:DD:EE:FF', "Success")

@patch('SP_CRA_mod.socket.socket')
def test_handle_client_fail(mock_socket, phycra_instance):
# Set up
conn = MagicMock()
addr = ('192.168.1.1', 5050)
phycra_instance.acf = ['dummy_acf']
random_index = 0
wrong_index = 1 # Different index to simulate failure

with patch('SP_CRA_mod.random.randint', return_value=random_index), \
patch('SP_CRA_mod.pickle.dumps', return_value=b'encoded_acf'), \
patch('SP_CRA_mod.PHYCRA.log_authentication') as mock_log_auth:

conn.recv.side_effect = [
(2).to_bytes(2, 'big'), # rx_index_length
str(wrong_index).encode(phycra_instance.FORMAT), # rx_index
(2).to_bytes(2, 'big'), # mac_address_length
'AA:BB:CC:DD:EE:FF'.encode(phycra_instance.FORMAT) # mac_address
]

# Invoke
phycra_instance.handle_client(conn, addr)

# Check
conn.sendall.assert_called_with(b'encoded_acf')
mock_log_auth.assert_called_with(addr[0], 'AA:BB:CC:DD:EE:FF', "Access denied")

# Main function to run the tests
if __name__ == '__main__':
options = ['-v', '-rA']
pytest.main(options)

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import threading
import time
import psutil
from SP_CRA_v7 import PHYCRA # SP_CRA_v7 is the main script/implementation

class PerformanceTest:
def __init__(self, num_clients):
self.num_clients = num_clients
self.phycra_instance = PHYCRA()
self.client_threads = []
self.start_time = None
self.end_time = None
self.cpu_usage = []
self.memory_usage = []
self.network_usage_start = psutil.net_io_counters()
self.network_usage_end = None

def run_test(self):
self.start_time = time.time()
for _ in range(self.num_clients):
client_thread = threading.Thread(target=self.simulate_client)
self.client_threads.append(client_thread)
client_thread.start()

# Monitor system resources in a separate thread
monitor_thread = threading.Thread(target=self.monitor_resources)
monitor_thread.start()

for thread in self.client_threads:
thread.join()

self.network_usage_end = psutil.net_io_counters()
self.end_time = time.time()
monitor_thread.join()
self.phycra_instance.stop_event.set()
self.phycra_instance.server_thread.join()
self.phycra_instance.listen_thread.join()
self.phycra_instance.broadcast_thread.join()
self.display_results()

def simulate_client(self):
self.phycra_instance.connect_to_server(self.phycra_instance.SERVER)

def monitor_resources(self):
while any(thread.is_alive() for thread in self.client_threads):
self.cpu_usage.append(psutil.cpu_percent(interval=1))
self.memory_usage.append(psutil.virtual_memory().percent)

def display_results(self):
total_time = self.end_time - self.start_time
avg_cpu_usage = sum(self.cpu_usage) / len(self.cpu_usage)
avg_memory_usage = sum(self.memory_usage) / len(self.memory_usage)
sent_bytes = self.network_usage_end.bytes_sent - self.network_usage_start.bytes_sent
recv_bytes = self.network_usage_end.bytes_recv - self.network_usage_start.bytes_recv
print(f"Total time for {self.num_clients} clients: {total_time:.2f} seconds")
print(f"Average time per client: {total_time / self.num_clients:.2f} seconds")
print(f"Average CPU usage during the test: {avg_cpu_usage:.2f}%")
print(f"Average memory usage during the test: {avg_memory_usage:.2f}%")
print(f"Total data sent: {sent_bytes} bytes")
print(f"Total data received: {recv_bytes} bytes")

if __name__ == "__main__":
num_clients = 10 # Change this to the number of simulated clients to be tested
performance_test = PerformanceTest(num_clients)
performance_test.run_test()

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import pytest
from unittest.mock import MagicMock
import numpy as np
from SP_CRA_v7 import PHYCRA

@pytest.fixture
def phycra_instance():
phycra = PHYCRA()
phycra.acf = np.array([[1, 2, 3], [4, 5, 6]]) # Simplified ACF for testing
phycra.acf_client = phycra.acf # Ensure client and server ACF are the same
return phycra

def test_handle_client_success(phycra_instance):
conn = MagicMock()
addr = ("127.0.0.1", 5050)
phycra_instance.handle_client(conn, addr)
assert conn.close.called

def test_handle_client_failure(phycra_instance):
# Simulate a client sending a wrong index
conn = MagicMock()
conn.recv.side_effect = [b'\x00\x01', b'1', b'\x00\x0c', b'fake-mac-address']
addr = ("127.0.0.1", 5050)
phycra_instance.handle_client(conn, addr)
assert conn.close.called

def test_handle_client_exception(phycra_instance):
# Simulate an exception during client handling
conn = MagicMock()
conn.recv.side_effect = Exception("Test Exception")
addr = ("127.0.0.1", 5050)
phycra_instance.handle_client(conn, addr)
assert conn.close.called

def test_handle_client_disconnection(phycra_instance):
# Simulate a client disconnecting unexpectedly
conn = MagicMock()
conn.recv.side_effect = [b'\x00\x01', b'0', b'\x00\x0c', ConnectionResetError("Client disconnected")]
addr = ("127.0.0.1", 5050)
phycra_instance.handle_client(conn, addr)
assert conn.close.called

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pytest
import socket
from unittest.mock import MagicMock, patch, call
from SP_CRA_v7 import PHYCRA

@pytest.fixture
def phycra_instance():
instance = PHYCRA()
instance.server_thread = MagicMock()
instance.listen_thread = MagicMock()
instance.broadcast_thread = MagicMock()
instance.stop_event = MagicMock()
instance.server = MagicMock()
# Mock the broadcast_sock if it doesn't exist in PHYCRA
instance.broadcast_sock = MagicMock()
return instance

# Mocking incorrect index transmission
def test_incorrect_index_transmission(phycra_instance):
with patch('socket.socket'):
conn = MagicMock()
phycra_instance.acf = MagicMock()
# Simulate receiving a malformed packet
conn.recv.return_value = b'\x00\x00\x00'
phycra_instance.handle_client(conn, ('127.0.0.1', 5050))
conn.send.assert_not_called() # Assuming it shouldn't send anything on error

# Basic security test cases
def test_replay_attack_prevention(phycra_instance):
with patch('socket.socket'):
conn = MagicMock()
addr = ('127.0.0.1', 5050)
phycra_instance.acf = MagicMock()
phycra_instance.acf.check_validity = MagicMock(return_value=False)
phycra_instance.handle_client(conn, addr)
conn.send.assert_not_called() # Assuming it shouldn't send anything if replay attack detected



Loading