2013年6月20日木曜日

rubyでFTPからCSV(?)をゲットして、DBに突っ込む

ftp.yml

remote_path: pub/test
retry: 3
host: 192.168.1.70
username: anonymous
password: anonymous


get_ftp.rb
sample usage: # ruby get_ftp.rb index_hist
will get 'index_hist.gz' from ftp and gunzip it to index_hist
require 'net/ftp'
require 'yaml'
cnt_retry = 0
puts "start process..."
begin
  ftp_cfg = YAML.load_file("ftp.yml")
  fn = ARGV[0]

  ftp = Net::FTP.open(ftp_cfg["host"])
  ftp.login(ftp_cfg["username"],ftp_cfg["password"])
  ftp.chdir(ftp_cfg["remote_path"])
  puts "getting #{fn}.gz from FTP for attempt #{cnt_retry}..."
  ftp.getbinaryfile("#{fn}.gz")
  system("gunzip -f #{fn}.gz")
  ftp.close
rescue => err
  if cnt_retry + 1 < 3
    cnt_retry += 1
    sleep 5
    retry
  else
    raise err
    log.error "There was an error: #{err.message}"
  end
else
  puts "Job done."
end
import to a mysql database db.yml host: 192.168.1.70
username: kagen
password: kagen
database: db_development
import_indices.rb
sample usage1: # ruby import_indices.rb master index_master
will import data from index_master into table 'indices'
sample usage2: # ruby import_indices.rb hist index_hist
will import data from index_hist into table 'index_value_hist'
require 'yaml'
require 'kconv'
require 'mysql'
puts "start process of data import..."
begin
  my = Mysql::init()
  db_cfg = YAML.load_file("db.yml")
  if ARGV[0] == "master"
    tbl = "indices"
    fld = "(itemcode, jpname, engname, jpsourcename, engunitname, unit, decimalpoint, startmonth, updated_at)"
  else
    tbl = "index_value_hist"
    fld = "(itemcode, subcode, cycle, yyyy, mm, dd, val, updated_at)"
  end
  puts "Connecting to host #{db_cfg["host"]} with user #{db_cfg["username"]} using database #{db_cfg["database"]}..."
  my.real_connect(db_cfg["host"], db_cfg["username"], db_cfg["password"], db_cfg["database"])
  puts "Connected Successfully"
  my.query("SET AUTOCOMMIT=0")
  puts "Start transaction..."
  my.query("START TRANSACTION")
  if ARGV[0] == "master"
    puts "Processing Index master"
  elsif ARGV[0] == "hist"
    puts "Processing Index values history"
  end

  begin
    File.open(ARGV[1]) do |f|
      f.each_line do |row|
        sql = nil
        rows = row.split(":")
        #puts rows.inspect
        if ARGV[0] == "master"
          sql = "INSERT INTO #{tbl} #{fld} VALUES('#{rows[0]}', '#{rows[1].toutf8}', '#{rows[2]}', '#{rows[3].toutf8}', '#{rows[4].toutf8}', '#{rows[5]}', '#{rows[6]}', '#{rows[7]}', CURRENT_TIMESTAMP)"
        elsif ARGV[0] == "hist"
          if rows[3] == "D"
            if rows[0].strip == "DEL"
              sql = "DELETE FROM #{tbl} WHERE itemcode = '#{rows[1].strip}' AND subcode = '#{rows[2].strip}' AND cycle = 'D' AND yyyy = '#{rows[4][0..3]}' AND mm = '#{rows[4][4..5]}' AND dd = '#{rows[4][6..7]}'"
            elsif rows[0].strip == "UPD"
              sql = "INSERT INTO #{tbl} #{fld} VALUES('#{rows[1].strip}', '#{rows[2].strip}', 'D', '#{rows[4][0..3]}', '#{rows[4][4..5]}', '#{rows[4][6..7]}', '#{rows[5]}', CURRENT_TIMESTAMP)"
            end
          elsif rows[3] == "M"
            if rows[0].strip == "DEL"
              sql = "DELETE FROM #{tbl} WHERE itemcode = '#{rows[1].strip}' AND subcode = '#{rows[2].strip}' AND cycle = 'D' AND yyyy = '#{rows[4][0..3]}' AND mm = '#{rows[4][4..5]}'"
            elsif rows[0].strip == "UPD"
              sql = "INSERT INTO #{tbl} #{fld} VALUES('#{rows[1].strip}', '#{rows[2].strip}', 'M', '#{rows[4][0..3]}', '#{rows[4][4..5]}', '', '#{rows[5]}', CURRENT_TIMESTAMP)"
            end
          end
        end
        if sql
          #puts sql
          my.query(sql)
        end
      end
    end
    my.query("COMMIT")
  rescue => err
    puts "rollback changes..."
    my.query("ROLLBACK")
    raise err
    log.error "There was an error while insert db: #{err.message}"
  end
rescue => err
  raise err
  log.error "There was an error: #{err.message}"
else
  puts "Job done."
end
create tabless CREATE TABLE `indices` (
`itemcode` varchar(15) NOT NULL ,
`jpname` varchar(30) NOT NULL ,
`engname` varchar(28) NOT NULL ,
`jpsourcename` varchar(32) NOT NULL ,
`jpunitname` varchar(30) NOT NULL ,
`engunitname` varchar(30) NOT NULL ,
`unit` int(8) NOT NULL ,
`decimalpoint` int(8) NOT NULL ,
`startmonth` int(8) NOT NULL ,
`updated_at` datetime NOT NULL ,
PRIMARY KEY (`itemcode`)
) type=InnoDB;
CREATE TABLE `index_value_hist` (
`itemcode` varchar(15) NOT NULL ,
`subcode` varchar(5) NOT NULL ,
`cycle` varchar(1) NOT NULL ,
`yyyy` varchar(4) NOT NULL ,
`mm` varchar(2) NOT NULL ,
`dd` varchar(2) NOT NULL ,
`val` double(24,7) NOT NULL ,
`updated_at` date NOT NULL ,
PRIMARY KEY (`itemcode`, `subcode`, `cycle`, `yyyy`, `mm`, `dd`)
) type=InnoDB;
Sample Data to import (EUC, LF) No title row ☆index_value_hist only☆ UPD :AREGEN :A :M:201212:150463.7400000:20121218
UPD :AREGEN :C :M:201212:158030.9800000:20121218
UPD :AREGEN :C :W:2012124:158030.9800000:20121218
UPD :AREGEN :H :M:201212:158030.9800000:20121218
...

0 件のコメント:

コメントを投稿