-
-
Notifications
You must be signed in to change notification settings - Fork 5
/
amcp.py
170 lines (152 loc) · 5.09 KB
/
amcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#
# This file is part of Mediary's Caspar Client.
# Copyright (C) 2018 Mediary Limited. All rights reserved.
#
'''
Python (2) interface to v2.1 of the CasparCG AMCP protocol
Note that not all parts of the protocol are implemented - only what I needed.
'''
import socket
import configparser
import json
import globalwidget
def code_lookup(c):
if c==400:
return 'Unspecified client error'
elif c==401:
return 'Illegal video channel'
elif c==402:
return 'Parameter missing'
elif c==403:
return 'Illegal parameter'
elif c==404:
return 'Media file not found'
elif c>=400 and c<500:
return 'Unknown client error code'
elif c==500:
return 'Unspecified internal server error'
elif c==501:
return 'Internal server error'
elif c==502:
return 'Media file unreadable'
elif c==503:
return 'Access error'
elif c>=500 and c<600:
return 'Unknown server error code'
else:
return 'Unknown error code'
class AMCPException(Exception):
def __init__(self, code, info):
self.code = code
self.info = info
def __str__(self):
return '%s (%s): %s' %(self.code, code_lookup(self.code), self.info)
class ClientError(AMCPException):
pass
class ServerError(AMCPException):
pass
class Connection(object):
def __init__(self, config, reporter=None):
'''
Initialise a connection wrapping object.
Doesn't actually connect until we need to.
If reporter is not None, its status() method may be called
at any time with a human-readable text string.
'''
self.socket = None # we'll connect on demand
self.server = None
self.port = None
self.reporter = reporter
self.config = config
def connect(self):
self.server = globalwidget.get_server(self.config)
self.port = globalwidget.get_port(self.config)
self.socket = socket.create_connection((self.server, self.port), 5)
def server_changed(self):
return not ( (self.server == globalwidget.get_server(self.config)) and (self.port == globalwidget.get_port(self.config)) )
def info(self, what=''):
''' INFO command/subcommand '''
return self.transact('INFO '+what)
def version(self, what=''):
''' VERSION command/subcommand '''
return self.transact('VERSION '+what)
def report(self,msg):
if self.reporter is not None:
self.reporter.status(msg)
self.reporter.update()
def transact(self, command):
'''
Error-checking transaction
'''
if self.socket and self.server_changed():
self.socket.close()
self.socket = None
if self.socket is None:
self.report('Connecting to server...')
self.connect()
try:
self.socket.send( (command+'\r\n').encode('utf-8') )
except Exception as e:
self.socket=None
raise Exception('server connection lost (%s)'%e)
# Read until we see a \r\n
response = ''
while not response.endswith('\r\n'):
tmp = self.socket.recv(4096).decode('utf-8')
if len(tmp) is 0: # socket closed
self.socket=None
raise Exception('server connection lost')
response += tmp
raw = response.strip().split('\r\n')
(status, info) = raw[0].split(' ',1)
status=int(status)
if status==202:
# OK, no data beyond the status line
return info
elif status==201:
# OK, one line data
return raw[1]
elif status==200:
# OK, multiline data, make sure we've got it all
while not response.endswith('\r\n\r\n'):
response += self.socket.recv(4096).decode('utf-8')
return response.strip().split('\r\n')[1:]
elif status>=400 and status<500:
raise ClientError(status, info)
elif status>=500 and status<600:
raise ServerError(status, info)
# That's interesting, we didn't recognise the status code.
raise AMCPException(status, info)
def quote(s):
'''
Quotes a parameter for transit over AMCP
Rules:
\ -> \\
Newline -> \n
" -> \"
Then, if there's a space, put double quotes around the whole thing.
'''
s=s.replace('\\','\\\\')
s=s.replace('\r\n', '\n')
s=s.replace('\n\r', '\n')
s=s.replace('\n', '\\n')
s=s.replace('"', '\\"')
if '"' in s:
s="\"%s\""%s
return s
def unquote(s):
''' Reverse of quote() '''
if s[0]=='"' and s[-1]=='"':
s = s[1:-1]
s=s.replace('\\"','"')
s=s.replace('\\n', '\n')
s=s.replace('\\\\','\\')
return s
def jsondata(obj):
'''
Serialises an object (dictionary) as JSON, quoted for Caspar/AMCP
'''
return quote(json.dumps(obj))
if __name__=='__main__':
for s in ['abc', 'abc"', 'a b', 'a "b c"', 'a\\b', 'a\nb', 'a\n"b \\c"']:
assert unquote(quote(s)) == s, 'quote unquote test: %s'%s