-
Notifications
You must be signed in to change notification settings - Fork 0
/
srpabstract.py
135 lines (120 loc) · 4.8 KB
/
srpabstract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import numpy as np
from scipy.special import logsumexp
class SRPAbstract:
"""
The abstract class for Thompas sampling SRP statistics
"""
def __init__(self, p, c, k, M, nsensors, Ks, L=-1, chart = 'srp',mode = 'T2',selectmode='indi',decisionchart=1,overflow=False):
"""
srp is the main class of the library
Input:
- p: Number of dimensions
- c: scale vector, Target meanshift is c * M
- k: Number of failuer Mode
- M: Failure Mode Mean Matrix of k failure modes: p * k
- nsensors: number of selected sensors
- Ks: Number of selected failure mode
- L: control limit, set to -1 if not initialized yet.
"""
self.p = p
self.c = c
self.k = k
self.M = M
self.nsensors = nsensors
self.Ks = Ks
self.chart = chart
self.mode = mode
self.selectmode = selectmode
self.decisionchart = decisionchart
self.overflow = overflow
def compute_log_LRT(self,a,x):
"""
Compute the log liklihood ratio of
Input:
- a: sensing vectors
- x: sensing data, must be in format of p * 1
"""
pass
def log1pexp(r):
zr = r * 0
for i,ir in enumerate(zr):
if ir <100:
zr[i] = np.log1p(np.exp(ir))
else:
zr[i] = ir
return zr
def compute_index(self,failureModeTopIdx,r=1,mode='T2'):
"""
Compute the index function to decide the best sensing allocation
Input:
- x_sample: sampled version of x, must be in format of p * 1 or p * k
- failureModeTopIdx: The most important failure index
- mode: Types of monitoring statistics
"""
pass
def compute_monitoring_statistics(self,x,T0,L):
"""
Compute monitoring statistics
Input:
- x: input data
- T0: time of change
- L: control limit
"""
Tmax = x.shape[0]
k = self.k
Ks = self.Ks
M = self.M
c = self.c
p = self.p
nsensors = self.nsensors
sequential_statistics = np.zeros((Tmax,k))
sequential_statistics_topRsum = np.zeros((Tmax))
individualS = np.random.randn(k)
failureModeTopIdx = np.argsort(-individualS)[:Ks]
failure_mode_history = np.zeros((Tmax,Ks))
if self.selectmode == 'indi':
a = np.zeros(p)
sensor_selection_history = np.zeros((Tmax,nsensors));
elif self.selectmode == 'cs':
a = np.zeros(p)
sensor_selection_history = np.zeros((Tmax,nsensors,p));
for i in range(Tmax):
if self.chart == 'srp':
if self.overflow:
sequential_statistics[i,:] = SRPAbstract.log1pexp(sequential_statistics[i-1,:]) + self.compute_log_LRT(a,x[[i],:].T)
else:
sequential_statistics[i,:] = np.log1p(np.exp(sequential_statistics[i-1,:])) + self.compute_log_LRT(a,x[[i],:].T)
elif self.chart == 'cusum':
sequential_statistics[i,:] = np.maximum(sequential_statistics[i-1,:] + self.compute_log_LRT(a,x[[i],:].T),0)
failureModeTopIdx = np.argsort(-sequential_statistics[i,:])[:Ks]
sensingSel = self.compute_index(failureModeTopIdx,r=sequential_statistics[i,:])
if self.selectmode == 'indi':
a = np.zeros(p)
a[sensingSel] = 1
elif self.selectmode == 'cs':
a = sensingSel
sensor_selection_history[i] = sensingSel
failure_mode_history[i] = failureModeTopIdx
failureModeTopIdxDec = np.argsort(-sequential_statistics[i,:])[:self.decisionchart]
sequential_statistics_topRsum[i] = np.sum(sequential_statistics[i,failureModeTopIdxDec])
if L != -1:
if sequential_statistics_topRsum[i]>L and i>T0:
break
return sequential_statistics_topRsum, sensor_selection_history, failure_mode_history, i, sequential_statistics
def compute_monitor_batch(self,x, T0, L):
"""
Compute monitoring statistics for batch of samples
Input:
- x: Batched input data
- T0: time of change
- L: control limit
"""
nbatch = x.shape[0]
Tmax = x.shape[1]
Tmonit_stat = np.zeros((nbatch,Tmax))
Tout_all = np.zeros(nbatch)
for i,idata in enumerate(x):
srp,a,b,Tout,c= self.compute_monitoring_statistics(idata,T0, L)
Tmonit_stat[i] = srp
Tout_all[i] = Tout
return Tmonit_stat, Tout_all