마크베이스 collector는 로그 데이터를 수집하여 분석하고 마크베이스 서버에 전송한다. 데이터 수집 및 분석 기능 외에 추가적인 데이터 처리를 위해 마크베이스 collector는 python을 이용한 데이터 전처리 프레임워크를 제공한다.
Preprocessing framework으로 python 2.6 버전을 사용한다. 이 버전의 python은 마크베이스 서버와 같이 설치된 것을 사용하는 것을 추천한다. 설치된 python은 $MACH_COLLECTOR_HOME/webadmin/flask/Python/bin 경로에 있다. Python 라이브러리 추가 설치를 위한 python 실행도 위 디렉토리에서 실행해야 기존에 설치되어 있는 다른 버전의 python과 충돌을 방지할 수 있다. 마크베이스 collector와 같이 제공된 python을 기본으로 사용하려면 PATH 환경변수를 정확히 설정하고, USER_PREPROCESS_LIB_PATH를 설정하여야 한다. USER_PREPROCESS_LIB_PATH에 경로를 추가로 등록할 때 경로 값의 분리를 위해 경로들 사이에 ":" 문자를 넣어줘야 한다.
로그 데이터를 변환 및 조작하기 위한 preprocessor 실행 순서를 기술한다.
원본 로그 데이터 파일에 데이터가 입력되면 각 로그 데이터는 로그 단위들로 분리된다. 가령 로그 데이터를 origin_msg라고 하자. 각 origin_msg는 한번에 한 단계씩 처리 과정을 거친다. 예를 들어 입력된 첫번째 메시지가 "Aug 19 15:37:12 localhost NetworkManager[1340]: (eth1): bringing up device." 라고 하면, 입력된 origin_msg는 정규 표현식에 의해 토큰들로 분리된다. 이를 메시지 파싱이라고 한다. 메시지 파싱 이전에 origin_msg를 전처리 할 수 있다. 전처리 스크립트를 이용해 origin_msg를 변경하는 경우에는 변경된 결과 메시지가 파싱 가능한 메시지여야 한다.
로그 메시지를 파싱한 이후, 결과 토큰값들이 생성된다. 아무런 처리 과정이 없다면 이 값이 데이터베이스에 저장된다. 파싱된 토큰들을 데이터베이스에 전달하기 전에 두번째 단계의 전처리 과정을 실행할 수 있다. 이때 rgx파일에 기술된 필드명을 이용하여 변경하거나 이를 이용할 수 있다. rgx파일에 기술된 데이터형과 다른 타입으로 토큰을 변경하면 에러가 발생할 수 있다.
자세한 내용은 샘플 스크립트를 참조하라.
전처리 스크립트는 Python으로 작성하여야 한다. 쉽게 사용하려면 "custom.py"파일을 원하는 형태로 변경하는 것을 추천한다.
PRS_SUCCESS = ( 0, None ) PRS_SUCCESS_INFO = ( 1, "Info Msg") PRS_SUCCESS_SKIP = ( 2, None ) PRS_FAILURE = (-1, "Error Msg" ) class mach_preprocess: def __init__(self): return def mach_msg_preprocess(self, dict): return PRS_SUCCESS; def mach_column_preprocess(self, dict): return PRS_SUCCESS; def __del__(self): return |
전처리 스크립트의 실행 결과를 collector에 전달하기 위해서 리턴값을 이용한다. collector가 전처리 스크립트를 실행한 이후에 참조하는 결과값은 (code, message)의 튜플 타입이다.
PRS_SUCCESS = ( 0, None ) PRS_SUCCESS_INFO = ( 1, "Info Msg") PRS_SUCCESS_SKIP = ( 2, None ) PRS_FAILURE = (-1, "Error Msg" ) |
PRS_SUCCESS, PRS_SUCCESS_INFO, PRS_SUCCESS_SKIP, PRS_FAILURE는 collector가 참조하는 결과값이다. 리턴값이 PRS_SUCCESS인 경우에는 collector는 정상적으로 데이터를 입력한다. 리턴값이 PRS_SUCCSS_INFO인 경우에는 데이터를 정상적으로 처리하고 전달된 메시지를 trc 파일에 기록한다. PRS_SUCCESS_SKIP인 경우에는 해당 데이터를 버리고 새로운 데이터 처리를 시작하고, PRS_FAILURE인 경우에는 에러 메시지를 trc 파일에 기록하고 다음 메시지를 처리한다.
데이터 처리의 제어를 위해서는 PRS_SUCCESS, PRS_SUCCESS_SKIP을 이용하고, 메시지를 trc에 남기기 위해서는 PRS_SUCCESS_INFO나 PRS_FAILURE를 이용하면 된다.
마크베이스 collector는 python언어로 작성된 사전 정의된 클래스의 함수를 호출하여 전처리를 수행한다. 아래의 예제를 보면 클래스의 각 함수와 "dict" 매개변수와 리턴값에 대해서 알 수 있다. 클래스 명이나 함수 이름을 바꾸면 실행되지 않는다. 따라서 작성시 유의하여야 한다.
class mach_preprocess: def __init__(self): return def mach_msg_preprocess(self, dict): return PRS_SUCCESS; def mach_column_preprocess(self, dict): return PRS_SUCCESS; def __del__(self): return |
사전 정의된 클래스명은 "mach_preprocess"이다. 매개변수는 메서드를 호출할 때 "self" 인스턴스로 전달된다. __init__과 __del__은 python언어의 기본 객체 생성자/제거자이다. 따라서 __init__은 collector의 프로세스가 생성될 때 호출되고, __del__은 collector가 종료될 때 호출된다. __init__에서 변수들을 초기화하고, __del__에서 할당한 자원을 해제할 수 있다. 이 두 메서드는 리턴값이 없다.
데이터 전처리에 호출되는 메서드는 "mach_msg_preprocess"와 "mach_column_preprocess"이다. 각 메서드의 설명은 아래와 같다.
이 메서드는 입력 메시지가 토큰으로 분리되기 전에 호출된다. 메시지를 파싱하기 전에 실행되므로, 전달되는 값은 컬렉터 관련 메타 데이터와 원본 메시지인 "origin_msg"이다. 컬렉터 메타 데이터는 테이블명, 컬렉터 타입, 현재 실행중인 컬렉터의 이름과 옵셋이다. 이 정보들은 참조 정보로 제공되어 변경하더라도 컬렉터에 반영되지는 않는다. "origin_msg"는 변경되면 컬렉터에 변경사항이 반영된다. rgx파일에 설정된 정규표현식을 통과하지 못하도록 메시지를 변경하면, 이후 파싱 과정에서 오류가 발생할 수 있다.
Table 1. Dictionary
Key | Value | Changes applied or not |
---|---|---|
table_name | Name of table | X |
collect_type | Types of data collection | X |
collector_name | Name of currently operating collector | X |
data_source | Path of data collection | X |
origin_msg | Log of raw data | O |
필요없는 메시지는 PRS_SUCCESS_SKIP을 리턴함으로써 이후 파싱 과정을 생략하여 처리를 빠르게 할 수 있다. 필요없는 메시지를 이 단계에서 파악할 수 있다면 먼저 PRS_SUCCESS_SKIP으로 처리하는 것이 좋다.
이 메서드는 입력 메시지를 파싱한 이후 토큰으로 분해된 값을 데이터베이스에 입력하기 전에 호출된다. "mach_msg_preprocess"와 같이 전달된 메타데이터는 컬렉터에 반영되지 않는다.
Table 2. Dictionary
Key | Value | Changes applied or not |
---|---|---|
table_name | Name of table | X |
collect_type | Types of data collection | X |
collect_name | Name of currently operating collector | X |
data_source | Path of data collection | X |
origin_msg | Log of raw data | X |
column_name | nth of column tokens | O |
기본으로 제공되는 샘플은 syslog파일에 대한 것이며 $MACH_COLLECTOR_HOME/collector 디렉토리에 있다. 샘플 템플릿인 syslog.tpl에서 전처리를 수행하는 방법을 살펴보자.
############################################################################### Copyright of this product 2013-2023, Machbase Inc. or its subsidiaries. All Rights reserved ############################################################################### # This file is for Machbase collector template file. # ################################################################### Collect setting ################################################################### COLLECT_TYPE=FILE LOG_SOURCE=/var/log/syslog ################################################################### Process setting ################################################################### REGEX_PATH=syslog.rgx PREPROCESS_PATH=script_path ################################################################### Output setting ################################################################### DB_TABLE_NAME = "syslogtable" DB_ADDR = "127.0.0.1" DB_PORT = 5656 DB_USER = "SYS" DB_PASS = "MANAGER" # 0: Direct insert 1: Prepared insert 2: Append APPEND_MODE=2 # 0: None, just append. 1: Truncate. 2: Try to create table. If table already exists, warn it and proceed. 3: Drop and create. CREATE_TABLE_MODE=2 |
전처리 스크립트 파일의 위치를 지정하기 위해서, PREPROCESS_PATH를 tpl 파일에 설정한다. 경로명은 절대 경로 (/로 시작되는 경로)를 지정하거나 $MACH_COLLECTOR_HOME/collector/preprocess의 기본 경로(파일명만 지정한 경우)가 된다.
입력 메시지를 검사하여 특정한 단어가 있는 경우에 이를 입력하지 않는 스크립트이다. 컬렉터 템플릿 파일에 PREPROCESS_PATH=skip.py를 설정하면 된다. 경로명을 지정하지 않았으므로, $MACH_COLLECTOR_HOME/collector/preprocess/디렉토리에 그 파일을 작성해야 한다.
PRS_SUCCESS = ( 0, None ) PRS_SUCCESS_INFO = ( 1, "Info Msg" ) PRS_SUCCESS_SKIP = ( 2, None ) PRS_FAILURE = (-1, "Error Msg" ) class mach_preprocess: def __init__(self): return def mach_msg_preprocess(self, dict): if dict['origin_msg'].find("CMD") is not -1: <== Search "CMD" return PRS_SUCCESS_SKIP <== Skip if "CMD" is included else: return PRS_SUCCESS; def mach_column_preprocess(self, dict): return PRS_SUCCESS; def __del__(self): return #Test code if __name__ == "__main__": pre_obj = mach_preprocess() dict = {"origin_msg":"Jul 16 07:09:01 mach-Precision-T1700 CRON[1220]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"} print pre_obj.mach_msg_preprocess(dict) dict = {"origin_msg":"Jul 16 07:39:31 mach-Precision-T1700 cracklib: no dictionary update necessary."} print pre_obj.mach_msg_preprocess(dict) |
파싱과정을 실행하지 않은 "mach_msg_preprocess" 메서드에서 "origin_msg"매개변수에 "CMD" 문자열이 있는지 검사하여 있다면 그 메시지를 스킵하도록 설정한 예제이다. "if __name__ == "__main__"이후의 소스라인은 스크립트가 정상적으로 동작하는지 테스트하기 위해서 작성한 코드이다. 관련 내용은 Script Test 부분을 참고하라.
파싱을 거친 이후에 msg 칼럼에 "CRON"이라는 문자열이 있는 경우, 그것을 "cron-exectue"문자열로 변환하는 예제이다. 이 또한 $MACH_COLLECTOR_HOME/collector/preprocess/ 디렉토리의 replace.py 파일을 tpl 파일에서 다음과 같이 지정하면 실행된다.
PREPROCESS_PATH=replace.py
PRS_SUCCESS = ( 0, None ) PRS_SUCCESS_INFO = ( 1, "Info Msg" ) PRS_SUCCESS_SKIP = ( 2, None ) PRS_FAILURE = (-1, "Error Msg" ) class mach_preprocess: def __init__(self): return def mach_msg_preprocess(self, dict): return PRS_SUCCESS; def mach_column_preprocess(self, dict): dict['msg'] = dict['msg'].replace("CRON", "cron-execute") <== Replace sentence return PRS_SUCCESS; def __del__(self): return #Test code if __name__ == "__main__": pre_obj = mach_preprocess() dict = {"tm":"Jul 16 07:39:01", "host":"mach-Precision-T1700", "msg":"CRON[1377]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"} (code, msg) = pre_obj.mach_column_preprocess(dict); if code >= 0: print dict else: print msg |
입력된 원본 메시지는 파싱 과정을 거쳐 토큰으로 분리된다. 이 토큰은 mach_column_ preprocess메서드에서 처리할 수 있다. 위의 예제는 "CRON"문자열을 "cron-execute"로 변환하는 예제이다. "if name == "__main__"이하의 코드는 스크립트 실행을 디버깅하기 위한 것이다.
TRACE 스크립트는 입력 데이터를 "mach_msg_preprocess" 및 "mach_column_preprocess"메서드에서 파일에 기록하는 것이다. PREPROCESS_PATH=trace.py를 tpl 파일에 추가하고, 해당 스크립트 파일을 $MACH_COLLECTOR_HOME/collector/preprocess 디렉토리에 작성해 두면 동작한다.
PRS_SUCCESS = ( 0, None ) PRS_SUCCESS_INFO = ( 1, "Info Msg" ) PRS_SUCCESS_SKIP = ( 2, None ) PRS_FAILURE = (-1, "Error Msg" ) class mach_preprocess: def __init__(self): self.msg_file = open("/tmp/msg.log", 'a') <== Define file variable self.column_file = open("/tmp/column.log", 'a') return def mach_msg_preprocess(self, dict): self.msg_file.write(str(dict)+"\n"); <== Write input argument in a file self.msg_file.write("\n"); return PRS_SUCCESS; def mach_column_preprocess(self, dict): self.column_file.write(str(dict)+"\n"); self.column_file.write("\n"); return PRS_SUCCESS; def __del__(self): self.msg_file.close() <== Free file variable self.column_file.close() return #Test code if __name__ == "__main__": pre_obj = mach_preprocess() dict = {"origin_msg":"Jul 16 06:39:01 mach-Precision-T1700 CRON[1149]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && ] [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"} pre_obj.mach_msg_preprocess(dict) dict = {"origin_msg":"Jul 16 06:39:01 mach-Precision-T1700 CRON[1149]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))", "tm":"Jul 16 06:39:01", "host":"mach-Precision-T1700", "msg":"CRON[1149]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"} pre_obj.mach_column_preprocess(dict) |
init 및 del 이 시작 및 종료시에 실행되는 점을 이용하여 collector실행시 msg_file, column_file 객체를 생성 및 개방하며, 종료시 각 파일을 닫도록 할 수 있다. 이 두개의 변수는 객체의 다른 메서드에서 접근할 수 있다.
if __name__ == "__main__" 이하의 코드는 스크립트가 정상적으로 동작하는지를 테스트하는 코드이다.
아래의 ODBC 스크립트는 검색 키를 데이터베이스에서 검색하여 검색 키가 입력 메시지에 존재한다면, 그 값을 지정된 테이블에 입력하는 예제이다.
이 예제를 동작시키는 방법은 PREPROCESS_PATH값을 템플릿 파일에 지정하는 것으로 기존과 동일하다.
이 예제에서 pypyodbc가 사용되었다. 컬렉터 스크립트에서 ODBC를 사용하려면 pypyodbc를 사용하여야 한다.
import pypyodbc PRS_SUCCESS = ( 0, None ) PRS_SUCCESS_INFO = ( 1, "Info Msg" ) PRS_SUCCESS_SKIP = ( 2, None ) PRS_FAILURE = (-1, "Error Msg" ) class mach_preprocess: def __init__(self): self.con = pypyodbc.connect("DSN=MYSQL") <== Predefined MySQL DSN. self.cursor = self.con.cursor() self.table_name = "error_msg" self.test_data_make(); <== Create random data. return def mach_msg_preprocess(self, dict): return PRS_SUCCESS; def mach_column_preprocess(self, dict): result = self.cursor.execute("select code, msg from %s where code = %d"%(self.table_name, int(dict['code_type']))) <== Search related data from a table. if result is not None: <== When search result exists. dict['code_type'] = result.fetchall()[0][1] <== Replace numbers with strings. else: print "failure "+str(dict) return PRS_SUCCESS; def __del__(self): self.cursor.close() self.con.close() return #for test def test_data_make(self): self.table_check("create table %s (code integer, msg varchar(255))"); return def table_check(self, query): self.tables = self.cursor.tables().fetchall() self.table_list = [] for (db, user, table, info, none) in self.tables: self.table_list.append(table.upper()) if self.table_name.upper() in self.table_list: <== If a table exists, delete it and create a table again. self.cursor.execute("drop table %s"%self.table_name) self.cursor.execute(query%self.table_name); self.insert_error_msg() self.cursor.commit() return def insert_error_msg(self): <== Store numeric code and message in a table. error = ((0, "SUCCESS"), (1, "SUCCESS_WITH_INFO"), (-1, "FAILURE")) for (code, msg) in error: self.cursor.execute("insert into %s values ( %d, '%s')"%(self.table_name, code, msg)) return # Test code if __name__ == "__main__": pre_obj = mach_preprocess() pre_obj.test_data_make() dict = {"tm":"Jul 16 07:39:01","host":"mach-Precision-T1700","msg":"CRON[1377]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))","code_type":"-1"} pre_obj.mach_column_preprocess(dict) print dict dict = {"tm":"Jul 15 11:31:54","host":"mach-Precision-T1700","msg":"NetworkManager[1340]: <error> [1405391514.205040] [nm-system.c:768] nm_system_iface_get_flags(): (unknown): failed to get interface link object","code_type":"0"} pre_obj.mach_column_preprocess(dict) print dict dict = {"tm":"Jul 15 11:31:54","host":"mach-Precision-T1700","msg":"NetworkManager[1340]: <warn> sysctl: failed to open '/proc/sys/net/ipv6/conf/eth1/use_tempaddr': (2) No file in directory","code_type":"1"} |
임포트된 pypyodbc는 기본 모듈이 아니므로 미리 설치할 필요가 있다. $MACH_COLLECTOR_HOME/webadmin/flask/Python/bin/python경로에서 pypyodbc를 설치하여야 한다. 설치 경로는 "$MACH_COLLECTOR_HOME/webadmin/flask/Python/lib/python2.7/site-packages/pypyodbc-1.3.3-py2.7.egg"이다. 그 다음 임포트 모듈의 경로를 설정하여야 한다. 경로 설정은 두가지 방법이 있다. 첫번째 방법은 sys module의 경로를 수정하는 것이며 다른 방법은 collector에게 모듈 경로를 제공하는 것이다. 두번째 방법을 설명하면, 환경변수 USER_PREPROCESS_LIB_PATH 를 설정하는 것이다. 여러개의 PATH를 추가하려면 ":"문자로 경로들을 연결하면 된다. pypyodbc모듈을 사용하려면 다음의 환경변수 설정을 이용하라.
export USER_PREPROCESS_LIB_PATH=$MACH_COLLECTOR_HOME/webadmin/flask/Python/lib/python2.7/site-packages/pypyodbc-1.3.3-py2.7.egg |
위 ODBC샘플에서 사용할 pypyodbc 모듈을 지정된 환경변수에서 로딩할 수 있다.
새로운 스크립트를 작성한 이후, 스크립트가 정확히 동작하는지 확인해야 한다. 스크립트 테스트를 위한 방법으로, 직접 실행 방법과 간접 실행 방법이 있다. 직접 실행 방법은 스크립트가 테스트를 직접 실행하는 것이며 간접실행방법은 그 스크립트를 임포트하여 테스트 하는 것이다.
샘플 전처리 스크립트 하단에 추가되어 있는 테스트 코드들은 preprocessing 클래스 객체를 직접 호출하여 결과를 확인한다. 아래 예제는 skip.py 스크립트의 테스트 코드이다.
if __name__ == "__main__": pre_obj = mach_preprocess() dict = {"origin_msg":"Jul 16 07:09:01 mach-Precision-T1700 CRON[1220]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"} print pre_obj.mach_msg_preprocess(dict) dict = {"origin_msg":"Jul 16 07:39:31 mach-Precision-T1700 cracklib: no dictionary update necessary."} print pre_obj.mach_msg_preprocess(dict) |
테스트 스크립트는 "__name__ == "__main__":" 으로 시작한다. Python 인터프리터로 이 스크립트를 바로 실행할 경우 __name__변수를 __main__으로 설정하므로 테스트 코드를 실행한다. collector에서 호출될 경우에는 위 테스트 코드가 실행되지 않는다.
테스트 코드의 실행 과정을 살펴보면 먼저 mach_process() 함수의 호출로 프리프로세스 객체인 pre_obj를 생성한다. 이 때, __init__메서드가 호출된다. mach_msg_preprocess에 전달되는 매개변수인 dict를 설정한 후, 메서드를 호출하여 테스트를 수행한다. dict 값을 개발자가 설정해야 하지만, 실제 스크립트에 전달되는 값을 알기 어려운 경우, "trace.py" 스크립트를 이용하여 메서드에 전달되는 값을 얻어서 테스트 할 수 있다. trace.py 스크립트가 생성하는 데이터는 /tmp/msg.log 및 /tmp/column.log에 기록된다.
이미 생성한 전처리 스크립트를 import를 통해서 읽어들여서 실행하는 방법이다.
import skip <== Import ready-made scripts if __name__ == "__main__": pre_obj = skip.mach_preprocess() <== When creating a class, call mach_preprocess in the script. dict = {"origin_msg":"Jul 16 07:09:01 mach-Precision-T1700 CRON[1220]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"} print pre_obj.mach_msg_preprocess(dict) dict = {"origin_msg":"Jul 16 07:39:31 mach-Precision-T1700 cracklib: no dictionary update necessary."} print pre_obj.mach_msg_preprocess(dict) |