Change Data Capture (CDC) is a design pattern to only track the changes in data. There are several ways this could be implemented. In IoT solutions, the Change Data Capture (CDC) pattern is used, to only send changes from the device to the cloud. But how to deal with this data in Azure Data Explorer for further analysis?
In this article, we look at an IoT implementation with Change Data Capture (CDC), and how to deal with it within Azure Data Explorer.
This is article is a part of a series about Change Data Capture in Azure Data Explorer:
- How to deal with Change Data Capture in Azure Data Explorer
- How to deal with Change Data Capture in Azure Data Explorer – Part 2
Implementation
In many IoT solutions, we see an implementation where the cloud only receives a value when it is changed. In the screenshot below, I added an example of a dataset where Change Data Capture is used.
Even though not all values are sent to the cloud every second, they have a value (the last value sent). So we have to interpret the data differently then how we receive it. Below, another screenshot on how the data (the data is filtered by key = ‘model’) should be interpreted. We didn’t receive a value for the model at 23:00:01.000, but it is still the same as the last value we received, and it will not change until we receive another value (in this example at 23:00:05).
Different approaches
To deal with CDC data, there are different approaches possible within Azure Data Explorer. The first two approaches that popup-ed were:
- Use of subsets and joins
- Use of series_fill_forward
- Use of scan operator ** Update 10-11-2023**
To investigate the approaches, we need data. So, I created a table Telemetry within a Free Azure Data Explorer cluster.
.create table Telemetry (deviceId: string, timeStamp: datetime, ['key']: string, value: dynamic)
Next, I created a C# application which generated sample data for two devices for every second, starting the first of January 2023 till now. The code I created was a bit sloppy and not blog worthy, so no example code here. But I used the sample app created on the Azure Data Explorer portal as a base.
If you want to create your own application, the Azure Data Explorer portal helps you with a sample app (C#, Jave, Node.js or Python). The wizard which follows is self-explanatory, but if you need help, check this article.
So, after executing the app, I have a table with a lot of sample data. To be more precise, around 51 million records:
Now we have data, let’s investigate the approaches, starting with a small dataset. In the approaches, we select the first 100 records of device1. We unpivot the key-value pairs to columns with the operators bag_pack and bag_unpack:
Telemetry | top 100 by timeStamp asc | where deviceId == "device1" | extend type_value_bag = bag_pack(key, value) | evaluate bag_unpack(type_value_bag) | order by deviceId, timeStamp asc
Which (in my scenario) results in:
Subsets and joins
When using the subsets and joins approach, we need to determine which records belong together. I used row_rank_dense for this. The row rank starts by default at 1
for the first row, and is incremented by 1
whenever the value changes.
Telemetry | top 100 by timeStamp asc | where deviceId == "device1" | extend type_value_bag = bag_pack(key, value) | evaluate bag_unpack(type_value_bag) | order by deviceId, timeStamp asc | extend gps = tostring(gps) // gps cannot be converted implicit | extend RankL = row_rank_dense(lokatie), RankM = row_rank_dense(model), RankS = row_rank_dense(status), RankG = row_rank_dense(gps)
Result:
Next, I used iif to select the previous value if the current value is empty.
Telemetry | top 100 by timeStamp asc | where deviceId == "device1" | extend type_value_bag = bag_pack(key, value) | evaluate bag_unpack(type_value_bag) | order by deviceId, timeStamp asc | extend gps = tostring(gps) // gps cannot be converted implicit | extend RankL = row_rank_dense(lokatie), RankM = row_rank_dense(model), RankS = row_rank_dense(status), RankG = row_rank_dense(gps) // If the value of the column is empty than we should use the previous value | extend RankL = iif(isempty(lokatie),RankL - 1 , RankL), RankM = iif(isempty(model),RankM - 1 , RankM), RankS = iif(isempty(status),RankS - 1 , RankS), RankG = iif(isempty(gps),RankG - 1 , RankG)
This resulted in the screenshot below. I highlighted the first couple of records of model, to show you that these records belong together based on the RankM column.
The next step is to create a subset per column and join them all together.
let MyTelemetry = //materialize the dataset for performance materialize( Telemetry | top 100 by timeStamp asc | where deviceId == "device1" | extend type_value_bag = bag_pack(key, value) | evaluate bag_unpack(type_value_bag) | order by deviceId, timeStamp asc | extend gps = tostring(gps) // gps cannot be converted implicit | extend RankL = row_rank_dense(lokatie), RankM = row_rank_dense(model), RankS = row_rank_dense(status), RankG = row_rank_dense(gps) | extend RankL = iif(isempty(lokatie),RankL - 1 , RankL), RankM = iif(isempty(model),RankM - 1 , RankM), RankS = iif(isempty(status),RankS - 1 , RankS), RankG = iif(isempty(gps),RankG - 1 , RankG) ) ; let telL = MyTelemetry | where isnotempty(lokatie) | distinct deviceId, RankL, lokatie ; let telM = MyTelemetry | where isnotempty(model) | distinct deviceId, RankM, model ; let telS = MyTelemetry | where isnotempty(status) | distinct deviceId, RankS, status ; let telG = MyTelemetry | where isnotempty(gps) | distinct deviceId, RankG, gps ; // MyTelemetry | project deviceId, timeStamp,RankL,RankM,RankS,RankG | lookup kind = leftouter telL on deviceId,RankL | lookup kind = leftouter telM on deviceId,RankM | lookup kind = leftouter telS on deviceId,RankS | lookup kind = leftouter telG on deviceId,RankG | project deviceId, timeStamp, lokatie, model, status, gps | order by timeStamp asc
Which result in a dataset that a user better understands:
(the column status looks empty, because the first value of status is received at 23:00:23)
Ok, the subsets and joins approach works, but how about the series_fill_forward approach.
Series_fill_forward
The series_fill_forward sounds like the perfect operator for our problem. But there is a downside to this operator, it only works on series (an array of numeric values) and we have all kind of values in our dataset.
So I first created a dataset with mapping data.
let MyTelemetry = //materialize the dataset for performance materialize( Telemetry | top 100 by timeStamp asc | where deviceId == "device1" ); MyTelemetry | distinct description = tostring(value) | serialize | extend id = row_number()
This dataset consists of every unique value with a unique id:
Next, we need to join the mapping dataset to the original dataset. So, we can extend the original dataset with the unique id of the mapping dataset.
let MyTelemetry = //materialize the dataset for performance materialize( Telemetry | top 100 by timeStamp asc | where deviceId == "device1" ); //create a mapping table let mapping = //materialize the dataset for performance materialize( MyTelemetry | distinct description = tostring(value) | serialize | extend id = row_number()); // MyTelemetry | extend mappingvalue = tostring(value) | join kind=leftouter mapping on $left.mappingvalue == $right.description | extend value = iif(isnotempty(id), id, value) | extend type_value_bag = bag_pack(key, value) | evaluate bag_unpack(type_value_bag) | order by deviceId, timeStamp asc
This results in the dataset:
We now can use series_fill_forward to forward the values of each column, or at least that was what I thought. Nope! It is necessary to first make-series of the columns before using series_fill_forward:
let MyTelemetry = //materialize the dataset for performance materialize( Telemetry | top 100 by timeStamp asc | where deviceId == "device1" ); //create a mapping table let mapping = //materialize the dataset for performance materialize( MyTelemetry | distinct description = tostring(value) | serialize | extend id = row_number()); // MyTelemetry | extend mappingvalue = tostring(value) | join kind=leftouter mapping on $left.mappingvalue == $right.description | extend value = iif(isnotempty(id), id, value) | extend type_value_bag = bag_pack(key, value) | evaluate bag_unpack(type_value_bag) | order by deviceId,timeStamp asc | make-series lokaties = any(lokatie) default=int(null) , models = any(model) default=int(null) , statuss = any(status) default=int(null) , gps = any(gps) default=int(null) on timeStamp step 1s by deviceId | extend lokaties = series_fill_forward(lokaties), models = series_fill_forward(models), statuses= series_fill_forward(statuses), gps = series_fill_forward(gps)
This created a series of each column:
Be aware when using make-series
In this example, we use make-series with a step size of one second, and we take any value (any aggregation) within that second. Because I receive data every second and take any value within a second, this is not a problem.
But what if you receive data more frequent than the selected step size (for example, 5 records every second). It will randomly pick any of the 5 records within that second, and there is not an aggregation to only select the last value received.
On the other hand, what if you receive data less frequent than the selected step size, for example you receive a record every 5 seconds. Make-series will now generate a step for every second, even if there isn’t any original record. Because we use series_fill_forward this wont have any effect on the values in the dataset, but the dataset will become five times as big (record for every second instead of every five seconds).
So be aware of this!
Continue with the dataset
To make the dataset (screenshot above with the series) more suitable for users, I expanded the arrays with the mv-expand operator. Next, I rejoined the mapping datasets to restore the original value instead of the unique id.
let MyTelemetry = //materialize the dataset for performance materialize( Telemetry | top 100 by timeStamp asc | where deviceId == "device1" ); //create a mapping table let mapping = //materialize the dataset for performance materialize( MyTelemetry | distinct description = tostring(value) | serialize | extend id = row_number()); // MyTelemetry | extend mappingvalue = tostring(value) | join kind=leftouter mapping on $left.mappingvalue == $right.description | extend value = iif(isnotempty(id), id, value) | extend type_value_bag = bag_pack(key, value) | evaluate bag_unpack(type_value_bag) | order by deviceId,timeStamp asc | make-series lokaties = any(lokatie) default=int(null) , models = any(model) default=int(null) , statuses = any(status) default=int(null) , gps = any(gps) default=int(null) on timeStamp step 1s by deviceId | extend lokaties = series_fill_forward(lokaties), models = series_fill_forward(models), statuses= series_fill_forward(statuses), gps = series_fill_forward(gps) | mv-expand timeStamp to typeof(datetime), lokaties to typeof(int), models to typeof(int), statuses to typeof(int), gps to typeof(int) | join kind=leftouter mapping on $left.lokaties == $right.id | join kind=leftouter mapping on $left.models == $right.id | join kind=leftouter mapping on $left.statuses== $right.id | join kind=leftouter mapping on $left.gps == $right.id | project deviceId, timeStamp, lokaties = description, models = description1, statuses = description2, gps = description3 | order by deviceId asc, timeStamp asc
This resulted in the same dataset as of the other approach:
Scan operator ** Update 10-11-2023**
After a moment of contact with the Azure Data Explorer team they pointed out the scan operator. With this operator you can build sequences based on predicates. And this is the most simpel solution to fill forward the values.
In our situation, we only need one step, but it is possible to build several steps in sequence.
Telemetry | top 100 by timeStamp asc | where deviceId == "device1" | extend type_value_bag = bag_pack(key, value) | evaluate bag_unpack(type_value_bag) | order by deviceId, timeStamp asc | scan declare (gps_filled: string="",lokatie_filled: long=long(null),model_filled: string="",status_filled: string="") with ( step s1: true => gps_filled = iff(isempty(gps), s1.gps_filled, gps), lokatie_filled = iff(isempty(lokatie), s1.lokatie_filled, lokatie), model_filled = iff(isempty(model), s1.model_filled, model), status_filled = iff(isempty(status), s1.status_filled, status); ) | project deviceId, timeStamp, lokatie = lokatie_filled, model = model_filled, status = status_filled, gps = gps_filled
This resulted again in the same dataset as of the other approach:
Ok, all approaches work, but the scan operator has the best performance and was the most simple query. In the code examples above, I only select 100 records, but also with larger datasets the performance of the scan operator was better.
Which approach to use, depends on the situation. But because of the performance and ease of use, my personal preference is the scan operator approach.
Point in time
There is still one thing that bothers me, when selecting a specific point in time the first couple of records are empty. This applies to all approaches, but to illustrate this issue better, I used the subset and joins query. Take for example this code, where I only wanted to see date 2023-01-27:
let MyTelemetry = //materialize the dataset for performance materialize( Telemetry | where timeStamp between (todatetime('2023-01-27') .. todatetime('2023-01-28')) | where deviceId == "device1" | extend type_value_bag = bag_pack(key, value) | evaluate bag_unpack(type_value_bag) | order by deviceId, timeStamp asc | extend gps = tostring(gps) // gps cannot be converted implicit | extend RankL = row_rank_dense(lokatie), RankM = row_rank_dense(model), RankS = row_rank_dense(status), RankG = row_rank_dense(gps) | extend RankL = iif(isempty(lokatie),RankL - 1 , RankL), RankM = iif(isempty(model),RankM - 1 , RankM), RankS = iif(isempty(status),RankS - 1 , RankS), RankG = iif(isempty(gps),RankG - 1 , RankG) ) ; let telL = MyTelemetry | where isnotempty(lokatie) | distinct deviceId, RankL, lokatie ; let telM = MyTelemetry | where isnotempty(model) | distinct deviceId, RankM, model ; let telS = MyTelemetry | where isnotempty(status) | distinct deviceId, RankS, status ; let telG = MyTelemetry | where isnotempty(gps) | distinct deviceId, RankG, gps ; // MyTelemetry | project deviceId, timeStamp,RankL,RankM,RankS,RankG | lookup kind = leftouter telL on deviceId,RankL | lookup kind = leftouter telM on deviceId,RankM | lookup kind = leftouter telS on deviceId,RankS | lookup kind = leftouter telG on deviceId,RankG | project deviceId, timeStamp, lokatie, model, status, gps | order by timeStamp asc
Look at the first record of model, it is empty.
But I know for sure that there should be a value. The original dataset also confirms this:
So, what is happening here?
In this example, the highlighted record was excluded by the where filter. So it can not be used to calculate the value for 2023-01-27 00:00:00. Which results in an empty value in the first record (or in the first couple of records, depending on when the value changed).
My first thought was, just move the where filter to the end of the query. Then, the MyTelemetry subset will calculate with all the values and we don’t have the problem anymore.
But unfortunately, due to insufficient memory, the query can not be executed. The error might be caused because I’m using the free Azure Data Explorer cluster. Nevertheless, the subset of MyTelemetry can become quite big and could cause performance issues.
My next thought was to just subtract one day off the starting period. In our example, just select all records from 2023-01-26. This might solve the problem in many cases, but not in all. Because what if a value is only sent on the first day of every month. Then the value in all the records would still be empty.
My solution
To solve this problem, we have to combine a little bit of both. We have to calculate with a large enough dataset where all the values are available. But not too large to cause any errors or performance issues.
After a discussion with Sander van de Velde, we came up with a solution to this problem, by adding records, which are already calculated. What I mean by that, why not save the last record of a curtain period. In this scenario, every day should be a good solution, for example the last record of 2023-01-26 (as highlighted in the screenshot below). Depending on your situation, the period on which you save data might differ.
Before I start building this solution, I checked if it is working. So, I created a new table and inserted the last record of 2023-01-26:
.create table SavedTelemetry (deviceId: string, timeStamp: datetime, lokatie: long, model: string, ['status']: string, gps: dynamic) .ingest inline into table SavedTelemetry <| device1,datetime(2023-01-26T23:59:59Z),38,"carx","running","[49,15]"
Now I changed the original query, so that it will use this new table.
let MyTelemetry = //materialize the dataset for performance materialize( Telemetry | where timeStamp between (todatetime('2023-01-27') .. todatetime('2023-01-28')) | where deviceId == "device1" | extend type_value_bag = bag_pack(key, value) | evaluate bag_unpack(type_value_bag) //Join the saved telemetry with the original set | union (SavedTelemetry | where timeStamp between (todatetime('2023-01-26') .. todatetime('2023-01-27')) | order by deviceId, timeStamp asc | extend gps = tostring(gps) // gps cannot be converted implicit | extend RankL = row_rank_dense(lokatie), RankM = row_rank_dense(model), RankS = row_rank_dense(status), RankG = row_rank_dense(gps) | extend RankL = iif(isempty(lokatie),RankL - 1 , RankL), RankM = iif(isempty(model),RankM - 1 , RankM), RankS = iif(isempty(status),RankS - 1 , RankS), RankG = iif(isempty(gps),RankG - 1 , RankG) ) ; let telL = MyTelemetry | where isnotempty(lokatie) | distinct deviceId, RankL, lokatie ; let telM = MyTelemetry | where isnotempty(model) | distinct deviceId, RankM, model ; let telS = MyTelemetry | where isnotempty(status) | distinct deviceId, RankS, status ; let telG = MyTelemetry | where isnotempty(gps) | distinct deviceId, RankG, gps ; // MyTelemetry | project deviceId, timeStamp,RankL,RankM,RankS,RankG | lookup kind = leftouter telL on deviceId,RankL | lookup kind = leftouter telM on deviceId,RankM | lookup kind = leftouter telS on deviceId,RankS | lookup kind = leftouter telG on deviceId,RankG | project deviceId, timeStamp, lokatie, model, status, gps | order by timeStamp asc
Which resulted in:
As you can see, the first record contains all the values. So, the proposed solutions worked, but now find a way to integrate this in the IoT solution.
Integration
To fill the SavedTelemetry table, there are several possibilities, like:
- let the device generate the calculated values
- create an Azure Function with a trigger that calculates the data
Because you do not always have control over the device, I created an Azure Function. The function triggers every day around midnight and will calculate the values of the day before. The repository can be found on GitHub.
For convenience, I converted the Kusto query into a function so that it can be easily called via the Azure Function. As you may have noticed, I also added two parameters, the start- and enddate.
.create-or-alter function Fill_forward(_startdate:datetime,_enddate:datetime) { let MyTelemetry = //materialize the dataset for performance materialize( Telemetry | where timeStamp between (_startdate .. _enddate) | extend type_value_bag = bag_pack(key, value) | evaluate bag_unpack(type_value_bag) //Join the saved telemetry with the original set | union (SavedTelemetry | where timeStamp between (datetime_add('day',-1,_startdate) .. datetime_add('second',-1,_startdate)) ) | order by deviceId, timeStamp asc | extend gps = tostring(gps) // gps cannot be converted implicit | extend RankL = row_rank_dense(lokatie), RankM = row_rank_dense(model), RankS = row_rank_dense(status), RankG = row_rank_dense(gps) | extend RankL = iif(isempty(lokatie),RankL - 1 , RankL), RankM = iif(isempty(model),RankM - 1 , RankM), RankS = iif(isempty(status),RankS - 1 , RankS), RankG = iif(isempty(gps),RankG - 1 , RankG) ) ; let telL = MyTelemetry | where isnotempty(lokatie) | distinct deviceId, RankL, lokatie ; let telM = MyTelemetry | where isnotempty(model) | distinct deviceId, RankM, model ; let telS = MyTelemetry | where isnotempty(status) | distinct deviceId, RankS, status ; let telG = MyTelemetry | where isnotempty(gps) | distinct deviceId, RankG, gps ; // MyTelemetry | project deviceId, timeStamp,RankL,RankM,RankS,RankG | lookup kind = leftouter telL on deviceId,RankL | lookup kind = leftouter telM on deviceId,RankM | lookup kind = leftouter telS on deviceId,RankS | lookup kind = leftouter telG on deviceId,RankG | project deviceId, timeStamp, lokatie, model, status, gps | order by deviceId, timeStamp asc }
We now have a solution that regularly stores the calculated values and a Kusto function that retrieves the Change Data Captue (CDC) data on a way that users will understand.
Conclusion
In this article, we looked at an IoT implementation with Change Data Capture (CDC) and how to deal with this kind of data in Azure Data Explorer. It is possible to create a dataset that users will better understand by using the bag_pack and bag_unpack to unpivot the data, and the scan operator to fill forward values. But you have to add calculated values to really build a good and complete solution.
Don’t forget to take a look at part 2: How to deal with Change Data Capture in Azure Data Explorer – Part 2
Leave a Reply