J
John Stoffel
Hi all,
I'm trying to write a ruby script to do a recursive descent, counting up
the size of each directory and it's contents. Obviously, you just
recurse down to the bottom, and work your way back up.
But I need to break this into parallel processes because of the sheer
size of my directories, on the order of 1-5Tbs in size, with millions of
files. I managed to write a version using the Ruby thread library, but
since Ruby 1.8.6 is not truly threaded, it didn't buy me anything.
So I've tried to use the 'drb' and 'slave' libraries to make my life
simpler. Basically, I want to have N slaves going at any one time.
When processing a directory, and I find a sub-directory to descend, I
make a call back using DRb to see if I can fork off another slave to do
the work, or if I just recursively do the work myself.
Once I'm done in a directory, I wait on any slaves still working below
me, before I return my results to the next higher level.
I'm obviously doing something completely wrong here, but I'm missing out
what. Can anyone help me look at this code and tell me what I'm doing
wrong here? Or what I'm missing when you try to run it and it just
barfs (sorta) all over the place?
#!/usr/bin/ruby
require 'getoptlong'
#require 'thread'
require 'slave'
require 'drb'
$VERSION = "v1.0";
$max_slaves = 1
$count = 50
URI = "drbunix:///tmp/readdir+" + Process.pid.to_s
slave_cnt_mutex = Mutex.new
opts = GetoptLong.new(
[ "--help", "-h", GetoptLong::NO_ARGUMENT],
[ "--kids", "-k", GetoptLong::REQUIRED_ARGUMENT]
)
#---------------------------------------------------------------------
class Master
def initialize(max_slaves=1)
@slaves = []
@max_slaves = max_slaves
@slave_count = 1
@slave_mutex = Mutex.new
# Probably not needed...
@slave_cv = ConditionVariable.new
end
def count
@slave_count
end
# Increment the count of slaves, returning 1 if incremented, 0 if not.
def addslave
ok = nil
@slave_mutex.synchronize do
if (@slave_count < @max_slaves) then
@slave_count += 1
ok = 1
end
end
ok
end
# Decrement the count of slaves
def remslave
@slave_mutex.synchronize do
if (@slave_count > 1) then
@slave_count -= 1
end
end
@slave_count
end
end
#---------------------------------------------------------------------
# Read a directory and add to the database; this function is recursive
# for sub-directories
def readdir(master,dir)
#puts "readdir(#{dir})"
size_file = {}
size_dir = {}
size_total = 0
# Slave pool for this level of readdir() recursion.
pool = []
# Traverse the directory and collect the size of all files and
# directories
begin
Dir.foreach(dir) do |f|
#print " #{f},"
if(f != "." && f != "..") then
f_full = addpath(dir, f)
stat = File.lstat(f_full)
if(!stat.symlink?) then
if(stat.file?) then
#puts " File: #{f}"
size = File.size(f_full)
size_file[f] = size
size_total += size
end
if(stat.directory?) then
#puts " DIR: #{f}, thread_count = #{@thread_count},
max_slaves = #{@max_slaves}."
if ($max_slaves <= 1) then
#puts " no threads."
size = readdir(master,f_full)
if (size > 0) then
size_dir[f] = size
size_total += size
end
else
ok = master.addslave
if (ok)
puts " Threaded!"
pool << Slave.objectasync => true) {
size = readdir(master,f_full)
puts "size = #{size}"
}
else
puts " no free threads, do anyway"
size = readdir(master,f_full)
if(size > 0) then
size_dir[f] = size
size_total += size
end
end
end
end
end
end
end
rescue SystemCallError => errmsg
puts errmsg
end
pool.each { |slave|
size_total += slave.value
#master.remslave
slave.shutdown
}
#Puts "Dir: #{dir} = #{size_total}"
return size_total
end
#---------------------------------------------------------------------
def usage
puts
puts "usage: readdir-drb [--kids NUM] <dir>"
puts " defaults to #{$max_kids} children"
puts
puts " version: #{$version}"
puts
end
#---------------------------------------------------------------------
def addpath(a, b)
return a + b if(a =~ /\/$/)
return a + "/" + b
end
#---------------------------------------------------------------------
# Main
#---------------------------------------------------------------------
$DEBUG = true
opts.each do |opt,arg|
case opt
when "--kids"
$max_slaves = arg.to_i
else
usage
exit
end
end
if ARGV.length != 1
puts "Missing dir argument (try --help)"
exit 0
end
dir = ARGV.shift
# Start the server which does the counting and coordination between
slaves.
DRb.start_service URI, Master.new($max_slaves)
# Fire up the first slave process which will do the work of readdir()
master = DRbObject.new_with_uri URI
# Now let's try to do a recursive readdir() algorith with threads.
size = readdir(master,dir)
sizekb = size / 1024;
sizemb = sizekb / 1024;
sizegb = sizemb / 1024;
puts ""
puts "Total size: #{size} B"
puts "Total size: #{sizekb} KB"
puts "Total size: #{sizemb} MB"
puts "Total size: #{sizegb} GB"
I'm trying to write a ruby script to do a recursive descent, counting up
the size of each directory and it's contents. Obviously, you just
recurse down to the bottom, and work your way back up.
But I need to break this into parallel processes because of the sheer
size of my directories, on the order of 1-5Tbs in size, with millions of
files. I managed to write a version using the Ruby thread library, but
since Ruby 1.8.6 is not truly threaded, it didn't buy me anything.
So I've tried to use the 'drb' and 'slave' libraries to make my life
simpler. Basically, I want to have N slaves going at any one time.
When processing a directory, and I find a sub-directory to descend, I
make a call back using DRb to see if I can fork off another slave to do
the work, or if I just recursively do the work myself.
Once I'm done in a directory, I wait on any slaves still working below
me, before I return my results to the next higher level.
I'm obviously doing something completely wrong here, but I'm missing out
what. Can anyone help me look at this code and tell me what I'm doing
wrong here? Or what I'm missing when you try to run it and it just
barfs (sorta) all over the place?
#!/usr/bin/ruby
require 'getoptlong'
#require 'thread'
require 'slave'
require 'drb'
$VERSION = "v1.0";
$max_slaves = 1
$count = 50
URI = "drbunix:///tmp/readdir+" + Process.pid.to_s
slave_cnt_mutex = Mutex.new
opts = GetoptLong.new(
[ "--help", "-h", GetoptLong::NO_ARGUMENT],
[ "--kids", "-k", GetoptLong::REQUIRED_ARGUMENT]
)
#---------------------------------------------------------------------
class Master
def initialize(max_slaves=1)
@slaves = []
@max_slaves = max_slaves
@slave_count = 1
@slave_mutex = Mutex.new
# Probably not needed...
@slave_cv = ConditionVariable.new
end
def count
@slave_count
end
# Increment the count of slaves, returning 1 if incremented, 0 if not.
def addslave
ok = nil
@slave_mutex.synchronize do
if (@slave_count < @max_slaves) then
@slave_count += 1
ok = 1
end
end
ok
end
# Decrement the count of slaves
def remslave
@slave_mutex.synchronize do
if (@slave_count > 1) then
@slave_count -= 1
end
end
@slave_count
end
end
#---------------------------------------------------------------------
# Read a directory and add to the database; this function is recursive
# for sub-directories
def readdir(master,dir)
#puts "readdir(#{dir})"
size_file = {}
size_dir = {}
size_total = 0
# Slave pool for this level of readdir() recursion.
pool = []
# Traverse the directory and collect the size of all files and
# directories
begin
Dir.foreach(dir) do |f|
#print " #{f},"
if(f != "." && f != "..") then
f_full = addpath(dir, f)
stat = File.lstat(f_full)
if(!stat.symlink?) then
if(stat.file?) then
#puts " File: #{f}"
size = File.size(f_full)
size_file[f] = size
size_total += size
end
if(stat.directory?) then
#puts " DIR: #{f}, thread_count = #{@thread_count},
max_slaves = #{@max_slaves}."
if ($max_slaves <= 1) then
#puts " no threads."
size = readdir(master,f_full)
if (size > 0) then
size_dir[f] = size
size_total += size
end
else
ok = master.addslave
if (ok)
puts " Threaded!"
pool << Slave.objectasync => true) {
size = readdir(master,f_full)
puts "size = #{size}"
}
else
puts " no free threads, do anyway"
size = readdir(master,f_full)
if(size > 0) then
size_dir[f] = size
size_total += size
end
end
end
end
end
end
end
rescue SystemCallError => errmsg
puts errmsg
end
pool.each { |slave|
size_total += slave.value
#master.remslave
slave.shutdown
}
#Puts "Dir: #{dir} = #{size_total}"
return size_total
end
#---------------------------------------------------------------------
def usage
puts
puts "usage: readdir-drb [--kids NUM] <dir>"
puts " defaults to #{$max_kids} children"
puts
puts " version: #{$version}"
puts
end
#---------------------------------------------------------------------
def addpath(a, b)
return a + b if(a =~ /\/$/)
return a + "/" + b
end
#---------------------------------------------------------------------
# Main
#---------------------------------------------------------------------
$DEBUG = true
opts.each do |opt,arg|
case opt
when "--kids"
$max_slaves = arg.to_i
else
usage
exit
end
end
if ARGV.length != 1
puts "Missing dir argument (try --help)"
exit 0
end
dir = ARGV.shift
# Start the server which does the counting and coordination between
slaves.
DRb.start_service URI, Master.new($max_slaves)
# Fire up the first slave process which will do the work of readdir()
master = DRbObject.new_with_uri URI
# Now let's try to do a recursive readdir() algorith with threads.
size = readdir(master,dir)
sizekb = size / 1024;
sizemb = sizekb / 1024;
sizegb = sizemb / 1024;
puts ""
puts "Total size: #{size} B"
puts "Total size: #{sizekb} KB"
puts "Total size: #{sizemb} MB"
puts "Total size: #{sizegb} GB"