Чисто-рубиновый параллельных хэш


каков наилучший способ реализации хэша, который может быть изменен в нескольких потоках, но с наименьшим количеством блокировок. Для целей этого вопроса Вы можете предположить, что хэш будет прочитан тяжело. Он должен быть потокобезопасным во всех реализациях Ruby, включая те, которые работают по-настоящему одновременно, например JRuby, и он должен быть написан на чистом Ruby (без C или Java).

Не стесняйтесь представить наивное решение, которое всегда блокируется, но это не так вероятно, это лучшее решение. Очки за элегантность, но меньшая вероятность блокировки выигрывает над меньшим кодом.

10 58

10 ответов:

хорошо, теперь, когда вы указали фактическое значение "threadsafe", вот две потенциальные реализации. Следующий код будет работать вечно в MRI и JRuby. Реализация без блокировки следует за конечной моделью согласованности, где каждый поток использует свое собственное представление хэша, если мастер находится в потоке. Существует небольшая хитрость, необходимая для того, чтобы убедиться, что хранение всей информации в потоке не приводит к утечке памяти, но это обрабатывается и тестируется-размер процесса не растет при этом код. Обе реализации потребуют больше работы ,чтобы быть "завершенными", что означает удаление, обновление и т. д. потребуется некоторое мышление, но любая из двух приведенных ниже концепций будет соответствовать вашим требованиям.

это очень важно для людей, читающих эту тему, чтобы понять, что вся проблема является эксклюзивной для JRuby-in MRI встроенного хэша достаточно.

module Cash
  def Cash.new(*args, &block)
    env = ENV['CASH_IMPL']
    impl = env ? Cash.const_get(env) : LocklessImpl
    klass = defined?(JRUBY_VERSION) ? impl : ::Hash
    klass.new(*args)
  end

  class LocklessImpl
    def initialize
      @hash = {}
    end

    def thread_hash
      thread = Thread.current
      thread[:cash] ||= {}
      hash = thread[:cash][thread_key]
      if hash
        hash
      else
        hash = thread[:cash][thread_key] = {}
        ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
        hash
      end
    end

    def thread_key
      [Thread.current.object_id, object_id]
    end

    def []=(key, val)
      time = Time.now.to_f
      tuple = [time, val]
      @hash[key] = tuple
      thread_hash[key] = tuple
      val
    end

    def [](key)
    # check the master value
    #
      val = @hash[key]

    # someone else is either writing the key or it has never been set.  we
    # need to invalidate our own copy in either case
    #
      if val.nil?
        thread_val = thread_hash.delete(key)
        return(thread_val ? thread_val.last : nil)
      end

    # check our own thread local value
    #
      thread_val = thread_hash[key]

    # in this case someone else has written a value that we have never seen so
    # simply return it
    #
      if thread_val.nil?
        return(val.last)
      end

    # in this case there is a master *and* a thread local value, if the master
    # is newer juke our own cached copy
    #
      if val.first > thread_val.first
        thread_hash.delete(key)
        return val.last
      else
        return thread_val.last
      end
    end
  end

  class LockingImpl < ::Hash
    require 'sync'

    def initialize(*args, &block)
      super
    ensure
      extend Sync_m
    end

    def sync(*args, &block)
      sync_synchronize(*args, &block)
    end

    def [](key)
      sync(:SH){ super }
    end

    def []=(key, val)
      sync(:EX){ super }
    end
  end
end



if  == __FILE__
  iteration = 0

  loop do
    n = 42
    hash = Cash.new

    threads =
      Array.new(10) {
        Thread.new do
          Thread.current.abort_on_exception = true
          n.times do |key|
            hash[key] = key
            raise "#{ key }=nil" if hash[key].nil?
          end
        end
      }

    threads.map{|thread| thread.join}

    puts "THREADSAFE: #{ iteration += 1 }"
  end
end

Posting base / наивное решение, просто чтобы увеличить мой стек переполнения cred:

require 'thread'

class ConcurrentHash < Hash
  def initialize
    super
    @mutex = Mutex.new
  end

  def [](*args)
    @mutex.synchronize { super }
  end

  def []=(*args)
    @mutex.synchronize { super }
  end
end

Йехуда, я думаю, вы упомянули, что установка Ивара была атомной? А как насчет простой копии и замены?

require 'thread'

