カイワレの大冒険 Third

技術的なことや他愛もないことをたまに書いてます

今まで作ったPython, Ruby, Bashの関数を色々晒してみる

華金なのに飲みもないので、ブログでも書こう。今日は、今までに書いた関数をいくつも晒してみようと思う。RubyとかPythonとかbashとか必要に応じて色々作った関数もそこそこあるので、誰の役に立つかわからないけど、晒してみる。 importとかまでは記してないし、コピペしただけでは動かないのが大半なので、もし利用する場合はググるなり、Twitterでメンション飛ばすなりしてください。

Python

バージョンチェック

最近書くスクリプトではこのチェック必ず入れてる。自分以外の人が使うときに、そこでハマってほしくないので。

def check_python_version():
    u""" 実行されているPythonのバージョンが許可したバージョンかの確認を行います。
    """
    python_version = __get_python_version()
    expected_python_version = Constants.get_python_version()
    from hogehoge import log
    log.output(
        "REAL PYTHON VERSION: %s, EXPECTED PYTHON VERSION: %s"
        % (python_version, expected_python_version)
    )
    if python_version != expected_python_version:
        raise Exception('このPythonバージョンでの実行は許可されていません')


def __get_python_version():
    u""" 実行されているPythonのバージョンを取得します。
    """
    __python_version = '%i.%i.%i' % (
        sys.version_info[0],
        sys.version_info[1],
        sys.version_info[2])
    return __python_version

ユーザ周り

バージョンチェック同様、ユーザのバリデーションも欠かせない。

def __get_exec_user():
    u""" スクリプトの実行ユーザを返します。
    """
    return pwd.getpwuid(os.getuid())[0]


def check_exec_user():
    u""" 実行ユーザのバリデーションをします。
    """
    exec_user = __get_exec_user()
    expected_exec_user = Constants.get_exec_user()
    from hogehoge import log
    log.output(
        "REAL EXEC USER: %s, EXPECTED EXEC USER: %s"
        % (exec_user, expected_exec_user)
    )
    if exec_user != expected_exec_user:
        raise Exception('このユーザでの実行は許可されていません')

ロック

重複実行は避けたいので。コマンドラインツールとかだと使う。

def create_lock_file():
    u""" ロックファイルを作成します。

         既にロックファイルが存在する場合は例外をスローします。
    """
    __lock_file_path = __get_lock_dir_path() + __get_lock_file_name()

    __check_duplicate_execution()

    try:
        # ファイル作成
        log.output('create lock file: ' + __lock_file_path, 'info')
        fp = open(__lock_file_path, "w")

    except Exception as inst:
        print(type(inst))
        log.output('READ ERROR', 'warning')

    finally:
        fp.close


def delete_lock_file():
    u""" ロックファイルを削除します。
    """
    __lock_file_path = __get_lock_dir_path() + __get_lock_file_name()
    log.output('delete lock file: ' + __lock_file_path, 'info')
    if os.path.exists(__lock_file_path):
        os.remove(__lock_file_path)


def __get_lock_file_name():
    u""" ロックファイルの名前を取得します。
    """
    __env = retrieve_env()
    return CONSTANTS[__env]['lock']['FILE_NAME']


def __create_lock_dir():
    u""" ロックファイル用ディレクトリを作成します。

         該当ディレクトリ存在しない場合のみ、ディレクトリを作成します。
    """
    __lock_file_dir = __get_lock_dir_path()
    if not os.path.isdir(__lock_file_dir):
        os.mkdir(__lock_file_dir)
        # os.makedirs(LOCK_FILE_DIR)


def __get_lock_dir_path():
    u""" ロックファイル用ディレクトリのパスを取得します。

         ディレクトリが指定されていない場合は、bin/ディレクトリを返すようにしています。
    """
    __env = retrieve_env()
    __lock_file_dir = CONSTANTS[__env]['lock']['FILE_DIR']
    __bin_dir = CONSTANTS[__env]['apps']['BIN_DIR']

    # 変数が空の場合は、binディレクトリ指定して返す
    if not __lock_file_dir:
        __lock_file_dir = get_home_dir(__file__) + '/' + __bin_dir + '/'
    return __lock_file_dir


