Change Data Capture (CDC) is a design pattern to only track the changes in data. There are several ways this could be implemented. In IoT solutions, the Change Data Capture (CDC) pattern is used, to only send changes from the device to the cloud. But how to deal with this data in Azure Data Explorer for further analysis?

In my last article, we looked at an IoT implementation with Change Data Capture (CDC), and how to deal with it within Azure Data Explorer by saving snapshots of the calculated values with an Azure Function. In this article, we investigate a somewhat different approach by using Kusto update policy.

This is article is a part of a series about Change Data Capture in Azure Data Explorer:

Implementation

For those who did not read part 1, I will shortly explain the implementation.

In many IoT solutions, we see an implementation where the cloud only receives a value when it is changed. In the screenshot below, I added an example of a dataset where Change Data Capture is used.

Even though not all values are sent to the cloud every second, they have a value (the last value sent). So we have to interpret the data differently then how we receive it. Below, another screenshot on how the data (the data is filtered by key = ‘model’) should be interpreted. We didn’t receive a value for the model at 23:00:01.000, but it is still the same as the last value we received, and it will not change until we receive another value (in this example at 23:00:05).

Solution part 1

In part one of this series, we discussed several ways to fill forward the data, and we build an Azure Function which saved the last record of each day to a SavedTelemetry table.

This table is then used to query the data and prevent any gaps.

If you want to know more, please check part one of these series.

Another solution

When discussing my latest blog with Sander van de Velde we came up with another solution to solve this problem. Why spend money on an Azure Function when you already have update policies in Azure Data Explorer. So let’s investigate this.

As you might know, the Kusto function in update policies will only query new (ingested) data. Let me explain this with a simple example. I created a simple update policy on table Telemetry:

.create-or-alter function UpdatePolicy()  
{
    Telemetry        
}

When inserting data to the table for the first time, it will return the first row. When inserting the second record, the function will only return the second row (and not both rows).

So we need to be aware of this, when building this new solution.

Elaboration

The idea we had, was to build a running window. So we save the calculated values in a different table and when there is a new record ingested, we add a new record to that table with the updated values. So, every record is stored in the calculation table. Let’s take a look how to implement this.

Implementation

We start with creating a table for storing the calculated values:

.create table SavedCalculatedTelemetry (deviceId: string, timeStamp: datetime, lokatie: long, model: string, ['status']: string, gps: dynamic) 

Now we create a function which combines the latest record of the SavedCalculatedTelemetry and the ingested record of Telemetry.

.create-or-alter function UpdatePolicy()  
{
    Telemetry    
    | extend type_value_bag = bag_pack(key, value)
    //Added an OutputSchema so all the columns are avaiable and in the right format
    | evaluate bag_unpack(type_value_bag) : (deviceId:string, timeStamp:datetime, gps:dynamic, lokatie:long, model:dynamic, status:dynamic)
    //Union the saved caluculated telemetry, so it can be used to fill forward
    | union (   SavedCalculatedTelemetry
                | join kind=inner Telemetry on $left.deviceId==$right.deviceId
                | top 1 by timeStamp
            )
    | order by deviceId, timeStamp asc 
    //fill forward
    | scan declare (gps_filled: dynamic=dynamic(null),lokatie_filled: long=long(null),model_filled: dynamic=dynamic(null),status_filled: dynamic=dynamic(null)) with 
    (
        step s1: true => gps_filled =  iff(isempty(gps), iff(deviceId == s1.deviceId,s1.gps_filled,dynamic(null)), gps),
                        lokatie_filled = iff(isempty(lokatie), iff(deviceId == s1.deviceId,s1.lokatie_filled,long(null)), lokatie),
                        model_filled =  iff(isempty(model), iff(deviceId == s1.deviceId,s1.model_filled,dynamic(null)), model),
                        status_filled =  iff(isempty(status), iff(deviceId == s1.deviceId,s1.status_filled,dynamic(null)), status);
    )
    | project deviceId, timeStamp, lokatie = lokatie_filled, model = model_filled, status = status_filled, gps = gps_filled
    //Only use the latest record
    | top 1 by timeStamp
}

NOTE: This function can only handle telemetry when it is send in sequence. This can be solved by changing the function. But for this short demonstration this is sufficient.

Last, we need to assign the update policy to the table:

.alter table SavedCalculatedTelemetry policy update 
@'[{ "IsEnabled": true, "Source": "Telemetry", "Query": "UpdatePolicy()", "IsTransactional": true, "PropagateIngestionProperties": false}]'

Results

Let’s look at the implementation and results when both tables (Telemetry and SavedCalculatedTelemetry) are empty. We start by manually ingesting a record to Telemetry:

.ingest inline into table Telemetry <|
    device1,datetime(2023-10-31T08:08:40),status,Started

Which results in:

Now we add a second record:

.ingest inline into table Telemetry <|
    device1,datetime(2023-10-31T08:08:50Z),model,carx

Which result in:

After some more data ingests, it will result in a table which a user can understand:

The advantage of this solution is that the data doesn’t have to be calculated when querying. By moving the calculation to the ingestion process, querying the data will be faster.

Existing data

The previous example was very easy because there wasn’t any data in both tables. But what if the Telemetry table is already filled? Then, we need to ensure that the calculations are inserted into SavedCalculatedTelemetry in advance.

To demonstrate this situation, I empty the table:

.clear table SavedCalculatedTelemetry data

Then, I created a Kusto query to calculate the values and insert them into the SavedCalculatedTelemetry table:

.append SavedCalculatedTelemetry <|
Telemetry    
    | extend type_value_bag = bag_pack(key, value)
    //Added an OutputSchema so all the columns are avaiable and in the right format
    | evaluate bag_unpack(type_value_bag) : (deviceId:string, timeStamp:datetime, gps:dynamic, lokatie:long, model:dynamic, status:dynamic)
    | order by deviceId, timeStamp asc 
    //fill forward
    | scan declare (gps_filled: dynamic=dynamic(null),lokatie_filled: long=long(null),model_filled: dynamic=dynamic(null),status_filled: dynamic=dynamic(null)) with 
    (
        step s1: true => gps_filled =  iff(isempty(gps), iff(deviceId == s1.deviceId,s1.gps_filled,dynamic(null)), gps),
                        lokatie_filled = iff(isempty(lokatie), iff(deviceId == s1.deviceId,s1.lokatie_filled,long(null)), lokatie),
                        model_filled =  iff(isempty(model), iff(deviceId == s1.deviceId,s1.model_filled,dynamic(null)), model),
                        status_filled =  iff(isempty(status), iff(deviceId == s1.deviceId,s1.status_filled,dynamic(null)), status);
    )
    | project deviceId, timeStamp, lokatie = lokatie_filled, model = model_filled, status = status_filled, gps = gps_filled

Now, we can insert a record again:

.ingest inline into table Telemetry <|
    device1,datetime(2023-10-31T08:09:20Z),model,carx

Which resulted the new record added at the end:

Conclusion

In this article, we looked at an IoT implementation with Change Data Capture (CDC) and take another approach on how to deal with this kind of data in Azure Data Explorer. By using Kusto update policies, we created a rolling window by saving calculated values. By moving the calculation to the ingest process, the performance of querying the data will improve.