class ConcurrentHash
  def initialize
    @reader, @writer = {}, {}
    @lock = Mutex.new
  end

  def [](key)
    @reader[key]
  end

  def []=(key, value)
    @lock.synchronize {
      @writer[key] = value
      @reader, @writer = @writer, @reader
      @writer[key] = value
    }
  end
end

это класс-оболочка вокруг хэша, который позволяет одновременные читатели, но блокирует вещи для всех других типов доступа (включая повторные чтения).

class LockedHash
  def initialize
    @hash = Hash.new
    @lock = ThreadAwareLock.new()
    @reader_count = 0
  end

  def [](key)
    @lock.lock_read
    ret = @hash[key]
    @lock.unlock_read
    ret
  end

  def []=(key, value)
    @lock.lock_write
    @hash[key] = value
    @lock.unlock_write
  end

  def method_missing(method_sym, *arguments, &block)
    if @hash.respond_to? method_sym
      @lock.lock_block
      val = lambda{@hash.send(method_sym,*arguments, &block)}.call
      @lock.unlock_block
      return val
    end
    super
  end
end

вот код блокировки, который он использует:

class RWLock
  def initialize
    @outer = Mutex.new
    @inner = Mutex.new
    @reader_count = 0
  end
  def lock_read
    @outer.synchronize{@inner.synchronize{@reader_count += 1}}
  end
  def unlock_read
    @inner.synchronize{@reader_count -= 1}
  end
  def lock_write
    @outer.lock
    while @reader_count > 0 ;end
  end
  def unlock_write
    @outer.unlock
  end
end

class ThreadAwareLock < RWLock
  def initialize
    @owner = nil
    super
  end
  def lock_block
    lock_write
    @owner = Thread.current.object_id
  end
  def unlock_block
    @owner = nil
    unlock_write
  end
  def lock_read
    super unless my_block?
  end
  def unlock_read
    super unless my_block?
  end
  def lock_write
    super unless my_block?
  end
  def unlock_write
    super unless my_block?
  end
  def my_block?
    @owner == Thread.current.object_id
  end
end

потоковая блокировка позволяет вам заблокировать класс один раз, а затем вызвать методы, которые обычно блокируются, и не блокировать их. Вам это нужно, потому что вы уступаете блокам внутри некоторых методов, и эти блоки могут вызывать методы блокировки на объект, и вы не хотите взаимоблокировки или ошибки двойной блокировки. Вместо этого вы можете использовать блокировку подсчета.

вот попытка реализовать блокировки чтения и записи на уровне ведра:

class SafeBucket
  def initialize
    @lock = RWLock.new()
    @value_pairs = []
  end

  def get(key)
    @lock.lock_read
    pair = @value_pairs.select{|p| p[0] == key}
    unless pair && pair.size > 0
      @lock.unlock_read
      return nil
    end
    ret = pair[0][1]
    @lock.unlock_read
    ret
  end

  def set(key, value)
    @lock.lock_write
    pair = @value_pairs.select{|p| p[0] == key}
    if pair && pair.size > 0
      pair[0][1] = value
      @lock.unlock_write
      return
    end
    @value_pairs.push [key, value]
    @lock.unlock_write
    value
  end

  def each
    @value_pairs.each{|p| yield p[0],p[1]}
  end

end

class MikeConcurrentHash
  def initialize
    @buckets = []
    100.times {@buckets.push SafeBucket.new}
  end

  def [](key)
    bucket(key).get(key)
  end

  def []=(key, value)
    bucket(key).set(key, value)
  end

  def each
    @buckets.each{|b| b.each{|key, value| yield key, value}}
  end

  def bucket(key)
    @buckets[key.hash % 100]
  end
end

я перестал работать над этим, потому что это слишком медленно, поэтому каждый метод небезопасен (позволяет мутации другими потоками во время итерации), и он не поддерживает большинство методов хэша.

и вот тестовый жгут для одновременного хэши:

require 'thread'
class HashHarness
  Keys = [:a, :basic, :test, :harness, :for, :concurrent, :testing, :of, :hashes,
          :that, :tries, :to, :provide, :a, :framework, :for, :designing, :a, :good, :ConcurrentHash,
          :for, :all, :ruby, :implementations]

  def self.go
    h = new
    r = h.writiness_range(20, 10000, 0, 0)
    r.each{|k, v| p k + ' ' + v.map{|p| p[1]}.join(' ')}
    return
  end
  def initialize(classes = [MikeConcurrentHash, JoshConcurrentHash, JoshConcurrentHash2, PaulConcurrentHash, LockedHash, Hash])
    @classes = classes
  end
  def writiness_range(basic_threads, ops, each_threads, loops)
    result = {}
    @classes.each do |hash_class|
      res = []
      0.upto 10 do |i|
        writiness = i.to_f / 10
        res.push [writiness,test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)]
      end
      result[hash_class.name] = res
    end
    result
  end
  def test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)
    time = Time.now
    threads = []
    hash = hash_class.new
    populate_hash(hash)
    begin
    basic_threads.times do
      threads.push Thread.new{run_basic_test(hash, writiness, ops)}
    end
    each_threads.times do
      threads.push Thread.new{run_each_test(hash, writiness, loops)}
    end
    threads.each{|t| t.join}
    rescue ThreadError => e
      p [e.message, hash_class.name, basic_threads, ops, each_threads, loops, writiness].join(' ')
      return -1
    end
    p [hash_class.name, basic_threads, ops, each_threads, loops, writiness, Time.now - time].join(' ')
    return Time.now - time
  end
  def run_basic_test(hash, writiness, ops)
    ops.times do
      rand < writiness ? hash[choose_key]= rand : hash[choose_key]
    end
  end
  def run_each_test(hash, writiness, loops)
    loops.times do
      hash.each do |k, v|
        if rand < writiness
          each_write_work(hash, k, v)
        else
          each_read_work(k, v)
        end
      end
    end
  end
  def each_write_work(hash, key, value)
    hash[key] = rand
  end
  def each_read_work(key, value)
    key.to_s + ": " + value.to_s
  end
  def choose_key
    Keys[rand(Keys.size)]
  end
  def populate_hash(hash)
    Keys.each{|key| hash[key]=rand}  
  end
end

цифры: Последние

Writiness      0.0   0.1   0.2   0.3   0.4   0.5   0.6   0.7   0.8   0.9   1.0
ConcurrentHash 2.098 3.179 2.971 3.083 2.731 2.941 2.564 2.480 2.369 1.862 1.881
LockedHash     1.873 1.896 2.085 2.058 2.001 2.055 1.904 1.921 1.873 1.841 1.630
Hash           0.530 0.672 0.685 0.822 0.719 0.877 0.901 0.931 0.942 0.950 1.001

и МРТ

Writiness      0.0    0.1    0.2    0.3    0.4    0.5    0.6    0.7    0.8    0.9    1.0
ConcurrentHash  9.214  9.913  9.064 10.112 10.240 10.574 10.566 11.027 11.323 11.837 13.036
LockedHash     19.593 17.712 16.998 17.045 16.687 16.609 16.647 15.307 14.464 13.931 14.146
Hash            0.535  0.537  0.534  0.599  0.594  0.676  0.635  0.650  0.654  0.661  0.692

цифры МРТ довольно поразительны. Блокировка в МРТ действительно отстой.

Это может быть использование хомяк

хомячок реализует хэш-массив сопоставленных попыток (HAMT), а также некоторые другие постоянные структуры данных, на чистом Руби.

постоянные структуры данных неизменяемы, и вместо изменения (изменения) структуры, например, путем добавления или замены пары ключ-значение в хэше, вы вместо этого возвращаете новую структуру данных, которая содержит изменение. Трюк, с помощью постоянные неизменяемые структуры данных - это то, что вновь возвращенная структура данных повторно использует как можно больше предшественника.

Я думаю, чтобы реализовать с помощью хомяка, вы бы использовали их изменяемую хэш-оболочку, которая передает все чтения к текущему значению постоянного неизменяемого хэша (т. е. должна быть быстрой), при этом защищая все записи с помощью мьютекса и переключаясь на новое значение постоянного неизменяемого хэша после записи.

для пример:

require 'hamster'
require 'hamster/experimental/mutable_hash'    
hsh = Hamster.mutable_hash(:name => "Simon", :gender => :male)

# reading goes directly to hash
puts hsh[:name] # Simon

# writing is actually swapping to new value of underlying persistent data structure
hsh.put(:name, "Joe")
puts hsh[:name] # Joe

Итак, давайте использовать это для аналогичного типа проблемы, описанной:

(суть здесь)

require 'hamster'
require 'hamster/experimental/mutable_hash'

# a bunch of threads with a read/write ratio of 10:1
num_threads = 100
num_reads_per_write = 10
num_loops = 100 
hsh = Hamster.mutable_hash