def __check_duplicate_execution():
    u""" 二重実行されていないかのチェックを行います。

         ロックファイルが既に作成されている場合は、二重実行されていると判断し、例外をスローします。
    """
    __lock_file_path = __get_lock_dir_path() + __get_lock_file_name()

    # 存在確認
    log.output('check lock file existence: ' + __lock_file_path, 'info')
    if os.path.exists(__lock_file_path):
        raise Exception('すでにロックファイルが存在します')

ホームディレクトリを返す

テストでアプリのコードをimportする際にパス通すときに使う。それ以外では特に使わない。

def get_home_dir(file):
    u""" アプリケーションのホームディレクトリを返します。
    """
    return os.path.split(os.path.dirname(os.path.abspath(file)))[0]

環境変数取得

環境変数に応じて設定値を変えたいときなどに使う。Rubyだとsettingslogicとかあるのに、Pythonではどうも見つからないので、これが欠かせない。

def retrieve_env():
    u""" 環境変数を参照し、スクリプト実行環境を判別します。

         実行時デフォルトはdefaultを返すようになっています
         「SCRIPT_ENV=production hogehoge.py」のように実行することで、
         環境に応じたコンフィグを得ることができます。
    """
    __env = 'DEFAULT'
    if 'SCRIPT_ENV' in os.environ:
        __env = os.environ['SCRIPT_ENV']
    return __env

ファイル・ディレクトリ周り

ラップしただけだけど。

def is_exist_file(filename):
    u""" ファイルが存在するかの確認をします。
    """
    return os.path.exists(filename)


def is_exist_dir(path):
    u""" ディレクトリが存在するかの確認をします。
    """
    return os.path.isdir(path)


def create_dir(path):
    u""" ディレクトリを作成します。
    """
    return os.makedirs(path)


def delete_recursive_dir(path):
    u""" ディレクトリを再帰的に削除します。
    """
    return shutil.rmtree(path)

def find_latest_created_dir(target_path):
    u""" 対象ディレクトリのなかで、
         最も最近に作られたディレクトリをフルパスで返します。
    """
    import glob
    __files = glob.glob(target_path)
    __files.sort(key=os.path.getmtime, reverse=True)
    return __files[0]

OSコマンド

こういうのはあったほうが便利ですな。ちょくちょく使う。

def exec_command(cmd='hostname'):
    u""" OSのコマンドを実行します。
         例外はキャッチしていないため、必要に応じて上位でキャッチしてください。
    """
    subproc_args = {'stdin': None,
                    'stdout': subprocess.PIPE,
                    'stderr': subprocess.PIPE,
                    'close_fds': True}
    p = subprocess.Popen(cmd.strip().split(" "), **subproc_args)
    stdout_data, stderr_data = p.communicate()
    return p.returncode, stdout_data, stderr_data

SSHクライアント

昔書いた。便利で、状況に応じて使う。

class Ssh:
    def __init__(self, user_name, remote_host, port, private_key, timeout):
        self.user = user_name
        self.host = remote_host
        self.port = port
        self.private_key = private_key
        self.timeout = timeout

    def _command_execute(self, command):
        u""" リモートホストに対し、コマンドを実行します。

             paramikoモジュール内でエラーハンドリングができなかったため、
             exec_command時に標準エラー出力があれば、例外をスローしています。
        """
        from hogehoge import log
        log.output('Command Execute: ' + command, 'debug')

        try:
            self.__create_connection()
            stdin, stdout, stderr = self.client.exec_command(command)

            stdout_msg = stdout.read()
            stderr_msg = stderr.read()

            # エラー出力があれば、例外スロー
            if stderr_msg:
                stderr_decoded = stderr_msg.decode(sys.stdout.encoding)
                log.output(stderr_decoded, 'warning')
                raise Exception

            stdout_decoded = stdout_msg.decode(sys.stdout.encoding)
            return stdout_decoded

        except paramiko.SSHException as e:
            log.output("Password is invalid:" + str(e))
        except paramiko.AuthenticationException as e:
            log.output("We had an authentication exception!" + str(e))
        except Exception as e:
            log.output("### SSH Client Exception Catched! ###" + str(e))

    def __create_connection(self):
        u""" コネクションを作成します。
        """
        self.conn = self.client.connect(
            hostname=self.host,
            port=self.port,
            username=self.user,
            key_filename=self.private_key,
            timeout=self.timeout
        )
        return self.conn

    def __enter__(self):
        u""" オブジェクト生成時にSSHクライアントを生成します。
        """
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        return self

    def __exit__(self, exec_type, exec_value, traceback):
        self.client.close()

