00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156 #ifndef __APPLICATION_CHORD_H__
00157 #define __APPLICATION_CHORD_H__
00158
00159 #include <iostream>
00160
00161 #include "udp.h"
00162 #include "application.h"
00163 #include "node.h"
00164 #include "pdu.h"
00165 #include "common-defs.h"
00166 #include "sha1.h"
00167 #include "object.h"
00168 #include "chord-stats.h"
00169 #include "chord-vis.h"
00170
00171 #include "sys/types.h"
00172
00173 #define CHORD_STD_PORT 30000 // default UDP port to which to bind
00174 #define CHORD_MAX_XMIT_ATTEMPTS 3 // number of times to try to transmit a message
00175
00176
00177
00178
00179 #ifdef CHORD_DEBUG
00180 #define CDBG(cmd) cmd
00181 #define CDBGPR(str) cout << Simulator::Now() << ": " str << endl
00182 #else
00183 #define CDBG(cmd)
00184 #define CDBGPR(str)
00185 #endif
00186
00187 #define STATSLOG(msg) \
00188 if(stats) \
00189 *stats << Simulator::Now() << " " << msg << "\n"
00190
00191 typedef enum { REQUEST, RESPONSE } ChordMsgType_t;
00192 typedef enum { FIND_SUCCESSOR, NOTIFY, GET_PREDECESSOR, CHECK_PREDECESSOR } ChordMsg_t;
00193 typedef unsigned long ChordResolverId_t;
00194
00195 class Chord;
00196 class ChordPDU;
00197 class ChordJoinResolver;
00198 class ChordFingerResolver;
00199
00200 class ChordEvent : public Event {
00201 public:
00202 typedef enum { FIX_FINGERS = 100, STABILIZE, CREATE, JOIN,
00203 CHECK_PREDECESSOR, TIMEOUT, IGNORE_MSG_ON, IGNORE_MSG_OFF,
00204 DIE } ChordEvent_t;
00205 ChordEvent() { }
00206 ChordEvent(Event_t e) : Event(e) { }
00207
00208 ChordMsg_t msg;
00209 };
00210
00211 class ChordResolverEvent : public Event {
00212 public:
00213 typedef enum { TIMEOUT } ChordResolverEvent_t;
00214 ChordResolverEvent() { }
00215 ChordResolverEvent(Event_t e) : Event(e) { }
00216 };
00217
00220
00221 class ChordId {
00222 public:
00223 ChordId();
00224 ChordId(unsigned b);
00225 ChordId(unsigned, unsigned, unsigned, unsigned, unsigned);
00226
00227 void Init();
00228
00229 bool operator==(ChordId cid);
00230 bool operator!=(ChordId cid);
00231 bool operator>=(ChordId cid);
00232 bool operator<=(ChordId cid);
00233 bool operator>(ChordId cid);
00234 bool operator<(ChordId cid);
00235 ChordId operator+(ChordId cid);
00236
00237 friend ostream& operator<<(ostream &os, ChordId cid);
00238
00239
00240
00241
00242
00243
00244
00246
00248 bool InIntervalOC(ChordId a, ChordId b);
00250 bool InIntervalOO(ChordId a, ChordId b);
00251
00252 void SetId(unsigned, unsigned, unsigned, unsigned, unsigned);
00254 static void SetBitLength(unsigned char mb) { m = mb; };
00255 static unsigned char GetBitLength() { return m; }
00257 ChordId MaskBits(unsigned char mb);
00260 ChordId MaskBits() { return MaskBits(m); }
00261
00262 unsigned id[5];
00263
00264
00265 protected:
00266
00267 static unsigned char m;
00268
00269 };
00270
00271
00273
00274 class ChordFinger {
00275 public:
00276 ChordFinger() : ip(0) {}
00277 ChordFinger(ChordId aid, IPAddr_t aip, PortId_t ap) { id = aid; ip = aip; port = ap; }
00278
00279 ChordId GetId() { return id; }
00280 IPAddr_t GetIP() { return ip; }
00281 PortId_t GetPort() { return port; }
00282 bool IsUnset() { return ip == 0; }
00283 bool IsSet() { return ip != 0; }
00284 void Clear() { ip = 0; }
00285
00286 friend ostream& operator<<(ostream &os, ChordFinger f);
00287
00288 ChordId id;
00289 IPAddr_t ip;
00290 PortId_t port;
00291 };
00292
00293
00294 class ChordFingerTable : public Object {
00295 public:
00296 ChordFingerTable();
00297 ChordFingerTable(unsigned int size);
00298 ~ChordFingerTable();
00299
00301
00303 ChordFinger GetFinger(unsigned int index) { return table[index - 1]; }
00305 void SetFinger(unsigned int index, ChordFinger finger) {
00306 CDBGPR(<< "changing finger " << index << " in table " << Name()
00307 << " to " << finger);
00308 STATSLOG("finger " << index << " " << cid << " " << finger.GetId());
00309 table[index - 1] = finger;
00310 }
00311
00312
00313 ChordFinger GetSuccessor() { return table[0]; }
00314 void SetSuccessor(ChordFinger finger) {
00315 CDBGPR(<< "changing successor in table " << Name()
00316 << " to " << finger);
00317 STATSLOG("finger 1 " << cid << " " << finger.GetId());
00318 table[0] = finger;
00319 }
00320
00322 void SetSize(unsigned int);
00323
00325 void Print();
00326
00328 void SetId(ChordId i) { cid = i; }
00329
00330 protected:
00332 void AllocTable();
00333
00335 ChordFinger *table;
00336 unsigned int table_size;
00337
00339 ChordId cid;
00340
00341 ChordStats *stats;
00342 };
00343
00344
00346
00347 class ChordResolver : public Handler {
00348 public:
00349 ChordResolver();
00350
00351 virtual void FindSuccessor(ChordId);
00352 virtual void HandleFindSuccessorResponse(ChordPDU*);
00353 virtual void HandleFoundSuccessor(ChordPDU*);
00354 virtual void HandleFoundSuccessor(ChordFinger);
00355 virtual void HandleFailed();
00356 virtual void Failed(ChordFinger);
00358 virtual void FoundSuccessor(ChordFinger);
00360 virtual void AttachChord(Chord* ach) { ch = ach; }
00362 virtual void SetResolverId(ChordResolverId_t crid) { resolver_id = crid; }
00364 virtual bool IsBusy() { return busy; }
00365 virtual ChordResolverId_t GetResolverId() { return resolver_id; }
00366 virtual void Handle(Event*, Time_t);
00367 virtual void CancelTimeout();
00368
00369 protected:
00370 Chord *ch;
00371 ChordStats *stats;
00372 ChordResolverId_t resolver_id;
00373
00374 unsigned long next_seq_no;
00375 unsigned long hi_seq_no;
00376
00377 bool busy;
00378 ChordId resolving_cid;
00379 ChordFinger resolving_peer;
00380 ChordResolverEvent *timeout_event;
00381 unsigned short timeout_count;
00382 };
00383
00384
00386
00387 class Chord : public Application {
00388 public:
00389 Chord();
00390 Chord(const Chord&);
00391 Chord(unsigned short idbitlength);
00392 ~Chord();
00393
00394 void Init();
00395
00397
00401 virtual void RegisterResolver(ChordResolver*);
00403 virtual void UnregisterResolver(ChordResolver*);
00404
00406 virtual void Create();
00408 virtual void Create(Time_t);
00410 virtual void Join(IPAddr_t, PortId_t);
00412 virtual void SetJoinNode(IPAddr_t, PortId_t);
00414 virtual void Join();
00416 virtual void Join(Time_t);
00418 virtual void Join(Time_t, IPAddr_t, PortId_t);
00420 virtual void Joined(ChordFinger);
00422 virtual ChordFinger ClosestPrecedingNode(ChordId);
00424 virtual void SetPort(PortId_t p);
00426 virtual void PrintFingerTable();
00427 virtual void SendFindSuccessorResponse(IPAddr_t, PortId_t, ChordId, unsigned long, ChordResolverId_t);
00428 virtual void SendFindSuccessorRequest(IPAddr_t, PortId_t, ChordId, unsigned long, ChordResolverId_t);
00429 virtual void SendNotifyResponse(IPAddr_t, PortId_t, unsigned long);
00430 virtual void SendNotifyRequest(IPAddr_t, PortId_t, unsigned long);
00431 virtual void SendGetPredecessorResponse(IPAddr_t, PortId_t, unsigned long);
00432 virtual void SendGetPredecessorRequest(IPAddr_t, PortId_t, unsigned long);
00433 virtual void SendCheckPredecessorResponse(IPAddr_t, PortId_t, unsigned long);
00434 virtual void SendCheckPredecessorRequest(IPAddr_t, PortId_t, unsigned long);
00435
00436
00437 virtual void IgnoreMessagesOn() { ignore_messages = true; STATSLOG("ignore_msg on " << my_id); CDBGPR( << my_id << ": Ignoring messages ON."); }
00438 virtual void IgnoreMessagesOn(Time_t);
00439 virtual void IgnoreMessagesOff() { ignore_messages = false; STATSLOG("ignore_msg off " << my_id); CDBGPR( << my_id << ": Ignoring messages OFF."); }
00440 virtual void IgnoreMessagesOff(Time_t);
00441 virtual void IgnoreMessages(Time_t, Time_t);
00442
00444 virtual void Die();
00446 virtual void Die(Time_t);
00447
00448 virtual ChordId GetMyId();
00449 virtual ChordFinger GetMyFinger() { return ChordFinger(GetMyId(), GetMyIP(), port); }
00450
00451 virtual IPAddr_t GetMyIP() { return localNode->GetIPAddr(); }
00452 virtual UDP* GetUDP() { return udp; }
00453
00454
00455 virtual void Receive(Packet*, L4Protocol*, Seq_t = 0);
00456 virtual void StartApp();
00457 virtual void StopApp();
00458 virtual void AttachNode(Node*);
00459 virtual Application* Copy() const { return new Chord(*this); }
00460 virtual void Handle(Event*, Time_t);
00461
00462 static ChordStats* GetStats();
00463 static void UseVis(bool b) { use_vis = b; }
00464
00465 unsigned short m;
00466
00467 Time_t stabilize_interval;
00468 Time_t fix_fingers_interval;
00469 Time_t predecessor_ping_interval;
00470 ChordFingerTable finger_table;
00471 ChordFinger predecessor;
00472 SHA1 sha1;
00473 ChordId my_id;
00474 ChordFinger my_finger;
00475 Time_t rpc_timeout;
00476
00477 protected:
00479 virtual void Stabilize();
00481 virtual void HandleGetPredecessorResponse(ChordFinger);
00483 virtual void Notify();
00484 virtual void Notify(ChordFinger);
00486 virtual void NotifyTimeout();
00488 virtual void CheckPredecessor();
00490 virtual void CheckPredecessorTimeout();
00492 virtual void PredecessorDied();
00494 virtual void FixFinger();
00496 virtual void FixFinger(unsigned short index);
00497 virtual void GetPredecessor();
00498 virtual void GetPredecessorTimeout();
00500 virtual void InitEvents();
00501
00502 UDP *udp;
00503 Node *localNode;
00504 PortId_t port;
00505 ChordResolverId_t next_resolver_id;
00506 map<ChordResolverId_t, ChordResolver*> resolver_demux;
00507 bool running;
00508 ChordFinger result_finger;
00509 ChordJoinResolver *join_resolver;
00510 ChordFingerResolver *finger_resolver;
00511 bool joined;
00512 unsigned short next_finger_to_fix;
00513 unsigned long next_notify_seq_no;
00514 unsigned long next_get_predecessor_seq_no;
00515 unsigned long next_check_predecessor_seq_no;
00516 unsigned long hi_notify_seq_no;
00517 unsigned long hi_get_predecessor_seq_no;
00518 unsigned long hi_check_predecessor_seq_no;
00519 IPAddr_t join_ip;
00520 PortId_t join_port;
00521 ChordEvent *stabilize_event;
00522 ChordEvent *fix_fingers_event;
00523 ChordEvent *check_predecessor_event;
00524 ChordEvent *check_predecessor_timeout_event;
00525 ChordEvent *get_predecessor_timeout_event;
00526 ChordEvent *notify_timeout_event;
00527 ChordEvent *join_event;
00528 ChordEvent *create_event;
00529 ChordEvent *die_event;
00530
00531
00532 unsigned short check_predecessor_timeout;
00533 unsigned short get_predecessor_timeout;
00534 unsigned short notify_timeout;
00535
00536
00537
00538
00539 ChordFinger succ_notify;
00540
00541
00542 ChordFinger succ_get_pred;
00543
00545 bool ignore_messages;
00546
00547 static ChordStats *stats;
00548 friend class ChordStats;
00549 static bool use_vis;
00550 };
00551
00552 class ChordJoinResolver : public ChordResolver {
00553 public:
00554 ChordJoinResolver() : ChordResolver() { }
00555 virtual void FoundSuccessor(ChordFinger finger) { ch->Joined(finger); }
00556 };
00557
00558
00559 class ChordFingerResolver : public ChordResolver {
00560 public:
00561 ChordFingerResolver() : ChordResolver(), finger_index(1) { }
00562 ChordFingerResolver(unsigned int index) : ChordResolver() { finger_index = index; }
00563 virtual void SetIndex(unsigned int index) { finger_index = index; }
00564 virtual void FoundSuccessor(ChordFinger finger);
00565 void Failed(ChordFinger failed_peer);
00566 protected:
00567 unsigned int finger_index;
00568 };
00569
00570 class ChordPDU : public Data {
00571 public:
00572 ChordPDU();
00573 ChordPDU(Size_t);
00574
00575 Size_t Size() const { return size;}
00576 PDU* Copy() const { return new ChordPDU(*this); }
00577
00579 ChordMsgType_t type;
00581 ChordMsg_t msg;
00582 ChordId id;
00583 ChordFinger finger;
00585 unsigned long seq_no;
00587 ChordResolverId_t resolver;
00589 bool redirect;
00590
00591 Size_t size;
00592
00593 friend ostream& operator<<(ostream &os, ChordPDU &cp);
00594 };
00595
00596 #endif