-
Notifications
You must be signed in to change notification settings - Fork 0
/
hist
executable file
·208 lines (195 loc) · 8.82 KB
/
hist
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
#!/usr/bin/env python3
"""Cinelog Search CLI
Usage:
hist.py [-a] <term>...
hist.py -h | --help
hist.py -v | --version
Options:
-a Also search in *.cast files.
<term>... Search terms to filter commands.
-h --help Show this screen.
-v --version Show version.
"""
from docopt import docopt
import glob
from itertools import chain
from pathlib import Path
import json
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import FuzzyWordCompleter
from prompt_toolkit.key_binding import KeyBindings
import shutil
import subprocess
import os
import re
import ast
kb = KeyBindings()
def combine_glob_patterns(*patterns):
return chain.from_iterable(glob.iglob(pattern) for pattern in patterns)
def strip_ansi_escape_sequences(text):
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)
def read_events(file_path, has_header=False):
events = []
with open(file_path, 'r') as f:
if has_header:
header_line = f.readline()
try:
header = json.loads(header_line)
except json.JSONDecodeError:
header = None
else:
header = None
for line in f:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
events.append(event)
except json.JSONDecodeError:
continue
return header, events
if __name__ == '__main__':
arguments = docopt(__doc__, version='Cinelog Search CLI 1.0', options_first=True)
if arguments['<term>']:
home = str(Path.home())
script_dir = os.path.dirname(os.path.abspath(__file__))
search_term = ' '.join(arguments['<term>']).casefold()
results = {}
# Command search base... input and output or just the commands
if arguments['-a']:
patterns = [
os.path.join(home, f"logs/*/*.cast.commands.log"),
os.path.join(home, 'logs/*.cast.commands.log'),
os.path.join(home, f"logs/*/*.cast"),
os.path.join(home, 'logs/*.cast'),
]
else:
patterns = [
os.path.join(home, f"logs/*/*.cast.commands.log"),
os.path.join(home, 'logs/*.cast.commands.log'),
]
combined_files = chain.from_iterable(glob.iglob(pattern) for pattern in patterns)
for filepath in combined_files:
with open(filepath) as logfile:
file_matched = False
results[logfile.name] = []
previous_command = ""
file_extension = os.path.splitext(filepath)[1]
if file_extension == '.log':
for line in logfile:
if '"m"' in line:
# Extract the command from the line
try:
data = ast.literal_eval(line.replace(r'\+', r'\\+'))
timestamp = data[0]
previous_command = data[2]
# Check if the search term is in the command
if search_term in previous_command.casefold():
file_matched = True
results[logfile.name].append(f"{timestamp},{previous_command}")
except (SyntaxError, SyntaxWarning, ValueError) as e:
print(f"Error parsing line in {filepath}: {e}")
continue
if not file_matched:
del results[logfile.name]
elif file_extension == '.cast':
# Skip the header line
header_line = logfile.readline()
try:
header = json.loads(header_line)
except json.JSONDecodeError:
continue
for line in logfile:
try:
data = json.loads(line)
except json.JSONDecodeError:
continue
if len(data) >= 3:
timestamp = data[0]
event_type = data[1]
content = data[2]
if event_type == 'o':
if "hist" not in content: # don't pollute search results with search
# Remove ANSI escape sequences
clean_content = strip_ansi_escape_sequences(content)
# Stop at the first backslash to shorten the output
clean_content = clean_content.split('\\', 1)[0]
# Check if the search term is in the cleaned content
if search_term in clean_content.casefold():
file_matched = True
# Shorten the output for display
short_content = clean_content.strip()[:80]
results[logfile.name].append(f"{timestamp},{short_content}")
if not file_matched:
del results[logfile.name]
else:
continue
if not results:
print(f"No matches found for: '{search_term}'")
exit()
complete = []
for file in results.keys():
for timestamp_command in results[file]:
file_short = re.sub(f"{home}/logs/", "", file)
complete.append(f"{file_short},{timestamp_command}")
result_completer = FuzzyWordCompleter(complete)
session = PromptSession(
"Select your match:\n>",
completer=result_completer,
key_bindings=kb,
)
webfolder = os.environ.get('SUDO_USER','')
while True:
try:
text = session.prompt(
pre_run=session.default_buffer.start_completion,
complete_while_typing=True,
mouse_support=True
)
selected_file = text.split(",")[0]
timestamp = text.split(",")[1]
if os.path.splitext(selected_file)[1] == ".log":
asciinema_log = os.path.join(home, "logs", selected_file.split(".")[0] + ".cast")
asciinema_command_log = os.path.join(home, "logs", selected_file)
elif os.path.splitext(selected_file)[1] == ".cast":
asciinema_log = os.path.join(home, "logs", selected_file)
asciinema_command_log = os.path.join(home, "logs", selected_file + ".commands.log")
filename = os.path.basename(asciinema_log)
# Merge terminal log and command log to script-relative tmp directory
tmp_dir = os.path.join(script_dir, "asciinema-player", "tmp")
os.makedirs(tmp_dir, exist_ok=True)
os.makedirs(os.path.join(tmp_dir, webfolder), exist_ok=True)
output_file_path = os.path.join(tmp_dir, webfolder, filename)
print(output_file_path)
# Read events from both files
header1, events1 = read_events(asciinema_log, has_header=True)
_, events2 = read_events(asciinema_command_log)
# Combine events and sort
all_events = events1 + events2
all_events.sort(key=lambda event: event[0])
# Write combined events to output file
with open(output_file_path, 'w') as f:
# Write the header from the .cast file
if header1:
json.dump(header1, f)
f.write('\n')
for event in all_events:
json.dump(event, f)
f.write('\n')
cinelog_viewer_port = os.environ.get('CINELOG_VIEWER_PORT','10001')
print(cinelog_viewer_port)
url = f"http://127.0.0.1:{cinelog_viewer_port}/play.html?filename=tmp/{webfolder}/{filename}×tamp={timestamp}"
print("URL to Event: " + url)
subprocess.Popen(['xdg-open', url], stdout=open(os.devnull, 'wb'))
except Exception as e:
print(e)
except KeyboardInterrupt:
# Clean up tmp files per user
files = glob.glob(os.path.join(script_dir, 'asciinema-player', 'tmp', webfolder, '*'))
for f in files:
os.remove(f)
break
else:
print(__doc__)