/
Collector Preprocessing Framework

Collector Preprocessing Framework

The Machbase collector collects log data, analyzes it, and sends it to the Machbase server. 

For additional data processing in addition to data collection and analysis, the Machbase collector provides a data preprocessing framework using python.

Environment Variables for Preprocessing Configuration


We use python version 2.6 as a preprocessing framework. It is recommended that this version of python be installed with the Machbase server.

Installed python is in the  $MACH_COLLECTOR_HOME/webadmin/flask/Python/bin path. 
Running python for additional installation of the python library should also be done in the above directory to avoid conflicts with other existing versions of python.

To use python, which is provided with the Machbase collector by default, you must set the PATH environment variable correctly and set USER_PREPROCESS_LIB_PATH. When you add an additional path to USER_PREPROCESS_LIB_PATH, you must put a ":" character between paths to separate the path values.


Index


Preprocessing Order


Describes a preprocessing execution order for converting and manipulating log data.

1. Message Preprocessing

When data is input to the original log data file, each log data is separated into log units.

Let's say the log data is origin_msg. Each origin_msg is processed one step at a time.

  • For example,  if the first message entered is "Aug 19 15:37:12 localhost NetworkManager [1340]: (eth1): bringing up device.",
    the input origin_msg is separated into tokens by a regular expression. This is called message parsing. 
  • You can preprocess origin_msg before message parsing. 
  • If you change the origin_msg using the preprocessing script, the modified result message must be a message able to be parsed.


2. Column Preprocessing

After parsing the log message, the resulting token values ​​are generated. If there is no processing, this value is stored in the database.

The preprocessing of the second stage can be executed before passing the parsed tokens to the database. 
At this time, the field name described in the rgx file can be used to change or use it. Changing the token to a different type than the data type described in the rgx file may cause an error.
For more information, refer to the example script below.


Preprocessing Script


The preprocessing script must be written in Python. For ease of use, we recommend that you change the "custom.py" file to the desired format.

PRS_SUCCESS       = ( 0, None )
PRS_SUCCESS_INFO  = ( 1, "Info Msg")
PRS_SUCCESS_SKIP  = ( 2, None )
PRS_FAILURE       = (-1, "Error Msg" )

class mach_preprocess:
    def __init__(self):
        return
    def mach_msg_preprocess(self, dict):
        return PRS_SUCCESS;
    def mach_column_preprocess(self, dict):
        return PRS_SUCCESS;
    def __del__(self):
        return


Defining Result Value


The return value is used to pass the execution result of the preprocessing script to the collector. The result values (code, message) that the collector refers to after executing the preprocessing script are of the tuple type.

PRS_SUCCESS       = ( 0, None )
PRS_SUCCESS_INFO  = ( 1, "Info Msg")
PRS_SUCCESS_SKIP  = ( 2, None )
PRS_FAILURE       = (-1, "Error Msg" )

Here PRS_SUCCESS, PRS_SUCCESS_INFO, PRS_SUCCESS_SKIP, PRS_FAILURE are the results that the collector refers to.

  • If the result is PRS_SUCCESS, the collector normally enters data. 
  • If the result is PRS_SUCCSS_INFO, the data is processed normally and the transferred message is written to the trc file. 
  • If the result is PRS_SUCCESS_SKIP, the data is discarded and new data processing is started.
  • If the result is PRS_FAILURE, an error message is written to the trc file and the next message is processed.

PRS_SUCCESS_SUCCESS and PRS_SUCCESS_SKIP result value are used for data processing control, and PRS_SUCCESS_INFO or PRS_FAILURE result value is used for leaving message in trc.


Class Definition


The Machbase collector performs preprocessing by calling functions of predefined classes written in the python language.

The following example shows each function in the class and the "dict" parameter and return value.

Will not run if the class name or function name is changed. Therefore, it must be created with caution.



class mach_preprocess:
    def __init__(self):
        return
    def mach_msg_preprocess(self, dict):
        return PRS_SUCCESS;
    def mach_column_preprocess(self, dict):
        return PRS_SUCCESS;
    def __del__(self):
        return

The predefined class name is "mach_preprocess".

  • Parameters are passed as "self" instances when the method is called. 
  • __init__ and __del__ are default object constructors/removers for the python language. 
    So __init__ is called when the collector's process is created, and __del__ is called when the collector is finished. 
  • You can initialize variables in __init__ and release the resources allocated by __del__. 
    These two methods have no return value.

The methods called for data preprocessing are "mach_msg_preprocess" and "mach_column_preprocess". 
Each method is described below.


mach_msg_preprocess

This method is called before the input message is separated into tokens.

