Rule Structure

A rule is represented in JSON format across APIs, Config, and UI. The rule can be segregated into different JSON nodes, such as config, schedule, filter, and actions.

Figure 1. Rule Structure


See Rule parameter descriptions for parameter descriptions.

Alert Rule - Perpetual

Example of an alert rule scheduled for a perpetual run:

{
    "name": "testRule",
    "type": "ALERT_RULE",
    "status": "ENABLED",
    "schedule": {
        "perpetual": true
    },
    "filterType": "PARAMETERIZED",
    "filter": {
        "select": [
            {
                "param": "operationType"
            },
            {
                "param": "CAST(resultCode AS String)",
                "as": "resultCode"
            }
        ],
        "from": [
            {
                "stream": "IMPACT_RESPONSE_STREAM"
            }
        ],
        "where": {
            "and": [
                {
                    "parameter": "resultCode",
                    "condition": "=",
                    "value": "200"
                },
                {
                    "parameter": "operationType",
                    "condition": "=",
                    "value": "'read'"
                }
            ]
        }
    },
    "severity": "UNDEFINED",
    "actions": {
        "persistToCassandra": "true",
        "persistToS3": "false",
        "customActions": ["47b8d550-eebc-11eb-a8b5-491d87242ca6"]
    }
}

Aggregation Rule - Scheduled

Example of an aggregation rule scheduled to run during a specified time window:

{
    "name": "testRule",
    "id": "b76423f0-b177-11eb-b778-f11b0c368e8c",
    "group": "tenant1.auto1",
    "user": "autoUser",
    "type": "AGGREGATION_RULE",
    "status": "ENABLED",
    "schedule": {
        "perpetual": false,
        "startTime": "2021-05-01T12:00:00",
        "endTime": "2022-12-31T23:00:00"
    },
    "filterType": "PARAMETERIZED",
    "filter": {
        "metric": "serialNumber",
        "aggregateFunction": "count(*)",
        "from": [
            {
                "stream": "IMPACT_RESPONSE_STREAM"
            }
        ],
        "where": {
            "and": [
                {
                    "parameter": "resultCode",
                    "condition": "!=",
                    "value": "200"
                },
                {
                    "parameter": "operationType",
                    "condition": "=",
                    "value": "'read'"
                }
            ]
        },
        "window": {
            "function": "TUMBLING",
            "sizeInSec": "5",
            "advanceInSec": "5"
        }
    },
    "actions": {
        "persistToCassandra": "true",
        "persistToS3": "true",
        "streamToExternalKafka": "true",
        "customActions": []
    }
}

Alert field Descriptions

Note: *Read-only parameters in Rule parameter descriptions must not be set during rule creation. They are returned as part of GET APIs.
Table 1. Rule parameter descriptions
Field Descriptions
*id The unique ID of the rule that is used while accessing various rules APIs.

*Read-only parameter.

name Name of the rule. Must be unique within the tenant. Can contain alphanumeric characters and underscore.
*group The tenant for which the rule is created. This is set according to the tenant of the user creating the rule, or the impersonated tenant set with the tenant header.

*Read-only parameter.

*user The user that created the rule.

*Read-only parameter.

type Type of the rule.
  • ALERT_RULE

    Rules that are used to generate alerts when certain conditions are met.

  • AGGREGATION_RULE

    Rules that are used to aggregate data over specified windows.

severity The severity of the alert specified for the ALERT_RULE.
  • CRITICAL
  • MAJOR
  • MINOR
  • UNDEFINED
status The status of the rule. During rule creation status can be:
  • ENABLED (Default)
  • DISABLED

If DISABLED, the rule does not run, and no data is processed into the topics. In GET APIs, this may also be set to RUNNING or STOPPED. RUNNING status indicates that the rule is currently processing data. If it is STOPPED, it means that the rule is not processing data. This can occur when this is a scheduled rule, which has completed its run.

filterType Type of the filter. Rule condition for the rule.
  • RAW query format

    RAW query rule condition allows you to provide a raw KSQL query which will be directly used against the stream of events. The filter is expressed as an SQL query.

  • PARAMETERIZED
    Parameterized query conditions can be set up using a combination of parameters. The rule filter contains various fields depending on the type of the rule being created.
    • If filterType of the rule is PARAMETERIZED and the type is ALERT_RULE, then you must provide input for fields Stream, Select, and Where.

    • If filterType of the rule is PARAMETERIZED and the type is AGGREGATION_RULE, then you must provide input for fields stream, aggregateFunction, metric, where, having, and window.

    The filter is expressed as a JSON node.