puts RUBY_DESCRIPTION
puts "#{num_threads} threads x #{num_loops} loops, #{num_reads_per_write}:1 R/W ratio"

t0 = Time.now
Thread.abort_on_exception = true
threads = (0...num_threads).map do |n|
  Thread.new do
    write_key = n % num_reads_per_write
    read_keys = (0...num_reads_per_write).to_a.shuffle # random order
    last_read = nil

    num_loops.times do
      read_keys.each do |k|
        # Reads
        last_read = hsh[k]

        Thread.pass

        # Atomic increments in the correct ratio to reads
        hsh.put(k) { |v| (v || 0) + 1 } if k == write_key
      end
    end
  end
end

threads.map { |t| t.join }
t1 = Time.now

puts "Error in keys" unless (0...num_reads_per_write).to_a == hsh.keys.sort.to_a
puts "Error in values" unless hsh.values.all? { |v| v == (num_loops * num_threads) / num_reads_per_write }
puts "Time elapsed: #{t1 - t0} s"

Я получаю следующие результаты:

ruby 1.9.2p320 (2012-04-20 revision 35421) [x86_64-linux]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 5.763414627 s

jruby 1.7.0 (1.9.3p203) 2012-10-22 ff1ebbe on Java HotSpot(TM) 64-Bit Server VM 1.6.0_26-b03 [linux-amd64]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 1.697 s

что вы думаете об этом?

Это решение больше похоже на то, как можно было бы решить это в Scala или Clojure, хотя в этих языках, скорее всего, будет использоваться программная транзакционная память с поддержкой низкоуровневого процессора для атомарного сравнить и поменять операции, которые реализованы.

Edit: стоит отметить, что одна из причин реализации хомяка быстро заключается в том, что он имеет путь чтения без блокировки. Пожалуйста, ответьте в комментариях, если у вас есть вопросы об этом или как это работает.

этот (видео, pdf) - это хэш-таблица без блокировки, реализованная в Java.

спойлер: использует atomic Compare-and-Swap (CAS) операции, если они недоступны в Ruby, вы можете эмулировать их с помощью блокировок. не уверен, что это даст какое-либо преимущество над простыми защищенными от блокировки хэш-таблицами

не проверено, и наивный удар по оптимизации для чтения. Он предполагает, что большую часть времени значение не будет заблокировано. Если это так, то плотная петля будет пытаться, пока это не будет. Я ставлю Thread.critical там, чтобы гарантировать, что потоки чтения не будут выполняться до завершения записи. Не уверен, что критическая часть необходима, это действительно зависит от того, насколько Вы читаете, поэтому некоторые бенчмаркинги в порядке.

class ConcurrentHash < Hash

  def initialize(*args)
    @semaphore = Mutex.new
    super
  end

  def []=(k,v)
    begin
      old_crit = Thread.critical
      Thread.critical = true unless old_crit
      @semaphore.synchronize { super }
    ensure
      Thread.critical = old_crit
    end
  end

  def [](k)
    while(true)
      return super unless @semaphore.locked?
    end
  end

end

там может быть несколько других методов чтения, которые необходимо проверить @semaphore lock, я не знаю, все ли остальное реализовано в терминах #[].

Я довольно неясно, что понимать под этим. я думаю, что самая простая реализация-это просто

Hash

то есть встроенный в Ruby хэш и threadsafe если под threadsafe вы подразумеваете не взорвется, если > 1 потоков пытается получить к нему доступ. этот код будет работать безопасно навсегда

n = 4242
hash = {}

loop do
  a =
    Thread.new do
      n.times do
        hash[:key] = :val
      end
    end

  b =
    Thread.new do
      n.times do
        hash.delete(:key)
      end
    end

  c =
    Thread.new do
      n.times do
        val = hash[:key]
        raise val.inspect unless [nil, :val].include?(val)
      end
    end

  a.join
  b.join
  c.join
  p :THREADSAFE
end

Я подозреваю, что по потокобезопасности вы действительно имеете в виду кислоту - например, запись, такая как hash[:key]=:val, а затем чтение, если имеет[:key], вернет :val. но никакого количества обман с блокировкой может обеспечить это-последний всегда будет побеждать. например, скажем, у вас есть 42 потока все обновления хэша threadsafe-какое значение должно быть прочитано 43'rd?? конечно, под threasafe вы не имеете в виду какой - то общий порядок при записи-поэтому, если 42 потока активно писали, "правильное" значение любой верно? но встроенный хэш ruby работает именно так...

