Changeset 460

Show
Ignore:
Timestamp:
07/21/07 09:36:27 (1 year ago)
Author:
blackhedd
Message:

dev, datagram support

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • version_0/java/src/com/rubyeventmachine/EmReactor.java

    r450 r460  
    123123                                                        sn.configureBlocking(false); 
    124124                                                        String b = createBinding(); 
    125                                                         EventableChannel ec = new EventableChannel (sn, b, mySelector); 
     125                                                        EventableSocketChannel ec = new EventableSocketChannel (sn, b, mySelector); 
    126126                                                        Connections.put(b, ec); 
    127127                                                        eventCallback ((String)k.attachment(), EM_CONNECTION_ACCEPTED, ByteBuffer.wrap(b.getBytes())); 
     
    130130 
    131131                                        if (k.isReadable()) { 
     132                                                EventableChannel ec = (EventableChannel)k.attachment(); 
     133                                                myReadBuffer.clear(); 
     134                                                ec.readInboundData (myReadBuffer); 
     135                                                myReadBuffer.flip(); 
     136                                                String b = ec.getBinding(); 
     137                                                if (myReadBuffer.limit() > 0) { 
     138                                                        eventCallback (b, EM_CONNECTION_READ, myReadBuffer); 
     139                                                } 
     140                                                else { 
     141                                                        eventCallback (b, EM_CONNECTION_UNBOUND, EmptyByteBuffer); 
     142                                                        Connections.remove(b); 
     143                                                        k.channel().close();                                                     
     144                                                } 
     145                                                /* 
     146                                                System.out.println ("READABLE"); 
    132147                                                SocketChannel sn = (SocketChannel) k.channel(); 
    133148                                                //ByteBuffer bb = ByteBuffer.allocate(16 * 1024); 
     
    152167                                                        sn.close(); 
    153168                                                } 
     169                                                */ 
    154170                                        } 
    155171                                 
     
    165181                                         
    166182                                        if (k.isConnectable()) { 
    167                                                 EventableChannel ec = (EventableChannel)k.attachment(); 
     183                                                EventableSocketChannel ec = (EventableSocketChannel)k.attachment(); 
    168184                                                if (ec.finishConnecting()) { 
    169185                                                        eventCallback (ec.getBinding(), EM_CONNECTION_COMPLETED, EmptyByteBuffer); 
     
    190206                mySelector.close(); 
    191207                mySelector = null; 
     208 
     209                // run down open connections and sockets. 
     210                Iterator<ServerSocketChannel> i = Acceptors.values().iterator(); 
     211                while (i.hasNext()) { 
     212                        i.next().close(); 
     213                } 
     214                 
     215                Iterator<EventableChannel> i2 = Connections.values().iterator(); 
     216                while (i2.hasNext()) 
     217                        i2.next().close(); 
    192218        } 
    193219         
     
    218244                BindingIndex++; 
    219245                String s = createBinding(); 
    220                 //System.out.println (new Date().getTime()+ milliseconds); 
    221246                Timers.put(new Date().getTime() + milliseconds, s); 
    222247                return s; 
     
    241266        } 
    242267         
     268 
     269        /** 
     270         *  
     271         * @param address 
     272         * @param port 
     273         * @return 
     274         * @throws IOException 
     275         */ 
     276        public String openUdpSocket (String address, int port) throws IOException { 
     277                DatagramChannel dg = DatagramChannel.open(); 
     278                dg.configureBlocking(false); 
     279                dg.socket().bind( new InetSocketAddress(address,port)); 
     280                String b = createBinding(); 
     281                EventableChannel ec = new EventableDatagramChannel (dg, b, mySelector); 
     282                dg.register(mySelector, SelectionKey.OP_READ, ec); 
     283                Connections.put(b, ec); 
     284                return b; 
     285        } 
     286         
    243287        public void sendData (String sig, ByteBuffer bb) throws IOException { 
    244288                (Connections.get(sig)).scheduleOutboundData( bb ); 
     
    249293        } 
    250294         
     295        /** 
     296         *  
     297         * @param sig 
     298         * @param data 
     299         * @param length 
     300         * @param recipAddress 
     301         * @param recipPort 
     302         */ 
     303        public void sendDatagram (String sig, String data, int length, String recipAddress, int recipPort) { 
     304                sendDatagram (sig, ByteBuffer.wrap(data.getBytes()), recipAddress, recipPort); 
     305        } 
     306         
     307        /** 
     308         *  
     309         * @param sig 
     310         * @param bb 
     311         * @param recipAddress 
     312         * @param recipPort 
     313         */ 
     314        public void sendDatagram (String sig, ByteBuffer bb, String recipAddress, int recipPort) { 
     315                (Connections.get(sig)).scheduleOutboundDatagram( bb, recipAddress, recipPort); 
     316        } 
     317 
     318         
     319        /** 
     320         *  
     321         * @param address 
     322         * @param port 
     323         * @return 
     324         * @throws ClosedChannelException 
     325         */ 
    251326        public String connectTcpServer (String address, int port) throws ClosedChannelException { 
    252327                String b = createBinding(); 
     
    255330                        SocketChannel sc = SocketChannel.open(); 
    256331                        sc.configureBlocking(false); 
    257                         EventableChannel ec = new EventableChannel (sc, b, mySelector); 
     332                        EventableSocketChannel ec = new EventableSocketChannel (sc, b, mySelector); 
    258333 
    259334                        if (sc.connect (new InetSocketAddress (address, port))) { 
     
    285360 
    286361        public void closeConnection (String sig, boolean afterWriting) throws ClosedChannelException { 
    287                 //System.out.println ("???"+Connections.get(sig)); 
    288362                Connections.get(sig).scheduleClose (afterWriting); 
    289363        }