This repository has been archived by the owner on Nov 8, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cosmos_cli.py
executable file
·182 lines (156 loc) · 5.84 KB
/
cosmos_cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/usr/bin/env python
import argparse
from functools import partial
from cmd2 import Cmd
import cmd2.constants
from pydocumentdb.errors import HTTPFailure
from pygments import highlight
from pygments.lexers import JsonLexer
from pygments.formatters import TerminalFormatter
from termcolor import colored
import json
import os
import sys
import pydocumentdb.document_client as document_client
cmd2.constants.REDIRECTION_OUTPUT = '->'
cmd2.constants.REDIRECTION_APPEND = '->>'
cmd2.constants.REDIRECTION_CHARS = [
cmd2.constants.REDIRECTION_PIPE, cmd2.constants.REDIRECTION_OUTPUT]
def get_id(x):
return x['id']
def startswith(text, x):
return x.startswith(text)
class CosmosPrompt(Cmd):
def __init__(self):
Cmd.__init__(self, persistent_history_file='~/.cosmos-cli-history')
self.client = self.get_client()
self._databases = None
self._collections = None
self.database = None
self.collection = None
self.result_json = None
self.output_function = self.ppaged
self.update_prompt()
def get_client(self):
try:
client = document_client.DocumentClient(
os.environ['COSMOS_ENDPOINT'],
{'masterKey': os.environ['COSMOS_ACCOUNT_KEY']}
)
except Exception:
self.pfeedback(
'Set COSMOS_ENDPOINT and COSMOS_ACCOUNT_KEY '
'in the environment.')
raise SystemExit
return client
def get_collection_path(self, silent=False):
if not silent:
if self.database is None:
raise ValueError(
'Use "database <database_name>" '
'to select CosmosDB database')
if self.collection is None:
raise ValueError('Use "collection <collection_name>" '
'to select CosmosDB collection')
return '/dbs/{}/colls/{}'.format(self.database, self.collection)
def update_prompt(self):
self.prompt = '{}{}{} '.format(
colored('[', color='white', attrs=['bold']),
colored(self.get_collection_path(silent=True), color='white'),
colored(']', color='white', attrs=['bold']),
)
def do_export(self, args):
"""Export last result (if set) to file in JSON format"""
if not self.result_json:
self.pfeedback('No result to export. Make a SELECT query.')
return
with open(os.path.expanduser(args), 'w') as outfile:
outfile.write(self.result_json)
def do_select(self, args):
"""Query CosmosDB documents using a select statement."""
try:
self.result_json = json.dumps(
list(self.client.QueryDocuments(
self.get_collection_path(),
{'query': 'SELECT {}'.format(args)},
{
'enableScanInQuery': True,
'enableCrossPartitionQuery': True,
},
)),
indent=2,
sort_keys=True,
)
self.output_function(highlight(
self.result_json, JsonLexer(), TerminalFormatter()))
except HTTPFailure as e:
try:
body = str(e).split('\n', 1)[1]
error = json.loads(body)
message = json.loads(
error['message'].split('\r')[0][9:]
)['errors'][0]['message']
self.perror(message, traceback_war=False)
except Exception:
# if payload is unexpected shape, print entire exception
self.perror(e, traceback_war=False)
except ValueError as e:
self.pfeedback(e)
def do_pager(self, args):
"""Set to True (default) to use pager and False to disable.
Color codes are stripped from output during redirection with pager
set to False."""
if args.lower() in ('true', 'on'):
self.output_function = self.ppaged
elif args.lower() in ('false', 'off'):
self.output_function = self.poutput
self.poutput('pager: {}'.format(self.output_function == self.ppaged))
do_SELECT = do_select
def do_exit(self, args):
"""Exit cosmos-cli"""
raise SystemExit
do_EOF = do_exit
def do_database(self, args):
"""Set CosmosDB database."""
self.database = args
self._collections = list(map(
get_id,
self.client.QueryCollections(
'/dbs/{}'.format(self.database),
'SELECT * FROM c'
)
))
self.update_prompt()
def complete_database(self, text, line, begidx, endidx):
if self._databases is None:
self._databases = list(map(
get_id,
self.client.QueryDatabases('SELECT * FROM d')))
return filter(partial(startswith, text), self._databases)
def do_collection(self, args):
"""Set CosmosDB database collection."""
self.collection = args
self.update_prompt()
def complete_collection(self, text, line, begidx, endidx):
if self._collections:
return filter(partial(startswith, text), self._collections)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--database', type=str)
parser.add_argument('-c', '--collection', type=str)
parser.add_argument('commands', nargs='*')
args = parser.parse_args()
prompt = CosmosPrompt()
if args.database:
prompt.do_database(args.database)
if args.collection:
prompt.do_collection(args.collection)
if args.commands:
for command in args.commands:
prompt.onecmd(command)
return
else:
sys.argv = sys.argv[:1]
prompt.cmdloop('Connected to CosmosDB')
if __name__ == '__main__':
main()