Rule Structure
A rule is represented in JSON format across APIs, Config, and UI. The rule can be segregated into different JSON nodes like config, schedule, filter, and actions.
See Rule parameter descriptions for parameter descriptions.
Alert Rule - Perpetual
Example of an alert rule scheduled for a perpetual run:
{
"name": "testRule",
"type": "ALERT_RULE",
"status": "ENABLED",
"schedule": {
"perpetual": true
},
"filterType": "PARAMETERIZED",
"filter": {
"select": [
{
"param": "operationType"
},
{
"param": "CAST(resultCode AS String)",
"as": "resultCode"
}
],
"from": [
{
"stream": "IMPACT_RESPONSE_STREAM"
}
],
"where": {
"and": [
{
"parameter": "resultCode",
"condition": "=",
"value": "200"
},
{
"parameter": "operationType",
"condition": "=",
"value": "'read'"
}
]
}
},
"severity": "UNDEFINED",
"actions": {
"persistToCassandra": "true",
"persistToS3": "false",
"customActions": ["47b8d550-eebc-11eb-a8b5-491d87242ca6"]
}
}
Aggregation Rule - Scheduled
Example of an aggregation rule scheduled to run during a specified time window:
{
"name": "testRule",
"id": "b76423f0-b177-11eb-b778-f11b0c368e8c",
"group": "tenant1.auto1",
"user": "autoUser",
"type": "AGGREGATION_RULE",
"status": "ENABLED",
"schedule": {
"perpetual": false,
"startTime": "2021-05-01T12:00:00",
"endTime": "2022-12-31T23:00:00"
},
"filterType": "PARAMETERIZED",
"filter": {
"metric": "serialNumber",
"aggregateFunction": "count(*)",
"from": [
{
"stream": "IMPACT_RESPONSE_STREAM"
}
],
"where": {
"and": [
{
"parameter": "resultCode",
"condition": "!=",
"value": "200"
},
{
"parameter": "operationType",
"condition": "=",
"value": "'read'"
}
]
},
"window": {
"function": "TUMBLING",
"sizeInSec": "5",
"advanceInSec": "5"
}
},
"actions": {
"persistToCassandra": "true",
"persistToS3": "true",
"streamToExternalKafka": "true",
"customActions": []
}
}
Alert field Descriptions
Field | Descriptions |
---|---|
*id | The unique ID of the rule that is used while accessing
various rules APIs. *Read-only parameter. |
name | Name of the rule. Must be unique within the tenant. Can contain alphanumeric characters and underscore. |
*group | The tenant for which the rule is created. This is set
according to the tenant of the user creating the rule, or the
impersonated tenant set with the tenant header. *Read-only parameter. |
*user | The user that created the rule. *Read-only parameter. |
type | Type of the rule.
|
severity | The severity of the alert specified for the ALERT_RULE.
|
status | The status of the rule. During rule creation status can
be:
If DISABLED, the rule does not run, and no data is processed into the topics. In GET APIs, this may also be set to RUNNING or STOPPED. RUNNING status indicates that the rule is currently processing data. If it is STOPPED, it means that the rule is not processing data. This can occur when this is a scheduled rule, which has completed its run. |
filterType | Type of the filter. Rule condition for the rule.
|
schedule | Rule schedule option.
If specified as neither, an error will be thrown. |
filter | The rule filter, which specifies the streams or tables to
read from, fields to store, filter criteria to apply. Filter
options are configurable. For more information, see Rule Filter in this section. |
actions | List of custom actions to be taken for the records. This is
an optional parameter.
|
Rule Filter
The rule filter contains various fields depending on the type of rule being created. For more information refer to https://docs.ksqldb.io/en/latest/reference/.
Specify the following field if filterType of the rule is RAW. This field is mandatory.
Field | Description |
---|---|
query | Raw KSQL query providing the filter configurations for the
rule. The query fields have to match the schema provided for the
rules. For more information, see the topic on query in this section. |
Specify the following fields if filterType of the rule is PARAMETERIZED and the type is ALERT_RULE.
Field | Description |
---|---|
select | Fields to select. These are stored in the details field of the
schema, as a map of key-value pairs. For more information, see the topic on select in this section. |
from | Streams or Tables to read from. For more information, see the topic on from in this section. |
where | Condition to filter the data. For more information, see the topic on where in this section. |
Specify the following fields if filterType of the rule is PARAMETERIZED and the type is AGGREGATION_RULE.
Field | Description |
---|---|
metric | The field for which the data is being aggregated. For more information, see the topic on metric in this section. |
aggregateFunction | The aggregate function applied over the field. For more information, see the topic on aggregateFunction in this section. |
from | Streams or Tables to read from. For more information, see the topic on from in this section. |
where | Condition to filter the data. For more information, see the topic on where in this section. |
window | Window function and its fields. For more information, see the topic on window in this section. |
having | To filter records from aggregated data. This is an optional
field. For more information, see the topic on having in this section. |
The details on how to specify these fields are as follows:
- query
This is a mandatory field when the filterType is set to RAW.
The output columns from the select query must match the alert or aggregation schema, according to the rule type. For more information, see Appendix F: AVRO schema in Connect Deployment guide.
The output columns must match the schema both in datatype and the order of the columns. The order of the fields must also be the same. However, for aggregation schema, serverTime has also been handled internally making it an optional column when the raw query is created.
The aggregation query should start with the Kafka record key column, followed by the same column for metric. The metric field should use the AS_VALUE function. Also, the filtering of the records by the tenant has to be provided as part of the where clause in the filter. The templates for ruleId, ruleName, groupName, and actions are mandatory.
Table 5. Examples Alert raw query "filter": { "query": "select $groupName as groupName, serverTime, $ruleId as ruleId, $ruleName as ruleName, 'MAJOR' as severity, serialNumber, map('resourcePath':=resourcePath) as details, $actions as actions from impact_response_stream where groupName=$groupName;" }
Aggregation raw query not having serverTime "filter": { "query": "select serialNumber as metricKey, as_value(serialNumber) as metric, $groupName as groupName, $ruleId as ruleId, $ruleName as ruleName, WINDOWSTART as startTime, WINDOWEND as endTime, count(*) as value, CAST(null as MAP<String,String>) as details, $actions as actions from impact_response_stream WINDOW tumbling (SIZE 30 seconds) where $groupName=GROUPNAME group By serialNumber;" }
Aggregation raw query having serverTime "filter": { "query": "select serialNumber as metricKey, as_value(serialNumber) as metric, $groupName as groupName, $ruleId as ruleId, $ruleName as ruleName, WINDOWSTART as startTime, WINDOWEND as endTime, UNIX_TIMESTAMP()as serverTime, count(*) as value, CAST(null as MAP<String,String>) as details, $actions as actions from impact_response_stream WINDOW tumbling (SIZE 30 seconds) where $groupName=GROUPNAME group By serialNumber;" }
- Select
An array of the fields which should be selected from the streams or tables. These fields will be made available in the details column in the resulting record after the rules are processed. For more information on the details column, see the sections Alerts Schema and Aggregation Schema. The data from the fields will be the value in the details map field. If an alias is provided in as field, this will be used as the key for the map. If not, the param string itself will be used as the key.
param can be specified as * (Star). If it is specified as just *, then all the fields from all streams and tables from the from will be selected. * can also be specified for only a specific stream or table. In this case, only fields from the specified stream is selected.
Note:- The map field stores string key and string value types. Hence, ensure that these selects are of string type. If a column is not of string type, it will need to be explicitly cast into string. No implicit casting will be done, except when * is selected. For * selection, casting will be done as needed.
- Literal string values should be surrounded by single quotes.
- param: The column from the stream or a literal value with functions applied to it as needed.
- as: Alias for this field. This will be used as a key in the details map of the record.
Table 6. Examples Select Filter Example details map in record Single select "select": [ { "param": "operationType" } ]
"details" : { "operationType": "read" }
Select with alias "select": [ { "param": "operationType" "as": "responseType" } ]
"details" : { "responseType": "read" }
Select with cast "select": [ { "param": "CAST(resultCode AS String)", "as": "resultCode" } ]
"details" : { "resultCode": "100" }
Select literals "select": [ { "param": "'India'", "as": "country" } ]
"details" : { "country": "India" }
Literal value will be the same for every record.
Select multiple "select": [ { "param": "UCASE(operationType)" "as": "responseType" }, { "param": "CAST(resultCode AS String)", "as": "resultCode" } ]
"details" : { "responseType": "READ", "resultCode": "100" }
Select all from a stream "select": [ { "param": "IMPACT_OBSERVENOTIFY_STREAM.*" } ]
If alias is specified for from.stream, then the alias has to be used here. Example: If alias for IMPACT_OBSERVENOTIFY_STREAM is obNotify, then param here should be: obNotify.*
"details" : { "groupName": "t1", "serialNumber": "123", "serverTime": "1485339813341", "resourcePath": "vehicle/0/batteryVoltage", "value": "12.6", "customAttributes": "{}", "subscriptionId": "e7a570f4-a175-4f2a-8080-d44400065ada" }
Select all from all input streams or tables "select": [ { "param": "*" } ]
"details" : { "groupName": "t1", "serialNumber": "123", "serverTime": "1485339813341", "resourcePath": "vehicle/0/batteryVoltage", "value": "12.6", "customAttributes": "{}", "subscriptionId": "e7a570f4-a175-4f2a-8080-d44400065ada" }
- from
An array of streams or tables which have to be joined and data read from.
Table 7. Filter- From Field Description Stream Stream or Table to select from. At least Stream is mandatory. For the list of default streams, see Default source streams.
For the schema of default streams, refer to Avro Schema section in Appendix F: Messaging schema in Connect Module Deployment guide.
Alias An alternative name for the column. Join This specifies the join operation, The choices are INNER JOIN, LEFT JOIN. - INNER JOIN - This is used to return values that are matching in both streams
- LEFT JOIN - This is used to return values on the joined stream if the conditions match.
On The join condition. Currently, the join expression must be a single column equal comparison. Non-equi joins, or joins on multiple columns are not supported. Example: stream1.serialNumber=stream2.serialNumber Within The time period for the action. The records from two streams which come within this time period will be evaluated for windowed joins. Value can be in days, hours, minutes, seconds or milliseconds. Example: 2 minutes.
Table 8. Default source streams Event type Default streams Notes Lifecycle IMPACT_LIFECYCLE_STREAM Life cycle events as received from the Device. ObserveNotify IMPACT_OBSERVENOTIFY_STREAM Notification of data changes for device resources. Response IMPACT_RESPONSE_STREAM Response to downlink request from IMPACT. DJR IMPACT_DJR_STREAM Device Job Record Monte IMPACT_MONTE_STREAM Network Events from either the Network Exposure Function (NEF) or Service Capability and Exposure Function (SCEF). Sms IMPACT_SMS_STREAM Short Message Delivery Table 9. Examples Single stream to read from "from": [ { "stream": "IMPACT_RESPONSE_STREAM" } ]
Stream with alias "from": [ { "stream": "IMPACT_RESPONSE_STREAM", "alias": "resp" } ]
Two streams joined "from": [ { "stream": "stream1" }, { "stream": "stream2", "join": "INNER JOIN", "on": "stream1.serialNumber=stream2.serialNumber" } ]
Two streams with windowed join "from": [ { "stream": "stream1" }, { "stream": "stream2", "join": "INNER JOIN", "on": "stream1.serialNumber=stream2.serialNumber", "within": "1 minutes" } ]
- where
One or more conditions to filter data with:
Table 10. Filter- where Field Description parameter The name of the parameter present in the source event. Part of simple condition. The field which needs to be compared. condition The conditional operator to be applied in the clause. Part of simple condition. The condition of comparison. value The value against which the parameter condition is applied. Part of simple condition. The value(s) to which the field is compared. and Logical and operator for multiple conditions. An array of conditions which are joined with and. or Logical or operator for multiple conditions. An array of conditions which are joined with or. Note:- Literal string values should be surrounded by single quotes. For the column types, refer to Avro Schema section in Appendix F: Messaging schema in Connect Module Deployment guide.
- The rules automatically add a condition to filter the records belonging to the tenant of the rule. That is, groupName='<tenant>' and query representation of the where filter.
Table 11. Examples Filter type Query representation Single Condition "where": { "parameter": "resultCode", "condition": "=", "value": "200" }
resultCode = 200
And Condition "where": { "and": [ { "parameter": "resultCode", "condition": "=", "value": "200" }, { "parameter": "operationType", "condition": "=", "value": "'read'" } ] }
(resultCode = 200 AND
operationType = 'read')
Or Condition "where": { "or": [ { "parameter": "operationType", "condition": "=", "value": "'write'" }, { "parameter": "operationType", "condition": "=", "value": "'read'" } ] }
(operationType = 'write' OR
operationType = 'read')
Nested Condition "where": { "and": [ { "parameter": "resultCode", "condition": "=", "value": "200" }, { "or": [ { "parameter": "operationType", "condition": "=", "value": "'read'" }, { "parameter": "operationType", "condition": "=", "value": "'write'" } ] } ] }
(resultCode = 200 AND (operationType = 'read' OR operationType = 'write'))
- metric
Metric is the column for aggregated rules. As it is used in aggregation rules, which are windowed queries, the metric is also used to group the records. It can have scalar functions applied on them. For example
CONCAT_WS(', ', serialNumber, resourcePath)
.The metric field is of string datatype. Hence, if the metric selected is of another datatype, an explicit cast to string must be provided. If more than one metric field is required they need to be combined into a single field.
Table 12. Examples Simple metric "metric": "serialNumber"
Metric with scalar function "metric": "CONCAT_WS(', ', serialNumber, resourcePath)"
- aggregateFunction
The aggregated value column. This should include an aggregation function applied to the column. This field is of the double datatype.
Table 13. Example Aggregate "aggregateFunction": "AVG(value)"
For more details, refer to https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/aggregate-functions/.
- window
The window function to apply for aggregation rules. Users can define the window size for each of the rules.
Table 14. Filter- window Field Window function The window function to apply. The window types can be: Window type Behavior Description Tumbling Window Time-based Fixed-duration, non-overlapping, gap-less windows Hopping Window Time-based Fixed-duration, overlapping windows Session Window Session-based Dynamically-sized, non-overlapping, data-driven windows sizeInSec The time interval in seconds for the size of the window. This field is mandatory for all three types of window functions. advanceInSec The time interval in seconds by which a HOPPING window has to be advanced. Table 15. Examples Hopping Window "window": { "function": "HOPPING", "sizeInSec": "10", "advanceInSec": "5" }
Tumbling Window "window": { "function": "TUMBLING", "sizeInSec": "5" }
Session Window "window": { "function": "SESSION", "sizeInSec": "5" }
- having
This is an optional parameter that can be specified to filter out the aggregated data. A conditional expression has to be specified for this, similar to the where clause.
Table 16. Example Having clause Query representation Single Condition "having": { "parameter": "count(*)", "condition": ">", "value": "5" }
HAVING count(*) > 5
Intelligent Data Store provides additional functions as UDFs, which can be used while creating rules. Refer to Custom Functions.