Last active
January 27, 2017 03:42
-
-
Save sj26/2095122 to your computer and use it in GitHub Desktop.
Process an enumerator using a simple thread pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "etc" | |
require "thread" | |
Enumerator.class_eval do | |
# Process an enumerator using a simple thread pool | |
# | |
# `of:` is pool size, defaults to detecting number of CPUs | |
# | |
# Caveat: Enumerator#map may yield results out of order. Work is processed in | |
# order, but apparently #map concatenates the results however it likes. This | |
# might be okay, depending on use case. Enumerator#each works great, though! | |
# | |
# 1.upto(10).in_thread_pool.map { |i| puts "#{Thread.current}: #{i}"; i * i } | |
# #<Thread:0x007fbed45fa218>: 1 | |
# #<Thread:0x007fbed45fa038>: 2 | |
# #<Thread:0x007fbed45f9f20>: 3 | |
# #<Thread:0x007fbed45f9e08>: 4 | |
# #<Thread:0x007fbed45fa218>: 5 | |
# #<Thread:0x007fbed45f9f20>: 6 | |
# #<Thread:0x007fbed45f9e08>: 7 | |
# #<Thread:0x007fbed45fa038>: 8 | |
# #<Thread:0x007fbed45f9f20>: 9 | |
# #<Thread:0x007fbed45fa038>: 10 | |
# => [1, 9, 16, 4, 36, 64, 25, 49, 81] | |
# | |
def in_thread_pool(of: Etc.nprocessors) | |
return enum_for(:in_thread_pool, of: of) unless block_given? | |
queue = SizedQueue.new(1) | |
threads = Array.new(of) do | |
Thread.new do | |
# pop blocks, returns nil when queue closed | |
while values = queue.pop | |
yield(*values) | |
end | |
end | |
end | |
while true | |
begin | |
queue.push next_values | |
rescue StopIteration => stop | |
queue.close | |
threads.each(&:join) | |
return stop.result | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "etc" | |
require "enumerator_thread_poolinging" | |
RSpec.describe "Enumerator#in_thread_pool" do | |
it "processes each element" do | |
a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] | |
b = a.to_enum.in_thread_pool.map { |i| i * i } | |
expect(b).to match_array([1, 4, 9, 16, 25, 36, 49, 64, 81, 100]) | |
end | |
it "uses specified number of threads" do | |
threads = 1.upto(100).to_enum.in_thread_pool(of: 1).map { Thread.current } | |
expect(threads.uniq.size).to eql(1) | |
threads = 1.upto(100).to_enum.in_thread_pool(of: 2).map { Thread.current } | |
expect(threads.uniq.size).to eql(2) | |
threads = 1.upto(100).to_enum.in_thread_pool(of: 3).map { Thread.current } | |
expect(threads.uniq.size).to eql(3) | |
end | |
it "uses the number of processors by default" do | |
threads = 1.upto(100).to_enum.in_thread_pool.map { Thread.current } | |
expect(threads.uniq.size).to eql(Etc.nprocessors) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment