Skip to content

Instantly share code, notes, and snippets.

@tonyta
Created June 24, 2024 20:52
Show Gist options
  • Save tonyta/81a1d72ffee914f8c6785b7643881dad to your computer and use it in GitHub Desktop.
Save tonyta/81a1d72ffee914f8c6785b7643881dad to your computer and use it in GitHub Desktop.
CSV Filter
#!/usr/bin/env ruby
require "csv"
require "pathname"
require "fileutils"
class CSVFilter
CSV_WRITE_OPTIONS = {
write_headers: true,
force_quotes: true,
}
attr_reader :input, :output, :target_column, :target_value, :csv_hits
def initialize(input, target_column, target_value)
@input = Pathname(input)
@output = Pathname.pwd/"output"
@target_column = target_column
@target_value = target_value
@csv_hits = Array.new
end
def run!
puts <<~MSG
Filtering CSVs...
Input: #{input}
Output: #{output}
Filter: #{target_column.inspect}: #{target_value.inspect}
MSG
reset_output!
walk_tree input, &method(:filter_copy)
walk_tree output, &method(:delete_empty)
puts
puts <<~MSG
Done!
Total: #{csv_hits.count}
Passed: #{csv_hits.count(&:itself)}
Misses: #{csv_hits.count { _1 == false }}
Errors: #{csv_hits.count(&:nil?)}
MSG
end
def reset_output!
FileUtils.rm_r(output, force: true)
FileUtils.mkdir_p(output)
end
def walk_tree(dir, &block)
dir.children.each do |child|
walk_tree(child, &block) if child.directory?
yield child
end
end
def filter_copy(input_path)
return if input_path.directory?
output_path = output/input_path.relative_path_from(input)
FileUtils.mkdir_p(output_path.parent)
unless input_path.extname == ".csv"
FileUtils.cp(input_path, output_path)
return
end
csv_hits << false
headers = CSV.foreach(input_path).first
unless headers.include?(target_column)
csv_hits[-1] = nil
print "\e[1;31m✘\e[m"
return
end
CSV.open(output_path, "w", headers: headers, **CSV_WRITE_OPTIONS) do |csv|
CSV.foreach(input_path, headers: true) do |row|
if row[target_column] == target_value
csv_hits[-1] ||= true
csv << row
end
end
end
if csv_hits.last
print "\e[1;32m✔\e[m"
else
print "⋅"
FileUtils.rm(output_path)
end
end
def delete_empty(path)
FileUtils.rm_r(path) if path.directory? && path.empty?
end
end
CSVFilter.new(*ARGV).run!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment