Skip to content

Commit

Permalink
giving bag_stream more modularity by cleaning Namespace.
Browse files Browse the repository at this point in the history
  • Loading branch information
1hada committed Jul 29, 2022
1 parent 256e4d5 commit aacaa41
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 21 deletions.
35 changes: 21 additions & 14 deletions src/rosbag_merge/bag_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
CSVS = rosbag_to_csv.CsvStreams

# inspired from : https://github.com/Kautenja/rosbag-tools/blob/master/merge.py , which has no LICENSE information and is not maintained
def stream(input_bag: Bag, output_bag: Bag, topics: list = ["*"], args: Namespace = None, input_bag_filename: str = None) -> None:
def stream(input_bag: Bag, topics: list = ["*"], output_bag: Bag=None, input_bag_filename: str=None, write_csvs : bool=False, global_output_path : str="") -> None:
"""Stream data from an input bag to an output bag.
:param input_bag: the input bag to stream from
:param topics: a list of the topics to include
:param output_bag: the output bag to write data to
:param input_bag_filename: used to write the csv name
:param write_csvs: bool to determine whether to write csvs.
:param topics: a list of the topics to include
:param global_output_path: The global path to write the csv with.
:return: None
"""
Expand All @@ -35,9 +39,9 @@ def stream(input_bag: Bag, output_bag: Bag, topics: list = ["*"], args: Namespac
if (output_bag):
# write this message to the output bag
merge_bags.write_to_bag(output_bag, topic, msg, time)
if (args.write_csvs):
if (write_csvs):
# write this message to the output csv
CSVS.write_data(topic,msg,time,input_bag_filename,args.global_output_path)
CSVS.write_data(topic,msg,time,input_bag_filename,global_output_path)
# increment the counter of included messages
msg_count += 1
if ( 0 == msg_count % prog_bar_update_interval):
Expand All @@ -47,31 +51,34 @@ def stream(input_bag: Bag, output_bag: Bag, topics: list = ["*"], args: Namespac
prog.set_postfix(msg_count=msg_count, prog_bar_update_interval=prog_bar_update_interval)
prog.close()

def stream_iter(args : Namespace , output_bag: Bag=None) -> None:
def stream_iter(input_bags : 'list[str]', topics : 'list[str]'=["*"], output_bag: Bag=None, write_csvs : bool=False, global_output_path : str=False) -> None:
"""Iterate over the input files
:param args: args which are used for path data.
:param input_bags: a list of bag names to write data to.
:param topics: a list of the topics to include
:param output_bag: the output bag to write data to
:param write_csvs: bool to determine whether to write csvs.
:param global_output_path: The global path to write the csv with.
:return: None
"""
for filename in tqdm(args.input_bags, unit='bag'):
for filename in tqdm(input_bags, unit='bag'):
# open the input bag with an automatically closing context
with Bag(filename, 'r') as input_bag:
# stream the input bag to the output bag
stream(input_bag, output_bag = output_bag, topics=args.topics, args=args, input_bag_filename=filename)
stream(input_bag, topics=topics, output_bag=output_bag, input_bag_filename=filename, write_csvs=write_csvs, global_output_path=global_output_path)

def main(args : Namespace ):
def main(input_bags : 'list[str]', topics : 'list[str]', write_bag : bool, write_csvs : bool, global_output_path : str, outbag_name : str):
try:
if(args.write_bag):
full_bag_path = os.path.join(args.global_output_path,args.outbag_name+".bag")
if(write_bag):
full_bag_path = os.path.join(global_output_path,outbag_name+".bag")
# open the output bag in an automatically closing context
with Bag(full_bag_path, 'w') as output_bag:
stream_iter(args, output_bag)
stream_iter(input_bags, topics = topics, output_bag = output_bag, write_csvs=write_csvs, global_output_path=global_output_path)
else:
stream_iter(args)
stream_iter(input_bags=input_bags, topics = topics, write_csvs=write_csvs, global_output_path=global_output_path)
except KeyboardInterrupt:
pass
finally:
if(args.write_csvs):
CSVS.close_streams()
if(write_csvs):
CSVS.close_streams()
2 changes: 0 additions & 2 deletions src/rosbag_merge/csv_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
from icecream import ic
ic.configureOutput(includeContext=True)

ic (sys.argv[1:])

def show_data_on_timeline(df:dd):
df[['cmd_vel_final.linear.x', 'cmd_vel_final.linear.y']].resample('1m').mean().compute().plot()
plt.show()
Expand Down
28 changes: 25 additions & 3 deletions src/rosbag_merge/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def parse_args(args)-> argparse.Namespace:
type=str,
nargs='*',
help='A sequence of topics to include from the input bags.',
default=None,#['*'],
default=None, # or list of strings ['*'],
required=False,
)
parser.add_argument('--topic-file', '-f',
Expand Down Expand Up @@ -118,10 +118,32 @@ def main():
args.input_csvs.extend(glob.glob(os.path.join(args.global_output_path,"*-single-topic.csv")))
csv_merge.merge_csvs_using_dask(args.input_csvs)
else:
bag_stream.main(args)
bag_stream.main(input_bags=args.input_bags
,write_bag=args.write_bag
,write_csvs=args.write_csvs
,global_output_path=args.global_output_path
,outbag_name=args.outbag_name
,topics=args.topics
)

if __name__ == "__main__":
main(sys.argv[1:])
args = sys.argv[1:]
args = parse_args(args)
if(args.merge_csvs):
# TODO , see why can't make csvs and merge in the same call to main
expecting_to_merge_newly_written_csvs = args.write_csvs and (not len(args.input_csvs))
if(expecting_to_merge_newly_written_csvs):
# gather newly made csvs from the output path
args.input_csvs.extend(glob.glob(os.path.join(args.global_output_path,"*-single-topic.csv")))
csv_merge.merge_csvs_using_dask(args.input_csvs)
else:
bag_stream.main(input_bags=args.input_bags
,write_bag=args.write_bag
,write_csvs=args.write_csvs
,global_output_path=args.global_output_path
,outbag_name=args.outbag_name
,topics=args.topics
)

# explicitly define the outward facing API of this module
__all__ = [ main.__name__ ]
4 changes: 2 additions & 2 deletions src/rosbag_merge/rosbag_to_csv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python3

from argparse import Namespace
from datetime import datetime
import io
import os
Expand Down Expand Up @@ -211,7 +210,8 @@ def format_csv_filename(self, form, full_file_path, topic):
"""A helper to get a formmated filename.
:param form: the format which defines the saved csv
:param msg: a ros msg
:param full_file_path: The global path to write the csv with.
:param topic: the topic name
:return: a file name which is formatted to let users distinguish the csv files.
"""
Expand Down

0 comments on commit aacaa41

Please sign in to comment.