schedule Rule schedule option.
  • PERPETUAL

    A boolean that indicates if the rule is to be run forever.

    True or false. If true, the rule is perpetual and runs as long as it is in ENABLED status.

  • SCHEDULED

    Rule scheduled with a start and end time.

    startTime: Specify the time when the rule has to start processing if it is ENABLED. Should be in the format: yyyy-MM-ddTHH:mm:ss. For a scheduled rule, startTime is mandatory.

    endTime: Specify the time when the rule has to stop processing if it is running. This has to be after startTime. Should be in the format: yyyy-MM-ddTHH:mm:ss. If both startTime and endTime are null, the rule is not a scheduled rule. For a scheduled rule, endTime is mandatory.

If specified as neither, an error will be thrown.

filter The rule filter, which specifies the streams or tables to read from, fields to store, filter criteria to apply. Filter options are configurable.

For more information, see Rule Filter in this section.

actions List of custom actions to be taken for the records. This is an optional parameter.
  • persistToCassandra: The action where the alarms or aggregations generated is stored in the internal storage. If true, the data from the rule is stored in the internal storage, and made available to data access APIs. The default value is false.
  • persistToS3: Persists the generated data to S3 storage. If true, the data from the rule is stored processed by the S3 connector, if the S3 connector is enabled. If the S3 connector is not enabled, then this does not result in an error.
  • streamToExternalKafka: Streams the data to an external Kafka using the mirror maker. If true, the data from the rule is mirrored to the target Kafka using the Connect mirror maker.
  • customActions: List of custom actions, previously created using the Actions API.

Rule Filter

The rule filter contains various fields depending on the type of rule being created. For more information refer to https://docs.ksqldb.io/en/latest/reference/.

Specify the following field if filterType of the rule is RAW. This field is mandatory.

Table 2. RAW filter type
Field Description
query Raw KSQL query providing the filter configurations for the rule. The query fields have to match the schema provided for the rules.

For more information, see the topic on query in this section.

Specify the following fields if filterType of the rule is PARAMETERIZED and the type is ALERT_RULE.

Table 3. PARAMETERIZED filterType for ALERT_RULE
Field Description
select Fields to select. These are stored in the details field of the schema, as a map of key-value pairs.

For more information, see the topic on select in this section.

from Streams or Tables to read from.

For more information, see the topic on from in this section.

where Condition to filter the data.

For more information, see the topic on where in this section.

Specify the following fields if filterType of the rule is PARAMETERIZED and the type is AGGREGATION_RULE.

Table 4. PARAMETERIZED filterType for AGGREGATION_RULE
Field Description
metric The field for which the data is being aggregated.

For more information, see the topic on metric in this section.

aggregateFunction The aggregate function applied over the field.

For more information, see the topic on aggregateFunction in this section.

from Streams or Tables to read from.

For more information, see the topic on from in this section.

where Condition to filter the data.

For more information, see the topic on where in this section.

window Window function and its fields.

For more information, see the topic on window in this section.

having To filter records from aggregated data. This is an optional field.

For more information, see the topic on having in this section.

