-
Notifications
You must be signed in to change notification settings - Fork 0
/
vsd.py
163 lines (119 loc) · 4.11 KB
/
vsd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#
# Copyright (C) 2018, Jaguar Land Rover
# This program is licensed under the terms and conditions of the
# Mozilla Public License, version 2.0. The full text of the
# Mozilla Public License is at https:#www.mozilla.org/MPL/2.0/
#
# Author: Magnus Feuer ([email protected])
#
# Simple library to integrate Python with DSTC
#
import dstc
import ctypes
_callback = None
vsd_swig = None
# Copied in from vehicle_signal_distribution.h
# Could not get exported enums to work.
class data_type_e:
vsd_int8 = 0
vsd_uint8 = 1
vsd_int16 = 2
vsd_uint16 = 3
vsd_int32 = 4
vsd_uint32 = 5
vsd_double = 6
vsd_float = 7
vsd_boolean = 8
vsd_string = 9
vsd_stream = 10
vsd_na = 11
def load_vss_shared_object(so_file):
global vsd_swig
res = ctypes.CDLL("librmc.so", mode=ctypes.RTLD_GLOBAL)
res = ctypes.CDLL("libdstc.so", mode=ctypes.RTLD_GLOBAL)
res = ctypes.CDLL(so_file, mode=ctypes.RTLD_GLOBAL)
vsd_swig = __import__("vsd_swig")
def publish(sig):
vsd_swig.log_debug("VSD Publish")
return vsd_swig.vsd_publish(sig)
def subscribe(sig):
vsd_swig.log_debug("VSD Subscribe")
return vsd_swig.swig_vsd_subscribe(sig)
def signal_by_id(sig_id):
return vsd_swig.swig_vss_get_signal_by_index(sig_id)
def signal(path):
return vsd_swig.swig_vss_find_signal_by_path(path)
def path(sig):
return vsd_swig.swig_vss_get_signal_path(sig)
def type(sig):
return vsd_swig.swig_vsd_data_type(sig)
def get(sig):
dt = vsd_swig.swig_vsd_data_type(sig)
if dt == data_type_e.vsd_int8:
return vsd_swig.swig_vsd_value_i8(sig)
if dt == data_type_e.vsd_uint8:
return vsd_swig.swig_vsd_value_u8(sig)
if dt == data_type_e.vsd_int16:
return vsd_swig.swig_vsd_value_i16(sig)
if dt == data_type_e.vsd_uint16:
return vsd_swig.swig_vsd_value_u16(sig)
if dt == data_type_e.vsd_int32:
return vsd_swig.swig_vsd_value_i32(sig)
if dt == data_type_e.vsd_uint32:
return vsd_swig.swig_vsd_value_u32(sig)
if dt == data_type_e.vsd_double:
return vsd_swig.swig_vsd_value_d(sig)
if dt == data_type_e.vsd_float:
return vsd_swig.swig_vsd_value_f(sig)
if dt == data_type_e.vsd_boolean:
return vsd_swig.swig_vsd_value_b(csig)
if dt == data_type_e.vsd_string:
return vsd_swig.swig_vsd_value_s(sig)
if dt == data_type_e.vsd_stream:
return None
if dt == data_type_e.vsd_na:
return None
return None
def set(sig, val):
dt = vsd_swig.swig_vsd_data_type(sig)
if dt == data_type_e.vsd_int8:
return vsd_swig.swig_vsd_set_i8(sig, val)
if dt == data_type_e.vsd_uint8:
return vsd_swig.swig_vsd_set_u8(sig, val)
if dt == data_type_e.vsd_int16:
return vsd_swig.swig_vsd_set_i16(sig, val)
if dt == data_type_e.vsd_uint16:
return vsd_swig.swig_vsd_set_u16(sig, val)
if dt == data_type_e.vsd_int32:
return vsd_swig.swig_vsd_set_i32(sig, val)
if dt == data_type_e.vsd_uint32:
return vsd_swig.swig_vsd_set_u32(sig, val)
if dt == data_type_e.vsd_double:
return vsd_swig.swig_vsd_set_d(sig, val)
if dt == data_type_e.vsd_float:
return vsd_swig.swig_vsd_set_f(sig, val)
if dt == data_type_e.vsd_boolean:
return vsd_swig.swig_vsd_set_b(sig, val)
if dt == data_type_e.vsd_string:
return vsd_swig.swig_vsd_set_s(sig, val)
if dt == data_type_e.vsd_stream:
return None
if dt == data_type_e.vsd_na:
return None
return None
def _intermediate_callback(*arg):
# Invoke the real callback with unpacked tuples provided
# as native arguments.
(path, sig_type, value) = arg
vsd_swig.log_debug("Intermediate callback called for path: " + str(path))
_callback(path, sig_type, value)
def set_callback(cb):
global _callback
_callback = cb
return vsd_swig.swig_vsd_set_python_callback(_intermediate_callback)
def process_events(timeout_msec):
return dstc.process_events(timeout_msec);
def process_pending_events(timeout_msec):
return dstc.process_pending_events(timeout_msec);
def activate():
dstc.activate()