-
Notifications
You must be signed in to change notification settings - Fork 0
/
pyOSCfunctions.py
158 lines (129 loc) · 4.02 KB
/
pyOSCfunctions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import math, string, struct
######
#
# OSCMessage decoding functions
#
######
def _readString(data):
"""Reads the next (null-terminated) block of data
"""
length = string.find(data,"\0")
nextData = int(math.ceil((length+1) / 4.0) * 4)
return (data[0:length], data[nextData:])
def _readBlob(data):
"""Reads the next (numbered) block of data
"""
length = struct.unpack(">i", data[0:4])[0]
nextData = int(math.ceil((length) / 4.0) * 4) + 4
return (data[4:length+4], data[nextData:])
def _readInt(data):
"""Tries to interpret the next 4 bytes of the data
as a 32-bit integer. """
if(len(data)<4):
print "Error: too few bytes for int", data, len(data)
rest = data
integer = 0
else:
integer = struct.unpack(">i", data[0:4])[0]
rest = data[4:]
return (integer, rest)
def _readLong(data):
"""Tries to interpret the next 8 bytes of the data
as a 64-bit signed integer.
"""
high, low = struct.unpack(">ll", data[0:8])
big = (long(high) << 32) + low
rest = data[8:]
return (big, rest)
def _readTimeTag(data):
"""Tries to interpret the next 8 bytes of the data
as a TimeTag.
"""
high, low = struct.unpack(">ll", data[0:8])
if (high == 0) and (low <= 1):
time = 0.0
else:
time = int(high) + float(low / 1e9)
rest = data[8:]
return (time, rest)
def _readFloat(data):
"""Tries to interpret the next 4 bytes of the data
as a 32-bit float.
"""
if(len(data)<4):
print "Error: too few bytes for float", data, len(data)
rest = data
float = 0
else:
float = struct.unpack(">f", data[0:4])[0]
rest = data[4:]
return (float, rest)
def decodeOSC(data):
"""Converts a binary OSC message to a Python list.
"""
table = {"i":_readInt, "f":_readFloat, "s":_readString, "b":_readBlob}
decoded = []
address, rest = _readString(data)
if address.startswith(","):
typetags = address
address = ""
else:
typetags = ""
if address == "#bundle":
time, rest = _readTimeTag(rest)
decoded.append(address)
decoded.append(time)
while len(rest)>0:
length, rest = _readInt(rest)
decoded.append(decodeOSC(rest[:length]))
rest = rest[length:]
elif len(rest)>0:
if not len(typetags):
typetags, rest = _readString(rest)
decoded.append(address)
decoded.append(typetags)
if typetags.startswith(","):
for tag in typetags[1:]:
value, rest = table[tag](rest)
decoded.append(value)
else:
raise OSCError("OSCMessage's typetag-string lacks the magic ','")
return decoded
######
#
# OSCError classes
#
######
class OSCError(Exception):
"""Base Class for all OSC-related errors
"""
def __init__(self, message):
self.message = message
def __str__(self):
return self.message
class OSCClientError(OSCError):
"""Class for all OSCClient errors
"""
pass
class OSCServerError(OSCError):
"""Class for all OSCServer errors
"""
pass
class NoCallbackError(OSCServerError):
"""This error is raised (by an OSCServer) when an OSCMessage with an 'unmatched' address-pattern
is received, and no 'default' handler is registered.
"""
def __init__(self, pattern):
"""The specified 'pattern' should be the OSC-address of the 'unmatched' message causing the error to be raised.
"""
self.message = "No callback registered to handle OSC-address '%s'" % pattern
class NotSubscribedError(OSCClientError):
"""This error is raised (by an OSCMultiClient) when an attempt is made to unsubscribe a host
that isn't subscribed.
"""
def __init__(self, addr, prefix=None):
if prefix:
url = getUrlStr(addr, prefix)
else:
url = getUrlStr(addr, '')
self.message = "Target osc://%s is not subscribed" % url