Trill 103: Ingress, Egress, and Trill’s notion of time
WRITTEN BY
/en-us/opensource/blog/author/james-terwilliger
Congratulations! You’ve made it to the next installment of our overview of Trill, Microsoft’s open source streaming data engine. As noted in our previous posts about basic queries and joins, Trill is a temporal query processor. Trill works with data that has some intrinsic notion of time. However, Trill doesn’t assign any semantics to that notion of time other than it is some value that is always generally increasing. So, what could qualify as “time” to Trill? It could be just about anything:
- Event time, or a time that is naturally associated with each event, such as the time a sensor reading is taken
- System time, or the time when an event arrives at the server or event queue
- Processing time, or the time when an event arrives at the given processing node running Trill
- Incremental time, or just a value that increments for each event seen
No matter where your “time” concept comes from, Trill will work for it. All Trill needs is for it to be a long-valued field and Trill is ready to go.
Tell me what to do
There are several ways to tell Trill what the time component of the data is. The most explicit way to do it is using StreamEvent objects that attach to every event a lifespan. StreamEvent objects can be created using static factory methods on the StreamEvent class. For instance, this method creates an event with a set interval:
StreamEvent objects, getting data into Trill is straightforward using the method ToStreamable:
Or just give me a hint
If you don’t want to go through the extra step of creating StreamEvent objects, there are a few ways to bring data directly into Trill while also telling it how to reason about time. How you do it depends on if your data already has a time component, or otherwise if you want Trill to assign time to your data. Let’s look at one example, given just an observable of data values:
Breaking this example down, note first that the result of the method call is the same IStreamable type as with the StreamEvent example. The only difference is that instead of assigning time in the observable, we are assigning it in the ToStreamable call. The method name has changed to ToTemporalStreamable because the input data has a natural time component called “Time”. This method is the logical equivalent of wrapping each element in the observable with a StreamEvent.CreateStart call beforehand, but without allocating the StreamEvent objects.
What if instead of a single time field in TPayload there are two temporal fields, indicating start and end time? There is an overload for that as well, with lambdas for identifying the start and end fields:
Order! Order!
While the section above demonstrates how to assign time to events in Trill, there is one more wrinkle that must be dealt with and must be done at ingress. Trill’s query engine is an in-order data processor. It is incredibly efficient in memory usage with high throughput, but many of the tricks it uses internally rely on data being presorted, such as the following:
This solution is not exactly relevant for most online streaming scenarios. However, if your temporal data has been stored to disk and has been sorted by time (or at least should have been), it’s a good safety check to have. It’s also a good option if the time associated with each event is assigned at ingress time.
Another option is to ignore any data that arrives out of order:
Patience is a virtue
The three options presented in the previous section work on their own for some scenarios but leave many scenarios unsupported. Far more often than not, the prospect of dropping data or adjusting timestamps is, put lightly, a major no-no. To cover more cases of disordered real-world streaming data, Trill offers two additional features that address common patterns.
Firstly, Trill allows the query author to specify a lag allowance, a fixed period that Trill buffers incoming data and sorts it on timestamp. Think of the lag allowance as a traveling window of time within which data is held back and sorted. Trill uses a sorting algorithm that is very fast in most cases but is especially fast in scenarios where the data is “almost sorted”, which is a great way to characterize incoming stream data. Namely, in nearly all real-world cases, as streaming data arrives the high watermark for time mostly moves forward. There are fluctuations and bits of disorder, yes, but generally speaking, time moves in one direction as opposed to data arriving at random times.
Consider again the example of ingress data from the previous section, but this time with a lag allowance of time L:
When the first event arrives, it is not immediately processed. Instead, it is held in a buffer, and a window of time L is established. When the second event arrives, it is more than time L ahead of the first point. The first event leaves the buffer and gets processed, while the second event now gets buffered and sets a new window.
Now, the first out-of-order point arrives. It trails in time from the second point, but it is within the sorting window. It is added to the buffer, timestamp unchanged, in sorted order:
Going their separate ways
Trill’s second trick up its sleeve for handling disordered input is especially suited for an Internet-of-Things (IoT) environment. A common scenario in IoT settings is for the clocks on each device to not be entirely in sync. Devices may join and leave the network, either purposefully or from network partitioning, at any time. Skew between device times is not only common but expected.
Also common in IoT settings are queries that are run per device as opposed to over the entire network. If the user wants, for instance, to know the maximum temperature reading per hour per device, we absolutely do not want any data from one sensor to be adjusted relative to another. If anything, we want each device in the network to essentially have its own timeline independent of one another:
In the example above, the data does not arrive in a single global order. However, notice that the data does arrive in temporal order for each device.
Trill allows this scenario using a feature called partitioned streams. Each partition is allowed its own timeline that has its own high watermark. The disorder policies are only applied per device if needed at all. Then Trill can compute the query over each device without needing a global order.
There are two ways to enable this feature in Trill. The more explicit way to do so is to create PartitionedStreamEvent objects instead of StreamEvent objects:
Parting is such sweet sorrow
So now data has been brought into Trill. Then using guidance from our other blog posts has been processed using your amazing query. It’s now time to get data back out of Trill and into an observable. Without having to worry about ordering, egressing data is a much simpler prospect. We already saw an example of egress in an earlier example:
Let’s break this example down:
- The ToTemporalObservable method says that we want to create an observable output, and that we may want to peek at the temporal information when we do.
The lambda expression parameter in bold describes how to construct results for the observable. In this case we are dropping the time information because the example was about giving partial results of an average and the time component is less relevant, but we could have just as easily done something like this:
Class is almost dismissed
We are always interested in identifying common patterns of time disorder that can be resolved at ingress. For instance, one pattern we have noticed but not yet implemented is future-outliers:
A data point may arrive seemingly so far out of order that it must be an error. For instance, what if a data point arrives that claims to be three years beyond the most recently seen timestamp? Such a situation could be caused by a network partition closing, or a new device coming online with a defective clock, or any number of other reasons. This pattern, among others, could be recognized at ingress time and dealt with in a user-defined way. Identifying and defining these patterns is a small task that is an excellent entry point for anyone who is interested in contributing to Trill.
In the meantime, our code can be found here, and our amazing samples can give you a good head start on programming over Trill. Reach out to us anytime if there’s something we can do to help you on your way. We hope to see you next time with our last entry in the Introduction to Trill blog series when we deep-dive into aggregation and pattern matching, possibly the most powerful feature Trill offers.