вы имеете в виду что-то вроде
hash.each do ...

в одном потоке и

hash.delete(key)

не мешали бы друг другу? я могу себе представить, что хочу, чтобы это было потокобезопасно, но это даже не безопасно в один поток с МРТ ruby (очевидно, вы не можете изменить хэш во время итерации по нему)

так что вы можете быть более конкретным о том, что вы подразумеваете под "threadsafe"??

единственный способ дать кислотную семантику - это грубый замок (конечно, это может быть метод, который взял блок - но все же внешний замок.)

планировщик потоков ruby не просто планирует поток в середине какой-то произвольной функции c (например, встроенные методы hash aref aset), поэтому они эффективно потокобезопасны.

к сожалению, я не могу добавить комментарий к ответу Майкла Софаера, где он вводит: класс RWLock и класс LockedHash с @reader_count и т. д. (пока не хватает кармы)

Это решение не работает. Это дает сообщение об ошибке: в 'разблокировать': попытка разблокировать мьютекс, который не заблокирован (ThreadError)

из-за логической ошибки: когда пришло время разблокировать вещи, то разблокировка происходит 1 дополнительное время (из-за отсутствия проверки my_block?(). Вместо этого он разблокирует его, даже если разблокировка не была необходима "is my block") и поэтому 2nd unlock на уже разблокированных mutes вызывает исключение. (Я вставьте полный код о том, как воспроизвести эту ошибку в конце данного поста).

также Майкл упомянул ,что " каждый метод небезопасен (позволяет мутации другими потоками во время итерации)", что было критично для меня, поэтому я получаю это упрощенное решение, которое работает для всех моих случаев использования, и оно просто блокирует мьютекс при любом вызове любого метода хэша при вызове из разных источников thread (вызовы из того же потока, которому принадлежит блокировка, не блокируются, чтобы избежать взаимоблокировок):

#
# This TrulyThreadSafeHash works!
#
# Note if one thread iterating the hash by #each method
# then the hash will be locked for all other threads (they will not be 
# able to even read from it)
#
class TrulyThreadSafeHash
  def initialize
    @mutex = Mutex.new
    @hash = Hash.new
  end

  def method_missing(method_sym, *arguments, &block)

    if !@mutex.owned?  # Returns true if this lock is currently held by current thread
        # We're trying to lock only if mutex is not owned by the current thread (is not locked or is locked by some other thread).
        # Following call will be blocking if mutex locked by other thread:
        @mutex.synchronize{
            return lambda{@hash.send(method_sym,*arguments, &block)}.call
        }
    end

    # We already own the lock (from current thread perspective).
    # We don't even check if @hash.respond_to?(method_sym), let's make Hash
    # respond properly on all calls (including bad calls (example: wrong method names))
    lambda{@hash.send(method_sym,*arguments, &block)}.call
  end

  # since we're tyring to mimic Hash we'll pretend to respond as Hash would
  def self.respond_to?(method_sym, include_private = false)
    Hash.respond_to(method_sym, include_private)
  end

  # override Object's to_s because our method_missing won't be called for to_s
  def to_s(*arguments)
      @mutex.synchronize{
        return @hash.to_s
      }
  end

  # And for those, who want to run extra mile:
  # to make our class json-friendly we shoud require 'json' and uncomment this:
  #def to_json(*options)
  #    @mutex.synchronize{
  #        return @hash.to_json(*options)
  #    }
  #end

end

а теперь полный пример для демонстрации / воспроизведения ошибки двойной разблокировки в решении Майкла Софаера:

#!/usr/bin/env ruby

# ======= unchanged copy-paste part from Michael Sofaer answer (begin) =======

class LockedHash
  def initialize
    @hash = Hash.new
    @lock = ThreadAwareLock.new()
    @reader_count = 0
  end

  def [](key)
    @lock.lock_read
    ret = @hash[key]
    @lock.unlock_read
    ret
  end

  def []=(key, value)
    @lock.lock_write
    @hash[key] = value
    @lock.unlock_write
  end

  def method_missing(method_sym, *arguments, &block)
    if @hash.respond_to? method_sym
      @lock.lock_block
      val = lambda{@hash.send(method_sym,*arguments, &block)}.call
      @lock.unlock_block
      return val
    end
    super
  end
end



