Changeset 464

Show
Ignore:
Timestamp:
07/21/07 20:42:43 (1 year ago)
Author:
blackhedd
Message:

refactoring

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • version_0/java/src/com/rubyeventmachine/Application.java

    r451 r464  
    77import java.nio.channels.*; 
    88import java.util.*; 
     9import java.io.*; 
     10import java.net.SocketAddress; 
    911 
    1012/** 
     
    1214 * 
    1315 */ 
    14 public class Application extends EmReactor { 
    15  
    16         private TreeMap<String, Timer> timers; 
    17         private TreeMap<String, Connection> connections; 
    18         /** 
    19          *  
    20          */ 
    21         public Application() { 
    22                 timers = new TreeMap<String, Timer>(); 
    23                 connections = new TreeMap<String, Connection>(); 
    24         } 
     16public class Application { 
    2517         
    2618         
    27         public void eventCallback (String sig, int eventType, ByteBuffer data) { 
    28                 if (eventType == EM_TIMER_FIRED) { 
    29                         String timersig = new String (data.array()); 
    30                         //System.out.println ("EVSIG "+sig + "..."+new String(data.array())); 
    31                         Timer r = timers.remove(timersig); 
    32                         if (r != null) 
    33                                 r._fire(); 
    34                         else 
    35                                 throw new RuntimeException ("unable to run unknown timer"); 
     19        public class Reactor extends EmReactor { 
     20 
     21                private Application application; 
     22                private TreeMap<String, Timer> timers; 
     23                private TreeMap<String, Connection> connections; 
     24                private TreeMap<String, ConnectionFactory> acceptors; 
     25                /** 
     26                 *  
     27                 */ 
     28                public Reactor (Application app) { 
     29                        application = app; 
     30                        timers = new TreeMap<String, Timer>(); 
     31                        connections = new TreeMap<String, Connection>(); 
     32                        acceptors = new TreeMap<String, ConnectionFactory>(); 
    3633                } 
    37                 else if (eventType == EM_CONNECTION_COMPLETED) { 
    38                         Connection c = connections.get(sig); 
    39                         if (c != null) { 
    40                                 c.connectionCompleted(); 
     34 
     35 
     36                public void eventCallback (String sig, int eventType, ByteBuffer data) { 
     37                        if (eventType == EM_TIMER_FIRED) { 
     38                                String timersig = new String (data.array()); 
     39                                //System.out.println ("EVSIG "+sig + "..."+new String(data.array())); 
     40                                Timer r = timers.remove(timersig); 
     41                                if (r != null) 
     42                                        r._fire(); 
     43                                else 
     44                                        throw new RuntimeException ("unable to run unknown timer"); 
    4145                        } 
    42                         else 
    43                                 throw new RuntimeException ("connection completed to unknown object"); 
     46                        else if (eventType == EM_CONNECTION_COMPLETED) { 
     47                                Connection c = connections.get(sig); 
     48                                if (c != null) { 
     49                                        c.connectionCompleted(); 
     50                                } 
     51                                else 
     52                                        throw new RuntimeException ("connection completed to unknown object"); 
    4453 
    45                 } 
    46                 else if (eventType == EM_CONNECTION_UNBOUND) { 
    47                         Connection c = connections.get(sig); 
    48                         if (c != null) { 
    49                                 c.unbind(); 
    5054                        } 
    51                         else 
    52                                 throw new RuntimeException ("unbind received on unknown object"); 
    53                 } 
    54                 else if (eventType == EM_CONNECTION_READ) { 
    55                         Connection c = connections.get(sig); 
    56                         if (c != null) { 
    57                                 c.receiveData(data); 
     55                        else if (eventType == EM_CONNECTION_UNBOUND) { 
     56                                Connection c = connections.get(sig); 
     57                               if (c != null) { 
     58                                       c.unbind(); 
     59                               } 
     60                               else 
     61                                       throw new RuntimeException ("unbind received on unknown object"); 
    5862                        } 
    59                         else throw new RuntimeException ("received data on unknown object"); 
    60                 } 
    61                 else { 
    62                         System.out.println ("unknown event type: " + eventType); 
     63                        else if (eventType == EM_CONNECTION_ACCEPTED) { 
     64                                ConnectionFactory f = acceptors.get(sig); 
     65                                if (f != null) { 
     66                                        Connection c = f.connection(); 
     67                                        c.signature = new String (data.array()); 
     68                                        c.application = application; 
     69                                        connections.put(c.signature, c); 
     70                                        c.postInit(); 
     71                                        //System.out.println (sig+"..."+new String(data.array())); 
     72                                } 
     73                                else 
     74                                        throw new RuntimeException ("received connection on unknown acceptor"); 
     75                        } 
     76                        else if (eventType == EM_CONNECTION_READ) { 
     77                                Connection c = connections.get(sig); 
     78                                if (c != null) { 
     79                                        c.receiveData(data); 
     80                                } 
     81                                else throw new RuntimeException ("received data on unknown object"); 
     82                        } 
     83                        else { 
     84                                System.out.println ("unknown event type: " + eventType); 
     85                        } 
    6386                } 
    6487        } 
     88 
     89 
     90        Reactor reactor; 
    6591         
    66  
    67          
     92        public Application() { 
     93                reactor = new Reactor (this); 
     94        } 
    6895        public void addTimer (double seconds, Timer t) { 
    6996                t.application = this; 
    7097                t.interval = seconds; 
    71                 String s = installOneshotTimer ((int)(seconds * 1000)); 
    72                 timers.put(s, t); 
     98                String s = reactor.installOneshotTimer ((int)(seconds * 1000)); 
     99                reactor.timers.put(s, t); 
    73100                 
    74101        } 
    75102         
    76         public void connect (String host, int port, Connection c) throws ClosedChannelException{ 
    77                 String s = connectTcpServer(host, port); 
    78                 c.application = this; 
    79                 c.signature = s; 
    80                 connections.put(s, c); 
     103        public void connect (String host, int port, Connection c) { 
     104                try { 
     105                        String s = reactor.connectTcpServer(host, port); 
     106                        c.application = this; 
     107                        c.signature = s; 
     108                        reactor.connections.put(s, c); 
     109                        c.postInit(); 
     110                } catch (ClosedChannelException e) {} 
     111        } 
     112         
     113        public void startServer (SocketAddress sa, ConnectionFactory f) throws EmReactorException { 
     114                String s = reactor.startTcpServer(sa); 
     115                reactor.acceptors.put(s, f); 
     116        } 
     117         
     118        public void stop() { 
     119                reactor.stop(); 
     120        } 
     121        public void run() { 
     122                try { 
     123                        reactor.run(); 
     124                } catch (IOException e) {} 
     125        } 
     126        public void run (final Runnable r) { 
     127                addTimer(0, new Timer() { 
     128                        public void fire() { 
     129                                r.run(); 
     130                        } 
     131                }); 
     132                run(); 
     133        } 
     134         
     135        public void sendData (String sig, ByteBuffer bb) { 
     136                try { 
     137                        reactor.sendData(sig, bb); 
     138                } catch (IOException e) {} 
     139        } 
     140        public void closeConnection (String sig, boolean afterWriting) { 
     141                try { 
     142                        reactor.closeConnection(sig, afterWriting); 
     143                } catch (ClosedChannelException e) {} 
    81144        } 
    82145} 
  • version_0/java/src/com/rubyeventmachine/Connection.java

    r451 r464  
    11package com.rubyeventmachine; 
    22 
    3 import java.io.*; 
     3//import java.io.*; 
    44import java.nio.*; 
    5 import java.nio.channels.*; 
     5//import java.nio.channels.*; 
    66 
    77public class Connection { 
     
    1010        public String signature; 
    1111         
     12        public void postInit() {} 
    1213        public void connectionCompleted() {} 
    1314        public void unbind() {} 
     
    1920         * @param bytebuffer 
    2021         */ 
    21         public void sendData (ByteBuffer b) throws IOException
     22        public void sendData (ByteBuffer b)
    2223                application.sendData(signature, b); 
    2324        } 
     
    2728         * TODO: don't expose the exception here. 
    2829         */ 
    29         public void close() throws ClosedChannelException
     30        public void close()
    3031                application.closeConnection(signature, false); 
    3132        } 
     
    3334         * This is called by user code/ 
    3435         */ 
    35         public void closeAfterWriting() throws ClosedChannelException
     36        public void closeAfterWriting()
    3637                application.closeConnection(signature, true); 
    3738        } 
  • version_0/java/src/com/rubyeventmachine/EmReactor.java

    r460 r464  
    248248        } 
    249249         
    250         public String startTcpServer (String address, int port) throws IOException { 
     250        public String startTcpServer (SocketAddress sa) throws EmReactorException { 
     251                try { 
     252                        ServerSocketChannel server = ServerSocketChannel.open(); 
     253                        server.configureBlocking(false); 
     254                        server.socket().bind (sa); 
     255                        String s = createBinding(); 
     256                        Acceptors.put(s, server); 
     257                        server.register(mySelector, SelectionKey.OP_ACCEPT, s); 
     258                        return s; 
     259                } catch (IOException e) { 
     260                        // TODO, should parameterize this exception better. 
     261                        throw new EmReactorException ("unable to open socket acceptor"); 
     262                } 
     263        } 
     264         
     265        public String startTcpServer (String address, int port) throws EmReactorException { 
     266                return startTcpServer (new InetSocketAddress (address, port)); 
     267                /* 
    251268                ServerSocketChannel server = ServerSocketChannel.open(); 
    252269                server.configureBlocking(false); 
     
    256273                server.register(mySelector, SelectionKey.OP_ACCEPT, s); 
    257274                return s; 
     275                */ 
    258276        } 
    259277 
  • version_0/java/src/com/rubyeventmachine/EventableSocketChannel.java

    r463 r464  
    2121         
    2222        // TODO, must refactor this to permit channels that aren't sockets. 
    23         SocketChannel myChannel; 
    24         String myBinding; 
    25         Selector mySelector; 
    26         LinkedList<ByteBuffer> OutboundQ; 
     23        SocketChannel channel; 
     24        String binding; 
     25        Selector selector; 
     26        LinkedList<ByteBuffer> outboundQ; 
    2727        boolean bCloseScheduled; 
    2828         
     
    3333 
    3434 
    35         public EventableSocketChannel (SocketChannel sc, String binding, Selector sel) throws ClosedChannelException { 
    36                 myChannel = sc; 
    37                 myBinding = binding; 
    38                 mySelector = sel; 
     35        public EventableSocketChannel (SocketChannel sc, String _binding, Selector sel) throws ClosedChannelException { 
     36                channel = sc; 
     37                binding = _binding; 
     38                selector = sel; 
    3939                bCloseScheduled = false; 
    40                 OutboundQ = new LinkedList<ByteBuffer>(); 
     40                outboundQ = new LinkedList<ByteBuffer>(); 
    4141                 
    42                 sc.register(mySelector, SelectionKey.OP_READ, this); 
     42                sc.register(selector, SelectionKey.OP_READ, this); 
    4343        } 
    4444         
    4545        public String getBinding() { 
    46                 return myBinding; 
     46                return binding; 
    4747        } 
    4848         
     
    5353        public void close() { 
    5454                try { 
    55                         myChannel.close(); 
     55                        channel.close(); 
    5656                } catch (IOException e) { 
    5757                } 
     
    6565                                        sslEngine.wrap(bb, b); 
    6666                                        b.flip(); 
    67                                         OutboundQ.addLast(b); 
     67                                        outboundQ.addLast(b); 
    6868                                } 
    6969                                else { 
    70                                         OutboundQ.addLast(bb); 
     70                                        outboundQ.addLast(bb); 
    7171                                } 
    72                                 myChannel.register(mySelector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, this); 
     72                                channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, this); 
    7373                        } 
    7474                } catch (ClosedChannelException e) { 
     
    8888        public void readInboundData (ByteBuffer bb) { 
    8989                try { 
    90                         myChannel.read(bb); 
     90                        channel.read(bb); 
    9191                } catch (IOException e) { 
    9292                        throw new RuntimeException ("i/o error"); 
     
    107107         */ 
    108108        public boolean writeOutboundData(){ 
    109                 while (!OutboundQ.isEmpty()) { 
    110                         ByteBuffer b = OutboundQ.getFirst(); 
     109                while (!outboundQ.isEmpty()) { 
     110                        ByteBuffer b = outboundQ.getFirst(); 
    111111                        try { 
    112112                                if (b.remaining() > 0) 
    113                                         myChannel.write(b); 
     113                                        channel.write(b); 
    114114                        } 
    115115                        catch (IOException e) { 
     
    121121                        // buffers are full, so break out of here. 
    122122                        if (b.remaining() == 0) 
    123                                 OutboundQ.removeFirst(); 
     123                                outboundQ.removeFirst(); 
    124124                        else 
    125125                                break; 
    126126                } 
    127127 
    128                 if (OutboundQ.isEmpty()) { 
     128                if (outboundQ.isEmpty()) { 
    129129                        try { 
    130                                 myChannel.register(mySelector, SelectionKey.OP_READ, this); 
     130                                channel.register(selector, SelectionKey.OP_READ, this); 
    131131                        } catch (ClosedChannelException e) { 
    132132                        } 
     
    136136                // If anyone wants to close immediately, they're responsible for clearing 
    137137                // the outbound queue. 
    138                 return (bCloseScheduled && OutboundQ.isEmpty()) ? false : true; 
     138                return (bCloseScheduled && outboundQ.isEmpty()) ? false : true; 
    139139        } 
    140140         
    141141        public void setConnectPending() throws ClosedChannelException { 
    142                 myChannel.register(mySelector, SelectionKey.OP_CONNECT, this); 
     142                channel.register(selector, SelectionKey.OP_CONNECT, this); 
    143143        } 
    144144         
     
    150150        public boolean finishConnecting() throws ClosedChannelException { 
    151151                try { 
    152                         myChannel.finishConnect(); 
     152                        channel.finishConnect(); 
    153153                } 
    154154                catch (IOException e) { 
    155155                        return false; 
    156156                } 
    157                 myChannel.register(mySelector, SelectionKey.OP_READ, this); 
     157                channel.register(selector, SelectionKey.OP_READ, this); 
    158158                return true; 
    159159        } 
     
    161161        public void scheduleClose (boolean afterWriting) { 
    162162                if (!afterWriting) 
    163                         OutboundQ.clear(); 
    164                 try { 
    165                         myChannel.register(mySelector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, this); 
     163                        outboundQ.clear(); 
     164                try { 
     165                        channel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, this); 
    166166                } catch (ClosedChannelException e) { 
    167167                        throw new RuntimeException ("unable to schedule close"); // TODO, get rid of this. 
  • version_0/java/src/com/rubyeventmachine/tests/ConnectTest.java

    r452 r464  
    3939                a.addTimer(0, new Timer() { 
    4040                        public void fire() { 
    41                                 try { 
    42                                         application.connect("www.bayshorenetworks.com", 80, new Connection() { 
    43                                                 public void connectionCompleted() { 
    44                                                         try { 
    45                                                                 close(); 
    46                                                         } catch (ClosedChannelException e) { 
    47                                                         } 
    48                                                 } 
    49                                                 public void unbind() { 
    50                                                         application.stop(); 
    51                                                 } 
    52                                         }); 
    53                                 } catch (ClosedChannelException e) { 
    54                                         // TODO, must refactor this exception handler out of here. 
    55                                 } 
     41                                application.connect("www.bayshorenetworks.com", 80, new Connection() { 
     42                                        public void connectionCompleted() { 
     43                                                close(); 
     44                                        } 
     45                                        public void unbind() { 
     46                                                application.stop(); 
     47                                        } 
     48                                }); 
    5649                        } 
    5750                }); 
     
    6356                class Bays extends Connection { 
    6457                        public void connectionCompleted() { 
    65                                 try { 
    66                                         sendData (ByteBuffer.wrap( new String ("GET / HTTP/1.1\r\nHost: _\r\n\r\n").getBytes())); 
    67                                 } catch (IOException e) { 
    68                                 } 
     58                                sendData (ByteBuffer.wrap( new String ("GET / HTTP/1.1\r\nHost: _\r\n\r\n").getBytes())); 
    6959                        } 
    7060                        public void receiveData (ByteBuffer b) { 
     
    7767                a.addTimer(0, new Timer() { 
    7868                        public void fire() { 
    79                                 try { 
    80                                         application.connect("www.bayshorenetworks.com", 80, new Bays()); 
    81                                 } catch (ClosedChannelException e) { 
    82                                 } 
     69                                application.connect("www.bayshorenetworks.com", 80, new Bays()); 
    8370                        } 
    8471                }); 
    8572                a.run(); 
    8673        } 
     74         
     75         
     76         
     77        class C1 extends Connection { 
     78                Application application; 
     79                public C1 (Application a) { 
     80                        application = a; 
     81                } 
     82                public void postInit() { 
     83                        application.stop(); 
     84                } 
     85        } 
     86        @Test 
     87        public final void test3() { 
     88                final Application a = new Application(); 
     89                C1 c = new C1 (a); 
     90                a.run (new Runnable() { 
     91                        public void run() { 
     92                                a.connect("www.bayshorenetworks.com", 80, new C1(a)); 
     93                        } 
     94                }); 
     95        } 
     96         
     97         
     98 
    8799}