-
Notifications
You must be signed in to change notification settings - Fork 0
/
Peer.java
393 lines (342 loc) · 16.7 KB
/
Peer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
import java.io.*;
import java.net.*;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.security.PublicKey;
import java.security.SignatureException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class Peer {
public static Peer[] peers; // In a real system, there must be a listing of peer ids to broadcast, here we
// keep all peers in this array
public static Random rand;
private final Integer envStatus; // for peers linux env type because we can check if env and package env is
// adaptive or not
public int id;
private static int countPeers = 0; // to assign an automatic id
private static double probabilityValidate = 0.7;
// Every peer has its own copy of the blockchain
// Possibly can occasionally become a little different from others
// but must eventually become the same as all others
// which is the whole idea of the blockchain
public Blockchain blockchain;
// Every peer has a wallet.
// In fact, a peer can have multiple wallets
// But for this application, to keep it simple, we implement only one
protected Wallet wallet; // This is the wallet of this peer that holds the keys and UTXOs to be used
// later
private BlockingQueue<String> queue; // To keep incoming messages
private ServerSocket serverSocket;
private Socket clientSocket;
public ArrayList<MetaPackage> createdMetaPackages; // for keeping peer's created packages
public ArrayList<MetaPackage> validatedMetaPackages; // for keeping peer's validated meta packages
// Creates a peer, assigns the next available id, initializes random generator
// Initializes transactions list and starts the server socket
public Peer() throws InvalidKeyException, NoSuchAlgorithmException, SignatureException {
id = countPeers++;
queue = new LinkedBlockingQueue<>();
rand = new Random(System.currentTimeMillis());
wallet = new Wallet(this); // Normally a wallet should be anonymous, or may not be, but we add this for
// debugging
blockchain = new Blockchain(this);
this.envStatus = new Random().nextInt(4); // for peers env status
this.createdMetaPackages = new ArrayList<MetaPackage>();
this.validatedMetaPackages = new ArrayList<MetaPackage>();
try {
serverSocket = new ServerSocket(6000 + id);
} catch (IOException e) {
e.printStackTrace();
}
}
public void write(String s) {
for (int i = 0; i < id; i++) {
System.out.print(" ");
}
System.out.println("PEER " + id + ": ");
System.out.println(s);
}
public Integer getEnvStatus() {
return envStatus;
}
public ArrayList<MetaPackage> getCreatedMetaPackages() {
return createdMetaPackages;
}
public void addPackageCreated(MetaPackage metaPackage) {
createdMetaPackages.add(metaPackage);
}
public ArrayList<MetaPackage> getValidatedMetaPackages() {
return validatedMetaPackages;
}
public void addPackageValidated(MetaPackage metaPackage) {
validatedMetaPackages.add(metaPackage);
}
// randomly choosing role for each peer any timestep...
public String flipRandomValidatorCreator() {
int rand = new Random().nextInt(3); // Generate 0, 1, or 2
return switch (rand) {
case 0 -> "creator";
case 1 -> "validator";
default -> "waiting";
};
}
// from fetch package score and difficulties then
// calculate score, reward or penalty
// with respect to packages validator size
// we should keep package validator size so
// if validator is more get more reward or penalty
// at the end peer should think about is it worth the validate or not
public Integer calculateScore(MetaPackage metaPackage) {
Integer validatorSize = metaPackage.getValidators().size();
if (validatorSize > 0)
return metaPackage.getScore() / validatorSize;
else
return metaPackage.getScore();
}
// if the peer creator
// 1-create package
// 2-send package to repo
// 3-add package to users created package list
public void createdPackageProcess(Peer peer) {
System.out.println("Starting Creating Package Process!");
MetaPackage newPackage = MetaPackageUtil.createRandomMetaPackage(peer.wallet);
peer.addPackageCreated(newPackage); // peers created package list
IPFSPackageCenter.addPackage(newPackage); // IPFS central package repo add
System.out.println("Peer " + peer.id + " create " + newPackage.getName() + " and send IPFS repo.");
}
// we need new transaction constructor for got 2 role because each of them got
// reputation score
public void validateSuccessProcess(MetaPackage metaPackage, Peer peer, int reward) throws NoSuchAlgorithmException {
Transaction validatorTx = new Transaction(peer.wallet.publicKey,
Peer.peers[peer.id].wallet.publicKey,
reward,
peer);
metaPackage.addValidator(peer);
peer.addPackageValidated(metaPackage);
Integer creatorPeer = metaPackage.getCreatorId();
Transaction creatorTx = new Transaction(peer.wallet.publicKey,
Peer.peers[creatorPeer].wallet.publicKey,
metaPackage.getScore(),
peer);
System.out.println("Peer " + peer.id + " t:" + validatorTx.timeStamp + ": Transaction broadcasted -> ID:"
+ validatorTx.transactionId + " reputation score:" + validatorTx.value);
System.out.println("Peer " + peer.id + " t:" + creatorTx.timeStamp + ": Transaction broadcasted -> ID:"
+ creatorTx.transactionId + " reputation score:" + validatorTx.value);
peer.gossipPackageProtocolToAllPeers(creatorTx.toString(), metaPackage);
peer.gossipPackageProtocolToAllPeers(validatorTx.toString(), metaPackage);
}
public void validateFailProcess(MetaPackage metaPackage, Peer peer) throws NoSuchAlgorithmException {
int penalty = -calculateScore(metaPackage); // negative result from reputation score
System.out.println("No validate penalty is : " + penalty);
Transaction validatorTx = new Transaction(peer.wallet.publicKey,
Peer.peers[peer.id].wallet.publicKey,
penalty,
peer);
System.out.println("Peer " + peer.id + " t:" + validatorTx.timeStamp + ": Transaction broadcasted -> ID:"
+ validatorTx.transactionId + " reputation score:" + validatorTx.value);
peer.gossipPackageProtocolToAllPeers(validatorTx.toString(), metaPackage);
System.out.println("Fail validate process!");
}
public void validateWithProbability(MetaPackage metaPackage, int score, Peer peer) throws NoSuchAlgorithmException {
// probability for validate but actually we find alternative solution
// and anyway we should change probabilty from the package difficukty score
// or validator reputation score
double prob = new Random().nextDouble();
System.out.println("This current probability: " + prob);
if (prob >= probabilityValidate)
validateSuccessProcess(metaPackage, peer, score);
else
validateFailProcess(metaPackage, peer);
}
public void validateProcessWithReward(MetaPackage metaPackage, int score, Peer peer)
throws NoSuchAlgorithmException {
if (score > 0) {
validateWithProbability(metaPackage, score, peer);
} else {
System.out.println("Reward is not enough for validating!");
}
}
///////////////////////// THE LIVING SIMULATION OF PEER /////////////////////////
// This is a user simulation (it randomly initializes some transactions from
// time to time)
public void startPeerTransactions() {
new Thread(() -> {
while (true) {
int waitForNextTransactionTime = rand.nextInt(10000) + 3000;
try {
// Wait for some random duration
Thread.sleep(waitForNextTransactionTime);
// Make a random transaction
// Randomly choose "someone else" to send some money.
// int receiver = rand.nextInt(peers.length);
// while (receiver == this.id)
// receiver = rand.nextInt(peers.length); // I said "someone else", not myself
// we should use currentlu user because each user should have a role
String selectedRole = flipRandomValidatorCreator();
System.out.println("Peer " + this.id + " role is: " + selectedRole);
if (selectedRole.equals("creator")) {
createdPackageProcess(this);
} else if (selectedRole.equals("validator")) {
// 1-get packages from IPS repo or gossip protocol broadcast
// 2-validate packages by checking env status code
// 3-validate or don't check env dependency
// 4-give score write transactions
System.out.println("Starting Validating Package Process!");
if (IPFSPackageCenter.getSize() > 0) {
MetaPackage gotPackage = IPFSPackageCenter.sendRandomPackage();
assert gotPackage != null;
// if the environment status is equals the we can validate package
if (Objects.equals(gotPackage.getDependencyEnvStatus(), this.getEnvStatus())) {
System.out.println("Validate process with env assign!!");
Integer reward = calculateScore(gotPackage);
System.out.println("Reward is : " + reward);
validateProcessWithReward(gotPackage, reward, this);
} else { // env and dependency not match
validateFailProcess(gotPackage, this);
}
System.out.println("Validator process finish!!!");
} else {
System.out.println("IPFS Package Center has no Package yet!!");
}
} else {
System.out.println("Peer" + this.id + " waiting for next timestep!!");
}
if (blockchain.mempool.size() >= 5) { // fixed mempoolsize for blockmined
try {
// create a new block from the transactions in the mempool
List<Transaction> transactionsToMine = new ArrayList<>(this.blockchain.mempool);
Block newBlock = new Block(blockchain.getLastBlock().calculateHash(), transactionsToMine,
blockchain.blocks.size(), this.blockchain);
newBlock.hash = newBlock.calculateHash(); // because in constructor we forgot the
// calculateHash, bug fixs
System.out.println("newblocks hash: " + newBlock.hash);
blockchain.addBlock(newBlock);
// clear the mempool
blockchain.mempool.clear();
System.out.println("New block added to the blockchain!");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
}
} catch (InterruptedException | NoSuchAlgorithmException e) {
e.printStackTrace();
}
}
}).start();
}
///////////////////////// BLOCKCHAIN INTERACTION METHODS /////////////////////////
public Blockchain getBlockchain() {
return blockchain;
}
public PublicKey getPublicKey() {
return wallet.publicKey;
}
// While the message queue has new messages, reads them and processes them
// Here, processing is putting it into the transaction list
// Beware that this is not a trivial task, remember that the order of
// Transactions is very important. This may insert a transaction to a specific
// position in the list
public void startConsumer() {
new Thread(() -> {
while (true) {
try {
String message = queue.take(); // blocks if queue is empty
// Turn the string message back into a transaction
// System.out.println("consumer service messages: " + message);
String[] parts = message.split("#"); // for transaction and package info fetching
Transaction receivedTx = Transaction.fromString(parts[0], this);
// Process the message
System.out.println("For package " + parts[1] + " Peer " + this.id + ": Transaction received -> "
+ Wallet.getStringFromPublicKey(receivedTx.sender).substring(0, 5)
+ " ==> Number of txs in my mempool: " + blockchain.mempool.size());
if (receivedTx.processTransaction()) {
blockchain.mempool.add(receivedTx); // For now, just add it to the end of transactions
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
///////////////////////////// COMMUNICATION METHODS /////////////////////////////
// Starts the listening server
public void startServer() {
new Thread(() -> {
while (true) {
try {
Socket socket = serverSocket.accept();
new Thread(() -> receiveMessage(socket)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
// Connects to another peer.
public void connectToPeer(Peer peer) {
try {
this.clientSocket = new Socket("localhost", 6000 + peer.id);
} catch (IOException e) {
e.printStackTrace();
}
}
// When a message arrives, reads it from the input stream reader and puts it
// into the queue
private void receiveMessage(Socket socket) {
try {
BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String message;
while ((message = input.readLine()) != null) {
queue.put(message);
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
// Sends a message to a specific peer
public void sendMessage(int peerid, String message) {
this.connectToPeer(Peer.peers[peerid]);
new Thread(() -> {
try {
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
out.println(message);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
// Broadcasts a message to all peers
public void broadcastToAllPeers(String message) {
for (int peerid = 0; peerid < peers.length; peerid++) {
// if(peerid!=this.id) { // LET IT ALSO SEND TO ITSELF
this.connectToPeer(Peer.peers[peerid]);
new Thread(() -> {
try {
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
out.println(message);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
// }
}
}
public void gossipPackageProtocolToAllPeers(String message, MetaPackage metaPackage) {
// Gossip protocol for if creating a new packages, reached to all their peers
// or if validating a new packages from any peer, we can broadcast to others
for (int peerid = 0; peerid < peers.length; peerid++) {
this.connectToPeer(Peer.peers[peerid]);
new Thread(() -> {
try {
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
out.println(message + "#" + metaPackage.getName());
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
}