eaiovnaovbqoebvqoeavibavo PKX|iZ rinda.rbnu[require 'drb/drb' require 'thread' ## # A module to implement the Linda distributed computing paradigm in Ruby. # # Rinda is part of DRb (dRuby). # # == Example(s) # # See the sample/drb/ directory in the Ruby distribution, from 1.8.2 onwards. # #-- # TODO # == Introduction to Linda/rinda? # # == Why is this library separate from DRb? module Rinda ## # Rinda error base class class RindaError < RuntimeError; end ## # Raised when a hash-based tuple has an invalid key. class InvalidHashTupleKey < RindaError; end ## # Raised when trying to use a canceled tuple. class RequestCanceledError < ThreadError; end ## # Raised when trying to use an expired tuple. class RequestExpiredError < ThreadError; end ## # A tuple is the elementary object in Rinda programming. # Tuples may be matched against templates if the tuple and # the template are the same size. class Tuple ## # Creates a new Tuple from +ary_or_hash+ which must be an Array or Hash. def initialize(ary_or_hash) if hash?(ary_or_hash) init_with_hash(ary_or_hash) else init_with_ary(ary_or_hash) end end ## # The number of elements in the tuple. def size @tuple.size end ## # Accessor method for elements of the tuple. def [](k) @tuple[k] end ## # Fetches item +k+ from the tuple. def fetch(k) @tuple.fetch(k) end ## # Iterate through the tuple, yielding the index or key, and the # value, thus ensuring arrays are iterated similarly to hashes. def each # FIXME if Hash === @tuple @tuple.each { |k, v| yield(k, v) } else @tuple.each_with_index { |v, k| yield(k, v) } end end ## # Return the tuple itself def value @tuple end private def hash?(ary_or_hash) ary_or_hash.respond_to?(:keys) end ## # Munges +ary+ into a valid Tuple. def init_with_ary(ary) @tuple = Array.new(ary.size) @tuple.size.times do |i| @tuple[i] = ary[i] end end ## # Ensures +hash+ is a valid Tuple. def init_with_hash(hash) @tuple = Hash.new hash.each do |k, v| raise InvalidHashTupleKey unless String === k @tuple[k] = v end end end ## # Templates are used to match tuples in Rinda. class Template < Tuple ## # Matches this template against +tuple+. The +tuple+ must be the same # size as the template. An element with a +nil+ value in a template acts # as a wildcard, matching any value in the corresponding position in the # tuple. Elements of the template match the +tuple+ if the are #== or # #===. # # Template.new([:foo, 5]).match Tuple.new([:foo, 5]) # => true # Template.new([:foo, nil]).match Tuple.new([:foo, 5]) # => true # Template.new([String]).match Tuple.new(['hello']) # => true # # Template.new([:foo]).match Tuple.new([:foo, 5]) # => false # Template.new([:foo, 6]).match Tuple.new([:foo, 5]) # => false # Template.new([:foo, nil]).match Tuple.new([:foo]) # => false # Template.new([:foo, 6]).match Tuple.new([:foo]) # => false def match(tuple) return false unless tuple.respond_to?(:size) return false unless tuple.respond_to?(:fetch) return false unless self.size == tuple.size each do |k, v| begin it = tuple.fetch(k) rescue return false end next if v.nil? next if v == it next if v === it return false end return true end ## # Alias for #match. def ===(tuple) match(tuple) end end ## # Documentation? class DRbObjectTemplate ## # Creates a new DRbObjectTemplate that will match against +uri+ and +ref+. def initialize(uri=nil, ref=nil) @drb_uri = uri @drb_ref = ref end ## # This DRbObjectTemplate matches +ro+ if the remote object's drburi and # drbref are the same. +nil+ is used as a wildcard. def ===(ro) return true if super(ro) unless @drb_uri.nil? return false unless (@drb_uri === ro.__drburi rescue false) end unless @drb_ref.nil? return false unless (@drb_ref === ro.__drbref rescue false) end true end end ## # TupleSpaceProxy allows a remote Tuplespace to appear as local. class TupleSpaceProxy ## # Creates a new TupleSpaceProxy to wrap +ts+. def initialize(ts) @ts = ts end ## # Adds +tuple+ to the proxied TupleSpace. See TupleSpace#write. def write(tuple, sec=nil) @ts.write(tuple, sec) end ## # Takes +tuple+ from the proxied TupleSpace. See TupleSpace#take. def take(tuple, sec=nil, &block) port = [] @ts.move(DRbObject.new(port), tuple, sec, &block) port[0] end ## # Reads +tuple+ from the proxied TupleSpace. See TupleSpace#read. def read(tuple, sec=nil, &block) @ts.read(tuple, sec, &block) end ## # Reads all tuples matching +tuple+ from the proxied TupleSpace. See # TupleSpace#read_all. def read_all(tuple) @ts.read_all(tuple) end ## # Registers for notifications of event +ev+ on the proxied TupleSpace. # See TupleSpace#notify def notify(ev, tuple, sec=nil) @ts.notify(ev, tuple, sec) end end ## # An SimpleRenewer allows a TupleSpace to check if a TupleEntry is still # alive. class SimpleRenewer include DRbUndumped ## # Creates a new SimpleRenewer that keeps an object alive for another +sec+ # seconds. def initialize(sec=180) @sec = sec end ## # Called by the TupleSpace to check if the object is still alive. def renew @sec end end end PKX|iZz7ring.rbnu[# # Note: Rinda::Ring API is unstable. # require 'drb/drb' require 'rinda/rinda' require 'thread' module Rinda ## # The default port Ring discovery will use. Ring_PORT = 7647 ## # A RingServer allows a Rinda::TupleSpace to be located via UDP broadcasts. # Service location uses the following steps: # # 1. A RingServer begins listening on the broadcast UDP address. # 2. A RingFinger sends a UDP packet containing the DRb URI where it will # listen for a reply. # 3. The RingServer receives the UDP packet and connects back to the # provided DRb URI with the DRb service. class RingServer include DRbUndumped ## # Advertises +ts+ on the UDP broadcast address at +port+. def initialize(ts, port=Ring_PORT) @ts = ts @soc = UDPSocket.open @soc.bind('', port) @w_service = write_service @r_service = reply_service end ## # Creates a thread that picks up UDP packets and passes them to do_write # for decoding. def write_service Thread.new do loop do msg = @soc.recv(1024) do_write(msg) end end end ## # Extracts the response URI from +msg+ and adds it to TupleSpace where it # will be picked up by +reply_service+ for notification. def do_write(msg) Thread.new do begin tuple, sec = Marshal.load(msg) @ts.write(tuple, sec) rescue end end end ## # Creates a thread that notifies waiting clients from the TupleSpace. def reply_service Thread.new do loop do do_reply end end end ## # Pulls lookup tuples out of the TupleSpace and sends their DRb object the # address of the local TupleSpace. def do_reply tuple = @ts.take([:lookup_ring, DRbObject]) Thread.new { tuple[1].call(@ts) rescue nil} rescue end end ## # RingFinger is used by RingServer clients to discover the RingServer's # TupleSpace. Typically, all a client needs to do is call # RingFinger.primary to retrieve the remote TupleSpace, which it can then # begin using. class RingFinger @@broadcast_list = ['', 'localhost'] @@finger = nil ## # Creates a singleton RingFinger and looks for a RingServer. Returns the # created RingFinger. def self.finger unless @@finger @@finger = self.new @@finger.lookup_ring_any end @@finger end ## # Returns the first advertised TupleSpace. def self.primary finger.primary end ## # Contains all discovered TupleSpaces except for the primary. def self.to_a finger.to_a end ## # The list of addresses where RingFinger will send query packets. attr_accessor :broadcast_list ## # The port that RingFinger will send query packets to. attr_accessor :port ## # Contain the first advertised TupleSpace after lookup_ring_any is called. attr_accessor :primary ## # Creates a new RingFinger that will look for RingServers at +port+ on # the addresses in +broadcast_list+. def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT) @broadcast_list = broadcast_list || ['localhost'] @port = port @primary = nil @rings = [] end ## # Contains all discovered TupleSpaces except for the primary. def to_a @rings end ## # Iterates over all discovered TupleSpaces starting with the primary. def each lookup_ring_any unless @primary return unless @primary yield(@primary) @rings.each { |x| yield(x) } end ## # Looks up RingServers waiting +timeout+ seconds. RingServers will be # given +block+ as a callback, which will be called with the remote # TupleSpace. def lookup_ring(timeout=5, &block) return lookup_ring_any(timeout) unless block_given? msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout]) @broadcast_list.each do |it| soc = UDPSocket.open begin soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true) soc.send(msg, 0, it, @port) rescue nil ensure soc.close end end sleep(timeout) end ## # Returns the first found remote TupleSpace. Any further recovered # TupleSpaces can be found by calling +to_a+. def lookup_ring_any(timeout=5) queue = Queue.new Thread.new do self.lookup_ring(timeout) do |ts| queue.push(ts) end queue.push(nil) end @primary = queue.pop raise('RingNotFound') if @primary.nil? Thread.new do while it = queue.pop @rings.push(it) end end @primary end end ## # RingProvider uses a RingServer advertised TupleSpace as a name service. # TupleSpace clients can register themselves with the remote TupleSpace and # look up other provided services via the remote TupleSpace. # # Services are registered with a tuple of the format [:name, klass, # DRbObject, description]. class RingProvider ## # Creates a RingProvider that will provide a +klass+ service running on # +front+, with a +description+. +renewer+ is optional. def initialize(klass, front, desc, renewer = nil) @tuple = [:name, klass, front, desc] @renewer = renewer || Rinda::SimpleRenewer.new end ## # Advertises this service on the primary remote TupleSpace. def provide ts = Rinda::RingFinger.primary ts.write(@tuple, @renewer) end end end if __FILE__ == $0 DRb.start_service case ARGV.shift when 's' require 'rinda/tuplespace' ts = Rinda::TupleSpace.new Rinda::RingServer.new(ts) $stdin.gets when 'w' finger = Rinda::RingFinger.new(nil) finger.lookup_ring do |ts2| p ts2 ts2.write([:hello, :world]) end when 'r' finger = Rinda::RingFinger.new(nil) finger.lookup_ring do |ts2| p ts2 p ts2.take([nil, nil]) end end end PKX|iZI:j7j7 tuplespace.rbnu[require 'monitor' require 'thread' require 'drb/drb' require 'rinda/rinda' require 'enumerator' require 'forwardable' module Rinda ## # A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace) # together with expiry and cancellation data. class TupleEntry include DRbUndumped attr_accessor :expires ## # Creates a TupleEntry based on +ary+ with an optional renewer or expiry # time +sec+. # # A renewer must implement the +renew+ method which returns a Numeric, # nil, or true to indicate when the tuple has expired. def initialize(ary, sec=nil) @cancel = false @expires = nil @tuple = make_tuple(ary) @renewer = nil renew(sec) end ## # Marks this TupleEntry as canceled. def cancel @cancel = true end ## # A TupleEntry is dead when it is canceled or expired. def alive? !canceled? && !expired? end ## # Return the object which makes up the tuple itself: the Array # or Hash. def value; @tuple.value; end ## # Returns the canceled status. def canceled?; @cancel; end ## # Has this tuple expired? (true/false). # # A tuple has expired when its expiry timer based on the +sec+ argument to # #initialize runs out. def expired? return true unless @expires return false if @expires > Time.now return true if @renewer.nil? renew(@renewer) return true unless @expires return @expires < Time.now end ## # Reset the expiry time according to +sec_or_renewer+. # # +nil+:: it is set to expire in the far future. # +true+:: it has expired. # Numeric:: it will expire in that many seconds. # # Otherwise the argument refers to some kind of renewer object # which will reset its expiry time. def renew(sec_or_renewer) sec, @renewer = get_renewer(sec_or_renewer) @expires = make_expires(sec) end ## # Returns an expiry Time based on +sec+ which can be one of: # Numeric:: +sec+ seconds into the future # +true+:: the expiry time is the start of 1970 (i.e. expired) # +nil+:: it is Tue Jan 19 03:14:07 GMT Standard Time 2038 (i.e. when # UNIX clocks will die) def make_expires(sec=nil) case sec when Numeric Time.now + sec when true Time.at(1) when nil Time.at(2**31-1) end end ## # Retrieves +key+ from the tuple. def [](key) @tuple[key] end ## # Fetches +key+ from the tuple. def fetch(key) @tuple.fetch(key) end ## # The size of the tuple. def size @tuple.size end ## # Creates a Rinda::Tuple for +ary+. def make_tuple(ary) Rinda::Tuple.new(ary) end private ## # Returns a valid argument to make_expires and the renewer or nil. # # Given +true+, +nil+, or Numeric, returns that value and +nil+ (no actual # renewer). Otherwise it returns an expiry value from calling +it.renew+ # and the renewer. def get_renewer(it) case it when Numeric, true, nil return it, nil else begin return it.renew, it rescue Exception return it, nil end end end end ## # A TemplateEntry is a Template together with expiry and cancellation data. class TemplateEntry < TupleEntry ## # Matches this TemplateEntry against +tuple+. See Template#match for # details on how a Template matches a Tuple. def match(tuple) @tuple.match(tuple) end alias === match def make_tuple(ary) # :nodoc: Rinda::Template.new(ary) end end ## # Documentation? class WaitTemplateEntry < TemplateEntry attr_reader :found def initialize(place, ary, expires=nil) super(ary, expires) @place = place @cond = place.new_cond @found = nil end def cancel super signal end def wait @cond.wait end def read(tuple) @found = tuple signal end def signal @place.synchronize do @cond.signal end end end ## # A NotifyTemplateEntry is returned by TupleSpace#notify and is notified of # TupleSpace changes. You may receive either your subscribed event or the # 'close' event when iterating over notifications. # # See TupleSpace#notify_event for valid notification types. # # == Example # # ts = Rinda::TupleSpace.new # observer = ts.notify 'write', [nil] # # Thread.start do # observer.each { |t| p t } # end # # 3.times { |i| ts.write [i] } # # Outputs: # # ['write', [0]] # ['write', [1]] # ['write', [2]] class NotifyTemplateEntry < TemplateEntry ## # Creates a new NotifyTemplateEntry that watches +place+ for +event+s that # match +tuple+. def initialize(place, event, tuple, expires=nil) ary = [event, Rinda::Template.new(tuple)] super(ary, expires) @queue = Queue.new @done = false end ## # Called by TupleSpace to notify this NotifyTemplateEntry of a new event. def notify(ev) @queue.push(ev) end ## # Retrieves a notification. Raises RequestExpiredError when this # NotifyTemplateEntry expires. def pop raise RequestExpiredError if @done it = @queue.pop @done = true if it[0] == 'close' return it end ## # Yields event/tuple pairs until this NotifyTemplateEntry expires. def each # :yields: event, tuple while !@done it = pop yield(it) end rescue ensure cancel end end ## # TupleBag is an unordered collection of tuples. It is the basis # of Tuplespace. class TupleBag class TupleBin extend Forwardable def_delegators '@bin', :find_all, :delete_if, :each, :empty? def initialize @bin = [] end def add(tuple) @bin.push(tuple) end def delete(tuple) idx = @bin.rindex(tuple) @bin.delete_at(idx) if idx end def find @bin.reverse_each do |x| return x if yield(x) end nil end end def initialize # :nodoc: @hash = {} @enum = enum_for(:each_entry) end ## # +true+ if the TupleBag to see if it has any expired entries. def has_expires? @enum.find do |tuple| tuple.expires end end ## # Add +tuple+ to the TupleBag. def push(tuple) key = bin_key(tuple) @hash[key] ||= TupleBin.new @hash[key].add(tuple) end ## # Removes +tuple+ from the TupleBag. def delete(tuple) key = bin_key(tuple) bin = @hash[key] return nil unless bin bin.delete(tuple) @hash.delete(key) if bin.empty? tuple end ## # Finds all live tuples that match +template+. def find_all(template) bin_for_find(template).find_all do |tuple| tuple.alive? && template.match(tuple) end end ## # Finds a live tuple that matches +template+. def find(template) bin_for_find(template).find do |tuple| tuple.alive? && template.match(tuple) end end ## # Finds all tuples in the TupleBag which when treated as templates, match # +tuple+ and are alive. def find_all_template(tuple) @enum.find_all do |template| template.alive? && template.match(tuple) end end ## # Delete tuples which dead tuples from the TupleBag, returning the deleted # tuples. def delete_unless_alive deleted = [] @hash.each do |key, bin| bin.delete_if do |tuple| if tuple.alive? false else deleted.push(tuple) true end end end deleted end private def each_entry(&blk) @hash.each do |k, v| v.each(&blk) end end def bin_key(tuple) head = tuple[0] if head.class == Symbol return head else false end end def bin_for_find(template) key = bin_key(template) key ? @hash.fetch(key, []) : @enum end end ## # The Tuplespace manages access to the tuples it contains, # ensuring mutual exclusion requirements are met. # # The +sec+ option for the write, take, move, read and notify methods may # either be a number of seconds or a Renewer object. class TupleSpace include DRbUndumped include MonitorMixin ## # Creates a new TupleSpace. +period+ is used to control how often to look # for dead tuples after modifications to the TupleSpace. # # If no dead tuples are found +period+ seconds after the last # modification, the TupleSpace will stop looking for dead tuples. def initialize(period=60) super() @bag = TupleBag.new @read_waiter = TupleBag.new @take_waiter = TupleBag.new @notify_waiter = TupleBag.new @period = period @keeper = nil end ## # Adds +tuple+ def write(tuple, sec=nil) entry = create_entry(tuple, sec) synchronize do if entry.expired? @read_waiter.find_all_template(entry).each do |template| template.read(tuple) end notify_event('write', entry.value) notify_event('delete', entry.value) else @bag.push(entry) start_keeper if entry.expires @read_waiter.find_all_template(entry).each do |template| template.read(tuple) end @take_waiter.find_all_template(entry).each do |template| template.signal end notify_event('write', entry.value) end end entry end ## # Removes +tuple+ def take(tuple, sec=nil, &block) move(nil, tuple, sec, &block) end ## # Moves +tuple+ to +port+. def move(port, tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? synchronize do entry = @bag.find(template) if entry port.push(entry.value) if port @bag.delete(entry) notify_event('take', entry.value) return entry.value end raise RequestExpiredError if template.expired? begin @take_waiter.push(template) start_keeper if template.expires while true raise RequestCanceledError if template.canceled? raise RequestExpiredError if template.expired? entry = @bag.find(template) if entry port.push(entry.value) if port @bag.delete(entry) notify_event('take', entry.value) return entry.value end template.wait end ensure @take_waiter.delete(template) end end end ## # Reads +tuple+, but does not remove it. def read(tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? synchronize do entry = @bag.find(template) return entry.value if entry raise RequestExpiredError if template.expired? begin @read_waiter.push(template) start_keeper if template.expires template.wait raise RequestCanceledError if template.canceled? raise RequestExpiredError if template.expired? return template.found ensure @read_waiter.delete(template) end end end ## # Returns all tuples matching +tuple+. Does not remove the found tuples. def read_all(tuple) template = WaitTemplateEntry.new(self, tuple, nil) synchronize do entry = @bag.find_all(template) entry.collect do |e| e.value end end end ## # Registers for notifications of +event+. Returns a NotifyTemplateEntry. # See NotifyTemplateEntry for examples of how to listen for notifications. # # +event+ can be: # 'write':: A tuple was added # 'take':: A tuple was taken or moved # 'delete':: A tuple was lost after being overwritten or expiring # # The TupleSpace will also notify you of the 'close' event when the # NotifyTemplateEntry has expired. def notify(event, tuple, sec=nil) template = NotifyTemplateEntry.new(self, event, tuple, sec) synchronize do @notify_waiter.push(template) end template end private def create_entry(tuple, sec) TupleEntry.new(tuple, sec) end ## # Removes dead tuples. def keep_clean synchronize do @read_waiter.delete_unless_alive.each do |e| e.signal end @take_waiter.delete_unless_alive.each do |e| e.signal end @notify_waiter.delete_unless_alive.each do |e| e.notify(['close']) end @bag.delete_unless_alive.each do |e| notify_event('delete', e.value) end end end ## # Notifies all registered listeners for +event+ of a status change of # +tuple+. def notify_event(event, tuple) ev = [event, tuple] @notify_waiter.find_all_template(ev).each do |template| template.notify(ev) end end ## # Creates a thread that scans the tuplespace for expired tuples. def start_keeper return if @keeper && @keeper.alive? @keeper = Thread.new do while true sleep(@period) synchronize do break unless need_keeper? keep_clean end end end end ## # Checks the tuplespace to see if it needs cleaning. def need_keeper? return true if @bag.has_expires? return true if @read_waiter.has_expires? return true if @take_waiter.has_expires? return true if @notify_waiter.has_expires? end end end PKX|iZ rinda.rbnu[PKX|iZz7Uring.rbnu[PKX|iZI:j7j7 /tuplespace.rbnu[PKOg