package routing; import java.util.*; import core.*; /** * This class overrides ActiveRouter in order to inject calls to a * DecisionEngine object where needed add extract as much code from the update() * method as possible. * * Forwarding Logic: * * A DecisionEngineRouter maintains a List of Tuple in * support of a call to ActiveRouter.tryMessagesForConnected() in * DecisionEngineRouter.update(). Since update() is called so frequently, we'd * like as little computation done in it as possible; hence the List that gets * updated when events happen. Four events cause the List to be updated: a new * message from this host, a new received message, a connection goes up, or a * connection goes down. On a new message (either from this host or received * from a peer), the collection of open connections is examined to see if the * message should be forwarded along them. If so, a new Tuple is added to the * List. When a connection goes up, the collection of messages is examined to * determine to determine if any should be sent to this new peer, adding a Tuple * to the list if so. When a connection goes down, any Tuple in the list * associated with that connection is removed from the List. * * Decision Engines * * Most (if not all) routing decision making is provided by a * RoutingDecisionEngine object. The DecisionEngine Interface defines methods * that enact computation and return decisions as follows: * * * * Tombstones * * The ONE has the the deleteDelivered option that lets a host delete a message * if it comes in contact with the message's destination. More aggressive * approach lets a host remember that a given message was already delivered by * storing the message ID in a list of delivered messages (which is called the * tombstone list here). Whenever any node tries to send a message to a host * that has a tombstone for the message, the sending node receives the * tombstone. * * @author PJ Dillon, University of Pittsburgh */ public class DecisionEngineRouter extends ActiveRouter { public static final String PUBSUB_NS = "DecisionEngineRouter"; public static final String ENGINE_SETTING = "decisionEngine"; public static final String TOMBSTONE_SETTING = "tombstones"; public static final String CONNECTION_STATE_SETTING = ""; protected boolean tombstoning; protected RoutingDecisionEngine decider; protected List> outgoingMessages; protected Set tombstones; /** * Used to save state machine when new connections are made. See comment in * changedConnection() */ protected Map conStates; public DecisionEngineRouter(Settings s) { super(s); Settings routeSettings = new Settings(PUBSUB_NS); outgoingMessages = new LinkedList>(); decider = (RoutingDecisionEngine)routeSettings.createIntializedObject( "routing." + routeSettings.getSetting(ENGINE_SETTING)); if(routeSettings.contains(TOMBSTONE_SETTING)) tombstoning = routeSettings.getBoolean(TOMBSTONE_SETTING); else tombstoning = false; if(tombstoning) tombstones = new HashSet(10); conStates = new HashMap(4); } public DecisionEngineRouter(DecisionEngineRouter r) { super(r); outgoingMessages = new LinkedList>(); decider = r.decider.replicate(); tombstoning = r.tombstoning; if(this.tombstoning) tombstones = new HashSet(10); conStates = new HashMap(4); } @Override public MessageRouter replicate() { return new DecisionEngineRouter(this); } @Override public boolean createNewMessage(Message m) { if(decider.newMessage(m)) { //if(m.getId().equals("M7")) //System.out.println("Host: " + getHost() + " Creating M7"); makeRoomForNewMessage(m.getSize()); addToMessages(m, true); findConnectionsForNewMessage(m, getHost()); return true; } return false; } @Override public void changedConnection(Connection con) { DTNHost myHost = getHost(); DTNHost otherNode = con.getOtherNode(myHost); DecisionEngineRouter otherRouter = (DecisionEngineRouter)otherNode.getRouter(); if(con.isUp()) { decider.connectionUp(myHost, otherNode); /* * This part is a little confusing because there's a problem we have to * avoid. When a connection comes up, we're assuming here that the two * hosts who are now connected will exchange some routing information and * update their own based on what the get from the peer. So host A updates * its routing table with info from host B, and vice versa. In the real * world, A would send its *old* routing information to B and compute new * routing information later after receiving B's *old* routing information. * In ONE, changedConnection() is called twice, once for each host A and * B, in a serial fashion. If it's called for A first, A uses B's old info * to compute its new info, but B later uses A's *new* info to compute its * new info.... and this can lead to some nasty problems. * * To combat this, whichever host calls changedConnection() first calls * doExchange() once. doExchange() interacts with the DecisionEngine to * initiate the exchange of information, and it's assumed that this code * will update the information on both peers simultaneously using the old * information from both peers. */ if(shouldNotifyPeer(con)) { this.doExchange(con, otherNode); otherRouter.didExchange(con); } /* * Once we have new information computed for the peer, we figure out if * there are any messages that should get sent to this peer. */ Collection msgs = getMessageCollection(); for(Message m : msgs) { if(decider.shouldSendMessageToHost(m, otherNode)) outgoingMessages.add(new Tuple(m, con)); } } else { decider.connectionDown(myHost, otherNode); conStates.remove(con); /* * If we were trying to send message to this peer, we need to remove them * from the outgoing List. */ for(Iterator> i = outgoingMessages.iterator(); i.hasNext();) { Tuple t = i.next(); if(t.getValue() == con) i.remove(); } } } protected void doExchange(Connection con, DTNHost otherHost) { conStates.put(con, 1); decider.doExchangeForNewConnection(con, otherHost); } /** * Called by a peer DecisionEngineRouter to indicated that it already * performed an information exchange for the given connection. * * @param con Connection on which the exchange was performed */ protected void didExchange(Connection con) { conStates.put(con, 1); } @Override protected int startTransfer(Message m, Connection con) { int retVal; if (!con.isReadyForTransfer()) { return TRY_LATER_BUSY; } retVal = con.startTransfer(getHost(), m); if (retVal == RCV_OK) { // started transfer addToSendingConnections(con); } else if(tombstoning && retVal == DENIED_DELIVERED) { this.deleteMessage(m.getId(), false); tombstones.add(m.getId()); } else if (deleteDelivered && (retVal == DENIED_OLD || retVal == DENIED_DELIVERED) && decider.shouldDeleteOldMessage(m, con.getOtherNode(getHost()))) { /* final recipient has already received the msg -> delete it */ //if(m.getId().equals("M7")) //System.out.println("Host: " + getHost() + " told to delete M7"); this.deleteMessage(m.getId(), false); } return retVal; } @Override public int receiveMessage(Message m, DTNHost from) { if(isDeliveredMessage(m) || (tombstoning && tombstones.contains(m.getId()))) return DENIED_DELIVERED; return super.receiveMessage(m, from); } @Override public Message messageTransferred(String id, DTNHost from) { Message incoming = removeFromIncomingBuffer(id, from); if (incoming == null) { throw new SimError("No message with ID " + id + " in the incoming "+ "buffer of " + getHost()); } incoming.setReceiveTime(SimClock.getTime()); Message outgoing = incoming; for (Application app : getApplications(incoming.getAppID())) { // Note that the order of applications is significant // since the next one gets the output of the previous. outgoing = app.handle(outgoing, getHost()); if (outgoing == null) break; // Some app wanted to drop the message } Message aMessage = (outgoing==null)?(incoming):(outgoing); boolean isFinalRecipient = decider.isFinalDest(aMessage, getHost()); boolean isFirstDelivery = isFinalRecipient && !isDeliveredMessage(aMessage); if (outgoing!=null && decider.shouldSaveReceivedMessage(aMessage, getHost())) { // not the final recipient and app doesn't want to drop the message // -> put to buffer addToMessages(aMessage, false); // Determine any other connections to which to forward a message findConnectionsForNewMessage(aMessage, from); } if (isFirstDelivery) { this.deliveredMessages.put(id, aMessage); } for (MessageListener ml : this.mListeners) { ml.messageTransferred(aMessage, from, getHost(), isFirstDelivery); } return aMessage; } @Override protected void transferDone(Connection con) { Message transferred = this.getMessage(con.getMessage().getId()); for(Iterator> i = outgoingMessages.iterator(); i.hasNext();) { Tuple t = i.next(); if(t.getKey().getId().equals(transferred.getId()) && t.getValue().equals(con)) { i.remove(); break; } } if(decider.shouldDeleteSentMessage(transferred, con.getOtherNode(getHost()))) { //if(transferred.getId().equals("M7")) //System.out.println("Host: " + getHost() + " deleting M7 after transfer"); this.deleteMessage(transferred.getId(), false); } } @Override public void update() { super.update(); if (!canStartTransfer() || isTransferring()) { return; // nothing to transfer or is currently transferring } tryMessagesForConnected(outgoingMessages); for(Iterator> i = outgoingMessages.iterator(); i.hasNext();) { Tuple t = i.next(); if(!this.hasMessage(t.getKey().getId())) { i.remove(); } } } @Override public void deleteMessage(String id, boolean drop) { super.deleteMessage(id, drop); for(Iterator> i = outgoingMessages.iterator(); i.hasNext();) { Tuple t = i.next(); if(t.getKey().getId().equals(id)) { i.remove(); } } } public RoutingDecisionEngine getDecisionEngine() { return this.decider; } protected boolean shouldNotifyPeer(Connection con) { Integer i = conStates.get(con); return i == null || i < 1; } protected void findConnectionsForNewMessage(Message m, DTNHost from) { for(Connection c : getHost()) //for(Connection c : getConnections()) { DTNHost other = c.getOtherNode(getHost()); if(other != from && decider.shouldSendMessageToHost(m, other)) { //if(m.getId().equals("M7")) //System.out.println("Adding attempt for M7 from: " + getHost() + " to: " + other); outgoingMessages.add(new Tuple(m, c)); } } } }