Introducing Hyperlight: Virtual machine-based security for functions at scale
The Microsoft Azure Core Upstream team is excited to announce the Hyperlight…
This is part 2 of 2-post series that shows you how to use Trill, an open source .NET library designed to process one trillion events a day, for impression feedback.
In part 1, we walked through how to write Trill queries to find out: 1) which impressions successfully joined to the feedback stream and 2) which impressions never joined to the feedback stream (expired). Today, we’ll walk through a final step — how to find out impressions immediately upon arrival, whether joined with feedback or not.
(Note: step 2 and 3 were covered in part 1, which you can find here.)
Compared to the previous case, this one is easy – an antijoin sounds perfect:
internal static IStreamable JoinFeedbackToImpressions( this IStreamable impressions, IStreamable feedback) { // set the lifetime of the input streams impressions = impressions.SetDuration(TimeSpan.FromMinutes(40)); feedback = feedback .SetDuration(TimeSpan.FromMinutes(30)); // produce a stream of successful joins IStreamable joined = impressions .Join( feedback, imp => imp.RGUID, fdbk => fdbk.RGUID, (imp, fdbk) => new ImpressionsWithFeedback(imp, fdbk)) .ToPointStream() ; // produce a stream of "expired" point events that coincide with the end time // of the impression stream, and suppress those which had a successful join IStreamable expired = impressions .PointAtEnd() .LeftAntiJoin( joined.SetDuration(TimeSpan.FromMinutes(40)), l => l.RGUID, r => r.RGUID) .Select(imp => new ImpressionsWithFeedback(imp, null)) ; + + // produce a stream of failed joins + IStreamable unjoined = impressions + .LeftAntiJoin( + feedback, + imp => imp.RGUID, + fdbk => fdbk.RGUID) + .Select(imp, => new ImpressionsWithFeedback(imp, null)) + ; - // union together the two streams and return the result - return joined.Union(expired); + // union together the three streams and return the result + return joined.Union(unjoined).Union(expired); }
Great! So now we’re, done, right?
Of course not. (What fun would that be?) Let’s again look at what we are now outputting.
There are three of problems with the above solution:
We need some way to signal to our downstream operators whether a given event should contribute to statistics and whether it is done receiving feedback. Let’s consider each case separately.
First, let’s examine the case of contributing to statistics. Remember that the requirement for statistics is that an impression must be counted exactly once, and it must be counted when it arrives. Although this doesn’t sound too bad, we can express this requirement even more simply: an impression may only be counted if it’s sync time upon output is equal to its sync time in the input.
This is much easier – all that we need is the ability to get an event’s sync time. Luckily, there is an operator which does exactly that – an overload of Select which accepts a delegate that performs a transformation on the payload and sync time of each event in a stream.
Now let’s consider the second problem, whether an impression should be considered done. An impression must be output as soon as it receives feedback, or once it expires if feedback never arrives. This case is more straightforward than the previous one since each of the internal streams is either always done or always not done. In this case, we can just pass true or false to the ImpressionsWithFeedback constructor to differentiate.
Finally, the third case (extra tail antijoin events) can be addressed by filtering out events from the unjoined stream whose final sync time does not match its original sync time, very similar to how ContributeToStats is computed.
After making these changes, let’s see how our query looks.
internal static IStreamable JoinFeedbackToImpressions( this IStreamable impressions, IStreamable feedback) { - // set the lifetime of the input streams - impressions = impressions.SetDuration(TimeSpan.FromMinutes(40)); + // set the lifetime of the input streams, record the input sync time of the + // events in the impressions stream + impressions = impressions + .SetDuration(TimeSpan.FromMinutes(40)) + .Select((t, x) => { x.OriginalSyncTime = t; return x; }) + ; + feedback = feedback.SetDuration(TimeSpan.FromMinutes(30)); // produce a stream of successful joins IStreamable joined = impressions .Join( feedback, imp => imp.RGUID, fdbk => fdbk.RGUID, - (imp, fdbk) => new ImpressionsWithFeedback(imp, fdbk)) + (imp, fdbk) => new ImpressionsWithFeedback(imp, fdbk, done: true)) .ToPointStream() ; // produce a stream of "expired" point events that coincide with the end time // of the impression stream, and suppress those which had a successful join IStreamable expired = impressions .PointAtEnd() .LeftAntiJoin( joined.SetDuration(TimeSpan.FromMinutes(40)), l => l.RGUID, r => r.RGUID) - .Select(imp => new ImpressionsWithFeedback(imp, null)) + .Select(imp => new ImpressionsWithFeedback(imp, null, done: true)) ; - // produce a stream of failed joins + // produce a stream of failed joins, dropping the tail join if present IStreamable unjoined = impressions .LeftAntiJoin( feedback, imp => imp.RGUID, fdbk => fdbk.RGUID) - .Select(imp, => new ImpressionsWithFeedback(imp, null)) + .Select(imp, => new ImpressionsWithFeedback(imp, null, done: false)) + .Where((t, x) => x.OriginalSyncTime == t) ; - // union together the three streams and return the result - return joined.Union(unjoined).Union(expired); + // union together the three streams, mark records which should contribute + // to stats, and return the result + return joined + .Union(unjoined) + .Union(expired) + .Select((t, x) => + { + x.ContributeToStats = x.OriginalSyncTime == t; + return x; + }) + ; }
Astute readers will notice that filtering out the tail antijoins is not strictly necessary, since these events will neither contribute to statistics (their original sync time is not equal to their final sync time) nor be considered done (antijoin events are incomplete by definition).
The reason to remove them here is to reduce the total number of events in the stream. Although they have no effect semantically, they still contribute to I/O costs related to checkpointing and cross-node communication, as well as they use memory in any stateful operator that they flow through.
Phew! It was a long journey, but we are finally done! Let’s admire the fruits of our labor.
internal static IStreamable JoinFeedbackToImpressions( this IStreamable impressions, IStreamable feedback) { // set the lifetime of the input streams, record the input sync time of the // events in the impressions stream impressions = impressions .SetDuration(TimeSpan.FromMinutes(40)) .Select((t, x) => { x.OriginalSyncTime = t; return x; }) ; feedback = feedback.SetDuration(TimeSpan.FromMinutes(30)); // produce a stream of successful joins IStreamable joined = impressions .Join( feedback, imp => imp.RGUID, fdbk => fdbk.RGUID, (imp, fdbk) => new ImpressionsWithFeedback(imp, fdbk, done: true)) .ToPointStream() ; // produce a stream of "expired" point events that coincide with the end time // of the impression stream, and suppress those which had a successful join IStreamable expired = impressions .PointAtEnd() .LeftAntiJoin( joined.SetDuration(TimeSpan.FromMinutes(40)), l => l.RGUID, r => r.RGUID) .Select(imp => new ImpressionsWithFeedback(imp, null, done: true)) ; // produce a stream of failed joins, dropping the tail join if present IStreamable unjoined = impressions .LeftAntiJoin( feedback, imp => imp.RGUID, fdbk => fdbk.RGUID) .Select(imp, => new ImpressionsWithFeedback(imp, null, done: false)) .Where((t, x) => x.OriginalSyncTime == t) ; // union together the three streams, mark records which should contribute // to stats, and return the result return joined .Union(unjoined) .Union(expired) .Select((t, x) => { x.ContributeToStats = x.OriginalSyncTime == t; return x; }) ; }
And here is the same query visualized using a directed-acyclic graph (DAG) of query operators:
Finally, let’s go back to our favorite chart one last time to make sure we’re producing the right thing. Whether an impression received feedback and the state of the two Boolean conditions (contribute to statistics, is done) are represented by the Joined, Stats, and Done monikers, respectively, preceded by ‘+’ and written in green for true and preceded by ‘-‘ and written in red for false.
Yep, that looks perfect!
We have finally produced our user-defined join operator for associating feedback with impressions, keeping in mind the requirements of our downstream operators. Let’s reexamine the original stated requirements and briefly summarize how we addressed each one.
Although the problem seemed complicated at first, breaking it down into sub-problems and solving each of them individually made the entire process much more manageable. This is a skill that can take some time to master, but once you have a good understanding of what temporal operators are available and how they behave, the streamable APIs allow for near limitless expressiveness and power.
In our next Trill tutorial, we’ll cover a different impression feedback mechanism that the Bing Ads processing pipeline handles: visibility. Visibility feedback is distinct from impression feedback in that it is based on user activity and does not arrive all at once. Until then!
Feedback or questions? Let me know in the comments below.