package routing;
import java.util.*;
import core.*;
/**
* This class overrides ActiveRouter in order to inject calls to a
* DecisionEngine object where needed add extract as much code from the update()
* method as possible.
*
* Forwarding Logic:
*
* A DecisionEngineRouter maintains a List of Tuple in
* support of a call to ActiveRouter.tryMessagesForConnected() in
* DecisionEngineRouter.update(). Since update() is called so frequently, we'd
* like as little computation done in it as possible; hence the List that gets
* updated when events happen. Four events cause the List to be updated: a new
* message from this host, a new received message, a connection goes up, or a
* connection goes down. On a new message (either from this host or received
* from a peer), the collection of open connections is examined to see if the
* message should be forwarded along them. If so, a new Tuple is added to the
* List. When a connection goes up, the collection of messages is examined to
* determine to determine if any should be sent to this new peer, adding a Tuple
* to the list if so. When a connection goes down, any Tuple in the list
* associated with that connection is removed from the List.
*
* Decision Engines
*
* Most (if not all) routing decision making is provided by a
* RoutingDecisionEngine object. The DecisionEngine Interface defines methods
* that enact computation and return decisions as follows:
*
*
* - In createNewMessage(), a call to RoutingDecisionEngine.newMessage() is
* made. A return value of true indicates that the message should be added to
* the message store for routing. A false value indicates the message should
* be discarded.
*
* - changedConnection() indicates either a connection went up or down. The
* appropriate connectionUp() or connectionDown() method is called on the
* RoutingDecisionEngine object. Also, on connection up events, this first
* peer to call changedConnection() will also call
* RoutingDecisionEngine.doExchangeForNewConnection() so that the two
* decision engine objects can simultaneously exchange information and update
* their routing tables (without fear of this method being called a second
* time).
*
* - Starting a Message transfer, a protocol first asks the neighboring peer
* if it's okay to send the Message. If the peer indicates that the Message is
* OLD or DELIVERED, call to RoutingDecisionEngine.shouldDeleteOldMessage() is
* made to determine if the Message should be removed from the message store.
* Note: if tombstones are enabled or deleteDelivered is disabled, the
* Message will be deleted and no call to this method will be made.
*
* - When a message is received (in messageTransferred), a call to
* RoutingDecisionEngine.isFinalDest() to determine if the receiving (this)
* host is an intended recipient of the Message. Next, a call to
* RoutingDecisionEngine.shouldSaveReceivedMessage() is made to determine if
* the new message should be stored and attempts to forward it on should be
* made. If so, the set of Connections is examined for transfer opportunities
* as described above.
*
* - When a message is sent (in transferDone()), a call to
* RoutingDecisionEngine.shouldDeleteSentMessage() is made to ask if the
* departed Message now residing on a peer should be removed from the message
* store.
*
*
*
* Tombstones
*
* The ONE has the the deleteDelivered option that lets a host delete a message
* if it comes in contact with the message's destination. More aggressive
* approach lets a host remember that a given message was already delivered by
* storing the message ID in a list of delivered messages (which is called the
* tombstone list here). Whenever any node tries to send a message to a host
* that has a tombstone for the message, the sending node receives the
* tombstone.
*
* @author PJ Dillon, University of Pittsburgh
*/
public class DecisionEngineRouter extends ActiveRouter
{
public static final String PUBSUB_NS = "DecisionEngineRouter";
public static final String ENGINE_SETTING = "decisionEngine";
public static final String TOMBSTONE_SETTING = "tombstones";
public static final String CONNECTION_STATE_SETTING = "";
protected boolean tombstoning;
protected RoutingDecisionEngine decider;
protected List> outgoingMessages;
protected Set tombstones;
/**
* Used to save state machine when new connections are made. See comment in
* changedConnection()
*/
protected Map conStates;
public DecisionEngineRouter(Settings s)
{
super(s);
Settings routeSettings = new Settings(PUBSUB_NS);
outgoingMessages = new LinkedList>();
decider = (RoutingDecisionEngine)routeSettings.createIntializedObject(
"routing." + routeSettings.getSetting(ENGINE_SETTING));
if(routeSettings.contains(TOMBSTONE_SETTING))
tombstoning = routeSettings.getBoolean(TOMBSTONE_SETTING);
else
tombstoning = false;
if(tombstoning)
tombstones = new HashSet(10);
conStates = new HashMap(4);
}
public DecisionEngineRouter(DecisionEngineRouter r)
{
super(r);
outgoingMessages = new LinkedList>();
decider = r.decider.replicate();
tombstoning = r.tombstoning;
if(this.tombstoning)
tombstones = new HashSet(10);
conStates = new HashMap(4);
}
@Override
public MessageRouter replicate()
{
return new DecisionEngineRouter(this);
}
@Override
public boolean createNewMessage(Message m)
{
if(decider.newMessage(m))
{
//if(m.getId().equals("M7"))
//System.out.println("Host: " + getHost() + " Creating M7");
makeRoomForNewMessage(m.getSize());
addToMessages(m, true);
findConnectionsForNewMessage(m, getHost());
return true;
}
return false;
}
@Override
public void changedConnection(Connection con)
{
DTNHost myHost = getHost();
DTNHost otherNode = con.getOtherNode(myHost);
DecisionEngineRouter otherRouter = (DecisionEngineRouter)otherNode.getRouter();
if(con.isUp())
{
decider.connectionUp(myHost, otherNode);
/*
* This part is a little confusing because there's a problem we have to
* avoid. When a connection comes up, we're assuming here that the two
* hosts who are now connected will exchange some routing information and
* update their own based on what the get from the peer. So host A updates
* its routing table with info from host B, and vice versa. In the real
* world, A would send its *old* routing information to B and compute new
* routing information later after receiving B's *old* routing information.
* In ONE, changedConnection() is called twice, once for each host A and
* B, in a serial fashion. If it's called for A first, A uses B's old info
* to compute its new info, but B later uses A's *new* info to compute its
* new info.... and this can lead to some nasty problems.
*
* To combat this, whichever host calls changedConnection() first calls
* doExchange() once. doExchange() interacts with the DecisionEngine to
* initiate the exchange of information, and it's assumed that this code
* will update the information on both peers simultaneously using the old
* information from both peers.
*/
if(shouldNotifyPeer(con))
{
this.doExchange(con, otherNode);
otherRouter.didExchange(con);
}
/*
* Once we have new information computed for the peer, we figure out if
* there are any messages that should get sent to this peer.
*/
Collection msgs = getMessageCollection();
for(Message m : msgs)
{
if(decider.shouldSendMessageToHost(m, otherNode))
outgoingMessages.add(new Tuple(m, con));
}
}
else
{
decider.connectionDown(myHost, otherNode);
conStates.remove(con);
/*
* If we were trying to send message to this peer, we need to remove them
* from the outgoing List.
*/
for(Iterator> i = outgoingMessages.iterator();
i.hasNext();)
{
Tuple t = i.next();
if(t.getValue() == con)
i.remove();
}
}
}
protected void doExchange(Connection con, DTNHost otherHost)
{
conStates.put(con, 1);
decider.doExchangeForNewConnection(con, otherHost);
}
/**
* Called by a peer DecisionEngineRouter to indicated that it already
* performed an information exchange for the given connection.
*
* @param con Connection on which the exchange was performed
*/
protected void didExchange(Connection con)
{
conStates.put(con, 1);
}
@Override
protected int startTransfer(Message m, Connection con)
{
int retVal;
if (!con.isReadyForTransfer()) {
return TRY_LATER_BUSY;
}
retVal = con.startTransfer(getHost(), m);
if (retVal == RCV_OK) { // started transfer
addToSendingConnections(con);
}
else if(tombstoning && retVal == DENIED_DELIVERED)
{
this.deleteMessage(m.getId(), false);
tombstones.add(m.getId());
}
else if (deleteDelivered && (retVal == DENIED_OLD || retVal == DENIED_DELIVERED) &&
decider.shouldDeleteOldMessage(m, con.getOtherNode(getHost()))) {
/* final recipient has already received the msg -> delete it */
//if(m.getId().equals("M7"))
//System.out.println("Host: " + getHost() + " told to delete M7");
this.deleteMessage(m.getId(), false);
}
return retVal;
}
@Override
public int receiveMessage(Message m, DTNHost from)
{
if(isDeliveredMessage(m) || (tombstoning && tombstones.contains(m.getId())))
return DENIED_DELIVERED;
return super.receiveMessage(m, from);
}
@Override
public Message messageTransferred(String id, DTNHost from)
{
Message incoming = removeFromIncomingBuffer(id, from);
if (incoming == null) {
throw new SimError("No message with ID " + id + " in the incoming "+
"buffer of " + getHost());
}
incoming.setReceiveTime(SimClock.getTime());
Message outgoing = incoming;
for (Application app : getApplications(incoming.getAppID())) {
// Note that the order of applications is significant
// since the next one gets the output of the previous.
outgoing = app.handle(outgoing, getHost());
if (outgoing == null) break; // Some app wanted to drop the message
}
Message aMessage = (outgoing==null)?(incoming):(outgoing);
boolean isFinalRecipient = decider.isFinalDest(aMessage, getHost());
boolean isFirstDelivery = isFinalRecipient &&
!isDeliveredMessage(aMessage);
if (outgoing!=null && decider.shouldSaveReceivedMessage(aMessage, getHost()))
{
// not the final recipient and app doesn't want to drop the message
// -> put to buffer
addToMessages(aMessage, false);
// Determine any other connections to which to forward a message
findConnectionsForNewMessage(aMessage, from);
}
if (isFirstDelivery)
{
this.deliveredMessages.put(id, aMessage);
}
for (MessageListener ml : this.mListeners) {
ml.messageTransferred(aMessage, from, getHost(),
isFirstDelivery);
}
return aMessage;
}
@Override
protected void transferDone(Connection con)
{
Message transferred = this.getMessage(con.getMessage().getId());
for(Iterator> i = outgoingMessages.iterator();
i.hasNext();)
{
Tuple t = i.next();
if(t.getKey().getId().equals(transferred.getId()) &&
t.getValue().equals(con))
{
i.remove();
break;
}
}
if(decider.shouldDeleteSentMessage(transferred, con.getOtherNode(getHost())))
{
//if(transferred.getId().equals("M7"))
//System.out.println("Host: " + getHost() + " deleting M7 after transfer");
this.deleteMessage(transferred.getId(), false);
}
}
@Override
public void update()
{
super.update();
if (!canStartTransfer() || isTransferring()) {
return; // nothing to transfer or is currently transferring
}
tryMessagesForConnected(outgoingMessages);
for(Iterator> i = outgoingMessages.iterator();
i.hasNext();)
{
Tuple t = i.next();
if(!this.hasMessage(t.getKey().getId()))
{
i.remove();
}
}
}
@Override
public void deleteMessage(String id, boolean drop)
{
super.deleteMessage(id, drop);
for(Iterator> i = outgoingMessages.iterator();
i.hasNext();)
{
Tuple t = i.next();
if(t.getKey().getId().equals(id))
{
i.remove();
}
}
}
public RoutingDecisionEngine getDecisionEngine()
{
return this.decider;
}
protected boolean shouldNotifyPeer(Connection con)
{
Integer i = conStates.get(con);
return i == null || i < 1;
}
protected void findConnectionsForNewMessage(Message m, DTNHost from)
{
for(Connection c : getHost())
//for(Connection c : getConnections())
{
DTNHost other = c.getOtherNode(getHost());
if(other != from && decider.shouldSendMessageToHost(m, other))
{
//if(m.getId().equals("M7"))
//System.out.println("Adding attempt for M7 from: " + getHost() + " to: " + other);
outgoingMessages.add(new Tuple(m, c));
}
}
}
}