-
Notifications
You must be signed in to change notification settings - Fork 1
/
gyps.js
101 lines (89 loc) · 2.68 KB
/
gyps.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
const Gyps = _ => {
const gyps = Object.create(null);
let last;
/* Emit an event, calling all observers.
* @param data Data object to be used on calling observers
*/
gyps.emit = data => {
last = data;
return gyps;
};
/* Add a new observer. If an event was emitted before, the observer is called
* with the last value.
* @param observer Observer function to be added
*/
gyps.observe = observer => {
last ? observer(last) : null;
const emit = gyps.emit;
gyps.emit = data => {
emit(data);
observer(data);
return gyps;
};
return gyps;
};
/* Map all values to a constant.
* @param value Constant that will be emitted
*/
gyps.constant = value => gyps.map(_ => value);
/* Only emit values that pass the predicate.
* @param predicate Test function, returns true if pass
*/
gyps.filter = predicate => {
const filter = Gyps();
gyps.observe(data => predicate(data) && filter.emit(data));
return filter;
};
/* Transform a observable of observables into a observable of values emitted
* by values of the original observable.
*/
gyps.flatten = _ => {
const flatten = Gyps();
gyps.observe(stream => stream.observe(data => flatten.emit(data)));
return flatten;
};
/* Transform each value by a function.
* @param mapper Function that will be applied to each value
*/
gyps.map = mapper => {
const map = Gyps();
gyps.observe(data => map.emit(mapper(data)));
return map;
};
/* Combine multiple observables into one.
* @param streams Observables to be joined
*/
gyps.merge = (...streams) => {
const merge = Gyps();
[gyps, ...streams]
.forEach(stream => stream.observe(data => merge.emit(data)));
return merge;
};
/* Accumulate values using a function.
* @param reducer Function that receives the accumulated value and a new
* value, and return the new accumulated
* @param initial Initial value
*/
gyps.scan = (reducer, initial) => {
const scan = Gyps();
let value = initial;
gyps.observe(data => scan.emit(value = reducer(value, data)));
return scan;
};
/* Takes an observable of values, each time the original observable emits,
* this observable emits the last value emitted by the observable of values.
* @param value$ Observable of values to be emitted
*/
gyps.trigger = value$ => {
const trigger = Gyps();
let value;
value$.observe(x => value = x);
gyps.observe(_ => trigger.emit(value));
return trigger;
};
/* Wrap each value into an object with the key provided.
* @param key Key used to wrap values
*/
gyps.wrap = key => gyps.map(value => ({ [key]: value }));
return gyps;
};