diff --git a/smdebug/_version.py b/smdebug/_version.py index 8c79b2ef9..65d99eceb 100644 --- a/smdebug/_version.py +++ b/smdebug/_version.py @@ -1 +1 @@ -__version__ = "1.0.26" +__version__ = "1.0.27" diff --git a/smdebug/profiler/system_metrics_reader.py b/smdebug/profiler/system_metrics_reader.py index 752070c17..e9176ca60 100644 --- a/smdebug/profiler/system_metrics_reader.py +++ b/smdebug/profiler/system_metrics_reader.py @@ -5,6 +5,7 @@ import os import io import sys +import math import pickle import pandas as pd import numpy as np @@ -75,7 +76,7 @@ def _get_event_files_in_the_range( available timestamp """ - if self._startAfter_prefix is not "": + if self._startAfter_prefix != "": if end_time_microseconds >= get_utctimestamp_us_since_epoch_from_system_profiler_file( self._startAfter_prefix ): @@ -202,13 +203,14 @@ class S3NumpySystemMetricsReader(S3SystemMetricsReader): fill_val = 101 def __init__(self, s3_trial_path, use_in_memory_cache, - col_names, col_dict, + col_names, extra_col_names, col_dict, np_store, store_chunk_size, store_time_file_prefix, group_size, n_nodes, frequency, accumulate_forward = False, accumulate_minutes = 20, n_process=None, logger = None): super().__init__(s3_trial_path, use_in_memory_cache) self.col_names = col_names # list of names of indicators + self.extra_col_names = extra_col_names # list of extra indicators, not stored in memory, as opposed to col_names self.col_dict = col_dict # mapping from name to position. Possibly will decide not needed self.np_store = np_store self.np_chunk_size = store_chunk_size @@ -273,7 +275,9 @@ def init_accumulators(self): num_rows = self.accu_mins * 60 * 10 # Max, at highest frequency num_rows = num_rows + num_rows//10 # Buffer - num_cols = len(self.col_names) + # extra_col_names treated separately because, unlike col_names indices, + # the indices corresponding to col_names, they are stored on disk only + num_cols = len(self.col_names) + len(self.extra_col_names) self.accu_n_rows = num_rows for i in range (0, self.n_nodes): @@ -299,9 +303,10 @@ def init_accumulators(self): self.logger.info("NumpyS3Reader: ALLOCATED ACCU MEM for node {}".format(i)) def _json_to_numpy(node_ind, start_time, end_time, freq_delta, - num_rows, num_cols, np_chunk_size, event_data_list, + num_rows, num_cols, num_extra_cols, + np_chunk_size, event_data_list, shared_mem_id, col_dict, - accu_val_mem_id, accu_time_mem_id, + accu_val_mem_id, accu_time_mem_id, accu_cnt_mem_id, accu_num_rows, np_store, tprefix, queue, logger): @@ -329,25 +334,26 @@ def _json_to_numpy(node_ind, start_time, end_time, freq_delta, accu_shm_count = shared_memory.SharedMemory(name=accu_cnt_mem_id) accu_counts =\ - np.ndarray((num_cols,), dtype=np.int32, buffer=accu_shm_count.buf) + np.ndarray((num_cols+num_extra_cols,), + dtype=np.int32, buffer=accu_shm_count.buf) accu_shm_val = shared_memory.SharedMemory(name=accu_val_mem_id) - accu_vals = np.ndarray((accu_num_rows, num_cols), + accu_vals = np.ndarray((accu_num_rows, num_cols+num_extra_cols), dtype=np.int32, buffer=accu_shm_val.buf) accu_shm_time = shared_memory.SharedMemory(name=accu_time_mem_id) - accu_times = np.ndarray((accu_num_rows, num_cols), + accu_times = np.ndarray((accu_num_rows, num_cols+num_extra_cols), dtype=np.int64, buffer=accu_shm_time.buf) n_extra = np_chunk_size//100 for i in range (0, num_chunks): np_val_chunks[i] =\ - np.full((np_chunk_size+n_extra, num_cols), + np.full((np_chunk_size+n_extra, num_cols+num_extra_cols), -1, dtype = np.int32) np_time_chunks[i] =\ - np.full((np_chunk_size+n_extra, num_cols), + np.full((np_chunk_size+n_extra, num_cols+num_extra_cols), -1, dtype = np.int64) - np_ragged_sizes[i] = np.zeros((num_cols,), dtype = np.int32) + np_ragged_sizes[i] = np.zeros((num_cols+num_extra_cols,), dtype = np.int32) separator = S3NumpySystemMetricsReader.separator @@ -359,23 +365,24 @@ def _json_to_numpy(node_ind, start_time, end_time, freq_delta, n_zeros = 0 n_nonzeros = 0 - network_used = [] - cpu_memory_used = [] for event_data in event_data_list: event_string = event_data.decode("utf-8") event_items = event_string.split("\n") event_items.remove("") for item in event_items: event = json.loads(item) #ojson better - if event['Dimension'] == "Algorithm": - network_used.append(int(event['Value'])) - continue - if event['Name'] == "MemoryUsedPercent": - cpu_memory_used.append( float(event['Value'])) - continue - if event['Name'].startswith("cpu") == False and\ + if event['Dimension'] != "Algorithm" and\ + event['Name'] != "MemoryUsedPercent" and\ + event['Name'].startswith("cpu") == False and\ event['Name'].startswith("gpu") == False: continue + + if event['Name'] == "MemoryUsedPercent": + # More informative, when very little RAM work + event['Value'] = math.ceil(event['Value']) + + is_extra = (event['Dimension'] == "Algorithm" or\ + event['Name'] == "MemoryUsedPercent") col_ind = col_dict[event['Name']+separator+event['Dimension']] cur_time = int(event['Timestamp']*1000000) #fast @@ -389,11 +396,12 @@ def _json_to_numpy(node_ind, start_time, end_time, freq_delta, chunk_ind = row_ind//np_chunk_size chunk_row_ind = row_ind - chunk_ind*np_chunk_size - np_arr[row_ind,col_ind] = event['Value'] - if event['Value'] == 0: - n_zeros += 1 - else: - n_nonzeros += 1 + if is_extra is False: + np_arr[row_ind,col_ind] = event['Value'] + if event['Value'] == 0: + n_zeros += 1 + else: + n_nonzeros += 1 try: if accu_val_mem_id is not None and accu_counts[col_ind] < accu_num_rows: @@ -435,7 +443,7 @@ def _json_to_numpy(node_ind, start_time, end_time, freq_delta, max_entries_in_chunk = 0 min_time_in_chunk = max_time # This will yield a filename - for col_ind in range (0, num_cols): + for col_ind in range (0, num_cols+num_extra_cols): n_entries = np_ragged_sizes[chunk_ind][col_ind] max_entries_in_chunk = max(max_entries_in_chunk, n_entries) @@ -458,8 +466,7 @@ def _json_to_numpy(node_ind, start_time, end_time, freq_delta, continue if max_entries_in_chunk > 0: - - shp = (np_chunk_size+n_extra, num_cols) + shp = (np_chunk_size+n_extra, num_cols+num_extra_cols) S3NumpySystemMetricsReader.store_vals_times( node_ind, min_time_in_chunk, np_store, tprefix, shp, np_val_chunks[chunk_ind], np_time_chunks[chunk_ind]) @@ -469,12 +476,7 @@ def _json_to_numpy(node_ind, start_time, end_time, freq_delta, #print("RAGGED min_time_in_chunk type: {}".format(type(min_time_in_chunk.item()))) #print("RAGGED np_ragged_sized type: {}".format(type(np_ragged_sizes[chunk_ind]))) - network_used = np.array(network_used) - cpu_memory_used = np.array(cpu_memory_used) - S3NumpySystemMetricsReader.store_vals(node_ind, min_time, np_store, (len(network_used),), network_used, val_type="Network") - S3NumpySystemMetricsReader.store_vals(node_ind, min_time, np_store, (len(cpu_memory_used),), cpu_memory_used, val_type="CPUmemory", dtype=float) - - logger.info("S3NumpyReader _json_to_numpy FINISHED for node {}".format(node_ind)) + logger.info("S3NumpyReader _json_to_numpy FINISHED for node {} min_row {}, max_row {}, min_time {}, max_time {}".format(node_ind, min_row, max_row, min_time, max_time)) queue.put((node_ind, min_row, max_row, min_time, max_time, jagged_metadata)) def get_events( @@ -542,6 +544,8 @@ def get_events( num_rows = (end_time-start_time)//(self.frequency*1000) num_cols = len(self.col_names) + # Not stored in memory, as opposed to col_names based indicators: + num_extra_cols = len(self.extra_col_names) np_chunk_size = self.np_chunk_size self.logger.info("NumpyS3Reader: untrimmed DF shape ({},{})".format(num_rows,len(self.col_names))) np_arr = np.full((num_rows, num_cols), np.nan) @@ -581,7 +585,7 @@ def get_events( for i in range (0, n_nodes): tasks[i] = Process(target=S3NumpySystemMetricsReader._json_to_numpy, args=(i, start_time, end_time, freq_delta, - num_rows, num_cols, np_chunk_size, + num_rows, num_cols, num_extra_cols, np_chunk_size, event_data_lists[i], shared_mem_ids[i], copy.deepcopy(self.col_dict), self.accu_val_mem_ids[i] if forward else None, @@ -632,6 +636,16 @@ def get_events( # Could multiprocess the backfill as well. Not a bottleneck for now post_process = True + """ + Logic for fl below. + We want to backfill, but not when the user pauses profiling + Also, the user may have switched profiling frequencies, max being 60000, + our original being possibly 100. So we deem as "interruption" + if we do not see a signal within + fudge_factor * 60000 * "max freq" / "our preq" + worst case is 3 minutes + """ + fl = int(max(5, 3*60000/self.frequency)) for i in range (0, n_nodes): shm = shared_memory.SharedMemory(name=shared_mem_ids[i]) arr_from_sh = np.ndarray(num_rows*num_cols, @@ -646,8 +660,14 @@ def get_events( self.logger.info("S3NumpyReader: {} nans out of {}".format(mask.sum(), np_arr.size)) st_loc = time.perf_counter_ns() temp_df = pd.DataFrame(np_arr) - temp_df.fillna(method='ffill', axis=0, inplace=True) - temp_df.fillna(method='bfill', axis=0, inplace=True) + + # Fill at most 5 missing values, if more, there is a gap + temp_df.fillna(method='ffill', axis=0, limit = fl, inplace=True) + temp_df.fillna(method='bfill', axis=0, limit = fl, inplace=True) + # If anything is left to fill, we had a profiling gap. Zero it + temp_df.fillna(0, axis=0, inplace=True) + temp_df.fillna(0, axis=0, inplace=True) + fill_val = S3NumpySystemMetricsReader.fill_val temp_df.fillna(fill_val, axis=0, inplace=True) np_arr = temp_df.values @@ -696,7 +716,8 @@ def collect_accu_metadata(self, start_time: int, end_time: int, forward: bool): self.logger.info ("S3NumpyReader: writting accumulated data") - num_cols = len(self.col_names) + # both col_names based and extra_col_names based indicators go on disk: + num_cols = len(self.col_names) + len(self.extra_col_names) num_rows = self.accu_n_rows np_store = self.np_store tprefix = self.tprefix @@ -807,11 +828,10 @@ def store_vals_times(node_ind, min_time_in_chunk, np_store, tprefix, S3NumpySystemMetricsReader.dump_to_disk(np_store, node_name, time_filename, np_time, shp, dtype=np.int64) @staticmethod - def store_vals(node_ind, min_time, np_store, shp, np_data, val_type="", dtype=np.int64): + def store_vals(node_ind, np_store, shp, np_data, val_type="", dtype=np.int32): node_name = S3NumpySystemMetricsReader.node_name_from_index(node_ind) separator = S3NumpySystemMetricsReader.separator - filename = val_type + separator + str(min_time) + separator + str(node_ind+1) + \ - ".npy" + filename = val_type + separator + str(node_ind+1) + ".npy" if np_store.startswith("s3://"): S3NumpySystemMetricsReader.dump_to_s3(np_store, node_name, filename, np_data) else: @@ -829,9 +849,12 @@ def dump_to_s3(s3_storage_loc, node_name, filename, np_data): s3_client.upload_fileobj(data_stream, bucket, filepath) @staticmethod - def dump_to_disk(disk_storage_loc, node_name, filename, np_data, shp, dtype=np.int64): + def dump_to_disk(disk_storage_loc, node_name, filename, np_data, shp, dtype=np.int32): + directory = os.path.join(disk_storage_loc, node_name) filepath = os.path.join(disk_storage_loc, node_name, filename) + if not os.path.exists(directory): + os.makedirs(directory) fp_numpy = np.memmap(filepath, dtype=dtype, offset=0, mode='w+', shape = shp)