Changeset 785

Show
Ignore:
Timestamp:
09/15/08 05:46:23 (10 months ago)
Author:
francis
Message:

Applied a patch from Aman Gupta (tmm1) with contributions from
Riham Aldakkak, adds file-descriptor attach/detach and two new
event notifications, to support file descriptors not generated
internally by the reactor.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/ChangeLog

    r783 r785  
    134134        the bugs in buftok. 
    13513513Sep08: Improved the password handling in the Postgres protocol handler. 
     13615Sep08: Added attach/detach, contributed by Aman Gupta (tmm1) and Riham Aldakkak, 
     137        to support working with file descriptors not created in the reactor. 
    136138 
    137  
  • trunk/ext/cmain.cpp

    r679 r785  
    104104} 
    105105 
     106/************** 
     107evma_attach_fd 
     108**************/ 
     109 
     110extern "C" const char *evma_attach_fd (int file_descriptor, int notify_readable, int notify_writable) 
     111{ 
     112        if (!EventMachine) 
     113                throw std::runtime_error ("not initialized"); 
     114        return EventMachine->AttachFD (file_descriptor, (notify_readable ? true : false), (notify_writable ? true : false)); 
     115} 
     116 
     117/************** 
     118evma_detach_fd 
     119**************/ 
     120 
     121extern "C" int evma_detach_fd (const char *binding) 
     122{ 
     123        if (!EventMachine) 
     124                throw std::runtime_error ("not initialized"); 
     125 
     126        EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding)); 
     127        if (ed) 
     128                return EventMachine->DetachFD (ed); 
     129        else 
     130                throw std::runtime_error ("invalid binding to detach"); 
     131} 
    106132 
    107133/********************** 
  • trunk/ext/ed.cpp

    r687 r785  
    175175        EventableDescriptor (sd, em), 
    176176        bConnectPending (false), 
     177        bNotifyReadable (false), 
     178        bNotifyWritable (false), 
    177179        bReadAttemptedAfterClose (false), 
    178180        bWriteAttemptedAfterClose (false), 
     
    369371   */ 
    370372 
    371   if (bConnectPending
     373  if (bConnectPending || bNotifyWritable
    372374    return true; 
    373375  else { 
     
    411413                assert (!bReadAttemptedAfterClose); 
    412414                bReadAttemptedAfterClose = true; 
     415                return; 
     416        } 
     417 
     418        if (bNotifyReadable) { 
     419                if (EventCallback) 
     420                        (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_NOTIFY_READABLE, NULL, 0); 
    413421                return; 
    414422        } 
     
    550558        } 
    551559        else { 
     560 
     561                if (bNotifyWritable) { 
     562                        if (EventCallback) 
     563                                (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_NOTIFY_WRITABLE, NULL, 0); 
     564                        return; 
     565                } 
     566 
    552567                _WriteOutboundData(); 
    553568        } 
  • trunk/ext/ed.h

    r687 r785  
    4141 
    4242                int GetSocket() {return MySocket;} 
     43                void SetSocketInvalid() { MySocket = INVALID_SOCKET; } 
    4344                void Close(); 
    4445 
     
    140141 
    141142                void SetConnectPending (bool f) { bConnectPending = f; } 
     143 
     144                void SetNotifyReadable (bool readable) { bNotifyReadable = readable; } 
     145                void SetNotifyWritable (bool writable) { bNotifyWritable = writable; } 
    142146 
    143147                virtual void Read(); 
     
    173177        protected: 
    174178                bool bConnectPending; 
     179 
     180                bool bNotifyReadable; 
     181                bool bNotifyWritable; 
     182 
    175183                bool bReadAttemptedAfterClose; 
    176184                bool bWriteAttemptedAfterClose; 
  • trunk/ext/em.cpp

    r764 r785  
    11661166} 
    11671167 
     1168/************************ 
     1169EventMachine_t::AttachFD 
     1170************************/ 
     1171 
     1172const char *EventMachine_t::AttachFD (int fd, bool notify_readable, bool notify_writable) 
     1173{ 
     1174        #ifdef OS_UNIX 
     1175        if (fcntl(fd, F_GETFL, 0) < 0) 
     1176                throw std::runtime_error ("invalid file descriptor"); 
     1177        #endif 
     1178 
     1179        #ifdef OS_WIN32 
     1180        // TODO: add better check for invalid file descriptors (see ioctlsocket or getsockopt) 
     1181        if (fd == INVALID_SOCKET) 
     1182                throw std::runtime_error ("invalid file descriptor"); 
     1183        #endif 
     1184 
     1185        {// Check for duplicate descriptors 
     1186                for (size_t i = 0; i < Descriptors.size(); i++) { 
     1187                        EventableDescriptor *ed = Descriptors[i]; 
     1188                        assert (ed); 
     1189                        if (ed->GetSocket() == fd) 
     1190                                throw std::runtime_error ("adding existing descriptor"); 
     1191                } 
     1192 
     1193                for (size_t i = 0; i < NewDescriptors.size(); i++) { 
     1194                        EventableDescriptor *ed = NewDescriptors[i]; 
     1195                        assert (ed); 
     1196                        if (ed->GetSocket() == fd) 
     1197                                throw std::runtime_error ("adding existing new descriptor"); 
     1198                } 
     1199        } 
     1200 
     1201        ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this); 
     1202        if (!cd) 
     1203                throw std::runtime_error ("no connection allocated"); 
     1204 
     1205        cd->SetConnectPending (true); 
     1206        cd->SetNotifyReadable (notify_readable); 
     1207        cd->SetNotifyWritable (notify_writable); 
     1208 
     1209        Add (cd); 
     1210 
     1211        const char *out = NULL; 
     1212        out = cd->GetBinding().c_str(); 
     1213        if (out == NULL) 
     1214                closesocket (fd); 
     1215        return out; 
     1216} 
     1217 
     1218/************************ 
     1219EventMachine_t::DetachFD 
     1220************************/ 
     1221 
     1222int EventMachine_t::DetachFD (EventableDescriptor *ed) 
     1223{ 
     1224        if (!ed) 
     1225                throw std::runtime_error ("detaching bad descriptor"); 
     1226 
     1227        #ifdef HAVE_EPOLL 
     1228        if (bEpoll) { 
     1229                if (ed->GetSocket() != INVALID_SOCKET) { 
     1230                        assert (bEpoll); // wouldn't be in this method otherwise. 
     1231                        assert (epfd != -1); 
     1232                        int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent()); 
     1233                        // ENOENT or EBADF are not errors because the socket may be already closed when we get here. 
     1234                        if (e && (errno != ENOENT) && (errno != EBADF)) { 
     1235                                char buf [200]; 
     1236                                snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno)); 
     1237                                throw std::runtime_error (buf); 
     1238                        } 
     1239                } 
     1240        } 
     1241        #endif 
     1242 
     1243        #ifdef HAVE_KQUEUE 
     1244        if (bKqueue) { 
     1245                struct kevent k; 
     1246                EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_DELETE, 0, 0, ed); 
     1247                int t = kevent (kqfd, &k, 1, NULL, 0, NULL); 
     1248                assert (t == 0); 
     1249        } 
     1250        #endif 
     1251 
     1252        { // remove descriptor from lists 
     1253                int i, j; 
     1254                int nSockets = Descriptors.size(); 
     1255                for (i=0, j=0; i < nSockets; i++) { 
     1256                        EventableDescriptor *ted = Descriptors[i]; 
     1257                        assert (ted); 
     1258                        if (ted != ed) 
     1259                                Descriptors [j++] = ted; 
     1260                } 
     1261                while ((size_t)j < Descriptors.size()) 
     1262                        Descriptors.pop_back(); 
     1263 
     1264                ModifiedDescriptors.erase (ed); 
     1265        } 
     1266 
     1267        int fd = ed->GetSocket(); 
     1268 
     1269        // We depend on ~EventableDescriptor not calling close() if the socket is invalid 
     1270        ed->SetSocketInvalid(); 
     1271        delete ed; 
     1272 
     1273        return fd; 
     1274} 
    11681275 
    11691276/************ 
  • trunk/ext/em.h

    r668 r785  
    7070                const char *ConnectToServer (const char *, int); 
    7171                const char *ConnectToUnixServer (const char *); 
     72                const char *AttachFD (int, bool, bool); 
     73 
    7274                const char *CreateTcpServer (const char *, int); 
    7375                const char *OpenDatagramSocket (const char *, int); 
     
    8082                void Add (EventableDescriptor*); 
    8183                void Modify (EventableDescriptor*); 
     84                int DetachFD (EventableDescriptor*); 
    8285                void ArmKqueueWriter (EventableDescriptor*); 
    8386                void ArmKqueueReader (EventableDescriptor*); 
  • trunk/ext/eventmachine.h

    r679 r785  
    3131                EM_CONNECTION_ACCEPTED = 103, 
    3232                EM_CONNECTION_COMPLETED = 104, 
    33                 EM_LOOPBREAK_SIGNAL = 105 
     33                EM_LOOPBREAK_SIGNAL = 105, 
     34                EM_CONNECTION_NOTIFY_READABLE = 106, 
     35                EM_CONNECTION_NOTIFY_WRITABLE = 107 
     36 
    3437        }; 
    3538 
     
    4043        const char *evma_connect_to_server (const char *server, int port); 
    4144        const char *evma_connect_to_unix_server (const char *server); 
     45 
     46        const char *evma_attach_fd (int file_descriptor, int read_mode, int write_mode); 
     47        int evma_detach_fd (const char *binding); 
     48 
    4249        void evma_stop_tcp_server (const char *signature); 
    4350        const char *evma_create_tcp_server (const char *address, int port); 
  • trunk/ext/rubymain.cpp

    r679 r785  
    4040static VALUE Intern_receive_data; 
    4141 
     42static VALUE Intern_notify_readable; 
     43static VALUE Intern_notify_writable; 
    4244 
    4345/**************** 
     
    5456                rb_funcall (q, Intern_receive_data, 1, rb_str_new (a3, a4)); 
    5557        } 
     58        else if (a2 == EM_CONNECTION_NOTIFY_READABLE) { 
     59                VALUE t = rb_ivar_get (EmModule, Intern_at_conns); 
     60                VALUE q = rb_hash_aref (t, rb_str_new2(a1)); 
     61                if (q == Qnil) 
     62                        rb_raise (rb_eRuntimeError, "no connection"); 
     63                rb_funcall (q, Intern_notify_readable, 0); 
     64        } 
     65        else if (a2 == EM_CONNECTION_NOTIFY_WRITABLE) { 
     66                VALUE t = rb_ivar_get (EmModule, Intern_at_conns); 
     67                VALUE q = rb_hash_aref (t, rb_str_new2(a1)); 
     68                if (q == Qnil) 
     69                        rb_raise (rb_eRuntimeError, "no connection"); 
     70                rb_funcall (q, Intern_notify_writable, 0); 
     71        } 
    5672        else if (a2 == EM_LOOPBREAK_SIGNAL) { 
    5773                rb_funcall (EmModule, Intern_run_deferred_callbacks, 0); 
     
    319335                rb_raise (rb_eRuntimeError, "no connection"); 
    320336        return rb_str_new2 (f); 
     337} 
     338 
     339/*********** 
     340t_attach_fd 
     341***********/ 
     342 
     343static VALUE t_attach_fd (VALUE self, VALUE file_descriptor, VALUE read_mode, VALUE write_mode) 
     344{ 
     345        const char *f = evma_attach_fd (NUM2INT(file_descriptor), (read_mode == Qtrue) ? 1 : 0, (write_mode == Qtrue) ? 1 : 0); 
     346        if (!f || !*f) 
     347                rb_raise (rb_eRuntimeError, "no connection"); 
     348        return rb_str_new2 (f); 
     349} 
     350 
     351/*********** 
     352t_detach_fd 
     353***********/ 
     354 
     355static VALUE t_detach_fd (VALUE self,  VALUE signature) 
     356{ 
     357        return INT2NUM(evma_detach_fd (StringValuePtr(signature))); 
    321358} 
    322359 
     
    565602        Intern_call = rb_intern ("call"); 
    566603        Intern_receive_data = rb_intern ("receive_data"); 
     604 
     605        Intern_notify_readable = rb_intern ("notify_readable"); 
     606        Intern_notify_writable = rb_intern ("notify_writable"); 
    567607 
    568608        // INCOMPLETE, we need to define class Connections inside module EventMachine 
     
    591631        rb_define_module_function (EmModule, "connect_server", (VALUE(*)(...))t_connect_server, 2); 
    592632        rb_define_module_function (EmModule, "connect_unix_server", (VALUE(*)(...))t_connect_unix_server, 1); 
     633 
     634        rb_define_module_function (EmModule, "attach_fd", (VALUE (*)(...))t_attach_fd, 3); 
     635        rb_define_module_function (EmModule, "detach_fd", (VALUE (*)(...))t_detach_fd, 1); 
     636 
    593637        rb_define_module_function (EmModule, "open_udp_socket", (VALUE(*)(...))t_open_udp_socket, 2); 
    594638        rb_define_module_function (EmModule, "read_keyboard", (VALUE(*)(...))t_read_keyboard, 0); 
     
    627671        rb_define_const (EmModule, "ConnectionCompleted", INT2NUM(104)); 
    628672        rb_define_const (EmModule, "LoopbreakSignalled", INT2NUM(105)); 
    629 
    630  
     673 
     674        rb_define_const (EmModule, "ConnectionNotifyReadable", INT2NUM(106)); 
     675        rb_define_const (EmModule, "ConnectionNotifyWritable", INT2NUM(107)); 
     676 
     677
     678 
  • trunk/lib/eventmachine.rb

    r780 r785  
    679679  end 
    680680 
     681  # EventMachine::attach registers a given file descriptor or IO object with the eventloop 
     682  # 
     683  # If the handler provided has the functions notify_readable or notify_writable defined, 
     684  # EventMachine will not read or write from the socket, and instead fire the corresponding 
     685  # callback on the handler. 
     686  # 
     687  # To detach the file descriptor, use EventMachine::Connection#detach 
     688  # 
     689  # === Usage Example 
     690  # 
     691  #   module SimpleHttpClient 
     692  #     def initialize sock 
     693  #       @sock = sock 
     694  #     end 
     695  # 
     696  #     def notify_readable 
     697  #       header = @sock.readline 
     698  # 
     699  #       if header == "\r\n" 
     700  #         # detach returns the file descriptor number (fd == @sock.fileno) 
     701  #         fd = detach 
     702  #       end 
     703  #     rescue EOFError 
     704  #       detach 
     705  #     end 
     706  # 
     707  #     def unbind 
     708  #       EM.next_tick do 
     709  #         # socket is detached from the eventloop, but still open 
     710  #         data = @sock.read 
     711  #       end 
     712  #     end 
     713  #   end 
     714  # 
     715  #   EM.run{ 
     716  #     $sock = TCPSocket.new('site.com', 80) 
     717  #     $sock.write("GET / HTTP/1.0\r\n\r\n") 
     718  #     EM.attach $sock, SimpleHttpClient, $sock 
     719  #   } 
     720  # 
     721  #-- 
     722  # Thanks to Riham Aldakkak (eSpace Technologies) for the initial patch 
     723  def  EventMachine::attach io, handler=nil, *args 
     724    klass = if (handler and handler.is_a?(Class)) 
     725      handler 
     726    else 
     727      Class.new( Connection ) {handler and include handler} 
     728    end 
     729 
     730    arity = klass.instance_method(:initialize).arity 
     731    expected = arity >= 0 ? arity : -(arity + 1) 
     732    if (arity >= 0 and args.size != expected) or (arity < 0 and args.size < expected) 
     733      raise ArgumentError, "wrong number of arguments for #{klass}#initialize (#{args.size} for #{expected})" 
     734    end 
     735 
     736    readmode  = klass.public_instance_methods.any?{|m| m.to_sym == :notify_readable } 
     737    writemode = klass.public_instance_methods.any?{|m| m.to_sym == :notify_writable } 
     738 
     739    s = attach_fd io.respond_to?(:fileno) ? io.fileno : io, readmode, writemode 
     740 
     741    c = klass.new s, *args 
     742    @conns[s] = c 
     743    block_given? and yield c 
     744    c 
     745  end 
    681746 
    682747    #-- 
     
    11301195                elsif opcode == LoopbreakSignalled 
    11311196                        run_deferred_callbacks 
     1197                elsif opcode == ConnectionNotifyReadable 
     1198                        c = @conns[conn_binding] or raise ConnectionNotBound 
     1199                        c.notify_readable 
     1200                elsif opcode == ConnectionNotifyWritable 
     1201                        c = @conns[conn_binding] or raise ConnectionNotBound 
     1202                        c.notify_writable 
    11321203                end 
    11331204        end 
     
    13721443        end 
    13731444 
     1445        # EventMachine::Connection#detach will remove the given connection from the event loop. 
     1446        # The connection's socket remains open and its file descriptor number is returned 
     1447        def detach 
     1448                EventMachine::detach_fd @signature 
     1449        end 
     1450 
    13741451        # EventMachine::Connection#close_connection_after_writing is a variant of close_connection. 
    13751452        # All of the descriptive comments given for close_connection also apply to 
  • trunk/lib/eventmachine_version.rb

    r686 r785  
    2626module EventMachine 
    2727 
    28   VERSION = "0.12.0
     28  VERSION = "0.12.2
    2929 
    3030end 
  • trunk/Rakefile

    r714 r785  
    440440end 
    441441 
     442desc "Test Attach" 
     443task :test_attach do |t| 
     444  run_tests t, :extension, "test_attach*.rb" 
     445end 
     446 
    442447 
    443448desc "Build everything"