diff --git a/check_stream_init_new.py b/check_stream_init_new.py new file mode 100644 index 00000000..f6162e5e --- /dev/null +++ b/check_stream_init_new.py @@ -0,0 +1,85 @@ +import py_neuromodulation as nm +from py_neuromodulation.stream.data_processor import DataProcessor +from py_neuromodulation.stream.rawdata_generator import RawDataGenerator +from py_neuromodulation.stream.mnelsl_generator import MNELSLGenerator +from py_neuromodulation.stream.mnelsl_player import LSLOfflinePlayer +import asyncio + +async def main(): + ( + RUN_NAME, + PATH_RUN, + PATH_BIDS, + PATH_OUT, + datatype, + ) = nm.io.get_paths_example_data() + + ( + raw, + data, + sfreq, + line_noise, + coord_list, + coord_names, + ) = nm.io.read_BIDS_data(PATH_RUN=PATH_RUN) + + channels = nm.utils.create_channels( + ch_names=raw.ch_names, + ch_types=raw.get_channel_types(), + reference="default", + bads=raw.info["bads"], + new_names="default", + used_types=("ecog", "dbs", "seeg"), + target_keywords=["MOV_RIGHT"], + ) + + settings = nm.NMSettings.get_fast_compute() + + data_generator = RawDataGenerator(data, + settings.sampling_rate_features_hz, + settings.segment_length_features_ms, + channels, + sfreq, + ) + + data_writer = nm.utils.data_writer.DataWriter( + out_dir=PATH_OUT, save_csv=True, save_interval=10, experiment_name=RUN_NAME + ) + + data_processor = DataProcessor( + sfreq=sfreq, + settings=settings, + channels=channels, + coord_names=coord_names, + coord_list=coord_list, + line_noise=line_noise, + verbose=True, + ) + + rawdata_generator = nm.stream.rawdata_generator.RawDataGenerator( + data, settings.sampling_rate_features_hz, settings.segment_length_features_ms, channels, sfreq + ) + + lslplayer = LSLOfflinePlayer(stream_name="example_stream", raw=raw) + import numpy as np + lslplayer.start_player(chunk_size=30, n_repeat=5000) + + lsl_generator = MNELSLGenerator( + segment_length_features_ms=settings.segment_length_features_ms, + sampling_rate_features_hz=settings.sampling_rate_features_hz, + stream_name="example_stream" + ) + + stream = nm.Stream(verbose=True) + + # get_event_loop might be necessary for calling run() without the main function + #df_features = asyncio.get_event_loop().run_until_complete( + features = await stream.run( + data_processor=data_processor, + #data_generator=rawdata_generator, + data_generator=lsl_generator, + data_writer=data_writer, + ) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/examples/plot_0_first_demo.py b/examples/plot_0_first_demo.py index ce1494e4..70fad59b 100644 --- a/examples/plot_0_first_demo.py +++ b/examples/plot_0_first_demo.py @@ -91,7 +91,7 @@ def generate_random_walk(NUM_CHANNELS, TIME_DATA_SAMPLES): # DataFrame. There are some helper functions that let you create the # nm_channels without much effort: -nm_channels = nm.utils.get_default_channels_from_data(data, car_rereferencing=True) +nm_channels = nm.utils.create_default_channels_from_data(data, car_rereferencing=True) nm_channels @@ -135,6 +135,7 @@ def generate_random_walk(NUM_CHANNELS, TIME_DATA_SAMPLES): # We are now ready to go to instantiate the *Stream* and call the *run* method for feature estimation: stream = nm.Stream( + data=data, settings=settings, channels=nm_channels, verbose=True, @@ -142,7 +143,7 @@ def generate_random_walk(NUM_CHANNELS, TIME_DATA_SAMPLES): line_noise=50, ) -features = stream.run(data, save_csv=True) +features = stream.run(save_csv=True) # %% # Feature Analysis diff --git a/examples/plot_1_example_BIDS.py b/examples/plot_1_example_BIDS.py index e8c314d0..fa64de9f 100644 --- a/examples/plot_1_example_BIDS.py +++ b/examples/plot_1_example_BIDS.py @@ -43,7 +43,7 @@ coord_names, ) = nm.io.read_BIDS_data(PATH_RUN=PATH_RUN) -channels = nm.utils.set_channels( +channels = nm.utils.create_channels( ch_names=raw.ch_names, ch_types=raw.get_channel_types(), reference="default", @@ -94,6 +94,8 @@ # %% stream = nm.Stream( + data=data, + experiment_name=RUN_NAME, sfreq=sfreq, channels=channels, settings=settings, @@ -105,9 +107,7 @@ # %% features = stream.run( - data=data, out_dir=PATH_OUT, - experiment_name=RUN_NAME, save_csv=True, ) diff --git a/examples/plot_3_example_sharpwave_analysis.py b/examples/plot_3_example_sharpwave_analysis.py index e7a9f3cb..9283c3b4 100644 --- a/examples/plot_3_example_sharpwave_analysis.py +++ b/examples/plot_3_example_sharpwave_analysis.py @@ -56,6 +56,8 @@ coord_names, ) = nm.io.read_BIDS_data(PATH_RUN=PATH_RUN) +print(data.shape) + # %% settings = NMSettings.get_fast_compute() @@ -69,7 +71,7 @@ for sw_feature in settings.sharpwave_analysis_settings.sharpwave_features.list_all(): settings.sharpwave_analysis_settings.estimator["mean"].append(sw_feature) -channels = nm.utils.set_channels( +channels = nm.utils.create_channels( ch_names=raw.ch_names, ch_types=raw.get_channel_types(), reference="default", @@ -79,7 +81,9 @@ target_keywords=["MOV_RIGHT"], ) -stream = nm.Stream( +data_plt = data[5, 1000:4000] + +data_processor = nm.DataProcessor( sfreq=sfreq, channels=channels, settings=settings, @@ -88,14 +92,12 @@ coord_names=coord_names, verbose=False, ) -sw_analyzer = cast( - SharpwaveAnalyzer, stream.data_processor.features.get_feature("sharpwave_analysis") -) + +sw_analyzer = data_processor.features.get_feature("sharpwave_analysis") + # %% # The plotted example time series, visualized on a short time scale, shows the relation of identified peaks, troughs, and estimated features: -data_plt = data[5, 1000:4000] - filtered_dat = fftconvolve(data_plt, sw_analyzer.list_filter[0][1], mode="same") troughs = signal.find_peaks(-filtered_dat, distance=10)[0] @@ -297,6 +299,7 @@ channels.loc[[3, 8], "used"] = 1 stream = nm.Stream( + data=data[:, :30000], sfreq=sfreq, channels=channels, settings=settings, @@ -306,7 +309,7 @@ verbose=True, ) -df_features = stream.run(data=data[:, :30000], save_csv=True) +df_features = stream.run(save_csv=True) # %% # We can then plot two exemplary features, prominence and interval, and see that the movement amplitude can be clustered with those two features alone: diff --git a/examples/plot_4_example_gridPointProjection.py b/examples/plot_4_example_gridPointProjection.py index 5caf9d2d..eaed7c45 100644 --- a/examples/plot_4_example_gridPointProjection.py +++ b/examples/plot_4_example_gridPointProjection.py @@ -54,7 +54,7 @@ settings.postprocessing.project_cortex = True -channels = nm.utils.set_channels( +channels = nm.utils.create_channels( ch_names=raw.ch_names, ch_types=raw.get_channel_types(), reference="default", @@ -65,6 +65,8 @@ ) stream = nm.Stream( + data=data[:, : int(sfreq * 5)], + experiment_name=RUN_NAME, sfreq=sfreq, channels=channels, settings=settings, @@ -75,9 +77,7 @@ ) features = stream.run( - data=data[:, : int(sfreq * 5)], out_dir=PATH_OUT, - experiment_name=RUN_NAME, save_csv=True, ) diff --git a/examples/plot_6_real_time_demo.py b/examples/plot_6_real_time_demo.py index 51198222..0863bbdd 100644 --- a/examples/plot_6_real_time_demo.py +++ b/examples/plot_6_real_time_demo.py @@ -95,6 +95,7 @@ def get_fast_compute_settings(): print("Computation time for single ECoG channel: ") data = np.random.random([1, 1000]) stream = nm.Stream(sfreq=1000, data=data, sampling_rate_features_hz=10, verbose=False) + print( f"{np.round(timeit.timeit(lambda: stream.data_processor.process(data), number=10)/10, 3)} s" ) diff --git a/examples/plot_7_lsl_example.py b/examples/plot_7_lsl_example.py index 915e4a70..cbdc5fe6 100644 --- a/examples/plot_7_lsl_example.py +++ b/examples/plot_7_lsl_example.py @@ -11,6 +11,7 @@ # %% from matplotlib import pyplot as plt import py_neuromodulation as nm +import time # %% # Let’s get the example data from the provided BIDS dataset and create the channels DataFrame. @@ -32,7 +33,7 @@ coord_names, ) = nm.io.read_BIDS_data(PATH_RUN=PATH_RUN) -channels = nm.utils.set_channels( +channels = nm.utils.create_channels( ch_names=raw.ch_names, ch_types=raw.get_channel_types(), reference="default", @@ -61,6 +62,9 @@ player = nm.stream.LSLOfflinePlayer(raw=raw, stream_name="example_stream") player.start_player(chunk_size=30) + +time.sleep(2) # Wait for stream to start + # %% # Creating the LSLStream object # ----------------------------- @@ -78,6 +82,9 @@ # %% stream = nm.Stream( sfreq=sfreq, + experiment_name=RUN_NAME, + is_stream_lsl=True, + stream_lsl_name="example_stream", channels=channels, settings=settings, coord_list=coord_list, @@ -87,13 +94,7 @@ # %% # We then simply have to set the `stream_lsl` parameter to be `True` and specify the `stream_lsl_name`. -features = stream.run( - is_stream_lsl=True, - plot_lsl=False, - stream_lsl_name="example_stream", - out_dir=PATH_OUT, - experiment_name=RUN_NAME, -) +features = stream.run(out_dir=PATH_OUT) # %% # We can then look at the computed features and check if the streamed data was processed correctly. diff --git a/py_neuromodulation/__init__.py b/py_neuromodulation/__init__.py index 688393fd..79607cfb 100644 --- a/py_neuromodulation/__init__.py +++ b/py_neuromodulation/__init__.py @@ -2,7 +2,6 @@ import platform from pathlib import PurePath from importlib.metadata import version -from py_neuromodulation.utils.logging import NMLogger ##################################### # Globals and environment variables # @@ -57,12 +56,6 @@ user_features = {} -###################################### -# Logger initialization and settings # -###################################### - -logger = NMLogger(__name__) # logger initialization first to prevent circular import - #################################### # API: Exposed classes and methods # #################################### @@ -75,6 +68,7 @@ from .utils import types from .utils import io +from .utils import data_writer from . import stream from . import analysis diff --git a/py_neuromodulation/analysis/decode.py b/py_neuromodulation/analysis/decode.py index 84d10644..829493b6 100644 --- a/py_neuromodulation/analysis/decode.py +++ b/py_neuromodulation/analysis/decode.py @@ -9,7 +9,7 @@ from pathlib import PurePath import pickle -from py_neuromodulation import logger +from py_neuromodulation.utils import logger from typing import Callable diff --git a/py_neuromodulation/analysis/plots.py b/py_neuromodulation/analysis/plots.py index 9afccf2b..3d6559a0 100644 --- a/py_neuromodulation/analysis/plots.py +++ b/py_neuromodulation/analysis/plots.py @@ -4,7 +4,7 @@ from matplotlib import gridspec import seaborn as sb from pathlib import PurePath -from py_neuromodulation import logger, PYNM_DIR +from py_neuromodulation.utils import logger, PYNM_DIR from py_neuromodulation.utils.types import _PathLike diff --git a/py_neuromodulation/features/__init__.py b/py_neuromodulation/features/__init__.py index f8ae2cb3..98424df3 100644 --- a/py_neuromodulation/features/__init__.py +++ b/py_neuromodulation/features/__init__.py @@ -28,4 +28,5 @@ FeatureProcessors, add_custom_feature, remove_custom_feature, + USE_FREQ_RANGES, ) diff --git a/py_neuromodulation/features/coherence.py b/py_neuromodulation/features/coherence.py index 21ca471b..4408fa80 100644 --- a/py_neuromodulation/features/coherence.py +++ b/py_neuromodulation/features/coherence.py @@ -11,7 +11,7 @@ FrequencyRange, NMBaseModel, ) -from py_neuromodulation import logger +from py_neuromodulation.utils import logger if TYPE_CHECKING: from py_neuromodulation import NMSettings diff --git a/py_neuromodulation/features/feature_processor.py b/py_neuromodulation/features/feature_processor.py index 9e970e8f..b1100b69 100644 --- a/py_neuromodulation/features/feature_processor.py +++ b/py_neuromodulation/features/feature_processor.py @@ -6,6 +6,16 @@ import numpy as np from py_neuromodulation import NMSettings +USE_FREQ_RANGES: list[FeatureName] = [ + "bandpass_filter", + "stft", + "fft", + "welch", + "bursts", + "coherence", + "nolds", + "bispectrum", +] FEATURE_DICT: dict[FeatureName | str, str] = { "raw_hjorth": "Hjorth", diff --git a/py_neuromodulation/filter/notch_filter.py b/py_neuromodulation/filter/notch_filter.py index bf7dec53..b2fd58ae 100644 --- a/py_neuromodulation/filter/notch_filter.py +++ b/py_neuromodulation/filter/notch_filter.py @@ -2,7 +2,7 @@ from typing import cast from py_neuromodulation.utils.types import NMPreprocessor -from py_neuromodulation import logger +from py_neuromodulation.utils import logger class NotchFilter(NMPreprocessor): diff --git a/py_neuromodulation/gui/backend/app_backend.py b/py_neuromodulation/gui/backend/app_backend.py index 74514cea..d91f6c96 100644 --- a/py_neuromodulation/gui/backend/app_backend.py +++ b/py_neuromodulation/gui/backend/app_backend.py @@ -12,7 +12,6 @@ Query, WebSocket, ) -from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware @@ -71,7 +70,9 @@ def __init__( def push_features_to_frontend(self, feature_queue: Queue) -> None: while True: - time.sleep(0.002) # NOTE: should be adapted depending on feature sampling rate + time.sleep( + 0.002 + ) # NOTE: should be adapted depending on feature sampling rate if feature_queue.empty() is False: self.logger.info("data in feature queue") features = feature_queue.get() @@ -231,7 +232,6 @@ async def setup_offline_stream(data: dict): ####################### @self.get("/api/app-info") - # TODO: fix this function async def get_app_info(): metadata = importlib.metadata.metadata("py_neuromodulation") url_list = metadata.get_all("Project-URL") @@ -353,25 +353,4 @@ def quick_access(): ########################### @self.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): - # if self.websocket_manager.is_connected: - # self.logger.info( - # "WebSocket connection attempted while already connected" - # ) - # await websocket.close( - # code=1008, reason="Another client is already connected" - # ) - # return - - await self.websocket_manager.connect(websocket) - # # ####################### - # # ### SPA ENTRY POINT ### - # # ####################### - # if not self.dev: - - # @self.get("/app/{full_path:path}") - # async def serve_spa(request, full_path: str): - # # Serve the index.html for any path that doesn't match an API route - # print(Path.cwd()) - # return FileResponse("frontend/index.html") - - + await self.websocket_manager.connect(websocket) diff --git a/py_neuromodulation/gui/backend/app_pynm.py b/py_neuromodulation/gui/backend/app_pynm.py index f60c51c0..c989efe6 100644 --- a/py_neuromodulation/gui/backend/app_pynm.py +++ b/py_neuromodulation/gui/backend/app_pynm.py @@ -1,10 +1,9 @@ -import asyncio import logging import numpy as np -from multiprocessing import Process, Queue +from multiprocessing import Queue from py_neuromodulation.stream import Stream, NMSettings -from py_neuromodulation.utils import set_channels +from py_neuromodulation.utils import create_channels from py_neuromodulation.utils.io import read_mne_data @@ -26,9 +25,8 @@ async def start_run_function( self, out_dir: str = "", experiment_name: str = "sub", - websocket_manager_features=None, + websocket_manager=None, ) -> None: - # TODO: we should add a way to pass the output path and the foldername # Initialize the stream with as process with a queue that is passed to the stream # The stream will then put the results in the queue # there should be another websocket in which the results are sent to the frontend @@ -37,20 +35,6 @@ async def start_run_function( self.logger.info("setup stream Process") - # self.run_process = Process( - # target=self.stream.run, - # kwargs={ - # "out_dir": out_dir, - # "experiment_name": experiment_name, - # "feature_queue": feature_queue, - # "stream_handling_queue": stream_handling_queue, - # "is_stream_lsl": self.lsl_stream_name is not None, - # "stream_lsl_name": self.lsl_stream_name - # if self.lsl_stream_name is not None - # else "", - # }, - # ) - #asyncio.run( await self.stream.run( out_dir=out_dir, experiment_name=experiment_name, @@ -59,71 +43,23 @@ async def start_run_function( stream_lsl_name=self.lsl_stream_name if self.lsl_stream_name is not None else "", - websocket_featues=websocket_manager_features, + websocket_featues=websocket_manager, ) - # self.logger.info("initialized run process") - - # self.run_process.start() - - # import time - # time.sleep(2) - # self.logger.info(f"Stream running: {self.stream.is_running}") - - def setup_lsl_stream( self, lsl_stream_name: str | None = None, line_noise: float | None = None, sampling_rate_features: float | None = None, ): - from mne_lsl.lsl import resolve_streams - - self.logger.info("resolving streams") - lsl_streams = resolve_streams() - - for stream in lsl_streams: - if stream.name == lsl_stream_name: - self.logger.info(f"found stream {lsl_stream_name}") - # setup this stream - self.lsl_stream_name = lsl_stream_name - - ch_names = stream.get_channel_names() - if ch_names is None: - ch_names = ["ch" + str(i) for i in range(stream.n_channels)] - self.logger.info(f"channel names: {ch_names}") - - ch_types = stream.get_channel_types() - if ch_types is None: - ch_types = ["eeg" for i in range(stream.n_channels)] - - self.logger.info(f"channel types: {ch_types}") - - info_ = stream.get_channel_info() - self.logger.info(f"channel info: {info_}") - - channels = set_channels( - ch_names=ch_names, - ch_types=ch_types, - used_types=["eeg", "ecog", "dbs", "seeg"], - ) - self.logger.info(channels) - sfreq = stream.sfreq - - self.stream: Stream = Stream( - sfreq=sfreq, - line_noise=line_noise, - channels=channels, - sampling_rate_features_hz=sampling_rate_features, - ) - self.logger.info("stream setup") - self.settings: NMSettings = NMSettings(sampling_rate_features=sfreq) - self.logger.info("settings setup") - break - - if channels.shape[0] == 0: - self.logger.error(f"Stream {lsl_stream_name} not found") - raise ValueError(f"Stream {lsl_stream_name} not found") + self.logger.info(f"Attempting to connect to LSL stream: {lsl_stream_name}") + + self.stream: Stream = Stream( + line_noise=line_noise, + sampling_rate_features_hz=sampling_rate_features, + is_stream_lsl=True, + lsl_stream_name=lsl_stream_name, + ) def setup_offline_stream( self, @@ -133,7 +69,7 @@ def setup_offline_stream( ): data, sfreq, ch_names, ch_types, bads = read_mne_data(file_path) - channels = set_channels( + channels = create_channels( ch_names=ch_names, ch_types=ch_types, bads=bads, @@ -156,4 +92,3 @@ def setup_offline_stream( line_noise=line_noise, sampling_rate_features_hz=sampling_rate_features, ) - diff --git a/py_neuromodulation/stream/__init__.py b/py_neuromodulation/stream/__init__.py index 31f14b29..b9de0e63 100644 --- a/py_neuromodulation/stream/__init__.py +++ b/py_neuromodulation/stream/__init__.py @@ -1,5 +1,5 @@ -from .generator import RawDataGenerator +from .rawdata_generator import RawDataGenerator from .mnelsl_player import LSLOfflinePlayer -from .mnelsl_stream import LSLStream +from .mnelsl_generator import MNELSLGenerator from .stream import Stream from .settings import NMSettings diff --git a/py_neuromodulation/stream/data_generator_abc.py b/py_neuromodulation/stream/data_generator_abc.py new file mode 100644 index 00000000..4fbb233e --- /dev/null +++ b/py_neuromodulation/stream/data_generator_abc.py @@ -0,0 +1,12 @@ +from abc import ABC, abstractmethod +from typing import Tuple + + +class DataGeneratorABC(ABC): + + def __init__(self) -> Tuple[float, "pd.DataFrame"]: + pass + + @abstractmethod + def __next__(self) -> Tuple["np.ndarray", "np.ndarray"]: + pass diff --git a/py_neuromodulation/stream/data_processor.py b/py_neuromodulation/stream/data_processor.py index 5fea5ce9..b5ca8903 100644 --- a/py_neuromodulation/stream/data_processor.py +++ b/py_neuromodulation/stream/data_processor.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING import numpy as np -from py_neuromodulation import logger +from py_neuromodulation.utils import logger from py_neuromodulation.utils.types import _PathLike from py_neuromodulation.features import FeatureProcessors from py_neuromodulation.utils import io @@ -55,6 +55,9 @@ def __init__( self.sfreq_raw: float = sfreq // 1 self.line_noise: float | None = line_noise self.path_grids: _PathLike | None = path_grids + if path_grids is None: + import py_neuromodulation as nm + path_grids = nm.PYNM_DIR #NOTE: could be optimized self.verbose: bool = verbose self.features_previous = None @@ -229,7 +232,7 @@ def _set_coords( coord_list=coord_list, # type: ignore # None case handled above ) - def process(self, data: np.ndarray) -> dict[str, float]: + def process(self, data: np.ndarray) -> dict[str, np.float64]: """Given a new data batch, calculate and return features. Parameters @@ -315,11 +318,3 @@ def save_settings(self, out_dir: _PathLike, prefix: str = "") -> None: def save_channels(self, out_dir: _PathLike, prefix: str) -> None: io.save_channels(self.channels, out_dir, prefix) - - def save_features( - self, - feature_arr: "pd.DataFrame", - out_dir: _PathLike = "", - prefix: str = "", - ) -> None: - io.save_features(feature_arr, out_dir, prefix) diff --git a/py_neuromodulation/stream/generator.py b/py_neuromodulation/stream/generator.py deleted file mode 100644 index bff9becb..00000000 --- a/py_neuromodulation/stream/generator.py +++ /dev/null @@ -1,53 +0,0 @@ -import numpy as np - - -class RawDataGenerator: - """ - This generator function mimics online data acquisition. - The data are iteratively sampled with settings.sampling_rate_features_hz - """ - - def __init__( - self, - data: np.ndarray, - sfreq: float, - sampling_rate_features_hz: float, - segment_length_features_ms: float, - ) -> None: - """ - Arguments - --------- - data (np array): shape (channels, time) - settings (settings.NMSettings): settings object - sfreq (float): sampling frequency of the data - - Returns - ------- - np.array: 1D array of time stamps - np.array: new batch for run function of full segment length shape - """ - self.batch_counter: int = 0 # counter for the batches - - self.data = data - self.sfreq = sfreq - # Width, in data points, of the moving window used to calculate features - self.segment_length = segment_length_features_ms / 1000 * sfreq - # Ratio of the sampling frequency of the input data to the sampling frequency - self.stride = sfreq / sampling_rate_features_hz - - def __iter__(self): - return self - - def __next__(self): - start = self.stride * self.batch_counter - end = start + self.segment_length - - self.batch_counter += 1 - - start_idx = int(start) - end_idx = int(end) - - if end_idx > self.data.shape[1]: - raise StopIteration - - return np.arange(start, end) / self.sfreq, self.data[:, start_idx:end_idx] diff --git a/py_neuromodulation/stream/mnelsl_stream.py b/py_neuromodulation/stream/mnelsl_generator.py similarity index 59% rename from py_neuromodulation/stream/mnelsl_stream.py rename to py_neuromodulation/stream/mnelsl_generator.py index b1e54e77..bedf73c6 100644 --- a/py_neuromodulation/stream/mnelsl_stream.py +++ b/py_neuromodulation/stream/mnelsl_generator.py @@ -1,21 +1,26 @@ from collections.abc import Iterator import time -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Tuple import numpy as np -from py_neuromodulation import logger +from py_neuromodulation.utils import logger from mne_lsl.lsl import resolve_streams import os +from .data_generator_abc import DataGeneratorABC if TYPE_CHECKING: from py_neuromodulation import NMSettings -class LSLStream: +class MNELSLGenerator(DataGeneratorABC): """ Class is used to create and connect to a LSL stream and pull data from it. """ - def __init__(self, settings: "NMSettings", stream_name: str | None = None) -> None: + def __init__(self, + segment_length_features_ms: float, + sampling_rate_features_hz: float, + stream_name: str | None = "example_stream", + ) -> None: """ Initialize the LSL stream. @@ -34,9 +39,8 @@ def __init__(self, settings: "NMSettings", stream_name: str | None = None) -> No from mne_lsl.stream import StreamLSL self.stream: StreamLSL - #self.keyboard_interrupt = False + # self.keyboard_interrupt = False - self.settings = settings self._n_seconds_wait_before_disconnect = 3 try: if stream_name is None: @@ -52,19 +56,33 @@ def __init__(self, settings: "NMSettings", stream_name: str | None = None) -> No if self.stream.sinfo is None: raise RuntimeError("Stream info is None. Check if the stream is running.") - - self.winsize = settings.segment_length_features_ms / self.stream.sinfo.sfreq - self.sampling_interval = 1 / self.settings.sampling_rate_features_hz - - # If not running the generator when the escape key is pressed. - self.headless: bool = not os.environ.get("DISPLAY") - #if not self.headless: - #from py_neuromodulation.utils.keyboard import KeyboardListener - - #self.listener = KeyboardListener(("esc", self.set_keyboard_interrupt)) - #self.listener.start() - - def get_next_batch(self) -> Iterator[tuple[np.ndarray, np.ndarray]]: + else: + self.sinfo = self.stream.sinfo + + self.winsize = segment_length_features_ms / self.stream.sinfo.sfreq + self.sampling_interval = 1 / sampling_rate_features_hz + self.channels = self.get_LSL_channels() + self.sfreq = self.stream.sinfo.sfreq + + def get_LSL_channels(self) -> "pd.DataFrame": + + from py_neuromodulation.utils import create_channels + ch_names = self.sinfo.get_channel_names() or [ + "ch" + str(i) for i in range(self.sinfo.n_channels) + ] + ch_types = self.sinfo.get_channel_types() or [ + "eeg" for i in range(self.sinfo.n_channels) + ] + return create_channels( + ch_names=ch_names, + ch_types=ch_types, + used_types=["eeg", "ecog", "dbs", "seeg"], + ) + + def __iter__(self): + return self + + def __next__(self) -> Iterator[tuple[np.ndarray, np.ndarray]]: self.last_time = time.time() check_data = None data = None @@ -91,30 +109,6 @@ def get_next_batch(self) -> Iterator[tuple[np.ndarray, np.ndarray]]: if stream_start_time is None: stream_start_time = timestamp[0] - for i in range(self._n_seconds_wait_before_disconnect): - if ( - data is not None - and check_data is not None - and np.allclose(data, check_data, atol=1e-7, rtol=1e-7) - ): - logger.warning( - f"No new data incoming. Disconnecting stream in {3-i} seconds." - ) - time.sleep(1) - i += 1 - if i == self._n_seconds_wait_before_disconnect: - self.stream.disconnect() - logger.warning("Stream disconnected.") - break - yield timestamp, data logger.info(f"Stream time: {timestamp[-1] - stream_start_time}") - - #if not self.headless and self.keyboard_interrupt: - # logger.info("Keyboard interrupt") - # self.listener.stop() - # self.stream.disconnect() - - #def set_keyboard_interrupt(self): - # self.keyboard_interrupt = True diff --git a/py_neuromodulation/stream/mnelsl_player.py b/py_neuromodulation/stream/mnelsl_player.py index 086e4d60..e84b41e1 100644 --- a/py_neuromodulation/stream/mnelsl_player.py +++ b/py_neuromodulation/stream/mnelsl_player.py @@ -8,7 +8,7 @@ from py_neuromodulation.utils.types import _PathLike from py_neuromodulation.utils.io import read_BIDS_data -from py_neuromodulation import logger +from py_neuromodulation.utils import logger class LSLOfflinePlayer: @@ -17,7 +17,7 @@ class LSLOfflinePlayer: def __init__( self, - stream_name: str | None = "lsl_offline_player", + stream_name: str = "lsl_offline_player", f_name: str | _PathLike = None, raw: mne.io.Raw | None = None, sfreq: int | float | None = None, @@ -78,6 +78,7 @@ def __init__( self._streaming_complete = mp.Event() self._player_process = None self._stop_flag = mp.Event() + self._started_streaming = mp.Event() LSLOfflinePlayer._instances.add(self) # Register instancwe if LSLOfflinePlayer._atexit_registered: @@ -117,9 +118,12 @@ def start_player( self.n_repeat, self._stop_flag, self._streaming_complete, + self._started_streaming, ), ) self._player_process.start() + while not self._started_streaming.is_set(): + time.sleep(0.1) if block: try: @@ -128,7 +132,7 @@ def start_player( print("\nKeyboard interrupt received. Stopping the player...") self.stop_player() - def _run_player(self, chunk_size, n_repeat, stop_flag, streaming_complete): + def _run_player(self, chunk_size, n_repeat, stop_flag, streaming_complete, started_streaming): from mne_lsl.player import PlayerLSL signal.signal(signal.SIGINT, lambda: stop_flag.set()) @@ -140,6 +144,7 @@ def _run_player(self, chunk_size, n_repeat, stop_flag, streaming_complete): n_repeat=n_repeat, ) player = player.start() + started_streaming.set() try: while not stop_flag.is_set() and not player._end_streaming: diff --git a/py_neuromodulation/stream/rawdata_generator.py b/py_neuromodulation/stream/rawdata_generator.py new file mode 100644 index 00000000..7761135f --- /dev/null +++ b/py_neuromodulation/stream/rawdata_generator.py @@ -0,0 +1,158 @@ +from py_neuromodulation.utils import logger +from py_neuromodulation.utils.io import MNE_FORMATS, read_mne_data, load_channels +from py_neuromodulation.utils.types import _PathLike +from py_neuromodulation.utils import create_channels +from .data_generator_abc import DataGeneratorABC +import numpy as np +import pandas as pd +from typing import Generator + +class RawDataGenerator(DataGeneratorABC): + """ + This generator function mimics online data acquisition. + The data are iteratively sampled with settings.sampling_rate_features_hz + """ + + def __init__( + self, + data: "np.ndarray | pd.DataFrame | _PathLike | None", + sampling_rate_features_hz: float, + segment_length_features_ms: float, + channels: "pd.DataFrame | None", + sfreq: "float | None", + ) -> None: + """ + Arguments + --------- + data (np array): shape (channels, time) + settings (settings.NMSettings): settings object + sfreq (float): sampling frequency of the data + + Returns + ------- + np.array: 1D array of time stamps + np.array: new batch for run function of full segment length shape + """ + self.channels = channels + self.sfreq = sfreq + self.batch_counter: int = 0 # counter for the batches + self.target_idx_initialized: bool = False + + if isinstance(data, (np.ndarray, pd.DataFrame)): + logger.info(f"Loading data from {type(data).__name__}") + self.data = data + elif isinstance(self.data, _PathLike): + logger.info("Loading data from file") + filepath = Path(self.data) # type: ignore + ext = filepath.suffix + + if ext in MNE_FORMATS: + data, sfreq, ch_names, ch_types, bads = read_mne_data(filepath) + else: + raise ValueError(f"Unsupported file format: {ext}") + self.channels = create_channels( + ch_names=ch_names, + ch_types=ch_types, + used_types=["eeg", "ecog", "dbs", "seeg"], + bads=bads, + ) + + if sfreq is None: + raise ValueError( + "Sampling frequency not specified in file, please specify sfreq as a parameters" + ) + self.sfreq = sfreq + self.data = self._handle_data(data) + else: + raise ValueError( + "Data must be either a numpy array, a pandas DataFrame, or a path to an MNE supported file" + ) + self.sfreq = sfreq + # Width, in data points, of the moving window used to calculate features + self.segment_length = segment_length_features_ms / 1000 * sfreq + # Ratio of the sampling frequency of the input data to the sampling frequency + self.stride = sfreq / sampling_rate_features_hz + + self.channels = load_channels(channels) if channels is not None else None + + def _handle_data(self, data: "np.ndarray | pd.DataFrame") -> np.ndarray: + """_summary_ + + Args: + data (np.ndarray | pd.DataFrame): + Raises: + ValueError: _description_ + ValueError: _description_ + + Returns: + np.ndarray: _description_ + """ + names_expected = self.channels["name"].to_list() + + if isinstance(data, np.ndarray): + if not len(names_expected) == data.shape[0]: + raise ValueError( + "If data is passed as an array, the first dimension must" + " match the number of channel names in `channels`.\n" + f" Number of data channels (data.shape[0]): {data.shape[0]}\n" + f' Length of channels["name"]: {len(names_expected)}.' + ) + return data + + names_data = data.columns.to_list() + + if not ( + len(names_expected) == len(names_data) + and sorted(names_expected) == sorted(names_data) + ): + raise ValueError( + "If data is passed as a DataFrame, the" + "column names must match the channel names in `channels`.\n" + f"Input dataframe column names: {names_data}\n" + f'Expected (from channels["name"]): : {names_expected}.' + ) + return data.to_numpy().transpose() + + def add_target(self, feature_dict: "pd.DataFrame", data_batch: np.array) -> None: + """Add target channels to feature series. + + Parameters + ---------- + feature_dict : pd.DataFra,e + + Returns + ------- + dict + feature dict with target channels added + """ + if not (isinstance(self.channels, pd.DataFrame)): + raise ValueError("Channels must be a pandas DataFrame") + + if self.channels["target"].sum() > 0: + if not self.target_idx_initialized: + self.target_indexes = self.channels[self.channels["target"] == 1].index + self.target_names = self.channels.loc[ + self.target_indexes, "name" + ].to_list() + self.target_idx_initialized = True + + for target_idx, target_name in zip(self.target_indexes, self.target_names): + feature_dict[target_name] = data_batch[target_idx, -1] + + return feature_dict + + def __next__(self) -> Generator[np.ndarray, np.ndarray, None]: + while True: + start = self.stride * self.batch_counter + end = start + self.segment_length + + self.batch_counter += 1 + + start_idx = int(start) + end_idx = int(end) + + if end_idx > self.data.shape[1]: + #raise StopIteration + break + + yield np.arange(start, end) / self.sfreq, self.data[:, start_idx:end_idx] diff --git a/py_neuromodulation/stream/settings.py b/py_neuromodulation/stream/settings.py index 7e25ec24..9f088872 100644 --- a/py_neuromodulation/stream/settings.py +++ b/py_neuromodulation/stream/settings.py @@ -4,7 +4,8 @@ from typing import ClassVar from pydantic import Field, model_validator -from py_neuromodulation import PYNM_DIR, logger, user_features +from py_neuromodulation import PYNM_DIR, user_features +from py_neuromodulation.utils.logging import logger from py_neuromodulation.utils.types import ( BoolSelector, diff --git a/py_neuromodulation/stream/stream.py b/py_neuromodulation/stream/stream.py index cfb8e865..ad94d1c3 100644 --- a/py_neuromodulation/stream/stream.py +++ b/py_neuromodulation/stream/stream.py @@ -1,16 +1,19 @@ """Module for generic and offline data streams.""" +import asyncio from typing import TYPE_CHECKING from collections.abc import Iterator import numpy as np -from pathlib import Path -import py_neuromodulation as nm from contextlib import suppress +from py_neuromodulation.features import USE_FREQ_RANGES +from py_neuromodulation.utils.types import _PathLike +from py_neuromodulation.utils import logger +from py_neuromodulation.utils.data_writer import DataWriter +from py_neuromodulation.gui.backend.app_socket import WebSocketManager +from py_neuromodulation.stream.rawdata_generator import RawDataGenerator from py_neuromodulation.stream.data_processor import DataProcessor -from py_neuromodulation.utils.types import _PathLike, FeatureName -from py_neuromodulation.stream.settings import NMSettings if TYPE_CHECKING: import pandas as pd @@ -27,336 +30,94 @@ class Stream: def __init__( self, - sfreq: float, - channels: "pd.DataFrame | _PathLike | None" = None, - data: "np.ndarray | pd.DataFrame | None" = None, - settings: NMSettings | _PathLike | None = None, - line_noise: float | None = 50, - sampling_rate_features_hz: float | None = None, - path_grids: _PathLike | None = None, - coord_names: list | None = None, - stream_name: str - | None = "example_stream", # Timon: do we need those in the nmstream_abc? - is_stream_lsl: bool = False, - coord_list: list | None = None, verbose: bool = True, ) -> None: - """Stream initialization - - Parameters - ---------- - sfreq : float - sampling frequency of data in Hertz - channels : pd.DataFrame | _PathLike - parametrization of channels (see define_channels.py for initialization) - data : np.ndarray | pd.DataFrame | None, optional - data to be streamed with shape (n_channels, n_time), by default None - settings : NMSettings | _PathLike | None, optional - Initialized settings.NMSettings object, by default the py_neuromodulation/settings.yaml are read - and passed into a settings object - line_noise : float | None, optional - line noise, by default 50 - sampling_rate_features_hz : float | None, optional - feature sampling rate, by default None - path_grids : _PathLike | None, optional - path to grid_cortex.tsv and/or gird_subcortex.tsv, by default Non - coord_names : list | None, optional - coordinate name in the form [coord_1_name, coord_2_name, etc], by default None - coord_list : list | None, optional - coordinates in the form [[coord_1_x, coord_1_y, coord_1_z], [coord_2_x, coord_2_y, coord_2_z],], by default None - verbose : bool, optional - print out stream computation time information, by default True - """ - self.settings: NMSettings = NMSettings.load(settings) - - if channels is None and data is not None: - channels = nm.utils.channels.get_default_channels_from_data(data) - - if channels is not None: - self.channels = nm.io.load_channels(channels) + self.verbose = verbose + self.is_running = False - if self.channels.query("used == 1 and target == 0").shape[0] == 0: + async def run( + self, + data_processor: DataProcessor | None = None, + data_generator : Iterator | None = None, + data_writer: DataWriter | None = None, + stream_handling_queue: asyncio.Queue | None = None, + websocket_featues: WebSocketManager | None = None, + ): + self.data_processor = data_processor + # Check that at least one channel is selected for analysis + if self.data_processor.channels.query("used == 1 and target == 0").shape[0] == 0: raise ValueError( "No channels selected for analysis that have column 'used' = 1 and 'target' = 0. Please check your channels" ) - if channels is None and data is None: - raise ValueError("Either `channels` or `data` must be passed to `Stream`.") - # If features that use frequency ranges are on, test them against nyquist frequency - use_freq_ranges: list[FeatureName] = [ - "bandpass_filter", - "stft", - "fft", - "welch", - "bursts", - "coherence", - "nolds", - "bispectrum", - ] - need_nyquist_check = any( - (f in use_freq_ranges for f in self.settings.features.get_enabled()) + (f in USE_FREQ_RANGES for f in self.data_processor.settings.features.get_enabled()) ) if need_nyquist_check: assert all( - fb.frequency_high_hz < sfreq / 2 - for fb in self.settings.frequency_ranges_hz.values() + fb.frequency_high_hz < self.data_processor.sfreq_raw / 2 + for fb in self.data_processor.settings.frequency_ranges_hz.values() ), ( "If a feature that uses frequency ranges is selected, " "the frequency band ranges need to be smaller than the nyquist frequency.\n" - f"Got sfreq = {sfreq} and fband ranges:\n {self.settings.frequency_ranges_hz}" + f"Got sfreq = {self.data_processor.sfreq_raw} and fband ranges:\n {self.data_processor.settings.frequency_ranges_hz}" ) - if sampling_rate_features_hz is not None: - self.settings.sampling_rate_features_hz = sampling_rate_features_hz - - if path_grids is None: - path_grids = nm.PYNM_DIR - - self.path_grids = path_grids - self.verbose = verbose - self.sfreq = sfreq - self.line_noise = line_noise - self.coord_names = coord_names - self.coord_list = coord_list - self.sess_right = None - self.projection = None - self.model = None - self.is_running = False - - # TODO(toni): is it necessary to initialize the DataProcessor on stream init? - # timon: yes, I think so, because specific feature settings can thus be investigated? - self.data_processor = DataProcessor( - sfreq=self.sfreq, - settings=self.settings, - channels=self.channels, - path_grids=self.path_grids, - coord_names=coord_names, - coord_list=coord_list, - line_noise=line_noise, - verbose=self.verbose, - ) - - self.data = data - - self.target_idx_initialized: bool = False - - def _add_target(self, feature_dict: dict, data: np.ndarray) -> None: - """Add target channels to feature series. - - Parameters - ---------- - feature_dict : dict - data : np.ndarray - Raw data with shape (n_channels, n_samples). - Channels not usd for feature computation are also included - - Returns - ------- - dict - feature dict with target channels added - """ - - if self.channels["target"].sum() > 0: - if not self.target_idx_initialized: - self.target_indexes = self.channels[self.channels["target"] == 1].index - self.target_names = self.channels.loc[ - self.target_indexes, "name" - ].to_list() - self.target_idx_initialized = True - - for target_idx, target_name in zip(self.target_indexes, self.target_names): - feature_dict[target_name] = data[target_idx, -1] - - def _handle_data(self, data: "np.ndarray | pd.DataFrame") -> np.ndarray: - names_expected = self.channels["name"].to_list() - - if isinstance(data, np.ndarray): - if not len(names_expected) == data.shape[0]: - raise ValueError( - "If data is passed as an array, the first dimension must" - " match the number of channel names in `channels`.\n" - f" Number of data channels (data.shape[0]): {data.shape[0]}\n" - f' Length of channels["name"]: {len(names_expected)}.' - ) - return data - - names_data = data.columns.to_list() - if not ( - len(names_expected) == len(names_data) - and sorted(names_expected) == sorted(names_data) - ): - raise ValueError( - "If data is passed as a DataFrame, the" - "column names must match the channel names in `channels`.\n" - f"Input dataframe column names: {names_data}\n" - f'Expected (from channels["name"]): : {names_expected}.' - ) - return data.to_numpy().transpose() - - async def run( - self, - data: "np.ndarray | pd.DataFrame | None" = None, - out_dir: _PathLike = "", - experiment_name: str = "sub", - is_stream_lsl: bool = False, - stream_lsl_name: str | None = None, - save_csv: bool = False, - save_interval: int = 10, - return_df: bool = True, - # feature_queue: "multiprocessing.Queue | None" = None, - stream_handling_queue: "multiprocessing.Queue | None" = None, - websocket_featues: "WebSocketManager | None" = None, - ): - self.is_stream_lsl = is_stream_lsl - self.stream_lsl_name = stream_lsl_name self.stream_handling_queue = stream_handling_queue - # self.feature_queue = feature_queue - self.save_csv = save_csv - self.save_interval = save_interval - self.return_df = return_df - - # Validate input data - if data is not None: - data = self._handle_data(data) - elif self.data is not None: - data = self._handle_data(self.data) - elif self.data is None and data is None and self.is_stream_lsl is False: - raise ValueError("No data passed to run function.") - - # Generate output dirs - self.out_dir_root = Path.cwd() if not out_dir else Path(out_dir) - self.out_dir = self.out_dir_root / experiment_name - # TONI: Need better default experiment name - self.experiment_name = experiment_name if experiment_name else "sub" - - self.out_dir.mkdir(parents=True, exist_ok=True) - - # Open database connection - # TONI: we should give the user control over the save format - from py_neuromodulation.utils.database import NMDatabase - - self.db = NMDatabase(experiment_name, out_dir) # Create output database - - self.batch_count: int = 0 # Keep track of the number of batches processed - - # Reinitialize the data processor in case the nm_channels or nm_settings changed between runs of the same Stream - # TONI: then I think we can just not initialize the data processor in the init function - self.data_processor = DataProcessor( - sfreq=self.sfreq, - settings=self.settings, - channels=self.channels, - path_grids=self.path_grids, - coord_names=self.coord_names, - coord_list=self.coord_list, - line_noise=self.line_noise, - verbose=self.verbose, - ) - - nm.logger.log_to_file(out_dir) - - # Initialize mp.Pool for multiprocessing - self.pool = mp.Pool(processes=self.settings.n_jobs) - # Set up shared memory for multiprocessing - self.shared_memory = mp.Array(ctypes.c_double, self.settings.n_jobs * self.settings.n_jobs) - # Set up multiprocessing semaphores - self.semaphore = mp.Semaphore(self.settings.n_jobs) - - # Initialize generator - self.generator: Iterator - if not is_stream_lsl: - from py_neuromodulation.stream.generator import RawDataGenerator - - self.generator = RawDataGenerator( - data, - self.sfreq, - self.settings.sampling_rate_features_hz, - self.settings.segment_length_features_ms, - ) - nm.logger.info("Initializing RawDataGenerator") - else: - from py_neuromodulation.stream.mnelsl_stream import LSLStream - - self.lsl_stream = LSLStream( - settings=self.settings, stream_name=stream_lsl_name - ) - - if self.sfreq != self.lsl_stream.stream.sinfo.sfreq: - error_msg = ( - f"Sampling frequency of the lsl-stream ({self.lsl_stream.stream.sinfo.sfreq}) " - f"does not match the settings ({self.sfreq})." - "The sampling frequency read from the stream will be used" - ) - nm.logger.warning(error_msg) - self.sfreq = self.lsl_stream.stream.sinfo.sfreq - - self.generator = self.lsl_stream.get_next_batch() + self.is_running = False + self.is_lslstream = type(data_generator) != RawDataGenerator prev_batch_end = 0 - for timestamps, data_batch in self.generator: + for timestamps, data_batch in next(data_generator): self.is_running = True if self.stream_handling_queue is not None: + await asyncio.sleep(0.001) if not self.stream_handling_queue.empty(): - value = self.stream_handling_queue.get() - if value == "stop": + stop_signal = await asyncio.wait_for(self.stream_handling_queue.get(), timeout=0.01) + if stop_signal == "stop": break if data_batch is None: break - feature_dict = self.data_processor.process(data_batch) + feature_dict = data_processor.process(data_batch) this_batch_end = timestamps[-1] batch_length = this_batch_end - prev_batch_end - nm.logger.debug( + logger.debug( f"{batch_length:.3f} seconds of new data processed", ) feature_dict["time"] = ( batch_length - if self.is_stream_lsl + if self.is_lslstream else np.ceil(this_batch_end * 1000 + 1) ) prev_batch_end = this_batch_end if self.verbose: - nm.logger.info("Time: %.2f", feature_dict["time"] / 1000) + logger.info("Time: %.2f", feature_dict["time"] / 1000) - self._add_target(feature_dict, data_batch) + if not self.is_lslstream: + feature_dict = data_generator.add_target(feature_dict, data_batch) - # We should ensure that feature output is always either float64 or None and remove this with suppress(TypeError): # Need this because some features output None for key, value in feature_dict.items(): feature_dict[key] = np.float64(value) - self.db.insert_data(feature_dict) + data_writer.write_data(feature_dict) - # if self.feature_queue is not None: - # self.feature_queue.put(feature_dict) - if websocket_featues is not None: - nm.logger.info("Sending message to Websocket") - #nm.logger.info(feature_dict) - await websocket_featues.send_message(feature_dict) - self.batch_count += 1 - if self.batch_count % self.save_interval == 0: - self.db.commit() - - self.db.commit() # Save last batches - - # If save_csv is False, still save the first row to get the column names - feature_df: "pd.DataFrame" = ( - self.db.fetch_all() if (self.save_csv or self.return_df) else self.db.head() - ) - - self.db.close() # Close the database connection + await websocket_featues.send_cbor(feature_dict) - self._save_after_stream(feature_arr=feature_df) + feature_df = data_writer.get_features() + data_writer.save_csv_features(feature_df) + self._save_sidecars_after_stream(data_writer.out_dir, data_writer.experiment_name) self.is_running = False - return feature_df # TONI: Not sure if this makes sense anymore + return feature_df def plot_raw_signal( self, @@ -422,33 +183,25 @@ def plot_raw_signal( if plot_psd: raw.compute_psd().plot() - def _save_after_stream( - self, - feature_arr: "pd.DataFrame | None" = None, - ) -> None: - """Save features, settings, nm_channels and sidecar after run""" - self._save_sidecar() - if feature_arr is not None: - self._save_features(feature_arr) - self._save_settings() - self._save_channels() - - def _save_features( + def _save_sidecars_after_stream( self, - feature_arr: "pd.DataFrame", + out_dir: _PathLike, + experiment_name: str = "experiment" ) -> None: - nm.io.save_features(feature_arr, self.out_dir, self.experiment_name) + """Save settings, nm_channels and sidecar after run""" + self._save_sidecar(out_dir, experiment_name) + self._save_settings(out_dir, experiment_name) + self._save_channels(out_dir, experiment_name) - def _save_channels(self) -> None: - self.data_processor.save_channels(self.out_dir, self.experiment_name) + def _save_channels(self, out_dir, experiment_name) -> None: + self.data_processor.save_channels(out_dir, experiment_name) - def _save_settings(self) -> None: - self.data_processor.save_settings(self.out_dir, self.experiment_name) + def _save_settings(self, out_dir, experiment_name) -> None: + self.data_processor.save_settings(out_dir, experiment_name) - def _save_sidecar(self) -> None: + def _save_sidecar(self, out_dir, experiment_name) -> None: """Save sidecar incduing fs, coords, sess_right to out_path_root and subfolder 'folder_name'""" - additional_args = {"sess_right": self.sess_right} self.data_processor.save_sidecar( - self.out_dir, self.experiment_name, additional_args + out_dir, experiment_name ) diff --git a/py_neuromodulation/utils/__init__.py b/py_neuromodulation/utils/__init__.py index 98f7a28a..bfaf0bc4 100644 --- a/py_neuromodulation/utils/__init__.py +++ b/py_neuromodulation/utils/__init__.py @@ -1,2 +1,5 @@ from .channels import * +from .io import * from . import types + +from .logging import logger diff --git a/py_neuromodulation/utils/channels.py b/py_neuromodulation/utils/channels.py index c2fffd00..68c0485b 100644 --- a/py_neuromodulation/utils/channels.py +++ b/py_neuromodulation/utils/channels.py @@ -10,7 +10,7 @@ _LFP_TYPES = ["seeg", "dbs", "lfp"] # must be lower-case -def set_channels( +def create_channels( ch_names: list[str], ch_types: list[str], reference: list | str = "default", @@ -250,8 +250,8 @@ def _get_default_references( return df -def get_default_channels_from_data( - data: np.ndarray, +def create_default_channels_from_data( + data: "np.ndarray | pd.DataFrame", car_rereferencing: bool = True, ): """Return default channels dataframe with @@ -278,7 +278,15 @@ def get_default_channels_from_data( """ import pandas as pd - ch_name = [f"ch{idx}" for idx in range(data.shape[0])] + if isinstance(data, np.ndarray): + ch_name = [f"ch{idx}" for idx in range(data.shape[0])] + elif isinstance(data, pd.DataFrame): + ch_name = data.columns.to_list() + else: + raise ValueError( + "get_default_channels_from_data only supports np.ndarray and pd.DataFrame" + ) + status = ["good" for _ in range(data.shape[0])] type_nm = ["ecog" for _ in range(data.shape[0])] diff --git a/py_neuromodulation/utils/data_writer.py b/py_neuromodulation/utils/data_writer.py new file mode 100644 index 00000000..7fdb3d3c --- /dev/null +++ b/py_neuromodulation/utils/data_writer.py @@ -0,0 +1,51 @@ +from py_neuromodulation.utils.types import _PathLike +from py_neuromodulation.utils import logger, io +from pathlib import Path + +class DataWriter: + + def __init__(self, out_dir: _PathLike = "", save_csv: bool = False, + save_interval: int = 10, experiment_name: str = "experiment"): + + self.batch_count: int = 0 + self.save_interval: int = save_interval + self.save_csv: bool = save_csv + self.out_dir: _PathLike = out_dir + self.experiment_name: str = experiment_name + + self.out_dir_root = Path.cwd() if not out_dir else Path(out_dir) + self.out_dir = self.out_dir_root / self.experiment_name + self.out_dir.mkdir(parents=True, exist_ok=True) + + from py_neuromodulation.utils.database import NMDatabase + self.db = NMDatabase(self.experiment_name, out_dir) + + logger.log_to_file(out_dir) + + + def write_data(self, feature_dict): + + self.db.insert_data(feature_dict) + self.batch_count += 1 + if self.batch_count % self.save_interval == 0: + self.db.commit() + + def get_features(self, return_df: bool = False): + + self.db.commit() # Save last batches + + # If save_csv is False, still save the first row to get the column names + feature_df = ( + self.db.fetch_all() if (self.save_csv or return_df) else self.db.head() + ) + + self.db.close() + return feature_df + + def save_csv_features( + self, + df_features: "pd.DataFrame" + ) -> None: + filename = f"{self.experiment_name}_FEATURES.csv" if self.experiment_name else "_FEATURES.csv" + io.write_csv(df_features, self.out_dir / filename) + logger.info(f"{filename} saved to {str(self.out_dir)}") diff --git a/py_neuromodulation/utils/database.py b/py_neuromodulation/utils/database.py index 549470a6..cfc49284 100644 --- a/py_neuromodulation/utils/database.py +++ b/py_neuromodulation/utils/database.py @@ -41,7 +41,7 @@ def __init__( self.csv_path.parent.mkdir(parents=True, exist_ok=True) - self.conn = sqlite3.connect(self.db_path) + self.conn = sqlite3.connect(self.db_path, check_same_thread=False) self.cursor = self.conn.cursor() # Database config and optimization, prioritize data integrity diff --git a/py_neuromodulation/utils/io.py b/py_neuromodulation/utils/io.py index f2e1e16e..68c2c792 100644 --- a/py_neuromodulation/utils/io.py +++ b/py_neuromodulation/utils/io.py @@ -5,13 +5,17 @@ import numpy as np from py_neuromodulation.utils.types import _PathLike -from py_neuromodulation import logger, PYNM_DIR +from py_neuromodulation.utils.logging import logger +from py_neuromodulation import PYNM_DIR +from mne.io._read_raw import _get_supported if TYPE_CHECKING: from mne_bids import BIDSPath from mne import io as mne_io import pandas as pd +MNE_FORMATS = list(_get_supported().keys()) + def load_channels( channels: "pd.DataFrame | _PathLike", @@ -254,17 +258,6 @@ def save_channels( logger.info(f"{filename} saved to {out_dir}") -def save_features( - df_features: "pd.DataFrame", - out_dir: _PathLike = "", - prefix: str = "", -) -> None: - out_dir = Path.cwd() if not out_dir else Path(out_dir) - filename = f"{prefix}_FEATURES.csv" if prefix else "_FEATURES.csv" - write_csv(df_features, out_dir / filename) - logger.info(f"{filename} saved to {str(out_dir)}") - - def save_sidecar( sidecar: dict, out_dir: _PathLike = "", diff --git a/py_neuromodulation/utils/logging.py b/py_neuromodulation/utils/logging.py index f8d51d06..eaa90e0d 100644 --- a/py_neuromodulation/utils/logging.py +++ b/py_neuromodulation/utils/logging.py @@ -64,3 +64,12 @@ def log_to_file(self, path: _PathLike, mode: str = "w"): self.addHandler(self.info_file_handler) self.addHandler(self.debug_file_handler) + + +###################################### +# Logger initialization and settings # +###################################### + +logger = NMLogger( + "PyNeuromodulation" +) # logger initialization first to prevent circular import diff --git a/start_LSL_stream.py b/start_LSL_stream.py index fbd6ed85..fe37a580 100644 --- a/start_LSL_stream.py +++ b/start_LSL_stream.py @@ -5,7 +5,7 @@ import os import asyncio -from py_neuromodulation.utils import set_channels +from py_neuromodulation.utils import create_channels from py_neuromodulation import io @@ -33,7 +33,7 @@ if __name__ == "__main__": # PATH_VHDR = "/Users/Timon/Documents/py-neurmodulation_merge/py_neuromodulation/py_neuromodulation/data/sub-testsub/ses-EphysMedOff/ieeg/sub-testsub_ses-EphysMedOff_task-gripforce_run-0_ieeg.vhdr" - + # data, sfreq, ch_names, ch_types, bads = io.read_mne_data(PATH_VHDR) # channels = set_channels( @@ -44,7 +44,7 @@ # used_types=["eeg", "ecog", "dbs", "seeg"], # target_keywords=None, # ) - + # ( # raw_arr, # data, diff --git a/tests/conftest.py b/tests/conftest.py index 87a1011a..2c89563d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,7 +57,7 @@ def setup_default_stream_fast_compute() -> tuple[np.ndarray, nm.Stream]: coord_names, ) = nm.io.read_BIDS_data(PATH_RUN=PATH_RUN) - channels = nm.utils.set_channels( + channels = nm.utils.create_channels( ch_names=raw.ch_names, ch_types=raw.get_channel_types(), reference="default", @@ -74,6 +74,7 @@ def setup_default_stream_fast_compute() -> tuple[np.ndarray, nm.Stream]: settings.features.fooof = True stream = nm.Stream( + data=data, settings=settings, channels=channels, path_grids=None, diff --git a/tests/test_all_features.py b/tests/test_all_features.py index ebc2db6f..f187a65c 100644 --- a/tests/test_all_features.py +++ b/tests/test_all_features.py @@ -3,15 +3,24 @@ import py_neuromodulation as nm -def get_example_stream(test_arr: np.ndarray) -> nm.Stream: +def get_example_stream( + test_arr: np.ndarray, experiment_name: str = "test" +) -> nm.Stream: settings = nm.NMSettings.get_default().enable_all_features() settings.features.nolds = False settings.features.mne_connectivity = False settings.features.coherence = False - channels = nm.utils.get_default_channels_from_data(test_arr) + channels = nm.utils.create_default_channels_from_data(test_arr) - stream = nm.Stream(sfreq=1000, channels=channels, settings=settings, verbose=True) + stream = nm.Stream( + data=test_arr, + sfreq=1000, + channels=channels, + settings=settings, + verbose=True, + experiment_name=experiment_name, + ) return stream @@ -19,11 +28,9 @@ def test_all_features_random_array(): """This test runs's through all enabled features, and check's if they break""" np.random.seed(0) arr = np.random.random([2, 2000]) - stream = get_example_stream(arr) + stream = get_example_stream(arr, experiment_name="test_all_features_random_array") - df = stream.run( - arr, out_dir="./test_data", experiment_name="test_all_features_random_array" - ) + df = stream.run(out_dir="./test_data") assert df.shape[0] != 0 # terrible test @@ -31,12 +38,10 @@ def test_all_features_random_array(): def test_all_features_zero_array(): arr = np.zeros([2, 2000]) - stream = get_example_stream(arr) + stream = get_example_stream(arr, experiment_name="test_all_features_zero_array") stream.settings.features.fooof = False # Can't use fooof with 0s (log(0) undefined) - df = stream.run( - arr, out_dir="./test_data", experiment_name="test_all_features_zero_array" - ) + df = stream.run(out_dir="./test_data") assert df.shape[0] != 0 # terrible test @@ -45,11 +50,9 @@ def test_all_features_NaN_array(): arr = np.empty([2, 2000]) arr[:] = np.nan - stream = get_example_stream(arr) + stream = get_example_stream(arr, experiment_name="test_all_features_NaN_array") stream.settings.features.fooof = False # Can't use fooof nan values - df = stream.run( - arr, out_dir="./test_data", experiment_name="test_all_features_NaN_array" - ) + df = stream.run(out_dir="./test_data") assert df.shape[0] != 0 # terrible test diff --git a/tests/test_bispectra.py b/tests/test_bispectra.py index 97390c38..2afaab0b 100644 --- a/tests/test_bispectra.py +++ b/tests/test_bispectra.py @@ -24,7 +24,7 @@ def test_bispectrum(): ch_names = raw.ch_names[4] ch_types = raw.get_channel_types()[4] - channels = nm.utils.set_channels( + channels = nm.utils.create_channels( ch_names=[ch_names], ch_types=[ch_types], reference="default", @@ -40,6 +40,8 @@ def test_bispectrum(): settings.features.bispectrum = True stream = nm.Stream( + data=np.expand_dims(data[3, :], axis=0), + experiment_name="test_bispectrum", settings=settings, channels=channels, path_grids=None, @@ -51,9 +53,7 @@ def test_bispectrum(): ) features = stream.run( - np.expand_dims(data[3, :], axis=0), out_dir="./test_data", - experiment_name="test_bispectrum", ) assert features["ECOG_RIGHT_1_Bispectrum_phase_mean_whole_fband_range"].sum() != 0 diff --git a/tests/test_bursts.py b/tests/test_bursts.py index 5bb142db..cad11ee2 100644 --- a/tests/test_bursts.py +++ b/tests/test_bursts.py @@ -8,28 +8,28 @@ def test_init_wrong_fband(): settings = NMSettings.get_default() - settings.burst_settings.frequency_bands = ["wrong_band"] + settings.bursts_settings.frequency_bands = ["wrong_band"] with pytest.raises(ValidationError): Bursts(settings, ["ch1", "ch2"], 1000) def test_init_wrong_treshold(): settings = NMSettings.get_default() - settings.burst_settings.threshold = -1 + settings.bursts_settings.threshold = -1 with pytest.raises(ValidationError): Bursts(settings, ["ch1", "ch2"], 1000) def test_init_wrong_timeduration(): settings = NMSettings.get_default() - settings.burst_settings.time_duration_s = -1 + settings.bursts_settings.time_duration_s = -1 with pytest.raises(ValidationError): Bursts(settings, ["ch1", "ch2"], 1000) def test_init_wrong_burst_feature_init(): settings = NMSettings.get_default() - settings.burst_settings.burst_features.duration = -1 + settings.bursts_settings.burst_features.duration = -1 with pytest.raises(ValidationError): Bursts(settings, ["ch1", "ch2"], 1000) diff --git a/tests/test_feature_sampling_rates.py b/tests/test_feature_sampling_rates.py index 89b0b722..f162816b 100644 --- a/tests/test_feature_sampling_rates.py +++ b/tests/test_feature_sampling_rates.py @@ -8,7 +8,7 @@ def get_example_settings(test_arr: np.ndarray) -> tuple[nm.NMSettings, pd.DataFrame]: settings = nm.NMSettings.get_fast_compute() - channels = nm.utils.get_default_channels_from_data(test_arr) + channels = nm.utils.create_default_channels_from_data(test_arr) return settings, channels @@ -20,14 +20,17 @@ def test_different_sampling_rate_100Hz(): settings, channels = get_example_settings(arr_test) settings.sampling_rate_features_hz = sampling_rate_features - stream = nm.Stream(sfreq=1000, channels=channels, settings=settings, verbose=True) - - df = stream.run( - arr_test, - out_dir="./test_data", + stream = nm.Stream( + data=arr_test, + sfreq=1000, + channels=channels, + settings=settings, + verbose=True, experiment_name="test_different_sampling_rate_100Hz", ) + df = stream.run(out_dir="./test_data") + # check the difference between time points assert np.diff(df["time"].iloc[:2]) / 1000 == (1 / sampling_rate_features) @@ -39,14 +42,17 @@ def test_different_sampling_rate_10Hz(): settings, channels = get_example_settings(arr_test) settings.sampling_rate_features_hz = sampling_rate_features - stream = nm.Stream(sfreq=1000, channels=channels, settings=settings, verbose=True) - - df = stream.run( - arr_test, - out_dir="./test_data", + stream = nm.Stream( + data=arr_test, experiment_name="test_different_sampling_rate_10Hz", + sfreq=1000, + channels=channels, + settings=settings, + verbose=True, ) + df = stream.run(out_dir="./test_data") + # check the difference between time points assert np.diff(df["time"].iloc[:2]) / 1000 == (1 / sampling_rate_features) @@ -59,14 +65,17 @@ def test_different_sampling_rate_1Hz(): settings, channels = get_example_settings(arr_test) settings.sampling_rate_features_hz = sampling_rate_features - stream = nm.Stream(sfreq=1000, channels=channels, settings=settings, verbose=True) - - df = stream.run( - arr_test, - out_dir="./test_data", + stream = nm.Stream( + data=arr_test, experiment_name="test_different_sampling_rate_1Hz", + sfreq=1000, + channels=channels, + settings=settings, + verbose=True, ) + df = stream.run(out_dir="./test_data") + # check the difference between time points assert np.diff(df["time"].iloc[:2]) / 1000 == (1 / sampling_rate_features) @@ -79,14 +88,17 @@ def test_different_sampling_rate_0DOT1Hz(): settings, channels = get_example_settings(arr_test) settings.sampling_rate_features_hz = sampling_rate_features - stream = nm.Stream(sfreq=1000, channels=channels, settings=settings, verbose=True) - - df = stream.run( - arr_test, - out_dir="./test_data", + stream = nm.Stream( + data=arr_test, experiment_name="test_different_sampling_rate_0DOT1Hz", + sfreq=1000, + channels=channels, + settings=settings, + verbose=True, ) + df = stream.run(out_dir="./test_data") + # check the difference between time points assert np.diff(df["time"].iloc[:2]) / 1000 == (1 / sampling_rate_features) @@ -112,14 +124,17 @@ def test_different_segment_lengths(): settings.segment_length_features_ms = segment_length_features_ms settings.fft_settings.windowlength_ms = segment_length_features_ms - stream = nm.Stream(sfreq=1000, channels=channels, settings=settings, verbose=True) - - df_seglength_800 = stream.run( - arr_test, - out_dir="./test_data", + stream = nm.Stream( + data=arr_test, experiment_name="test_different_segment_lengths_800", + sfreq=1000, + channels=channels, + settings=settings, + verbose=True, ) + df_seglength_800 = stream.run(out_dir="./test_data") + segment_length_features_ms = 1000 arr_test = np.random.random([2, 1200]) @@ -128,13 +143,16 @@ def test_different_segment_lengths(): settings.segment_length_features_ms = segment_length_features_ms settings.fft_settings.windowlength_ms = segment_length_features_ms - stream = nm.Stream(sfreq=1000, channels=channels, settings=settings, verbose=True) - - df_seglength_1000 = stream.run( - arr_test, - out_dir="./test_data", + stream = nm.Stream( + data=arr_test, experiment_name="test_different_segment_lengths_1000", + sfreq=1000, + channels=channels, + settings=settings, + verbose=True, ) + + df_seglength_1000 = stream.run(out_dir="./test_data") # check the difference between time points print(df_seglength_1000.columns) diff --git a/tests/test_lsl_stream.py b/tests/test_lsl_stream.py index 6f4edc81..068b9ee9 100644 --- a/tests/test_lsl_stream.py +++ b/tests/test_lsl_stream.py @@ -2,38 +2,52 @@ import numpy as np import pytest import time +from py_neuromodulation.utils import create_channels +from py_neuromodulation.stream import NMSettings, Stream @pytest.mark.parametrize("setup_lsl_player", ["search"], indirect=True) def test_lsl_stream_search(setup_lsl_player): - from py_neuromodulation.stream import mnelsl_stream + from py_neuromodulation.stream import mnelsl_generator """ Test if the lsl stream search can find any streams after starting a player.""" player = setup_lsl_player player.start_player() - streams = mnelsl_stream.resolve_streams() + streams = mnelsl_generator.resolve_streams() assert len(streams) != 0, "No streams found in search" @pytest.mark.parametrize("setup_lsl_player", ["offline_test"], indirect=True) -def test_offline_lsl( - setup_default_stream_fast_compute, setup_lsl_player, setup_default_data -): - """ " Test the offline lsl player and stream, checking for sfreq, channel types and channel names.""" +def test_offline_lsl(setup_lsl_player, setup_default_data): + """ " Test the offline lsl player and stre am, checking for sfreq, channel types and channel names.""" - raw, data, sfreq = setup_default_data + raw, data, _ = setup_default_data player = setup_lsl_player player.start_player() - data, stream = setup_default_stream_fast_compute + channels = create_channels( + ch_names=raw.ch_names, + ch_types=raw.get_channel_types(), + reference="default", + bads=raw.info["bads"], + new_names="default", + used_types=("ecog", "dbs", "seeg"), + target_keywords=("MOV_RIGHT_CLEAN",), + ) - features = stream.run( + stream = Stream( + data=data, is_stream_lsl=True, - plot_lsl=False, stream_lsl_name="offline_test", - out_dir="./test_data", experiment_name="test_offline_lsl", + settings=NMSettings().get_fast_compute(), + channels=channels, + path_grids=None, + verbose=True, ) + + features = stream.run(out_dir="./test_data") + # check sfreq if not raw.info["sfreq"] == stream.sfreq: raise ValueError( diff --git a/tests/test_nan_values.py b/tests/test_nan_values.py index 036bf150..8f09a8fb 100644 --- a/tests/test_nan_values.py +++ b/tests/test_nan_values.py @@ -10,12 +10,12 @@ def test_stream_with_none_data(): data = np.random.random([2, 2000]) data[0, :] = None - stream = nm.Stream(sfreq=fs, data=data) - - features = stream.run( - data, out_dir="./test_data", experiment_name="test_stream_with_none_data" + stream = nm.Stream( + sfreq=fs, data=data, experiment_name="test_stream_with_none_data" ) + features = stream.run(out_dir="./test_data") + # assert if all features if name ch0 are None assert len( [f for f in features.columns if "ch0" in f and features[f].isna().all()] diff --git a/tests/test_osc_features.py b/tests/test_osc_features.py index 8cf84111..f9ed800d 100644 --- a/tests/test_osc_features.py +++ b/tests/test_osc_features.py @@ -61,7 +61,12 @@ def test_fft_frequencyband_range_passing_nyquist_range(): settings.frequency_ranges_hz = {"theta": [4, 8], "broadband": [10, 600]} with pytest.raises(AssertionError): - Stream(sfreq=sfreq, data=data, settings=settings) + Stream( + sfreq=sfreq, + experiment_name="test_fft_frequencyband_range_passing_nyquist_range", + data=data, + settings=settings, + ).run(out_dir="./test_data") def test_fft_zero_data(): @@ -176,7 +181,12 @@ def test_stft_wrong_frequencyband_range_init(): settings.frequency_ranges_hz = {"theta": [4, 8], "broadband": [10, 600]} with pytest.raises(AssertionError): - Stream(settings=settings, data=data, sfreq=sfreq) + Stream( + settings=settings, + experiment_name="test_stft_wrong_frequencyband_range_init", + data=data, + sfreq=sfreq, + ).run(out_dir="./test_data") def test_stft_beta_osc(): diff --git a/tests/test_preprocessing_filter.py b/tests/test_preprocessing_filter.py index d06c5e08..10fd0038 100644 --- a/tests/test_preprocessing_filter.py +++ b/tests/test_preprocessing_filter.py @@ -17,12 +17,11 @@ def test_preprocessing_within_pipeline( stream.settings.preprocessing_filter.lowpass_filter = True stream.settings.preprocessing_filter.highpass_filter = True + stream.data = data[:, : int(stream.sfreq * 2)] + stream.experiment_name = "test_preprocessing_within_pipeline" + try: - _ = stream.run( - data[:, : int(stream.sfreq * 2)], - out_dir="./test_data", - experiment_name="test_preprocessing_within_pipeline", - ) + _ = stream.run(out_dir="./test_data") except Exception as e: assert False, f"Error in pipeline including preprocess filtering : {e}" diff --git a/tests/test_rereference.py b/tests/test_rereference.py index 28f3e615..c9fd8b35 100644 --- a/tests/test_rereference.py +++ b/tests/test_rereference.py @@ -9,7 +9,7 @@ def test_rereference_not_used_channels_no_reref(setup_databatch): ch_names, ch_types, bads, data_batch = setup_databatch - channels = nm.utils.set_channels( + channels = nm.utils.create_channels( ch_names=ch_names, ch_types=ch_types, reference="default", @@ -35,7 +35,7 @@ def test_rereference_not_used_channels_no_reref(setup_databatch): def test_rereference_car(setup_databatch): ch_names, ch_types, bads, data_batch = setup_databatch - channels = nm.utils.set_channels( + channels = nm.utils.create_channels( ch_names=ch_names, ch_types=ch_types, reference="default", @@ -66,7 +66,7 @@ def test_rereference_car(setup_databatch): def test_rereference_bp(setup_databatch): ch_names, ch_types, bads, data_batch = setup_databatch - channels = nm.utils.set_channels( + channels = nm.utils.create_channels( ch_names=ch_names, ch_types=ch_types, reference="default", @@ -101,7 +101,7 @@ def test_rereference_bp(setup_databatch): def test_rereference_wrong_rererference_column_name(setup_databatch): ch_names, ch_types, bads, data_batch = setup_databatch - channels = nm.utils.set_channels( + channels = nm.utils.create_channels( ch_names=ch_names, ch_types=ch_types, reference="default", @@ -119,7 +119,7 @@ def test_rereference_wrong_rererference_column_name(setup_databatch): def test_rereference_muliple_channels(setup_databatch): ch_names, ch_types, bads, data_batch = setup_databatch - channels = nm.utils.set_channels( + channels = nm.utils.create_channels( ch_names=ch_names, ch_types=ch_types, reference="default", @@ -146,7 +146,7 @@ def test_rereference_muliple_channels(setup_databatch): def test_rereference_same_channel(setup_databatch): ch_names, ch_types, bads, data_batch = setup_databatch - channels = nm.utils.set_channels( + channels = nm.utils.create_channels( ch_names=ch_names, ch_types=ch_types, reference="default", diff --git a/tests/test_sampling.py b/tests/test_sampling.py index 58ae14d9..28b8c2bd 100644 --- a/tests/test_sampling.py +++ b/tests/test_sampling.py @@ -24,15 +24,14 @@ def get_features(time_end_ms: int, segment_length_features_ms: int): stream = nm.Stream( sfreq=1000, + experiment_name="test_feature_sampling_rates", data=data, sampling_rate_features_hz=10, verbose=False, settings=settings, ) - features = stream.run( - data, out_dir="./test_data", experiment_name="test_feature_sampling_rates" - ) + features = stream.run(out_dir="./test_data") return features diff --git a/tests/test_settings_change_after_init.py b/tests/test_settings_change_after_init.py index d9bd7977..962943b7 100644 --- a/tests/test_settings_change_after_init.py +++ b/tests/test_settings_change_after_init.py @@ -9,16 +9,16 @@ def test_post_init_channels_change(): data = np.random.random((10, 1000)) fs = 1000 - stream = nm.Stream(sfreq=fs, data=data) + stream = nm.Stream( + sfreq=fs, data=data, experiment_name="test_post_init_nm_channels_change" + ) # default channel names are "ch{i}" # every time the name changes, the "new_name" should also changes # this is however only done during initialization stream.channels["new_name"] = [f"new_ch_name_{i}" for i in range(10)] - features = stream.run( - out_dir="./test_data", experiment_name="test_post_init_nm_channels_change" - ) + features = stream.run(out_dir="./test_data") assert len([f for f in features.columns if "new_ch_name_0" in f]) != 0 @@ -29,15 +29,16 @@ def test_post_init_channels_used_channels_change_single_channel(): np.random.seed(0) data = np.random.random((3, 1000)) sfreq = 1000 - stream = nm.Stream(sfreq=sfreq, data=data, sampling_rate_features_hz=11) + stream = nm.Stream( + data=data, + experiment_name="test_post_init_nm_channels_used_channels_change_single_channel", + sfreq=sfreq, + sampling_rate_features_hz=11, + ) stream.channels["used"] = 0 stream.channels.loc[1, "used"] = 1 - features = stream.run( - data, - out_dir="./test_data", - experiment_name="test_post_init_nm_channels_used_channels_change_single_channel", - ) + features = stream.run(out_dir="./test_data") chs_not_used = stream.channels[stream.channels["used"] == 0]["new_name"] @@ -58,15 +59,16 @@ def test_post_init_channels_used_channels_change_multiple_channel(): np.random.seed(0) data = np.random.random((3, 1000)) sfreq = 1000 - stream = nm.Stream(sfreq=sfreq, data=data, sampling_rate_features_hz=11) + stream = nm.Stream( + data=data, + experiment_name="test_post_init_nm_channels_used_channels_change_multiple_channel", + sfreq=sfreq, + sampling_rate_features_hz=11, + ) stream.channels["used"] = 0 stream.channels.loc[[0, 2], "used"] = 1 - features = stream.run( - data, - out_dir="./test_data", - experiment_name="test_post_init_nm_channels_used_channels_change_multiple_channel", - ) + features = stream.run(out_dir="./test_data") chs_not_used = stream.channels[stream.channels["used"] == 0]["new_name"] diff --git a/tests/test_target_channel_add.py b/tests/test_target_channel_add.py index 259b57bd..73919518 100644 --- a/tests/test_target_channel_add.py +++ b/tests/test_target_channel_add.py @@ -7,7 +7,7 @@ def get_example_settings(test_arr: np.ndarray) -> tuple[nm.NMSettings, pd.DataFrame]: settings = nm.NMSettings.get_fast_compute() - channels = nm.utils.get_default_channels_from_data(test_arr) + channels = nm.utils.create_default_channels_from_data(test_arr) return settings, channels @@ -26,10 +26,17 @@ def test_label_add_single_target(): settings.sampling_rate_features_hz = sampling_rate_features - stream = nm.Stream(sfreq=1000, channels=channels, settings=settings, verbose=True) + stream = nm.Stream( + data=arr_test, + sfreq=1000, + experiment_name="test_label_add_single_target", + channels=channels, + settings=settings, + verbose=True, + ) df = stream.run( - arr_test, out_dir="./test_data", experiment_name="test_label_add_single_target" + out_dir="./test_data", ) assert df[ @@ -58,14 +65,17 @@ def test_label_add_multidimensional_target(): settings.sampling_rate_features_hz = sampling_rate_features - stream = nm.Stream(sfreq=1000, channels=channels, settings=settings, verbose=True) - - df = stream.run( - arr_test, - out_dir="./test_data", + stream = nm.Stream( + data=arr_test, experiment_name="test_label_add_multidimensional_target", + sfreq=1000, + channels=channels, + settings=settings, + verbose=True, ) + df = stream.run(out_dir="./test_data") + for target_ch in ["target_ch_0", "target_ch_1"]: assert df[ target_ch @@ -85,10 +95,15 @@ def test_label_add_no_target(): settings.sampling_rate_features_hz = sampling_rate_features - stream = nm.Stream(sfreq=1000, channels=channels, settings=settings, verbose=True) - - df = stream.run( - arr_test, out_dir="./test_data", experiment_name="test_label_add_no_target" + stream = nm.Stream( + data=arr_test, + experiment_name="test_label_add_no_target", + sfreq=1000, + channels=channels, + settings=settings, + verbose=True, ) + df = stream.run(out_dir="./test_data") + assert all([col.startswith("ch") or col.startswith("time") for col in df.columns]) diff --git a/tests/test_timing.py b/tests/test_timing.py index 64504a3d..fe803a52 100644 --- a/tests/test_timing.py +++ b/tests/test_timing.py @@ -18,20 +18,18 @@ def test_setting_computation_time(): settings.features.fft = False settings.features.raw_hjorth = True stream = nm.Stream( + experiment_name="test_setting_computation_time", sfreq=fs, data=data, sampling_rate_features_hz=sampling_rate_features_hz, settings=settings, ) - features = stream.run( - out_dir="./test_data", experiment_name="test_setting_computation_time" - ) + features = stream.run(out_dir="./test_data") # test if features up till the last sample was computed assert features.time.iloc[-1] == data_duration_s * fs - # test that the time difference between two samples is the feature sampling rate assert ( features.time.iloc[1] - features.time.iloc[0] @@ -56,13 +54,14 @@ def test_float_fs(): settings.features.fft = False settings.features.raw_hjorth = True stream = nm.Stream( + experiment_name="test_float_fs", sfreq=fs, data=data, sampling_rate_features_hz=sampling_rate_features_hz, settings=settings, ) - features = stream.run(out_dir="./test_data", experiment_name="test_float_fs") + features = stream.run(out_dir="./test_data") # test that the time difference between two samples is the feature sampling rate assert (