10
10
import java .rmi .RemoteException ;
11
11
import java .rmi .server .UnicastRemoteObject ;
12
12
import java .util .*;
13
+ import java .util .concurrent .atomic .AtomicInteger ;
13
14
import java .util .stream .Collector ;
14
15
import java .util .stream .Collectors ;
15
16
@@ -27,7 +28,7 @@ public class Node extends UnicastRemoteObject implements IComponent {
27
28
private Link bestEdge ; //the adjacent edge leading towards the best candidate for the MOE it knows about
28
29
private Link testEdge ;
29
30
private Link inBranch ; //the adjacent edge leading to the core of the fragment
30
- public int findCount ; // maybe atomic implementation
31
+ public AtomicInteger findCount ; // maybe atomic implementation
31
32
32
33
public Node (int id , Queue <Link > links ) throws RemoteException {
33
34
super ();
@@ -43,12 +44,13 @@ public void wakeUp() throws RemoteException {
43
44
System .out .println (id + ":Wakeup" );
44
45
45
46
Link edge = links .peek ();
47
+ System .out .println (id + ": changing link to " + edge .getReceiver (id ) + " to IN_MST" );
46
48
edge .setState (LinkState .IN_MST );
47
49
level = 0 ;
48
50
state = NodeState .FOUND ;
49
- findCount = 0 ;
51
+ findCount = new AtomicInteger ( 0 ) ;
50
52
Message msg = new Message (MessageType .CONNECT , 0 );
51
- System .out .println (id + ":Sending Connect to " + edge .getReceiver (id ));
53
+ System .out .println (id + ":Sending Connect to " + edge .getReceiver (id ));
52
54
new java .util .Timer ().schedule (
53
55
new java .util .TimerTask () {
54
56
@ Override
@@ -60,34 +62,34 @@ public void run() {
60
62
}
61
63
}
62
64
},
63
- 0
65
+ 10
64
66
);
65
67
}
66
68
67
69
@ Override
68
70
public void receive (Message message , Link link ) throws RemoteException {
69
- updateLinks (link );
71
+ Link myLink = getMyLink (link );
70
72
switch (message .getType ()) {
71
73
case TEST :
72
- receiveTest (message , link );
74
+ receiveTest (message , myLink );
73
75
break ;
74
76
case ACCEPT :
75
- receiveAccept (message , link );
77
+ receiveAccept (message , myLink );
76
78
break ;
77
79
case CONNECT :
78
- receiveConnect (message , link );
80
+ receiveConnect (message , myLink );
79
81
break ;
80
82
case INITIATE :
81
- receiveInitiate (message , link );
83
+ receiveInitiate (message , myLink );
82
84
break ;
83
85
case REPORT :
84
- receiveReport (message , link );
86
+ receiveReport (message , myLink );
85
87
break ;
86
88
case REJECT :
87
- receiveReject (message , link );
89
+ receiveReject (message , myLink );
88
90
break ;
89
91
case CHANGE_ROOT :
90
- receiveChangeRoot (message , link );
92
+ receiveChangeRoot (message , myLink );
91
93
break ;
92
94
}
93
95
}
@@ -109,13 +111,25 @@ public void send(Message message, Link link) throws RemoteException {
109
111
110
112
}
111
113
114
+ private Link getMyLink (Link link ) {
115
+ List <Link > linkLists = new ArrayList <>(links );
116
+ Link linkToUpdate = new Link ();
117
+ for (Link linkEntry : linkLists ) {
118
+ if (linkEntry .compareTo (link ) == 0 ) {
119
+ linkToUpdate = linkEntry ;
120
+ }
121
+ }
122
+ return linkToUpdate ;
123
+ }
124
+
112
125
@ Override
113
126
public void receiveConnect (Message message , Link link ) throws RemoteException {
114
127
System .out .println (id + ":Receive Connect from " + link .getReceiver (id ));
115
128
if (state == NodeState .SLEEPING )
116
129
wakeUp ();
117
130
// the case that l < l' and fragment F is absorbed by F'
118
131
if (message .getLevel () < this .level ) {
132
+ System .out .println (id + ": changing link to " + link .getReceiver (id ) + " to IN_MST" );
119
133
link .setState (LinkState .IN_MST ); // TODO: to be checked
120
134
Message msg = new Message (MessageType .INITIATE , level , fragmentName , state );
121
135
new java .util .Timer ().schedule (
@@ -132,8 +146,9 @@ public void run() {
132
146
},
133
147
10
134
148
);
135
- if (state == NodeState .FIND )
136
- findCount ++;
149
+ if (state == NodeState .FIND ) {
150
+ findCount .getAndIncrement ();
151
+ }
137
152
} else {
138
153
if (link .getState () == LinkState .CANDIDATE_IN_MST ) {
139
154
System .out .println (id + "Connect Appended to queue" );
@@ -165,7 +180,7 @@ public void run() {
165
180
}
166
181
}
167
182
},
168
- 0 );
183
+ 10 );
169
184
}
170
185
}
171
186
}
@@ -180,7 +195,7 @@ private void receiveInitiate(Message message, Link link) throws RemoteException
180
195
weightBestAdjacent = Double .POSITIVE_INFINITY ;
181
196
182
197
for (Link adjescentLink : this .links ) {//TODO: Not sure if this is the correct one list of links
183
- if (adjescentLink != link && adjescentLink .getState () == LinkState .IN_MST ) {
198
+ if (adjescentLink . compareTo ( link ) != 0 && adjescentLink .getState () == LinkState .IN_MST ) {
184
199
Message msg = new Message (MessageType .INITIATE , level , fragmentName , state );
185
200
System .out .println (id + ": Sending Initiate to " + adjescentLink .getReceiver (id ));
186
201
Link copy = new Link (adjescentLink );
@@ -195,10 +210,11 @@ public void run() {
195
210
}
196
211
}
197
212
},
198
- 0
213
+ 10
199
214
);
200
215
if (state == NodeState .FIND ) {
201
- findCount = findCount + 1 ; // the messages sent
216
+ findCount .getAndIncrement ();
217
+ // the messages sent
202
218
}
203
219
}
204
220
}
@@ -213,22 +229,26 @@ private void test() throws RemoteException {
213
229
testEdge = links .stream ().filter (p -> p .getState () == LinkState .CANDIDATE_IN_MST ).min (new LinkComparator ()).get ();
214
230
Message msg = new Message (MessageType .TEST , level , fragmentName );
215
231
Link copy = new Link (testEdge );
232
+ System .out .println (id + ":Sending Receive test to " + testEdge .getReceiver (id ));
216
233
new java .util .Timer ().schedule (
217
234
new java .util .TimerTask () {
218
235
@ Override
219
236
public void run () {
220
237
try {
221
- System .out .println (id + ":Sending Receive test to " + copy .getReceiver (id ));
222
- nodes [copy .getReceiver (id )].receive (msg , copy );
238
+ nodes [copy .getReceiver (id )].receive (msg , testEdge );
223
239
} catch (RemoteException e ) {
224
240
e .printStackTrace ();
225
241
}
226
242
}
227
243
},
228
- 0
244
+ 10
229
245
);
230
246
} else {
231
- System .out .println (id + "Changing test edge " );
247
+ if (testEdge != null ) {
248
+ System .out .println (id + "Changing test edge " + testEdge .getReceiver (id ) + "to null" );
249
+ } else {
250
+ System .out .println (id + "Changing test edge to null" );
251
+ }
232
252
testEdge = null ;
233
253
report ();
234
254
}
@@ -276,11 +296,12 @@ public void run() {
276
296
);
277
297
} else {
278
298
if (link .getState () == LinkState .CANDIDATE_IN_MST ) {
299
+ System .out .println (id + ": changing link to " + link .getReceiver (id ) + " to NOT_IN_MST" );
279
300
link .setState (LinkState .NOT_IN_MST ); //TODO: think about local copies
280
301
}
281
302
// if the node hasn't set this edge as testEdge then it sends a reject
282
303
// because they are in the same fragment with the sender
283
- if (link .compareTo (testEdge ) = = 0 ) {
304
+ if (link .compareTo (testEdge ) ! = 0 ) {
284
305
System .out .println (id + ":Sending Reject to " + link .getReceiver (id ));
285
306
Message msg = new Message (MessageType .REJECT );
286
307
new java .util .Timer ().schedule (
@@ -294,7 +315,7 @@ public void run() {
294
315
}
295
316
}
296
317
},
297
- 0
318
+ 10
298
319
);
299
320
} else {
300
321
test ();
@@ -307,6 +328,7 @@ public void run() {
307
328
private void receiveReject (Message message , Link link ) throws RemoteException {
308
329
System .out .println (id + ":Receive Reject from " + link .getReceiver (id ));
309
330
if (link .getState () == LinkState .CANDIDATE_IN_MST ) {
331
+ System .out .println (id + ": changing link to " + link .getReceiver (id ) + " to NOT_IN_MST" );
310
332
link .setState (LinkState .NOT_IN_MST );
311
333
}
312
334
test (); // to find another possible MOE
@@ -323,10 +345,11 @@ private void receiveAccept(Message message, Link link) throws RemoteException {
323
345
}
324
346
325
347
private void report () throws RemoteException {
326
- System .out .println (id + ":Report find_count:" + findCount + " testEdge:" + testEdge );
327
- if (findCount == 0 && testEdge == null ) {
348
+ System .out .println (id + ":Report find_count:" + findCount . get () + " testEdge:" + testEdge );
349
+ if (findCount . get () == 0 && testEdge == null ) {
328
350
this .state = NodeState .FOUND ;
329
351
Message msg = new Message (MessageType .REPORT , weightBestAdjacent );
352
+ System .out .println (id + ":Sending Report to " + inBranch .getReceiver (id ));
330
353
new java .util .Timer ().schedule (
331
354
new java .util .TimerTask () {
332
355
@ Override
@@ -338,16 +361,16 @@ public void run() {
338
361
}
339
362
}
340
363
},
341
- 0
364
+ 10
342
365
);
343
366
}
344
367
}
345
368
346
369
@ Override
347
370
public void receiveReport (Message message , Link link ) throws RemoteException {
348
- System .out .println (id + ":Receive Report" );
349
- if (! link .equals (inBranch )) {
350
- findCount -= 1 ;
371
+ System .out .println (id + ":Receive Report from " + link . getReceiver ( id ) );
372
+ if (link .compareTo (inBranch ) != 0 ) {
373
+ findCount . getAndDecrement () ;
351
374
if (message .getWeight () < weightBestAdjacent ) {
352
375
weightBestAdjacent = message .getWeight ();
353
376
bestEdge = link ;
@@ -374,7 +397,7 @@ public void run() {
374
397
else {
375
398
if (message .getWeight () == weightBestAdjacent &&
376
399
weightBestAdjacent == Double .POSITIVE_INFINITY ) {
377
- System .out .println (" HAAAAAAAAAAAAAAAAAAAAAAAAAALT" );
400
+ System .out .println (id + ": HAAAAAAAAAAAAAAAAAAAAAAAAAALT" );
378
401
// TODO: HALT
379
402
}
380
403
}
@@ -398,13 +421,14 @@ private void changeRoot() throws RemoteException {
398
421
public void run () {
399
422
try {
400
423
nodes [bestEdge .getReceiver (id )].receiveChangeRoot (msg , bestEdge );
424
+ System .out .println (id + ": changing bestEdge to " + bestEdge .getReceiver (id ) + " to IN_MST" );
401
425
bestEdge .setState (LinkState .IN_MST );
402
426
} catch (RemoteException e ) {
403
427
e .printStackTrace ();
404
428
}
405
429
}
406
430
},
407
- 0
431
+ 10
408
432
);
409
433
410
434
}
0 commit comments