Since the message is executed before parsing, the value passed is the collector related metadata and the original message "origin_msg". 
The collector metadata is the table name, the collector type, and the name and offset of the currently running collector. This information is provided as reference information and is not reflected in the collector even if it is changed.
When "origin_msg" is changed, the changes are reflected in the collector. 
If you change the message so that it does not pass the regular expression set in the rgx file, errors may occur during parsing.

Key

Description

Can Changes Be Reflected

table_name

Table name

X
collect_type

Collector type

X
collector_name

Name of currently running collector

X
data_source

Source file path to be data source

X
origin_msg

Raw data message of source file

O

An unnecessary message can return PRS_SUCCESS_SKIP, which can be processed faster by omitting the parsing process later. 

If you can identify an unwanted message at this stage, you should first treat it as PRS_SUCCESS_SKIP.


mach_column_preprocess

This method is called before parsing the input message and entering the token decomposed value into the database. 

Like "mach_msg_preprocess", the transferred metadata is not reflected in the collector.

Key

Description

Can Changes Be Reflected

table_name

Table name

X
collect_type

Collector type

X
collect_name

Name of currently running collector

X
data_source

Source file path to be data source

X
origin_msg

Raw data message of source file

X
column_name

nth column token

O

Example Script


The default example is for syslog files and is in the $MACH_COLLECTOR_HOME/collector directory.

Let's look at how to perform preprocessing in the example template syslog.tpl.

###############################################################################
Copyright of this product 2013-2023,
Machbase Inc. or its subsidiaries.
All Rights reserved
###############################################################################
#
This file is for Machbase collector template file.
#
###################################################################
Collect setting
###################################################################

COLLECT_TYPE=FILE
LOG_SOURCE=/var/log/syslog

###################################################################
Process setting
###################################################################

REGEX_PATH=syslog.rgx
PREPROCESS_PATH=script_path

###################################################################
Output setting
###################################################################
DB_TABLE_NAME = "syslogtable"
DB_ADDR = "127.0.0.1"
DB_PORT = 5656
DB_USER = "SYS"
DB_PASS = "MANAGER"
#
0: Direct insert
1: Prepared insert
2: Append
APPEND_MODE=2
#
0: None, just append.
1: Truncate.
2: Try to create table. If table already exists, warn it and proceed.
3: Drop and create.
CREATE_TABLE_MODE=2

To specify the location of the preprocessing script file, set PREPROCESS_PATH in the tpl file. The pathname specifies an absolute path (a path starting with /) or a default path (if only a file name is specified) of $MACH_COLLECTOR_HOME/collector/preprocess.


SKIP

This is a script that examines an input message and does not enter a specific word if it exists.

You can set PREPROCESS_PATH = skip.py in the collector template file. 
Since a pathname was not specified, the file needs to be created in the $MACH_COLLECTOR_HOME/collector/preprocess/ directory.

PRS_SUCCESS       = ( 0, None )
PRS_SUCCESS_INFO  = ( 1, "Info Msg" )
PRS_SUCCESS_SKIP  = ( 2, None )
PRS_FAILURE       = (-1, "Error Msg" )

class mach_preprocess:
    def __init__(self):
        return
    def mach_msg_preprocess(self, dict):
        if dict['origin_msg'].find("CMD") is not -1: <== String search "CMD"
            return PRS_SUCCESS_SKIP <== Omit if string does not contain "CMD"
        else:
            return PRS_SUCCESS;
    def mach_column_preprocess(self, dict):
        return PRS_SUCCESS;
    def __del__(self):
        return