class RWLock
  def initialize
    @outer = Mutex.new
    @inner = Mutex.new
    @reader_count = 0
  end
  def lock_read
    @outer.synchronize{@inner.synchronize{@reader_count += 1}}
  end
  def unlock_read
    @inner.synchronize{@reader_count -= 1}
  end
  def lock_write
    @outer.lock
    while @reader_count > 0 ;end
  end
  def unlock_write
    @outer.unlock
  end
end

class ThreadAwareLock < RWLock
  def initialize
    @owner = nil
    super
  end
  def lock_block
    lock_write
    @owner = Thread.current.object_id
  end
  def unlock_block
    @owner = nil
    unlock_write
  end
  def lock_read
    super unless my_block?
  end
  def unlock_read
    super unless my_block?
  end
  def lock_write
    super unless my_block?
  end
  def unlock_write
    super unless my_block?
  end
  def my_block?
    @owner == Thread.current.object_id
  end
end

# ======= unchanged copy-paste part from Michael Sofaer answer (end) =======


# global hash object, which will be 'shared' across threads
$h = LockedHash.new

# hash_reader is just iterating through the 'shared' hash $h
# and prints specified delimeter (capitalized when last hash item read)
def hash_reader(delim)
    loop{
        count = 0
        $h.each{
            count += 1
            if count != $h.size
                $stderr.print delim
            else
                $stderr.puts delim.upcase
            end
        }
    }
end

# fill hash with 10 items
10.times{|i|
    $h[i] = i
}

# create a thread which will read $h hash
t1 = Thread.new(){
    hash_reader("o")
}

t1.join  # will never happen, but for completeness

, что дает следующую ошибку:

./LockedHash_fails_to_unlock.rb
oooooooooO
./LockedHash_fails_to_unlock.rb:55:in `unlock': Attempt to unlock a mutex which is not locked (ThreadError)
        from ./LockedHash_fails_to_unlock.rb:55:in `unlock_write'
        from ./LockedHash_fails_to_unlock.rb:82:in `unlock_write'
        from ./LockedHash_fails_to_unlock.rb:70:in `unlock_block'
        from ./LockedHash_fails_to_unlock.rb:29:in `method_missing'
        from ./LockedHash_fails_to_unlock.rb:100:in `block in hash_reader'
        from ./LockedHash_fails_to_unlock.rb:98:in `loop'
        from ./LockedHash_fails_to_unlock.rb:98:in `hash_reader'
        from ./LockedHash_fails_to_unlock.rb:119:in `block in <main>'

поскольку вы упоминаете, что хэш будет считываться тяжело, наличие одного мьютекса, блокирующего чтение и запись, приведет к условиям гонки, которые, скорее всего, выиграют чтения. Если вы не против, то проигнорируйте ответ.

Если вы хотите дать приоритет записи, блокировка чтения-записи поможет. Следующий код основан на некотором старом назначении c++ для класса операционных систем, поэтому может быть не лучшим качеством, но дает общую идею.

require 'thread'

class ReadWriteLock
  def initialize
    @critical_section = Mutex.new
    @are_writers_finished = ConditionVariable.new
    @are_readers_finished = ConditionVariable.new
    @readers = 0
    @writers = 0
    @writer_locked = false
  end

  def read
    begin
      start_read
      yield
    ensure
      end_read
    end
  end

  def start_read
    @critical_section.lock
    while (@writers != 0 || @writer_locked)
      @are_writers_finished.wait(@critical_section)
    end
    @readers += 1
    @critical_section.unlock
  end

  def end_read
    @critical_section.lock
    if (@readers -= 1) == 0
      @are_readers_finished.broadcast
    end
    @critical_section.unlock
  end

  def write
    begin
      start_write
      yield
    ensure
      end_write
    end
  end

  def start_write
    @critical_section.lock
    @writers += 1
    while @readers > 0
      @are_readers_finished.wait(@critical_section)
    end
    while @writer_locked
      @are_writers_finished.wait(@critical_section)
    end
    @writers -= 1
    @writer_locked = true
    @critical_section.unlock
  end

  def end_write
    @critical_section.lock
    @writer_locked = false
    @are_writers_finished.broadcast
    @critical_section.unlock
  end
end

тогда просто оберните []= и [] в замке.напишите и зафиксируйте.читать. Может иметь влияние на производительность, но гарантирует, что записи будут "проходить" через чтение. Полезность этого зависит от того, насколько тяжело читать на самом деле.