Collector Preprocessing Framework
Preprocessing Order
Describes a preprocessing execution order for converting and manipulating log data.
1. Message Preprocessing
When data is input to the original log data file, each log data is separated into log units.
Let's say the log data is origin_msg. Each origin_msg is processed one step at a time.
- For example, if the first message entered is "Aug 19 15:37:12 localhost NetworkManager [1340]: (eth1): bringing up device.",
the input origin_msg is separated into tokens by a regular expression. This is called message parsing. - You can preprocess origin_msg before message parsing.
- If you change the origin_msg using the preprocessing script, the modified result message must be a message able to be parsed.
2. Column Preprocessing
After parsing the log message, the resulting token values are generated. If there is no processing, this value is stored in the database.
The preprocessing of the second stage can be executed before passing the parsed tokens to the database.
At this time, the field name described in the rgx file can be used to change or use it. Changing the token to a different type than the data type described in the rgx file may cause an error.
For more information, refer to the example script below.
Preprocessing Script
The preprocessing script must be written in Python. For ease of use, we recommend that you change the "custom.py" file to the desired format.
PRS_SUCCESS = ( 0, None ) PRS_SUCCESS_INFO = ( 1, "Info Msg") PRS_SUCCESS_SKIP = ( 2, None ) PRS_FAILURE = (-1, "Error Msg" ) class mach_preprocess: def __init__(self): return def mach_msg_preprocess(self, dict): return PRS_SUCCESS; def mach_column_preprocess(self, dict): return PRS_SUCCESS; def __del__(self): return
Defining Result Value
The return value is used to pass the execution result of the preprocessing script to the collector. The result values (code, message) that the collector refers to after executing the preprocessing script are of the tuple type.
PRS_SUCCESS = ( 0, None ) PRS_SUCCESS_INFO = ( 1, "Info Msg") PRS_SUCCESS_SKIP = ( 2, None ) PRS_FAILURE = (-1, "Error Msg" )
Here PRS_SUCCESS, PRS_SUCCESS_INFO, PRS_SUCCESS_SKIP, PRS_FAILURE are the results that the collector refers to.
- If the result is PRS_SUCCESS, the collector normally enters data.
- If the result is PRS_SUCCSS_INFO, the data is processed normally and the transferred message is written to the trc file.
- If the result is PRS_SUCCESS_SKIP, the data is discarded and new data processing is started.
- If the result is PRS_FAILURE, an error message is written to the trc file and the next message is processed.
PRS_SUCCESS_SUCCESS and PRS_SUCCESS_SKIP result value are used for data processing control, and PRS_SUCCESS_INFO or PRS_FAILURE result value is used for leaving message in trc.
Class Definition
The Machbase collector performs preprocessing by calling functions of predefined classes written in the python language.
The following example shows each function in the class and the "dict" parameter and return value.
Will not run if the class name or function name is changed. Therefore, it must be created with caution.
class mach_preprocess: def __init__(self): return def mach_msg_preprocess(self, dict): return PRS_SUCCESS; def mach_column_preprocess(self, dict): return PRS_SUCCESS; def __del__(self): return
The predefined class name is "mach_preprocess".
- Parameters are passed as "self" instances when the method is called.
- __init__ and __del__ are default object constructors/removers for the python language.
So __init__ is called when the collector's process is created, and __del__ is called when the collector is finished. - You can initialize variables in __init__ and release the resources allocated by __del__.
These two methods have no return value.
The methods called for data preprocessing are "mach_msg_preprocess" and "mach_column_preprocess".
Each method is described below.
mach_msg_preprocess
This method is called before the input message is separated into tokens.
Since the message is executed before parsing, the value passed is the collector related metadata and the original message "origin_msg".
The collector metadata is the table name, the collector type, and the name and offset of the currently running collector. This information is provided as reference information and is not reflected in the collector even if it is changed.
When "origin_msg" is changed, the changes are reflected in the collector.
If you change the message so that it does not pass the regular expression set in the rgx file, errors may occur during parsing.
Key | Description | Can Changes Be Reflected |
---|---|---|
table_name | Table name | X |
collect_type | Collector type | X |
collector_name | Name of currently running collector | X |
data_source | Source file path to be data source | X |
origin_msg | Raw data message of source file | O |
An unnecessary message can return PRS_SUCCESS_SKIP, which can be processed faster by omitting the parsing process later.
If you can identify an unwanted message at this stage, you should first treat it as PRS_SUCCESS_SKIP.
mach_column_preprocess
This method is called before parsing the input message and entering the token decomposed value into the database.
Like "mach_msg_preprocess", the transferred metadata is not reflected in the collector.
Key | Description | Can Changes Be Reflected |
---|---|---|
table_name | Table name | X |
collect_type | Collector type | X |
collect_name | Name of currently running collector | X |
data_source | Source file path to be data source | X |
origin_msg | Raw data message of source file | X |
column_name | nth column token | O |
Example Script
The default example is for syslog files and is in the $MACH_COLLECTOR_HOME/collector
directory.
Let's look at how to perform preprocessing in the example template syslog.tpl.
############################################################################### Copyright of this product 2013-2023, Machbase Inc. or its subsidiaries. All Rights reserved ############################################################################### # This file is for Machbase collector template file. # ################################################################### Collect setting ################################################################### COLLECT_TYPE=FILE LOG_SOURCE=/var/log/syslog ################################################################### Process setting ################################################################### REGEX_PATH=syslog.rgx PREPROCESS_PATH=script_path ################################################################### Output setting ################################################################### DB_TABLE_NAME = "syslogtable" DB_ADDR = "127.0.0.1" DB_PORT = 5656 DB_USER = "SYS" DB_PASS = "MANAGER" # 0: Direct insert 1: Prepared insert 2: Append APPEND_MODE=2 # 0: None, just append. 1: Truncate. 2: Try to create table. If table already exists, warn it and proceed. 3: Drop and create. CREATE_TABLE_MODE=2
To specify the location of the preprocessing script file, set PREPROCESS_PATH in the tpl file. The pathname specifies an absolute path (a path starting with /) or a default path (if only a file name is specified) of $MACH_COLLECTOR_HOME/collector/preprocess.
SKIP
This is a script that examines an input message and does not enter a specific word if it exists.
You can set PREPROCESS_PATH = skip.py in the collector template file.
Since a pathname was not specified, the file needs to be created in the $MACH_COLLECTOR_HOME/collector/preprocess/
directory.
PRS_SUCCESS = ( 0, None ) PRS_SUCCESS_INFO = ( 1, "Info Msg" ) PRS_SUCCESS_SKIP = ( 2, None ) PRS_FAILURE = (-1, "Error Msg" ) class mach_preprocess: def __init__(self): return def mach_msg_preprocess(self, dict): if dict['origin_msg'].find("CMD") is not -1: <== String search "CMD" return PRS_SUCCESS_SKIP <== Omit if string does not contain "CMD" else: return PRS_SUCCESS; def mach_column_preprocess(self, dict): return PRS_SUCCESS; def __del__(self): return #Test code if __name__ == "__main__": pre_obj = mach_preprocess() dict = {"origin_msg":"Jul 16 07:09:01 mach-Precision-T1700 CRON[1220]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"} print pre_obj.mach_msg_preprocess(dict) dict = {"origin_msg":"Jul 16 07:39:31 mach-Precision-T1700 cracklib: no dictionary update necessary."} print pre_obj.mach_msg_preprocess(dict)
This is an example of setting the "origin_msg" parameter in the "mach_msg_preprocess" method, which has not been parsed, to skip the message if it checks for the "CMD" string.
The source line after "if __name__ ==" __main__ " is the code written to test whether the script works properly.
For more information, see the 'Preprocessor Script Test' below.
REPLACE
This is an example of converting the string "CRON" to the string "cron-exectue" if the msg column is parsed after parsing.
This is also done by specifying the $MACH_COLLECTOR_HOME/collector/preprocess/
directory's replace.py file in the tpl file as:
PREPROCESS_PATH=replace.py
PRS_SUCCESS = ( 0, None ) PRS_SUCCESS_INFO = ( 1, "Info Msg" ) PRS_SUCCESS_SKIP = ( 2, None ) PRS_FAILURE = (-1, "Error Msg" ) class mach_preprocess: def __init__(self): return def mach_msg_preprocess(self, dict): return PRS_SUCCESS; def mach_column_preprocess(self, dict): dict['msg'] = dict['msg'].replace("CRON", "cron-execute") <== Replace "CRON" with "cron-execute" return PRS_SUCCESS; def __del__(self): return #Test code if __name__ == "__main__": pre_obj = mach_preprocess() dict = {"tm":"Jul 16 07:39:01", "host":"mach-Precision-T1700", "msg":"CRON[1377]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"} (code, msg) = pre_obj.mach_column_preprocess(dict); if code >= 0: print dict else: print msg
The original message is parsed and separated into tokens, which can be processed by the mach_column_ preprocess method.
The above example is an example of converting "CRON" string to "cron-execute". The code following "if name ==" __main__ " is for debugging script execution.
TRACE
The TRACE script writes the input data to a file in the "mach_msg_preprocess" and "mach_column_preprocess" methods.
It works by adding PREPROCESS_PATH = trace.py to the tpl file and writing the script file to the $MACH_COLLECTOR_HOME/collector/preprocess
directory.
PRS_SUCCESS = ( 0, None ) PRS_SUCCESS_INFO = ( 1, "Info Msg" ) PRS_SUCCESS_SKIP = ( 2, None ) PRS_FAILURE = (-1, "Error Msg" ) class mach_preprocess: def __init__(self): self.msg_file = open("/tmp/msg.log", 'a') <== Specify file self.column_file = open("/tmp/column.log", 'a') return def mach_msg_preprocess(self, dict); self.msg_file.write(str(dict)+"\n"); <== Write factor value to file self.msg_file.write("\n"); return PRS_SUCCESS; def mach_column_preprocess(self, dict): self.column_file.write(str(dict)+"\n"); self.column_file.write("\n"); return PRS_SUCCESS; def __del__(self): self.msg_file.close() Close file self.column_file.close() return #Test code if __name__ == "__main__": pre_obj = mach_preprocess() dict = {"origin_msg":"Jul 16 06:39:01 mach-Precision-T1700 CRON[1149]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && ] [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"} pre_obj.mach_msg_preprocess(dict) dict = {"origin_msg":"Jul 16 06:39:01 mach-Precision-T1700 CRON[1149]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))", "tm":"Jul 16 06:39:01", "host":"mach-Precision-T1700", "msg":"CRON[1149]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"} pre_obj.mach_column_preprocess(dict)
You can create and open the msg_file, column_file object when the collector is executed, and close each file at the end using the point where init and del are executed at the start and end.
These two variables can be accessed by other methods of the object. The code following "if __name__ == "__main__" tests the script to see if it works properly.
ODBC
The following ODBC script retrieves the search key from the database and enters the value into the specified table if the search key is present in the input message.
The way to run this example is the same as the previous one by assigning the PREPROCESS_PATH value to the template file.
In this example, pypyodbc is used. To use ODBC in the collector script, you must use pypyodbc.
The imported pypyodbc is not a basic module and needs to be installed in advance.
You should install pypyodbc in the $MACH_COLLECTOR_HOME/webadmin/flask/Python/bin/python
path.
The installation path is $MACH_COLLECTOR_HOME/webadmin/flask/Python/lib/python2.7/site-packages/pypyodbc-1.3.3-py2.7.egg
.
Then you must set the path of the import module. There are two ways to set the path.
- The first way is to modify the path of the sys module or provide the module path to the collector.
- The second way is to set the environment variable USER_PREPROCESS_LIB_PATH .
export USER_PREPROCESS_LIB_PATH=$MACH_COLLECTOR_HOME/webadmin/flask/Python/lib/python2.7/site-packages/pypyodbc-1.3.3-py2.7.egg
import pypyodbc PRS_SUCCESS = ( 0, None ) PRS_SUCCESS_INFO = ( 1, "Info Msg" ) PRS_SUCCESS_SKIP = ( 2, None ) PRS_FAILURE = (-1, "Error Msg" ) class mach_preprocess: def __init__(self): self.con = pypyodbc.connect("DSN=MYSQL") <== Input pre-declared MySQL DNS value self.cursor = self.con.cursor() self.table_name = "error_msg" self.test_data_make(); <== Generate random data return def mach_msg_preprocess(self, dict): return PRS_SUCCESS; def mach_column_preprocess(self, dict): result = self.cursor.execute("select code, msg from %s where code = %d"%(self.table_name, int(dict['code_type']))) <== Acquire relevant data from table if result is not None: <== When result value exists dict['code_type'] = result.fetchall()[0][1] <== Replace related data else: print "failure "+str(dict) return PRS_SUCCESS; def __del__(self): self.cursor.close() self.con.close() return #for test def test_data_make(self): self.table_check("create table %s (code integer, msg varchar(255))"); return def table_check(self, query): self.tables = self.cursor.tables().fetchall() self.table_list = [] for (db, user, table, info, none) in self.tables: self.table_list.append(table.upper()) if self.table_name.upper() in self.table_list: <== Delete table and create new one if table already exists. self.cursor.execute("drop table %s"%self.table_name) self.cursor.execute(query%self.table_name); self.insert_error_msg() self.cursor.commit() return def insert_error_msg(self): <== Replace code and message error = ((0, "SUCCESS"), (1, "SUCCESS_WITH_INFO"), (-1, "FAILURE")) for (code, msg) in error: self.cursor.execute("insert into %s values ( %d, '%s')"%(self.table_name, code, msg)) return # Test code if __name__ == "__main__": pre_obj = mach_preprocess() pre_obj.test_data_make() dict = {"tm":"Jul 16 07:39:01","host":"mach-Precision-T1700","msg":"CRON[1377]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))","code_type":"-1"} pre_obj.mach_column_preprocess(dict) print dict dict = {"tm":"Jul 15 11:31:54","host":"mach-Precision-T1700","msg":"NetworkManager[1340]: <error> [1405391514.205040] [nm-system.c:768] nm_system_iface_get_flags(): (unknown): failed to get interface link object","code_type":"0"} pre_obj.mach_column_preprocess(dict) print dict dict = {"tm":"Jul 15 11:31:54","host":"mach-Precision-T1700","msg":"NetworkManager[1340]: <warn> sysctl: failed to open '/proc/sys/net/ipv6/conf/eth1/use_tempaddr': (2) No file in directory","code_type":"1"}
Preprocessing Script Test
The pypyodbc module to be used in the above ODBC example can be loaded from the specified environment variable. After you create a new script, you need to make sure that the script works correctly.
There are two methods for script testing: direct execution and indirect execution.
The direct method is that the script executes the test directly, and the indirect method is to import and test the script.
Direct Execution
The test code added at the bottom of the preprocessing script example checks the result by calling the preprocessing class object directly. The example below is the test code for the skip.py script.
if __name__ == "__main__": pre_obj = mach_preprocess() dict = {"origin_msg":"Jul 16 07:09:01 mach-Precision-T1700 CRON[1220]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"} print pre_obj.mach_msg_preprocess(dict) dict = {"origin_msg":"Jul 16 07:39:31 mach-Precision-T1700 cracklib: no dictionary update necessary."} print pre_obj.mach_msg_preprocess(dict)
The test script starts with "__name__ ==" __main__ ":".
If you run this script directly with the Python interpreter, set the __name__ variable to __main__ and run the test code.
When called from the collector, the above test code is not executed.
If you look at the execution process of the test code, you first create pre-process object pre_obj by calling mach_process () function.
At this point, the __init__ method is called. After setting dict, which is a parameter passed to mach_msg_preprocess, call the method to perform the test.
If you need to set the dict value but you do not know what value to pass to the actual script, you can use the "trace.py" script to get the value passed to the method and test it.
The data generated by the trace.py script is written to /tmp/msg.log and /tmp/column.log.
Indirect Execution
This is a method of importing and executing preprocessing script that has already been created by import.
import skip <== Import already created script if __name__ == "__main__": pre_obj = skip.mach_preprocess() <== Call mach_preprocess function when creating class dict = {"origin_msg":"Jul 16 07:09:01 mach-Precision-T1700 CRON[1220]: (root) CMD ( [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"} print pre_obj.mach_msg_preprocess(dict) dict = {"origin_msg":"Jul 16 07:39:31 mach-Precision-T1700 cracklib: no dictionary update necessary."} print pre_obj.mach_msg_preprocess(dict)