-
Notifications
You must be signed in to change notification settings - Fork 0
/
ExtendedTSSRP.py
87 lines (77 loc) · 3.09 KB
/
ExtendedTSSRP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import numpy as np
from scipy.special import logsumexp
from model.srpabstract import SRPAbstract
class ExtendedTSSRP(SRPAbstract):
"""
Extended the SRPAbstract class
"""
def __init__(self, p, c, k, M, nsensors, Ks, L=-1, chart = 'srp',mode = 'T2',sample_mode = 'sample', selectmode = 'indi',decisionchart=1,overflow=False):
"""
srp is the main class of the library
Input:
- p: Number of dimensions
- c: scale vector, Target meanshift is c * M
- k: Number of failuer Mode
- M: Failure Mode Mean Matrix of k failure modes: p * k
- nsensors: number of selected sensors
- Ks: Number of selected failure mode
- L: control limit, set to -1 if not initialized yet.
"""
super().__init__(p, c, k, M, nsensors, Ks, L, chart, mode, selectmode, decisionchart,overflow)
self.sample_mode = sample_mode
def compute_log_LRT(self,a,x):
"""
Compute the log liklihood ratio of
Input:
- a: sensing vectors
- x: sensing data, must be in format of p * 1
"""
c = self.c
M = self.M
E = 2*c*M*x - c**2*M**2
if self.selectmode == 'indi':
result = a@E
elif self.selectmode == 'cs':
E = 1
return result
def compute_index(self,failureModeTopIdx,r=1):
"""
Compute the index function to decide the best sensing allocation
Input:
- x_sample: sampled version of x, must be in format of p * 1 or p * k
- failureModeTopIdx: The most important failure index
- mode: Types of monitoring statistics
"""
M = self.M
c = self.c
p = self.p
k = self.k
mode = self.mode
nsensors = self.nsensors
x_sample = np.zeros((p,k))
if self.sample_mode == 'sample':
for ik in range(k):
x_sample[:,ik] = np.random.randn(1,p) + M[:,ik]*c
elif self.sample_mode == 'mean':
for ik in range(k):
x_sample[:,ik] = M[:,ik]*c
E_sample = 2*c*M*x_sample - c**2*M**2
if mode == 'T2' or mode == 'T1':
individualS = np.sum(E_sample[:,failureModeTopIdx],1)
sensingIdx = np.argsort(-individualS)[:nsensors]
elif mode == 'random':
sensingIdx = np.random.choice(np.arange(p),nsensors)
elif mode == 'T1_Max':
E_sample = 2*c*M*x_sample - c**2*M**2
S = np.log1p(np.exp(r))[np.newaxis,:]*E_sample
S_sort= -np.sort(-S, axis = 0)
kmax = np.argmax(np.sum(S_sort[:nsensors,:] ,0))
sensingIdx = np.argsort(-S[:,kmax])[:nsensors]
elif mode == 'T1_App':
E_sample = 2*c*M*x_sample - c**2*M**2
S = np.log1p(np.exp(r))[np.newaxis,failureModeTopIdx]*E_sample[:,failureModeTopIdx]
p = S.shape[0]
f= lambda a: logsumexp(S.T@a)
sensingIdx = greedy(f,p,nsensors)
sensingIdx = sensingIdx.astype(int)
return sensingIdx