#Test code
if __name__ == "__main__":
    pre_obj = mach_preprocess()
    dict = {"origin_msg":"Jul 16 07:09:01 mach-Precision-T1700 CRON[1220]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && 
[ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"}
    print pre_obj.mach_msg_preprocess(dict)

    dict = {"origin_msg":"Jul 16 07:39:31 mach-Precision-T1700 cracklib: no dictionary update necessary."}
    print pre_obj.mach_msg_preprocess(dict)

This is an example of setting the "origin_msg" parameter in the "mach_msg_preprocess" method, which has not been parsed, to skip the message if it checks for the "CMD" string. 

The source line after "if __name__ ==" __main__ " is the code written to test whether the script works properly.
For more information, see the 'Preprocessor Script Test' below.


REPLACE

This is an example of converting the string "CRON" to the string "cron-exectue" if the msg column is parsed after parsing. 

This is also done by specifying the $MACH_COLLECTOR_HOME/collector/preprocess/ directory's replace.py file in the tpl file as: 

sample.tpl
PREPROCESS_PATH=replace.py
PRS_SUCCESS       = ( 0, None )
PRS_SUCCESS_INFO  = ( 1, "Info Msg" )
PRS_SUCCESS_SKIP  = ( 2, None )
PRS_FAILURE       = (-1, "Error Msg" )

class mach_preprocess:
    def __init__(self):
        return
    def mach_msg_preprocess(self, dict):
        return PRS_SUCCESS;
    def mach_column_preprocess(self, dict):
        dict['msg'] = dict['msg'].replace("CRON", "cron-execute") <== Replace "CRON" with "cron-execute"
        return PRS_SUCCESS;
    def __del__(self):
        return

#Test code
if __name__ == "__main__":
    pre_obj = mach_preprocess()
    dict = {"tm":"Jul 16 07:39:01", "host":"mach-Precision-T1700", "msg":"CRON[1377]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && 
[ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"}
    (code, msg) = pre_obj.mach_column_preprocess(dict);
    if code >= 0:
        print dict
    else:
        print msg

The original message is parsed and separated into tokens, which can be processed by the mach_column_ preprocess method. 

The above example is an example of converting "CRON" string to "cron-execute". The code following "if name ==" __main__ " is for debugging script execution.


TRACE

The TRACE script writes the input data to a file in the "mach_msg_preprocess" and "mach_column_preprocess" methods. 

It works by adding PREPROCESS_PATH = trace.py to the tpl file and writing the script file to the $MACH_COLLECTOR_HOME/collector/preprocess directory.

PRS_SUCCESS       = ( 0, None )
PRS_SUCCESS_INFO  = ( 1, "Info Msg" )
PRS_SUCCESS_SKIP  = ( 2, None )
PRS_FAILURE       = (-1, "Error Msg" )

class mach_preprocess:
    def __init__(self):
        self.msg_file = open("/tmp/msg.log", 'a') <== Specify file
        self.column_file = open("/tmp/column.log", 'a')
        return
    def mach_msg_preprocess(self, dict);
        self.msg_file.write(str(dict)+"\n"); <== Write factor value to file
        self.msg_file.write("\n");
        return PRS_SUCCESS;
    def mach_column_preprocess(self, dict):
        self.column_file.write(str(dict)+"\n");
        self.column_file.write("\n");
        return PRS_SUCCESS;
    def __del__(self):
        self.msg_file.close() Close file
        self.column_file.close()
        return

#Test code
if __name__ == "__main__":
    pre_obj = mach_preprocess()
    dict = {"origin_msg":"Jul 16 06:39:01 mach-Precision-T1700 CRON[1149]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && ]
[ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"}
    pre_obj.mach_msg_preprocess(dict)
    dict = {"origin_msg":"Jul 16 06:39:01 mach-Precision-T1700 CRON[1149]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && 
[ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))", "tm":"Jul 16 06:39:01", "host":"mach-Precision-T1700", 
"msg":"CRON[1149]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && 
/usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"}
    pre_obj.mach_column_preprocess(dict)

You can create and open the msg_file, column_file object when the collector is executed, and close each file at the end using the point where init and del are executed at the start and end.

These two variables can be accessed by other methods of the object. The code following "if __name__ == "__main__" tests the script to see if it works properly.


ODBC

The following ODBC script retrieves the search key from the database and enters the value into the specified table if the search key is present in the input message.

The way to run this example is the  same as the previous one by assigning the PREPROCESS_PATH value to the template file.
In this example, pypyodbc is used. To use ODBC in the collector script, you must use pypyodbc.

The imported pypyodbc is not a basic module and needs to be installed in advance.

You should install pypyodbc in the $MACH_COLLECTOR_HOME/webadmin/flask/Python/bin/python path. 
The installation path is  $MACH_COLLECTOR_HOME/webadmin/flask/Python/lib/python2.7/site-packages/pypyodbc-1.3.3-py2.7.egg.

Then you must set the path of the import module. There are two ways to set the path.

  • The first way is to modify the path of the sys module or provide the module path to the collector.
  • The second way is to set the environment variable USER_PREPROCESS_LIB_PATH .
    export USER_PREPROCESS_LIB_PATH=$MACH_COLLECTOR_HOME/webadmin/flask/Python/lib/python2.7/site-packages/pypyodbc-1.3.3-py2.7.egg


import pypyodbc

PRS_SUCCESS       = ( 0, None )
PRS_SUCCESS_INFO  = ( 1, "Info Msg" )
PRS_SUCCESS_SKIP  = ( 2, None )
PRS_FAILURE       = (-1, "Error Msg" )

class mach_preprocess:
    def __init__(self):
        self.con = pypyodbc.connect("DSN=MYSQL") <== Input pre-declared MySQL DNS value
        self.cursor = self.con.cursor()
        self.table_name = "error_msg"
        self.test_data_make(); <== Generate random data
        return
    def mach_msg_preprocess(self, dict):
        return PRS_SUCCESS;
    def mach_column_preprocess(self, dict):
        result = self.cursor.execute("select code, msg from %s where code = %d"%(self.table_name, int(dict['code_type']))) <== Acquire relevant data from table
        if result is not None: <== When result value exists
            dict['code_type'] = result.fetchall()[0][1] <== Replace related data
        else:
            print "failure "+str(dict)
        return PRS_SUCCESS;
    def __del__(self):
        self.cursor.close()
        self.con.close()
        return

    #for test
    def test_data_make(self):
        self.table_check("create table %s (code integer, msg varchar(255))");
        return
    def table_check(self, query):
        self.tables = self.cursor.tables().fetchall()
        self.table_list = []
        for (db, user, table, info, none) in self.tables:
            self.table_list.append(table.upper())
        if self.table_name.upper() in self.table_list: <== Delete table and create new one if table already exists.
            self.cursor.execute("drop table %s"%self.table_name)
        self.cursor.execute(query%self.table_name);
        self.insert_error_msg()
        self.cursor.commit()
        return
    def insert_error_msg(self): <== Replace code and message
        error = ((0, "SUCCESS"), (1, "SUCCESS_WITH_INFO"), (-1, "FAILURE"))
        for (code, msg) in error:
            self.cursor.execute("insert into %s values ( %d, '%s')"%(self.table_name, code, msg))
        return

# Test code
if __name__ == "__main__":
    pre_obj = mach_preprocess()
    pre_obj.test_data_make()
    dict = {"tm":"Jul 16 07:39:01","host":"mach-Precision-T1700","msg":"CRON[1377]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && 
[ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))","code_type":"-1"}
    pre_obj.mach_column_preprocess(dict)
    print dict
    dict = {"tm":"Jul 15 11:31:54","host":"mach-Precision-T1700","msg":"NetworkManager[1340]: <error> [1405391514.205040] [nm-system.c:768] 
nm_system_iface_get_flags(): (unknown): failed to get interface link object","code_type":"0"}
    pre_obj.mach_column_preprocess(dict)
    print dict
    dict = {"tm":"Jul 15 11:31:54","host":"mach-Precision-T1700","msg":"NetworkManager[1340]: <warn> sysctl: failed to 
open '/proc/sys/net/ipv6/conf/eth1/use_tempaddr': (2) No file in directory","code_type":"1"}

Preprocessing Script Test


The pypyodbc module to be used in the above ODBC example can be loaded from the specified environment variable. After you create a new script, you need to make sure that the script works correctly.

There are two methods for script testing: direct execution and indirect execution. 
The direct method is that the script executes the test directly, and the indirect method is to import and test the script.

Direct Execution

The test code added at the bottom of the preprocessing script example checks the result by calling the preprocessing class object directly. The example below is the test code for the skip.py script.

if __name__ == "__main__":
    pre_obj = mach_preprocess()
    dict = {"origin_msg":"Jul 16 07:09:01 mach-Precision-T1700 CRON[1220]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && 
[ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"}
    print pre_obj.mach_msg_preprocess(dict)

    dict = {"origin_msg":"Jul 16 07:39:31 mach-Precision-T1700 cracklib: no dictionary update necessary."}
    print pre_obj.mach_msg_preprocess(dict)

The test script starts with "__name__ ==" __main__ ":".

If you run this script directly with the Python interpreter, set the __name__ variable to __main__ and run the test code. 
When called from the collector, the above test code is not executed.
If you look at the execution process of the test code, you first create pre-process object pre_obj by calling mach_process () function. 
At this point, the __init__ method is called. After setting dict, which is a parameter passed to mach_msg_preprocess, call the method to perform the test. 
If you need to set the dict value but you do not know what value to pass to the actual script, you can use the "trace.py" script to get the value passed to the method and test it.
The data generated by the trace.py script is written to /tmp/msg.log and /tmp/column.log.


Indirect Execution

This is a method of importing and executing preprocessing script that has already been created by import.

import skip <== Import already created script

if __name__ == "__main__":
    pre_obj = skip.mach_preprocess() <== Call mach_preprocess function when creating class
    dict = {"origin_msg":"Jul 16 07:09:01 mach-Precision-T1700 CRON[1220]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && 
[ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"}
    print pre_obj.mach_msg_preprocess(dict)

    dict = {"origin_msg":"Jul 16 07:39:31 mach-Precision-T1700 cracklib: no dictionary update necessary."}
    print pre_obj.mach_msg_preprocess(dict)