Changeset 472

Show
Ignore:
Timestamp:
07/22/07 10:14:20 (1 year ago)
Author:
blackhedd
Message:

added EM::Connection:stream_file_data

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • version_0/lib/em/streamer.rb

    r470 r472  
    2424#  
    2525 
     26 
     27module EventMachine 
     28        class FileStreamer 
     29                MappingThreshold = 8192 
     30                BackpressureLevel = 50000 
     31                ChunkSize = 4096 
     32 
     33                include Deferrable 
     34                def initialize connection, filename, args 
     35                        @connection = connection 
     36                        @http_chunks = args[:http_chunks] 
     37 
     38                        if File.exist?(filename) 
     39                                @size = File.size?(filename) 
     40                                if @size <= MappingThreshold 
     41                                        stream_without_mapping filename 
     42                                else 
     43                                        stream_with_mapping filename 
     44                                end 
     45                        else 
     46                                fail "file not found" 
     47                        end 
     48                end 
     49 
     50                def stream_without_mapping filename 
     51                        if @http_chunks 
     52                                @connection.send_data "#{format("%x",@size)}\r\n" 
     53                                @connection.send_file_data filename 
     54                                @connection.send_data "\r\n0\r\n\r\n" 
     55                        else 
     56                                @connection.send_file_data filename 
     57                        end 
     58                        succeed 
     59                end 
     60                private :stream_without_mapping 
     61 
     62                def stream_with_mapping filename 
     63                        ensure_mapping_extension_is_present 
     64 
     65                        @position = 0 
     66                        @mapping = EventMachine::FastFileReader::Mapper.new filename 
     67                        stream_one_chunk 
     68                end 
     69                private :stream_with_mapping 
     70 
     71                def stream_one_chunk 
     72                        loop { 
     73                                if @position < @size 
     74                                        if @connection.get_outbound_data_size > BackpressureLevel 
     75                                                EventMachine::next_tick {stream_one_chunk} 
     76                                        else 
     77                                                len = @size - @position 
     78                                                len = ChunkSize if (len > ChunkSize) 
     79 
     80                                                @connection.send_data( "#{format("%x",len)}\r\n" ) if @http_chunks 
     81                                                @connection.send_data( @mapping.get_chunk( @position, len )) 
     82                                                @connection.send_data("\r\n") if @http_chunks 
     83 
     84                                                @position += len 
     85                                        end 
     86                                else 
     87                                        @connection.send_data "0\r\n\r\n" if @http_chunks 
     88                                        @mapping.close 
     89                                        succeed 
     90                                        break 
     91                                end 
     92                        } 
     93                end 
     94 
     95                #-- 
     96                # We use an outboard extension class to get memory-mapped files. 
     97                # It's outboard to avoid polluting the core distro, but that means 
     98                # there's a "hidden" dependency on it. The first time we get here in 
     99                # any run, try to load up the dependency extension. User code will see 
     100                # a LoadError if it's not available, but code that doesn't require 
     101                # mapped files will work fine without it. This is a somewhat difficult 
     102                # compromise between usability and proper modularization. 
     103                # 
     104                def ensure_mapping_extension_is_present 
     105                        @@fastfilereader ||= (require 'fastfilereaderext') 
     106                end 
     107                private :ensure_mapping_extension_is_present 
     108 
     109        end 
     110end 
     111 
  • version_0/lib/eventmachine.rb

    r433 r472  
    6666require 'em/eventable' 
    6767require 'em/messages' 
     68require 'em/streamer' 
    6869 
    6970require 'shellwords' 
     
    12531254        end 
    12541255 
     1256        # Open a file on the filesystem and send it to the remote peer. This returns an 
     1257        # object of type EventMachine::Deferrable. The object's callbacks will be executed 
     1258        # on the reactor main thread when the file has been completely scheduled for 
     1259        # transmission to the remote peer. Its errbacks will be called in case of an error 
     1260        # (such as file-not-found). #stream_file_data employs various strategems to achieve 
     1261        # the fastest possible performance, balanced against minimum consumption of memory. 
     1262        # 
     1263        # You can control the behavior of #stream_file_data with the optional arguments parameter. 
     1264        # Currently-supported arguments are: 
     1265        # :http_chunks, a boolean flag which defaults false. If true, this flag streams the 
     1266        # file data in a format compatible with the HTTP chunked-transfer encoding. 
     1267        # 
     1268        # 
     1269        def stream_file_data filename, args={} 
     1270                EventMachine::FileStreamer.new( self, filename, args ) 
     1271        end 
    12551272 
    12561273 
  • version_0/tests/test_send_file.rb

    r413 r472  
    4545 
    4646        def teardown 
     47                File.unlink( TestFilename ) if File.exist?( TestFilename ) 
    4748        end 
    4849 
     
    6970        end 
    7071 
     72        def test_send_large_file 
     73                File.open( TestFilename, "w" ) {|f| 
     74                        f << ("A" * 1000000) 
     75                } 
     76 
     77                data = nil 
     78 
     79                EM.run { 
     80                        EM.start_server TestHost, TestPort, TestModule 
     81                        EM.add_timer(2) {EM.stop} # avoid hanging in case of error 
     82                        EM.defer proc { 
     83                                t = TCPSocket.new TestHost, TestPort 
     84                                data = t.read 
     85                        }, proc { 
     86                                EM.stop 
     87                        } 
     88                } 
     89 
     90                assert_equal( "A" * 1000000, data ) 
     91                File.unlink TestFilename 
     92        end 
     93 
     94 
     95        module StreamTestModule 
     96                def post_init 
     97                        EM::Deferrable.future( stream_file_data(TestFilename)) { 
     98                                close_connection_after_writing 
     99                        } 
     100                end 
     101        end 
     102 
     103        module ChunkStreamTestModule 
     104                def post_init 
     105                        EM::Deferrable.future( stream_file_data(TestFilename, :http_chunks=>true)) { 
     106                                close_connection_after_writing 
     107                        } 
     108                end 
     109        end 
     110 
     111        def test_stream_file_data 
     112                File.open( TestFilename, "w" ) {|f| 
     113                        f << ("A" * 1000) 
     114                } 
     115 
     116                data = nil 
     117 
     118                EM.run { 
     119                        EM.start_server TestHost, TestPort, StreamTestModule 
     120                        EM.add_timer(2) {EM.stop} # avoid hanging in case of error 
     121                        EM.defer proc { 
     122                                t = TCPSocket.new TestHost, TestPort 
     123                                data = t.read 
     124                        }, proc { 
     125                                EM.stop 
     126                        } 
     127                } 
     128 
     129                assert_equal( "A" * 1000, data ) 
     130 
     131                File.unlink TestFilename 
     132        end 
     133 
     134        def test_stream_chunked_file_data 
     135                File.open( TestFilename, "w" ) {|f| 
     136                        f << ("A" * 1000) 
     137                } 
     138 
     139                data = nil 
     140 
     141                EM.run { 
     142                        EM.start_server TestHost, TestPort, ChunkStreamTestModule 
     143                        EM.add_timer(2) {EM.stop} # avoid hanging in case of error 
     144                        EM.defer proc { 
     145                                t = TCPSocket.new TestHost, TestPort 
     146                                data = t.read 
     147                        }, proc { 
     148                                EM.stop 
     149                        } 
     150                } 
     151 
     152                assert_equal( "3e8\r\n#{"A" * 1000}\r\n0\r\n\r\n", data ) 
     153 
     154                File.unlink TestFilename 
     155        end 
     156 
     157        module BadFileTestModule 
     158                def post_init 
     159                        de = stream_file_data( TestFilename+"..." ) 
     160                        de.errback {|msg| 
     161                                send_data msg 
     162                                close_connection_after_writing 
     163                        } 
     164                end 
     165        end 
     166        def test_stream_bad_file 
     167                data = nil 
     168                EM.run { 
     169                        EM.start_server TestHost, TestPort, BadFileTestModule 
     170                        EM.add_timer(2) {EM.stop} # avoid hanging in case of error 
     171                        EM.defer proc { 
     172                                t = TCPSocket.new TestHost, TestPort 
     173                                data = t.read 
     174                        }, proc { 
     175                                EM.stop 
     176                        } 
     177                } 
     178 
     179                assert_equal( "file not found", data ) 
     180        end 
     181 
     182        def test_stream_large_file_data 
     183                File.open( TestFilename, "w" ) {|f| 
     184                        f << ("A" * 10000) 
     185                } 
     186 
     187                data = nil 
     188 
     189                EM.run { 
     190                        EM.start_server TestHost, TestPort, StreamTestModule 
     191                        EM.add_timer(2) {EM.stop} # avoid hanging in case of error 
     192                        EM.defer proc { 
     193                                t = TCPSocket.new TestHost, TestPort 
     194                                data = t.read 
     195                        }, proc { 
     196                                EM.stop 
     197                        } 
     198                } 
     199 
     200                assert_equal( "A" * 10000, data ) 
     201 
     202                File.unlink TestFilename 
     203        end 
     204 
     205        def test_stream_large_chunked_file_data 
     206                File.open( TestFilename, "w" ) {|f| 
     207                        f << ("A" * 10000) 
     208                } 
     209 
     210                data = nil 
     211 
     212                EM.run { 
     213                        EM.start_server TestHost, TestPort, ChunkStreamTestModule 
     214                        EM.add_timer(2) {EM.stop} # avoid hanging in case of error 
     215                        EM.defer proc { 
     216                                t = TCPSocket.new TestHost, TestPort 
     217                                data = t.read 
     218                        }, proc { 
     219                                EM.stop 
     220                        } 
     221                } 
     222 
     223                expected = [ 
     224                        "1000\r\n", 
     225                        "A" * 4096, 
     226                        "\r\n", 
     227                        "1000\r\n", 
     228                        "A" * 4096, 
     229                        "\r\n", 
     230                        "710\r\n", 
     231                        "A" * 0x710, 
     232                        "\r\n", 
     233                        "0\r\n", 
     234                        "\r\n" 
     235                ].join 
     236                assert_equal( expected, data ) 
     237 
     238                File.unlink TestFilename 
     239        end 
     240 
    71241end 
    72242