Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream_refactor #362

Open
wants to merge 7 commits into
base: gui
Choose a base branch
from
Open

Stream_refactor #362

wants to merge 7 commits into from

Conversation

toni-neurosc
Copy link
Collaborator

I made some changes to the Stream class with the idea of trying to clarify where different tasks should be handled, for example:

  • Data should be passed now to the stream constructor, as well as experiment name, LSL stream name, etc.
  • run not only takes parameters related to the specific run, such as out_dir, save_csv, save_interval, return_df
  • I have tried to only execute certain tasks and validations at the latest moment they are needed. For example, as we might change settings before running the stream, nyquist frequency check has been moved to the run function.

More changes I'd like to add:

  • Move instancing of DataGenerator and DataProcessor to run function
  • Move Stream control logic to a separate class that we can pass in to the constructor
  • Move output writing (csv, sqlite, etc) to a separate class to output can be written in different formats at the request of the user

@timonmerk
Copy link
Contributor

@toni-neurosc I can take today and tomorrow to work on this branch and merge it into main. You mentioned that we can further refactor those points:

  • Move instancing of DataGenerator and DataProcessor to run function
  • Move Stream control logic to a separate class that we can pass in to the constructor
  • Move output writing (csv, sqlite, etc) to a separate class to output can be written in different formats at the request of the user

I agree with the changes, they make handling of the stream easier.

@timonmerk
Copy link
Contributor

I finished now a first version of the upper changes. An example to use it now looks as follows:

import py_neuromodulation as nm
from py_neuromodulation.stream.data_processor import DataProcessor
from py_neuromodulation.stream.rawdata_generator import RawDataGenerator
from py_neuromodulation.stream.mnelsl_generator import MNELSLGenerator
import asyncio

async def main():
    (
        RUN_NAME,
        PATH_RUN,
        PATH_BIDS,
        PATH_OUT,
        datatype,
    ) = nm.io.get_paths_example_data()

    (
        raw,
        data,
        sfreq,
        line_noise,
        coord_list,
        coord_names,
    ) = nm.io.read_BIDS_data(PATH_RUN=PATH_RUN)

    channels = nm.utils.create_channels(
        ch_names=raw.ch_names,
        ch_types=raw.get_channel_types(),
        reference="default",
        bads=raw.info["bads"],
        new_names="default",
        used_types=("ecog", "dbs", "seeg"),
        target_keywords=["MOV_RIGHT"],
    )

    settings = nm.NMSettings.get_fast_compute()

    data_generator = RawDataGenerator(data,
                                    settings.sampling_rate_features_hz,
                                    settings.segment_length_features_ms,
                                    channels,
                                    sfreq,
                                    )

    output_writer = nm.utils.data_writer.DataWriter(
        out_dir=PATH_OUT, save_csv=True, save_interval=10, experiment_name=RUN_NAME
    )

    data_processor = DataProcessor(
        sfreq=sfreq,
        settings=settings,
        channels=channels,
        coord_names=coord_names,
        coord_list=coord_list,
        line_noise=line_noise,
        verbose=True,
    )

    data_generator = nm.stream.rawdata_generator.RawDataGenerator(
        data, settings.sampling_rate_features_hz, settings.segment_length_features_ms, channels, sfreq
    )

    stream = nm.Stream(verbose=True)

    df_features = await stream.run(
        data_processor,
        data_generator,
        output_writer,
    )

    ######### Definition of LSL stream:

    lsl_generator = MNELSLGenerator(
        segment_length_features_ms=settings.segment_length_features_ms,
        sampling_rate_features_hz=settings.sampling_rate_features_hz,
        stream_name="example_stream"
    )
    channels = lsl_generator.get_LSL_channels()

    lsl_generator = lsl_generator.get_next_batch()

    stream = nm.Stream(verbose=True)

    df_features = stream.run(
        data_processor,
        lsl_generator,
        output_writer,
    )

if __name__ == "__main__":
    asyncio.run(main())

Through the await calls, the whole run function needs to be called now every time with asyncio. Not sure if that's optimal.. But I cannot think of another option.

There is not a "stream control logic" class yet. I would also need to think for some time how such a class could be included. At the moment there is the is_running variable, stream_handling_queue and websocket_featues that could be included in that class.

@timonmerk
Copy link
Contributor

Couple of additional points:

  • The stream currently doesn't have the sfreq parameter, in fact it has in the init function only verbose.
  • The settings and channels are now part of the processing/data_processor. I tried it change it in such way that there should be "one and only one" place where those objects are saved.
  • I've tried to play around "hiding" the fact that the run function is now asynchronous by placing the asyncio.run() function in a wrapper, but it didn't really work. For now it might be necessary to await it and call the now async main function or call it via asyncio.get_event_loop().run_until_complete(stream.run())

@toni-neurosc toni-neurosc self-assigned this Nov 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants