01-01-2013, 12:09 PM
Simulate the Distributed Mutual Exclusion
DISTRIBUTED MUTUAL EXCLUSION
Systems involving multiple processes are often most easily programmed using critical regions. When a process has to read or update certain shared data structures, it first enters a critical region to achieve mutual exclusion and ensures that no other process will use the shared data structures at the same time.
The mutual exclusion problem states that only a single process can be allowed access to a protected resource, also termed as a Critical Section (CS), at any time. Mutual exclusion is a form of synchronization and is one of the most fundamental paradigms in computing systems.
Distributed mutual exclusion algorithms are divided in three basic categories:
1) Token based Algorithms
In the token-based approach, a unique token (also known as the PRIVILEGE message) is shared among the sites. A site is allowed to enter its CS if it possesses the token and it continues to hold the token until the execution of the CS is over. Mutual exclusion is ensured because the token is unique. The algorithms based on this approach essentially differ in the way a site carries out the search for the token.
2) Non token based Algorithms
In the non-token-based approach, two or more successive rounds of messages are exchanged among the sites to determine which site will enter the CS next. A site enters the critical section (CS) when an assertion, defined on its local variables, becomes true. Mutual exclusion is enforced because the assertion becomes true only at one site at any given time.
3) Quorum based Algorithms
In the quorum-based approach, each site requests permission to execute the CS from a subset of sites (called a quorum). The quorums are formed in such a way that when two sites concurrently request access to the CS, at least one site receives both the requests and this site is responsible to make sure that only one request executes the CS at any time.
JAVA CODE
import Utilities.*;
import Synchronization.*;
class Message { public int number, id;
public Message(int number, int id)
{ this.number = number; this.id = id;
}
}
class Node extends MyObject implements Runnable
{ private static final int MAIN = 0, REQUESTS = 1, REPLIES = 2;
private int whichOne = 0;
private int id = -1;
private int numNodes = -1;
// Both are in milliseconds
private int napOutsideCS = 0;
private int napInsideCS = 0;
private MessagePassing[] requestChannel = null;
private MessagePassing[] replyChannel = null;
private MessagePassing requestsToMe = null;
private MessagePassing repliesToMe = null;
private int number = 0;
private int highNumber = 0;
private boolean requesting = false;
private int replyCount = 0;
private BinarySemaphore s = new BinarySemaphore(1);
private BinarySemaphore wakeUp = new BinarySemaphore(0);
private boolean[] deferred = null;
public Node (String name, int id, int numNodes, int napOutsideCS, int napInsideCS,
MessagePassing[] requestChannel, MessagePassing replyChannel[],
MessagePassing requestsToMe, MessagePassing repliesToMe)
{ super(name + " " + id);
this.id = id;
this.numNodes = numNodes;
this.napOutsideCS = napOutsideCS;
this.napInsideCS = napInsideCS;
this.requestChannel = requestChannel;
this.replyChannel = replyChannel;
this.requestsToMe = requestsToMe;
this.repliesToMe = repliesToMe;
deferred = new boolean[numNodes];
for (int i = 0; i < numNodes; i++) deferred[i] = false;
System.out.println(getName() + " is alive, napOutsideCS=" + napOutsideCS +
", napInsideCS=" + napInsideCS);
new Thread(this).start();
}
public void run()
{ // Start three different threads in the same object
int meDo = whichOne++;
if (meDo == MAIN) { new Thread(this).start();
main();
}
else if (meDo == REQUESTS) { new Thread(this).start();
handleRequests();
}
else if (meDo == REPLIES) handleReplies();
}
private void chooseNumber() { P(s);
requesting = true;
number = highNumber + 1;
V(s);
}
private void sendRequest() { replyCount = 0;
for (int j = 0; j < numNodes; j++)
{ if (j != id)
send(requestChannel[j],
new Message(number, id));
}
}
private void waitForReply() { P(wakeUp);
}
private void replyToDeferredNodes() { P(s);
requesting = false;
V(s);
for (int j = 0; j < numNodes; j++)
{ if (deferred[j])
{
deferred[j] = false;
send(replyChannel[j], id);
}
}
}
private void outsideCS() { int napping;
napping = ((int) random(napOutsideCS)) + 1;
System.out.println("age()=" + age() + ", " +
getName() + " napping outside CS for " + napping + " ms");
nap(napping);
}
private void insideCS() { int napping;
napping = ((int) random(napInsideCS)) + 1;
System.out.println("age()=" + age() + ", " +
getName() + " napping inside
CS for " + napping + " ms");
nap(napping);
}
private void main() { while (true)
{ outsideCS();
System.out.println("age()=" + age() + ", node " + id
+ " wants to enter its critical
section");
chooseNumber(); // PRE-PROTOCOL
sendRequest(); // PRE-PROTOCOL
waitForReply(); // PRE-PROTOCOL
insideCS();
System.out.println("age()=" + age() + ", node " + id
+ " has now left its critical
section");
replyToDeferredNodes(); // POST-PROTOCOL
}
}
private void handleRequests()
{ while (true)
{ Message m = (Message) receive(requestsToMe);
int receivedNumber = m.number;
int receivedID = m.id;
highNumber=Math.max(highNumber,receivedNumber);
P(s);
boolean decideToDefer = requesting && (number <
receivedNumber | (number == receivedNumber && id <
receivedID));
if (decideToDefer) deferred[receivedID] = true;
else send(replyChannel[receivedID], id);
V(s);
}
}
private void handleReplies()
{ while (true)
{ int receivedID = receiveInt(repliesToMe);
replyCount++;
if (replyCount == numNodes - 1) V(wakeUp);
}
}
}
class DistributedMutualExclusion extends MyObject
{ public static void main(String[] args)
{ // Parse command line options, if any, to override defaults
GetOpt go = new GetOpt(args, "Un:R:");
String usage = "Usage: -n numNodes -R runTime" + " napOutsideCS[i]
napInsideCS[i] i=0,1,...";
go.optErr = true;
int ch = -1;
int numNodes = 5;
int runTime = 60; // seconds
while ((ch = go.getopt()) != go.optEOF)
{ if((char)ch == 'U') { System.out.println(usage);
System.exit(0);
}
else if ((char)ch == 'n') numNodes=go.processArg (go.optArgGet(),
numNodes);
else if ((char)ch == 'R') runTime = go.processArg (go.optArgGet(),
runTime);
else { System.err.println(usage);
System.exit(1);
}
}
System.out.println("DistributedMutualExclusion: numNodes=" + numNodes +
", runTime=" + runTime);
// process non-option command line arguments
int[] napOutsideCS = new int[numNodes];
int[] napInsideCS = new int[numNodes];
int argNum = go.optIndexGet();
for (int i = 0; i < numNodes; i++)
{ napOutsideCS[i]=go.tryArg(argNum++, 8);
napInsideCS[i] = go.tryArg(argNum++, 2);
}
// create communication channels
MessagePassing[] requestChannel = null, replyChannel = null,
requestChannelS = null, requestChannelR = null,
replyChannelS = null, replyChannelR = null;
requestChannel = new MessagePassing[numNodes];
replyChannel = new MessagePassing[numNodes];
requestChannelS = new MessagePassing[numNodes];
replyChannelS = new MessagePassing[numNodes];
requestChannelR = new MessagePassing[numNodes];
replyChannelR = new MessagePassing[numNodes];
for (int i = 0; i < numNodes; i++)
{ requestChannel[i]=new AsyncMessagePassing();
replyChannel[i] = new AsyncMessagePassing();
requestChannelS[i] = new MessagePassingSendOnly
(requestChannel[i]);
replyChannelS[i] = new MessagePassingSendOnly(replyChannel[i]);
requestChannelR[i] = new MessagePassingReceiveOnly
(requestChannel[i]);
replyChannelR[i] = new MessagePassingReceiveOnly(replyChannel[i]);
}
for (int i = 0; i < numNodes; i++)
{ new Node("Node", i, numNodes, napOutsideCS[i]*1000,
napInsideCS[i]*1000, requestChannelS, replyChannelS,
requestChannelR[i], replyChannelR[i]);
}
System.out.println("All Nodes created");
nap(runTime*1000);
System.out.println("age()=" + age() + ", time to stop the threads & exit");
System.exit(0);
}
}
OUTPUT
D:\Java\Advanced>javac dimu.java
D:\Java\Advanced >java DistributedMutualExclusion -R20
DistributedMutualExclusion: numNodes=5, runTime=20
Node 0 is alive, napOutsideCS=8000, napInsideCS=2000
Node 1 is alive, napOutsideCS=8000, napInsideCS=2000
Node 2 is alive, napOutsideCS=8000, napInsideCS=2000
Node 3 is alive, napOutsideCS=8000, napInsideCS=2000
Node 4 is alive, napOutsideCS=8000, napInsideCS=2000
age()=170, Node 1 napping outside CS for 2719 ms
age()=170, Node 2 napping outside CS for 279 ms
All Nodes created
age()=170, Node 3 napping outside CS for 2355 ms
age()=220, Node 0 napping outside CS for 2393 ms
age()=220, Node 4 napping outside CS for 8 ms
age()=220, node 4 wants to enter its critical section
age()=330, Node 4 napping inside CS for 911 ms
age()=440, node 2 wants to enter its critical section
age()=1260, node 4 has now left its critical section
age()=1260, Node 4 napping outside CS for 4042 ms
age()=1260, Node 2 napping inside CS for 183 ms
age()=1480, node 2 has now left its critical section
age()=1480, Node 2 napping outside CS for 7335 ms
age()=2530, node 3 wants to enter its critical section
age()=2530, Node 3 napping inside CS for 741 ms
age()=2580, node 0 wants to enter its critical section
age()=2860, node 1 wants to enter its critical section
age()=3300, node 3 has now left its critical section
age()=3300, Node 3 napping outside CS for 6849 ms
age()=3300, Node 0 napping inside CS for 1710 ms
age()=5000, node 0 has now left its critical section
age()=5000, Node 0 napping outside CS for 5253 ms
age()=5000, Node 1 napping inside CS for 1694 ms
age()=5330, node 4 wants to enter its critical section
age()=6700, node 1 has now left its critical section
age()=6700, Node 1 napping outside CS for 3063 ms
age()=6700, Node 4 napping inside CS for 397 ms
age()=7140, node 4 has now left its critical section
age()=7140, Node 4 napping outside CS for 3687 ms
age()=8790, node 2 wants to enter its critical section
age()=8790, Node 2 napping inside CS for 102 ms
age()=8900, node 2 has now left its critical section
age()=8900, Node 2 napping outside CS for 1174 ms
age()=9780, node 1 wants to enter its critical section
age()=9780, Node 1 napping inside CS for 1617 ms
age()=10110, node 2 wants to enter its critical section
age()=10160, node 3 wants to enter its critical section
age()=10270, node 0 wants to enter its critical section
age()=10820, node 4 wants to enter its critical section
age()=11430, node 1 has now left its critical section
age()=11430, Node 1 napping outside CS for 5326 ms
age()=11430, Node 2 napping inside CS for 628 ms
age()=12090, node 2 has now left its critical section
age()=12090, Node 2 napping outside CS for 4970 ms
age()=12090, Node 3 napping inside CS for 545 ms
age()=12630, node 3 has now left its critical section
age()=12630, Node 3 napping outside CS for 7989 ms
age()=12630, Node 0 napping inside CS for 904 ms
age()=13510, node 0 has now left its critical section
age()=13510, Node 0 napping outside CS for 4162 ms
age()=13510, Node 4 napping inside CS for 1440 ms
age()=15000, node 4 has now left its critical section
age()=15000, Node 4 napping outside CS for 2578 ms
age()=16750, node 1 wants to enter its critical section
age()=16750, Node 1 napping inside CS for 123 ms
age()=16860, node 1 has now left its critical section
age()=16860, Node 1 napping outside CS for 3709 ms
age()=17030, node 2 wants to enter its critical section
age()=17030, Node 2 napping inside CS for 97 ms
age()=17140, node 2 has now left its critical section
age()=17140, Node 2 napping outside CS for 7901 ms
age()=17580, node 4 wants to enter its critical section
age()=17580, Node 4 napping inside CS for 1695 ms
age()=17690, node 0 wants to enter its critical section
age()=19280, node 4 has now left its critical section
age()=19280, Node 4 napping outside CS for 3751 ms
age()=19280, Node 0 napping inside CS for 869 ms
age()=20160, node 0 has now left its critical section
age()=20160, Node 0 napping outside CS for 6489 ms
age()=20160, time to stop the threads and exit