rsync

paramikoが再帰的にSCPしてくれなかったので、書いた。

def upload_dir(host, local_dir, remote_dir):
    u""" 対象ホストにディレクトリをアップロードします。
    """
    remote = '{}:{}'.format(host, remote_dir)
    (__exit_status, __stdout, __stderr) = exec_command('rsync -av {} {}'.format(local_dir, remote))
    return __exit_status, __stdout.decode('utf-8'), __stderr.decode('utf-8')

時刻周り

あったほうが便利かな。

def get_date():
    u""" 日時を取得します。
    """
    d = datetime.datetime.today()
    return d.strftime("%Y%m%d")


def get_time():
    u""" 時刻を取得します。
    """
    d = datetime.datetime.today()
    return d.strftime("%H%M%S")

HTTP

これもたまに使う。

def send_get_request(server, path, header, timeout):
    url = PROTOCOL_PREFIX + server + path
    r = requests.get(url, headers=header, timeout=timeout)
    status_code = r.status_code
    return status_code


def send_put_request(server, path, header, timeout, data):
    url = PROTOCOL_PREFIX + server + path
    r = requests.put(url, data=data, headers=header, timeout=timeout)
    status_code = r.status_code
    return status_code


def check_status_code(method, code):
    if method == 'get':
        flg = True if code == 200 else False
    elif method == 'put':
        flg = True if (code in [201, 204]) else False
    elif method == 'post':  # For "private api test"
        flg = True if (code in [400]) else False
    else:
        sys.exit(1)
    return flg

メール

これはごくまれに使う。今は通知部分はJenkinsとかでやるし、そこまで使わないかも。

class Mail:
    def _create_connection(self):
        __conn = smtplib.SMTP(MAIL_HOST)
        #__conn.set_debuglevel(True)
        return __conn


    def __create_message(self, to, mail_subject, mail_body):
        __message = {}
        __message = MIMEText(mail_body)
        __message['From'] = MAIL_FROM
        __message['To'] = to
        __message['Subject'] = mail_subject

        return __message


    def send_mail(self, to, mail_subject, mail_body):
        __conn = self._create_connection()
        __message = self.__create_message(to, mail_subject, mail_body)

        __conn.send_message(__message)
        __conn.close()

SVN周り

昔必要になり書いた。SVNのコミットデータをうまく抽出するって地味に面倒だったりする。

def get_commit_file_name(commit_file_line):
    u"""
    """
    __commit_file_info = __extract_commit_file_name(commit_file_line)
    return __commit_file_info


def __extract_commit_file_name(commit_file_line):
    u"""
    """
    __pattern = r'(Adding|Sending)\s+(.*)'
    regex = re.compile(__pattern, re.MULTILINE)
    for match in regex.finditer(commit_file_line):
        return match.group(2)


def collect_commited_files(log_file_path):
    __commited_files = []
    #print('this is log path: %s' % log_file_path)

    for line in open(log_file_path, 'r'):
        __commited_file_name = get_commit_file_name(line)

        # マッチしたもののみ出力
        if __commited_file_name:
            __commited_files.append(__commited_file_name)

    return __commited_files

bash

logging機能とかもう少し優秀だったらいいのにと思いつつ。

ログ

function append_log()
{
    [[ "$#" -ne 1 ]] && echo "${FUNCNAME} 'an argment insufficiency'" && return 1
    local msg="${1}"
    local now_time=$( date '+%Y%m%d_%H%M%S' )
    eval 'echo "[${now_time}] ${msg}" 2>&1' | tee -a "${LOG_FILE}"

}

ロック

Pythonのやつと同様、コマンドラインツールとかだと使うかな。こちらで想定した使い方しないケースもあったりするので、事前に重複実行は避けておきたい。

function create_lock_file()
{
    touch "${LOCK_FILE}"
    local cmd_ret="${?}"
    if [ $? -eq 0 ]; then
        return "${cmd_ret}"
    else
        err_msg '[ERROR] create lock file FAILED!!'
        exit 1
    fi
}


function delete_lock_file()
{
    rm -rf "${LOCK_FILE}"
    local cmd_ret="${?}"
    if [ $? -eq 0 ]; then
        return "${cmd_ret}"
    else
        err_msg '[ERROR] delete lock file FAILED!!'
        exit 1
    fi
}