The details on how to specify these fields are as follows:

  • query

    This is a mandatory field when the filterType is set to RAW.

    The output columns from the select query must match the alert or aggregation schema, according to the rule type. For more information, see Avro Schema.

    The output columns must match the schema both in datatype and the order of the columns. The order of the fields must also be the same. However, for aggregation schema, serverTime has also been handled internally making it an optional column when the raw query is created.

    The aggregation query must start with the Kafka record key column, followed by the same column for metric. The metric field should use the AS_VALUE function. Also, the filtering of the records by the tenant has to be provided as part of the where clause in the filter. The templates for ruleId, ruleName, groupName, and actions are mandatory.

    Table 5. Examples
    Alert raw query
    "filter": {
        "query": "select $groupName as groupName, serverTime, $ruleId as ruleId, $ruleName as ruleName, 'MAJOR' as severity, serialNumber, map('resourcePath':=resourcePath) as details, $actions as actions from impact_response_stream where groupName=$groupName;"
    }
    Aggregation raw query not having serverTime
    "filter": {
        "query": "select serialNumber as metricKey, as_value(serialNumber) as metric, $groupName as groupName, $ruleId as ruleId, $ruleName as ruleName, WINDOWSTART as startTime, WINDOWEND as endTime, count(*) as value, CAST(null as MAP<String,String>) as details, $actions as actions from impact_response_stream WINDOW tumbling (SIZE 30 seconds) where $groupName=GROUPNAME group By serialNumber;"
    }
    Aggregation raw query having serverTime
    "filter": {
        "query": "select serialNumber as metricKey, as_value(serialNumber) as metric, $groupName as groupName, $ruleId as ruleId, $ruleName as ruleName, WINDOWSTART as startTime, WINDOWEND as endTime, UNIX_TIMESTAMP()as serverTime, count(*) as value, CAST(null as MAP<String,String>) as details, $actions as actions from impact_response_stream WINDOW tumbling (SIZE 30 seconds) where $groupName=GROUPNAME group By serialNumber;"
    }
  • Select

    An array of the fields which should be selected from the streams or tables. These fields will be made available in the details column in the resulting record after the rules are processed. For more information on the details column, see the sections Alerts Schema and Aggregation Schema. The data from the fields will be the value in the details map field. If an alias is provided in as field, this will be used as the key for the map. If not, the param string itself will be used as the key.

    param can be specified as * (Star). If it is specified as just *, then all the fields from all streams and tables from the from will be selected. * can also be specified for only a specific stream or table. In this case, only fields from the specified stream is selected.

    Note:
    • The map field stores string key and string value types. Hence, ensure that these selects are of string type. If a column is not of string type, it will need to be explicitly cast into string. No implicit casting will be done, except when * is selected. For * selection, casting will be done as needed.
    • Literal string values should be surrounded by single quotes.
      • param: The column from the stream or a literal value with functions applied to it as needed.
      • as: Alias for this field. This will be used as a key in the details map of the record.
    Table 6. Examples
    Select Filter Example details map in record
    Single select
    "select": [
        {
            "param": "operationType"
        }
    ]
    "details" : {
        "operationType": "read"
    }
    Select with alias
    "select": [
        {
            "param": "operationType"
            "as": "responseType"
        }
    ]
    "details" : {
        "responseType": "read"
    }
    Select with cast
    "select": [
        {
            "param": "CAST(resultCode AS String)",
            "as": "resultCode"
        }
    ]
    "details" : {
        "resultCode": "100"
    }
    Select literals
    "select": [
        {
            "param": "'India'",
            "as": "country"
        }
    ]
    "details" : {
        "country": "India"
    }

    Literal value will be the same for every record.

    Select multiple
    "select": [
        {
            "param": "UCASE(operationType)"
            "as": "responseType"
        },
        {
            "param": "CAST(resultCode AS String)",
            "as": "resultCode"
        }
    ]
    "details" : {
        "responseType": "READ",
        "resultCode": "100"
    }
    Select all from a stream
    "select": [
        {
            "param": "IMPACT_OBSERVENOTIFY_STREAM.*"
        }
    ]

    If alias is specified for from.stream, then the alias has to be used here. Example: If alias for IMPACT_OBSERVENOTIFY_STREAM is obNotify, then param here should be: obNotify.*

    "details" : {
        "groupName": "t1",
        "serialNumber": "123",
        "serverTime": "1485339813341",
        "resourcePath": "vehicle/0/batteryVoltage",
        "value": "12.6",
        "customAttributes": "{}",
        "subscriptionId": "e7a570f4-a175-4f2a-8080-d44400065ada"
    }
    Select all from all input streams or tables
    "select": [
        {
            "param": "*"
        }
    ]
    "details" : {
        "groupName": "t1",
        "serialNumber": "123",
        "serverTime": "1485339813341",
        "resourcePath": "vehicle/0/batteryVoltage",
        "value": "12.6",
        "customAttributes": "{}",
        "subscriptionId": "e7a570f4-a175-4f2a-8080-d44400065ada"
    }
  • from

    An array of streams or tables which have to be joined and data read from.

    Table 7. Filter- From
    Field Description
    Stream Stream or Table to select from. At least Stream is mandatory.

    For the list of default streams, see Default source streams.

    For the schema of default streams, see Avro Schema section.

    Alias An alternative name for the column.
    Join This specifies the join operation, The choices are INNER JOIN, LEFT JOIN.
    • INNER JOIN - This is used to return values that are matching in both streams
    • LEFT JOIN - This is used to return values on the joined stream if the conditions match.
    Mandatory for second stream or table onwards.
    On The join condition. Currently, the join expression must be a single column equal comparison. Non-equi joins, or joins on multiple columns are not supported. Example: stream1.serialNumber=stream2.serialNumber
    Within The time period for the action. The records from two streams which come within this time period will be evaluated for windowed joins. Value can be in days, hours, minutes, seconds or milliseconds.

    Example: 2 minutes.

    Table 8. Default source streams
    Event type Default streams Notes
    Lifecycle IMPACT_LIFECYCLE_STREAM Life cycle events as received from the Device.
    ObserveNotify IMPACT_OBSERVENOTIFY_STREAM Notification of data changes for device resources.
    Response IMPACT_RESPONSE_STREAM Response to downlink request from IMPACT.
    DJR IMPACT_DJR_STREAM Device Job Record
    Monte IMPACT_MONTE_STREAM Network Events from either the Network Exposure Function (NEF) or Service Capability and Exposure Function (SCEF).
    Sms IMPACT_SMS_STREAM Short Message Delivery
    Table 9. Examples
    Single stream to read from
    "from": [
        {
            "stream": "IMPACT_RESPONSE_STREAM"
        }
    ]
    Stream with alias
    "from": [
        {
            "stream": "IMPACT_RESPONSE_STREAM",
            "alias": "resp"
        }
    ]
    Two streams joined
    "from": [
        {
            "stream": "stream1"
        },
        {
            "stream": "stream2",
            "join": "INNER JOIN",
            "on": "stream1.serialNumber=stream2.serialNumber"
        }
    ]
    Two streams with windowed join
    "from": [
        {
            "stream": "stream1"
        },
        {
            "stream": "stream2",
            "join": "INNER JOIN",
            "on": "stream1.serialNumber=stream2.serialNumber",
            "within": "1 minutes"
        }
    ]
  • where

    One or more conditions to filter data with:

    Table 10. Filter- where
    Field Description
    parameter The name of the parameter present in the source event. Part of simple condition. The field which needs to be compared.
    condition The conditional operator to be applied in the clause. Part of simple condition. The condition of comparison.
    value The value against which the parameter condition is applied. Part of simple condition. The value(s) to which the field is compared.
    and Logical and operator for multiple conditions. An array of conditions which are joined with and.
    or Logical or operator for multiple conditions. An array of conditions which are joined with or.
    Note:
    • Literal string values must be surrounded by single quotes. For the column types, refer toAvro Schema section.
    • The rules automatically add a condition to filter the records belonging to the tenant of the rule. That is, groupName='<tenant>' and query representation of the where filter.
    Table 11. Examples
    Filter type Query representation
    Single Condition
    "where": {
        "parameter": "resultCode",
        "condition": "=",
        "value": "200"
    }
    resultCode = 200
    And Condition
    "where": {
        "and": [
            {
                "parameter": "resultCode",
                "condition": "=",
                "value": "200"
            },
            {
                "parameter": "operationType",
                "condition": "=",
                "value": "'read'"
            }
        ]
    }
    (resultCode = 200 AND operationType = 'read')
    Or Condition
    "where": {
        "or": [
            {
                "parameter": "operationType",
                "condition": "=",
                "value": "'write'"
            },
            {
                "parameter": "operationType",
                "condition": "=",
                "value": "'read'"
            }
        ]
    }
    (operationType = 'write' OR operationType = 'read')
    Nested Condition
    "where": {
        "and": [
            {
                "parameter": "resultCode",
                "condition": "=",
                "value": "200"
            },
            {
            "or": [
                {
                    "parameter": "operationType",
                    "condition": "=",
                    "value": "'read'"
                },
                {
                    "parameter": "operationType",
                    "condition": "=",
                    "value": "'write'"
                }
            ]
            }
        ]
    }
    (resultCode = 200 AND (operationType = 'read' OR operationType = 'write'))
    
  • metric

    Metric is the column for aggregated rules. As it is used in aggregation rules, which are windowed queries, the metric is also used to group the records. It can have scalar functions applied on them. For example CONCAT_WS(', ', serialNumber, resourcePath).

    The metric field is of string datatype. Hence, if the metric selected is of another datatype, an explicit cast to string must be provided. If more than one metric field is required they need to be combined into a single field.

    Table 12. Examples
    Simple metric
    "metric": "serialNumber"
    Metric with scalar function
    "metric": "CONCAT_WS(', ', serialNumber, resourcePath)"
  • aggregateFunction

    The aggregated value column. This must include an aggregation function applied to the column. This field is of the double datatype.

    Table 13. Example
    Aggregate
    "aggregateFunction": "AVG(value)"

    For more details, refer to https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/aggregate-functions/.

  • window

    The window function to apply for aggregation rules. Users can define the window size for each of the rules.

    Table 14. Filter- window
    Field Window
    function The window function to apply. The window types can be:
    Window type Behavior Description
    Tumbling Window Time-based Fixed-duration, non-overlapping, gap-less windows
    Hopping Window Time-based Fixed-duration, overlapping windows
    Session Window Session-based Dynamically-sized, non-overlapping, data-driven windows
    sizeInSec The time interval in seconds for the size of the window. This field is mandatory for all three types of window functions.
    advanceInSec The time interval in seconds by which a HOPPING window has to be advanced.
    Table 15. Examples
    Hopping Window
    "window": {
        "function": "HOPPING",
        "sizeInSec": "10",
        "advanceInSec": "5"
    }
    Tumbling Window
    "window": {
        "function": "TUMBLING",
        "sizeInSec": "5"
    }
    Session Window
    "window": {
        "function": "SESSION",
        "sizeInSec": "5"
    }
  • having

    This is an optional parameter that can be specified to filter out the aggregated data. A conditional expression has to be specified for this, similar to the where clause.

    Table 16. Example
    Having clause Query representation
    Single Condition
    "having": {
        "parameter": "count(*)",
        "condition": ">",
        "value": "5"
    }
    HAVING count(*) > 5
Note:

Intelligent Data Store provides additional functions as UDFs, which can be used while creating rules. Refer to Custom Functions.