function check_lock_file()
{
    if [ -f "${LOCK_FILE}" ]; then
        err_msg "[ERROR] lock file FOUND. Dupilicate exec FAILED!![file_path: ${LOCK_FILE}]"
        exit 1
    fi
}

引数

コマンドライン引数チェックは自前ですな。

function check_opt_num()
{
    [[ "$#" -ne 2 ]] && echo "${FUNCNAME} 'an argment insufficiency'" && return 1
    local opt_num="${1}"
    local required_num="${2}"
    if [ "${opt_num}" -ne "${required_num}" ]; then
        echo '[ERROR] Insufficiency options.'
        usage
        exit 1
    fi
}

ユーザ

Python同様ユーザチェック。

function is_allow_exec_user()
{
    [[ "$#" -ne 2 ]] && echo "${FUNCNAME} 'an argment insufficiency'" && return 1
    local exec_user="${1}"
    local allow_user="${2}"
    if [ "${exec_user}" != "${allow_user}" ]; then
        err_msg "[ERROR] this user is not ALLOWED!!! [exec_user: ${exec_user}]"
        exit 1
    else
        logger "[Success]: this user is allowed. [exec_user: ${exec_user}]"
    fi
}

SSH

これも必要に応じて使う感じ。例外処理めんどいからあまりbashではやりたくないのだけれど、しょうがなかった。

function exec_cmd_with_ssh()
{
    [[ "$#" -ne 3 ]] && echo "${FUNCNAME} 'an argment insufficiency'" && return 1
    local user="${1}"
    local host="${2}"
    local cmd="${3}"

    ssh -n "${user}"@"${host}" "${cmd}"

    # Below if statemet is worked for 'shebang is [/bin/bash] only'
    # if you use -eu option, it does not work.
    local cmd_ret="${?}"
    if [ $cmd_ret -eq 0 ]; then
        return 0
    else
        err_msg '[ERROR] "Invalid cmd return" was called...'
        return 1
    fi
}

メール

これも昔必要があって、作った。

function send_message
{
    [[ "$#" -ne 3 ]] && echo "${FUNCNAME} 'an argment insufficiency'" && return 1
    local mail_to="${1}"
    local mail_subject="${2}"
    local mail_content="${3}"

    eval 'echo "${mail_content}" | /bin/mail -s "${mail_subject}" "${mail_to}"'
    if [ "$?" -ne 0 ]; then
        echo '[ERROR] send mail FAILED!!'
        exit 1
    fi
}

Ruby

Jenkinsはchangelogから差分抽出できるのでよいですね。

Jenkins

  def extract_commited_files_from_changelog()
    commit_files_list = []
    changelog = "#{ENV['JENKINS_HOME']}/jobs/#{ENV['JOB_NAME']}/builds/#{ENV['BUILD_NUMBER']}/changelog.xml"
    begin
      fh = open(changelog,'r')
      fh.each do | line |
        if /^:/ =~ line
          # extract commited file_path
          commit_info = line.strip.split(/\s+/)
          if commit_info[5].nil?
            raise 'changelog parse error. cannot get filename correctly'
          else
            commit_files_list.push commit_info[5]
          end
        end
      end
    rescue Errno::ENOENT => e
      raise "[critical] cannot open: #{changelog}"
      exit(1)
    ensure
      fh.close
    end
    commit_files_list.sort.uniq
  end

HTTP

リトライ付き。あると便利。

class HttpClient
  def get_url(base_url, request_path)
    begin
      failed ||= 0
      timeout(10) do
        response_status = Net::HTTP.get_response(base_url, request_path)
        validate_status_code(response_status.code, request_path)
      end
      rescue Timeout::Error
        puts "Timeout Occur! URL: #{base_url}#{request_path}"
      rescue RuntimeError
        failed += 1
        puts "status code error: retry is started"
        retry if failed < 3
    end
  end

  def validate_status_code(code, request_path)
    if code.to_i == 200 || code.to_i == 404 || code.to_i == 400
      #puts "#{request_path}, #{code}"
    else
      raise "status code is NOT 200, but #{code}, #{request_path}"
    end
  end
end

終わりに

こうやってみると、スタイルもバラバラだし、コマンドラインに特化したものばかりがどうしても多いなという印象。 例外とかコメントがもう少し綺麗に書けるといいなぁ。あとから読むとどうも分かりづらいので、がんばろう。