Have you ever thought how much data you personally are generating every day?
Data: direct result of our actions
There’s data generated as a direct result of our actions and activities:
- Browsing twitter
- Using mobile apps
- Performing financial transactions
- Using a navigator in your car
- Booking a train ticket
- Creating an online document
- Starting a YouTube live stream
Obviously, that’s not it.
Data: produced as a side effect
For example, performing a purchase where it seems like we’re buying just one thing – might generate hundreds of requests that would send and generate data.
Saving a document in the cloud doesn’t mean storing it on one server, it means replicating it across multiple regions for fault-tolerance and availability.
Performing a financial transaction doesn’t mean just doing the domain specific operation.
It also means storing logs and detailed information about every single micro step of the process, to be able to recover things if they go wrong.
It also means analyzing peripheral information about it to determine if the transaction is fraudulent or not.
We log tons of data. We cache things for faster access. We replicate data and setup backups. We save data for future analysis.
Data: continuous metrics and events
There’s data we track that is being constantly produced by systems, sensors and IoT devices.
It can be regular system metrics that capture state of web servers and their load, performance, etc. Or data that instrumented applications send out.
Heart rate data, or blood sugar level data. Airplane location and speed data – to build trajectories and avoid collisions. Real-time camera monitors that observe spaces or objects to determine anomalies in process behavior.
If we think about it – some of the data is collected to be stored and analyzed later. And some of the data is extremely time sensitive.
Analyzing logs of a regular web site isn’t super urgent when we are not risking anyone’s life.
Keeping track of credit card transactions is much more time sensitive because we need to take action immediately to be able to prevent the transaction if it’s malicious.
Then there’s something much more critical, like monitoring health data of patients, where every millisecond matters.
All of these real-life criteria translate to technical requirements for building a data processing system:
- Where and how the data is generated
- What is the frequency of changes and updates in the data
- How fast we need to react to the change
We need to be able to build solutions that can:
- Receive data from a variety of sources
- Perform specific computation and analysis on data on the fly
- Possibly perform an action as a result
We can divide how we think about building such architectures into two conventional parts:
- Data ingestion and decoupling layer between sources of data and destinations of data
- Data processing and reaction
Let’s look at some challenges with the first part.
Data ingestion: producers and consumers
When we, as engineers, start thinking of building distributed systems that involve a lot of data coming in and out, we have to think about the flexibility and architecture of how these streams of data are produced and consumed.
On earlier stages, we might have just a few components, like a web application that produces data about user actions, then we have a database system where all this data is supposed to be stored. At this point we usually have a 1 to 1 mapping between data producer (our web app) and consumer (database) in this case.
However, when our application grows – infrastructure grows, you start introducing new software components, for example, cache, or an analytics system for improving users flow, which also requires that web application to send data to all those new systems.
What if we introduce a mobile app in addition, now we have two main sources of data with even more data to keep track of.
Eventually we grow and end up with many independent data producers, many independent data consumers, and many different sorts of data flowing between them.
There are challenges!
How do we make sure that our architecture doesn’t become cumbersome with so many producers and consumers? Especially when same data should be available for some consumers after being read by other consumers.
How to prepare for the need to scale based on changes in rates of events coming in?
How to ensure data is durable and we won’t ever lose any important messages?
Apache Kafka
Apache Kafka is an open-source streaming system.
Kafka is used for building real-time streaming data pipelines that reliably get data between many independent systems or applications.
It allows:
- Publishing and subscribing to streams of records
- Storing streams of records in a fault-tolerant, durable way
It provides a unified, high-throughput, low-latency, horizontally scalable platform that is used in production in thousands of companies.
To understand how Kafka does these things, let’s explore a few concepts.
Kafka is run as a cluster on one or more servers that can span multiple datacenters. Those servers are usually called brokers.
Kafka uses Zookeeper to store metadata about brokers, topics and partitions.
Kafka Topics
The core abstraction Kafka provides for a stream of records — is the topic.
You can think of a topic as a distributed, immutable, append-only, partitioned commit log, where producers can write data, and consumers can read data from.
Each record in a topic consists of a key, a value, and a timestamp.
A topic can have zero, one, or many consumers that subscribe to the data written to it.
The Kafka cluster durably persists all published records using a configurable retention period — no matter if those records have been consumed or not.
Partitions
Each partition in a topic is an ordered, immutable sequence of records that is continually appended to a structured commit log.
The records in the partitions each have an offset – number that uniquely identifies each record within the partition.
Each individual partition must fit on the broker that hosts it, but a topic may have many partitions distributed over the brokers in the Kafka cluster so it can handle an arbitrary amount of data. Each partition can be replicated across a configurable number of brokers for fault tolerance.
Each partition has one broker which acts as the “leader” that handles all read and write requests for the partition, and zero or more brokers which act as “followers” that passively replicate the leader. Each broker acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.
Kafka Producers and Consumers
Producers publish data to the topics of their choice.
Consumers can subscribe to topics and receive messages. Consumers can act as independent consumers or be a part of some consumer group.
So Kafka not only helps with ingesting big amounts of data, but also works really well for small data in the environment with numerous systems that exchange data in a many to many fashion, allows flexibility in pace for consumers and producers, scales really well and is really fast.
Now, sometimes we need a system that is able to process streams of events as soon as they arrive, on the fly and then perform some action based on the results of the processing, it can be an alert, or notification, something that has to happen in real time.
Systems for stream processing
Even though this article is about Apache Spark, it doesn’t mean it’s the best for all use cases. The choice of a streaming platform depends on latency guarantees, community adoption, interop with libraries and ecosystem you’re using, and more.
When considering building a data processing pipeline, take a look at all leader-of-the-market stream processing frameworks and evaluate them based on your requirements.
For example, Storm is the oldest framework that is considered a “true” stream processing system, because each message is processed as soon as it arrives (vs in mini-batches). It provides low latency, though it can be cumbersome and tricky to write logic for some advanced operations and queries on data streams. It has a rather big community.
Kafka Streams is a pretty new and fast, lightweight stream processing solution that works best if all of your data ingestion is coming through Apache Kafka.
Flink is another great, innovative and new streaming system that supports many advanced things feature wise. It has a passionate community that is a bit less than community of Storm or Spark, but has a lot of potential.
Spark is by far the most general, popular and widely used stream processing system. It is primarily based on micro-batch processing mode where events are processed together based on specified time intervals. Since Spark 2.3.0 release there is an option to switch between micro-batching and experimental continuous streaming mode.
Apache Spark
Spark is an open source project for large scale distributed computations.
You can use Spark to build real-time and near-real-time streaming applications that transform or react to the streams of data.
Spark is similar to Map Reduce, but more powerful and much faster, as it supports more types of operations than just map or reduce, it uses Directed Acyclic Graph execution model and operates primarily in-memory.
As of the latest Spark release it supports both micro-batch and continuous processing execution modes.
Spark can be used with the variety of schedulers, including Hadoop Yarn, Apache Mesos, and Kubernetes, or it can run in a Standalone mode.
We can use Spark SQL and do batch processing, stream processing with Spark Streaming and Structured Streaming, machine learning with Mllib, and graph computations with GraphX.
How Spark works
We can submit jobs to run on Spark. On a high level, when we submit a job, Spark creates an operator graph from the code, submits it to the scheduler. There, operators are divided into stages of tasks, that correspond to some partition of the input data.
Spark has physical nodes called workers, where all the work happens.
A driver coordinates workers and overall execution of tasks.
Executor is a distributed agent that is responsible for executing tasks. They run on worker nodes.
Task is the smallest individual unit of execution.
Apache Kafka + Spark FTW
Kafka is great for durable and scalable ingestion of streams of events coming from many producers to many consumers.
Spark is great for processing large amounts of data, including real-time and near-real-time streams of events.
How can we combine and run Apache Kafka and Spark together to achieve our goals?
Example: processing streams of events from multiple sources with Apache Kafka and Spark
I’m running my Kafka and Spark on Azure using services like Azure Databricks and HDInsight. This means I don’t have to manage infrastructure, Azure does it for me.
You’ll be able to follow the example no matter what you use to run Kafka or Spark.
Note:
Previously, I’ve written about using Kafka and Spark on Azure and Sentiment analysis on streaming data using Apache Spark and Cognitive Services. These articles might be interesting to you if you haven’t seen them yet.
There are many detailed instructions on how to create Kafka and Spark clusters, so I won’t spend time showing it here. Instead, we’ll focus on their interaction to understand real-time streaming architecture.
Existing infrastructure and resources:
- Kafka cluster (HDInsight or other)
- Spark cluster (Azure Databricks workspace, or other)
- Peered Kafka and Spark Virtual Networks
- Sources of data: Twitter and Slack
We are not looking at health data tracking, or airplane collision example, or any life-or-death kind of example, because there are people who might use the example code for real life solutions.
Instead, we are going to look at a very atomic and specific example, that would be a great starting point for many use cases.
Main points it will demonstrate are:
- How to build a decoupling event ingestion layer that would work with multiple independent sources and receiving systems
- How to do processing on streams of events coming from multiple input systems
- How to react to outcomes of processing logic
- How to do it all in a scalable, durable and simple fashion
Example story…
Imagine that you’re in charge of a company. You have quite a few competitors in your industry. You want to make sure your products and tools are top quality.
As an example, I am using Azure for this purpose, because there’re a lot of tweets about Azure and I’m interested in what people think about using it to learn what goes well and to make it better for engineers.
Data sources: external, internal, and more
We can look at statements about Azure everywhere on the internet: it can be Twitter, Facebook, Github, Stackoverflow, LinkedIn, anywhere. There are hundreds of potential sources.
There are many things we can do with statements about Azure in terms of analysis: some of it might require reaction and be time sensitive, some of it might not be.
Because I really want the example to be concise, atomic and general, we’re going to analyze user feedback coming from a public data source, like Twitter, and another data source, like Slack – where people can also share their thoughts about potential service.
As a result, we will be watching and analyzing the incoming feedback on the fly, and if it’s too negative – we will need to notify certain groups to be able to fix things ASAP.
When it’s positive – we’d like to track it, but don’t really need an immediate response.
Our input feedback data sources are independent and even through in this example we’re using two input sources for clarity and conciseness, there could be easily hundreds of them, and used for many processing tasks at the same time.
In this situation what we can do is build a streaming system that would use Kafka as a scalable, durable, fast decoupling layer that performs data ingestion from Twitter, Slack, and potentially more sources. It would also analyze the events on sentiment in near real-time using Spark and that would raise notifications in case of extra positive or negative processing outcomes!
Feedback from Slack (Azure Databricks Notebook #1)
I’m using Azure Databricks interactive notebooks for running code as a great environment for demonstrations.
First part of the example is to be able to programmatically send data to Slack to generate feedback from users via Slack.
Slack Bot API token is necessary to run the code.
Slack API Token
val superSecretSlackToken = "xoxb-..."
Define Slack functions
import com.ullink.slack.simpleslackapi._ import com.ullink.slack.simpleslackapi.impl.SlackSessionFactory import java.io.IOException def instantiateSlackSession (token: String): SlackSession = { val session = SlackSessionFactory.createWebSocketSlackSession(token) session.connect() return session } def sendMessageToSlack (message: String, channel: SlackChannel, slackSession: SlackSession) { slackSession.sendMessage(channel, message) } val session = instantiateSlackSession(superSecretSlackToken) val feedbackChannel = session.findChannelByName("feedback") val hiChannel = session.findChannelByName("hi")
sendMessageToSlack("This is a message from bot. Hi!", hiChannel, session)
Listener for new Slack messages (Azure Databricks Notebook #2)
When users are sending statements that might be potential feedback, we need to be able to notice and track thise messages. For example, we can check if a message is under specific Slack channel and focused on a particular topic, and send it to a specific Kafka topic when it meets our “feedback” conditions.
We can do so by overwriting an “onEvent” method of “SlackMessagePostedListener” from Slack API, and implementing the logic inside of it, including sending qualifying events to a Kafka topic. After defining the listener class, we have to register an instance of it. We can also un-register it when we’d like to stop receiving feedback from Slack.
Slack Session
import com.ullink.slack.simpleslackapi._ import com.ullink.slack.simpleslackapi.impl.SlackSessionFactory import java.io.IOException def instantiateSlackSession (token: String) : SlackSession = { val session = SlackSessionFactory.createWebSocketSlackSession(token) session.connect() return session }
Slack API Token
val superSecretSlackToken = "xoxb-..."
Initialize Slack Channel
val session = instantiateSlackSession(superSecretSlackToken) val feedbackChannel = session.findChannelByName("feedback")
Preparing Kafka producer to send Slack messages
import java.util.Properties import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord def buildKafkaProducerForSlack () : KafkaProducer[String, String] = { val kafkaBrokers = "10.0.0.11:9092,10.0.0.15:9092,10.0.0.12:9092" val props = new Properties() props.put("bootstrap.servers", kafkaBrokers) props.put("acks", "all") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) return producer } val kafka = buildKafkaProducerForSlack() val feedbackTopic ="azurefeedback"
Defining a Listener for new Slack messages in #feedback channel
import com.ullink.slack.simpleslackapi.listeners._ import com.ullink.slack.simpleslackapi.SlackSession import com.ullink.slack.simpleslackapi.impl.SlackSessionFactory import com.ullink.slack.simpleslackapi.events.SlackMessagePosted import java.io._ class MessageToKafkaTopicWriter extends SlackMessagePostedListener { override def onEvent(event: SlackMessagePosted, session: SlackSession) { val channelOnWhichMessageWasPosted = event.getChannel() val messageContent = event.getMessageContent() val messageSender = event.getSender() val targetChannel = session.findChannelByName("feedback") if (targetChannel.getId().equals(channelOnWhichMessageWasPosted.getId()) && messageContent.contains("Azure")) { sendEventToKafka(messageContent, feedbackTopic) } } def sendEventToKafka(message: String, topicName: String) = { val key = java.util.UUID.randomUUID().toString() kafka.send(new ProducerRecord[String, String](topicName, key, message)) } } def registeringSlackToKafkaListener(session: SlackSession): MessageToKafkaTopicWriter = { val messagePostedListener = new MessageToKafkaTopicWriter session.addMessagePostedListener(messagePostedListener) return messagePostedListener } def removeSlackToKafkaListener (session: SlackSession, l: MessageToKafkaTopicWriter) { session.removeMessagePostedListener(l) }
Register listener instance
val slackListener = registeringSlackToKafkaListener(session)
Remove listener instance registration
removeSlackToKafkaListener(session, slackListener)
Sending Twitter feedback to Kafka (Azure Databricks Notebook #3)
The majority of public feedback will probably arrive from Twitter. We’d need to get latest tweets about specific topic and send them to Kafka to be able to receive these events together with feedback from other sources and process them all in Spark.
Preparing Kafka Producer to send tweets
import java.util.Properties import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord // Configuration for Kafka brokers val kafkaBrokers = "10.0.0.11:9092,10.0.0.15:9092,10.0.0.12:9092" val topicName = "azurefeedback" val props = new Properties() props.put("bootstrap.servers", kafkaBrokers) props.put("acks", "all") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) def sendEvent(message: String) = { val key = java.util.UUID.randomUUID().toString() producer.send(new ProducerRecord[String, String](topicName, key, message)) System.out.println("Sent event with key: '" + key + "' and message: '" + message + "'\n") }
Twitter Configuration Keys
val twitterConsumerKey="" val twitterConsumerSecret="" val twitterOauthAccessToken="" val twitterOauthTokenSecret=""
Getting latest #Azure tweets and sending them to Kafka topic
import java.util._ import scala.collection.JavaConverters._ import twitter4j._ import twitter4j.TwitterFactory import twitter4j.Twitter import twitter4j.conf.ConfigurationBuilder val cb = new ConfigurationBuilder() cb.setDebugEnabled(true) .setOAuthConsumerKey(twitterConsumerKey) .setOAuthConsumerSecret(twitterConsumerSecret) .setOAuthAccessToken(twitterOauthAccessToken) .setOAuthAccessTokenSecret(twitterOauthTokenSecret) val twitterFactory = new TwitterFactory(cb.build()) val twitter = twitterFactory.getInstance() val query = new Query(" #Azure ") query.setCount(100) query.lang("en") var finished = false while (!finished) { val result = twitter.search(query) val statuses = result.getTweets() var lowestStatusId = Long.MaxValue for (status <- statuses.asScala) { if(!status.isRetweet()){ sendEvent(status.getText()) Thread.sleep(4000) } lowestStatusId = Math.min(status.getId(), lowestStatusId) } query.setMaxId(lowestStatusId - 1) }
Result of the code output
Sent event with key: 'ffaead11-1967-48f1-a3f5-7c12a2d69123' and message: 'The latest Microsoft #Azure & #AzureUK Daily! https://t.co/lyrK0zLvbo #healthcare' Sent event with key: 'd4f3fb86-5816-4408-bfa5-17ca03eb01de' and message: 'Image Classification using CNTK inside Azure Machine Learning Workbench | Microsoft Docs https://t.co/ZTFU6qJ5Uk #azure #azureml #ml' Sent event with key: 'c78c7498-146f-43c8-9f14-d789da26feeb' and message: 'On the phone with Apple support. Their solutions generally involve some level of resetting settings/phone and resto… https://t.co/hxAAsS2khR' Sent event with key: 'a26e8922-f857-4882-a36e-88b40570fe13' and message: 'Create happy customers. #Azure delivers massive scalability and globally consistent experiences to support stellar… https://t.co/7pwtLhR49p' Sent event with key: 'd243cd59-4336-4bc9-a826-2abbc920156d' and message: 'Microsoft introduces Self-Deploying mode and Windows Autopilot reset to their Windows AutoPilot Deployment Program… https://t.co/jTf7tDbu6S' Sent event with key: '60dd5224-57c5-44ed-bfdc-1ab958bb30b0' and message: 'Do you know the #Microsoft #Security Story? https://t.co/nC43z3n88B #virtualization #msft #aad #azure' Sent event with key: 'd271ccda-9342-46e6-9b5a-cbcce26a899d' and message: 'RT @brunoborges: Say hello to #Jetty support on VS @code! #Azure https://t.co/9ht5VNtZyF #Developer #Coder… https://t.co/XY41wq68Lu' Sent event with key: 'bad66873-b311-45f2-9c5a-8c51349221c4' and message: 'Creating #VirtualMachine (VM) On #Microsoft #Azure by @AkhilMittal20 cc @CsharpCorner https://t.co/yucpiOjf6G https://t.co/ULwTkGQ8ss' Sent event with key: 'f7f152a3-74e6-4652-a6e8-f6f317b0c51c' and message: 'Microsoft Azure Stack expands availability to https://t.co/trBEqbqnDi #azureblogfeed #Azure #AzureStack #Cloud #CloudOps #Microsoft @Azure' Sent event with key: '9011daef-c2a1-4e89-99a9-f88c9f7cc775' and message: 'December 2017 Meeting: Serverless Computing – N https://t.co/C2JgBBdOPM #pastmeetings #Azure #AzureStack #Cloud #CloudOps #Microsoft @Azure' Sent event with key: 'be13d0f6-bf21-40e4-9036-4f63324d4ce3' and message: 'Azure Multi-Factor Authentication: Latency or for more .. https://t.co/FboIo3CfWt #azure #Multi-FactorAuthenticat… https://t.co/rvT61KcB1y' Sent event with key: 'cb2eb344-3d88-4cb7-a3ae-e3fd960e0dc6' and message: 'Top story: David Armour on Twitter: "Learn how the #Azure cloud brings efficien… https://t.co/h7eBIEeGBz, see more https://t.co/dzNIo8XPyk' Sent event with key: '128340ad-4e2a-4162-a6dd-dbeb5560cf0a' and message: 'That's it! My first blog post just got published! Learn how to build a nodeless CI/CD pipeline with Jenkins, Kubern… https://t.co/oZaKu29ykk' Sent event with key: '03ae4371-b6c7-407b-b942-41506c631b22' and message: 'RT @pzerger: Azure AD Conditional Access support for blocking legacy auth is in Public Preview! https://t.co/l8uJo3A8Ek #azuread #azure' Sent event with key: 'a98f7941-08e6-4875-b4be-f83275671b6c' and message: 'working with one of our great Ecosystem partners STMicroelectronics showing how to @MicrosfotIoT #Azure integrates… https://t.co/FbhkKKFYA0' Sent event with key: '471e0244-f667-41f8-aa4d-d6075b6e7168' and message: 'Azure Marketplace new offers: May 1-15 by Azure https://t.co/tUFTeGYqLG #azure via DotNetKicks' Sent event with key: '357ac31d-627f-42c5-a8c0-316ca53dfead' and message: 'Microsoft doubles Azure Stack's footprint, embiggens Azure VMs https://t.co/ywDPO4jc7N by @TheRegister #Cloud #Tech #Microsoft #Azure #VM' Sent event with key: 'ef59abe6-0cf7-4faa-ac4d-16cb56963808' and message: 'Project Natick: Microsoft's Under the Sea Data Center. #BigData #Analytics #MachineLearning #DataScience #AI… https://t.co/uXjEqLpk0X'
Analyzing feedback in real-time (Azure Databricks Notebook #4)
Kafka is now receiving events from many sources.
Now we can proceed with the reaction logic. We’re going to do sentiment analysis on incoming Kafka events, and when sentiment is less than 0.3 – we’ll send a notification to “#negative-feedback” Slack channel for review. When sentiment is more than 0.9 – we’ll send a message to #positive-feedback channel.
Reading a stream of feedback data from Kafka
val kafkaBrokers = "10.0.0.11:9092,10.0.0.15:9092,10.0.0.12:9092" // Setup connection to Kafka val kafka = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", kafkaBrokers) .option("subscribe", "azurefeedback") .option("startingOffsets", "latest") .load()
Transforming binary data into readable representation
import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions.{explode, split} val kafkaData = kafka .withColumn("Key", $"key".cast(StringType)) .withColumn("Topic", $"topic".cast(StringType)) .withColumn("Offset", $"offset".cast(LongType)) .withColumn("Partition", $"partition".cast(IntegerType)) .withColumn("Timestamp", $"timestamp".cast(TimestampType)) .withColumn("Value", $"value".cast(StringType)) .select("Value")
What data is coming in?
kafkaData.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()
Cognitive Services API Key
val accessKey = "..."
Sentiment Analysis Logic
import java.io._ import java.net._ import java.util._ import javax.net.ssl.HttpsURLConnection import com.google.gson.Gson import com.google.gson.GsonBuilder import com.google.gson.JsonObject import com.google.gson.JsonParser import scala.util.parsing.json._ class Document(var id: String, var text: String, var language: String = "", var sentiment: Double = 0.0) extends Serializable class Documents(var documents: List[Document] = new ArrayList[Document]()) extends Serializable { def add(id: String, text: String, language: String = "") { documents.add (new Document(id, text, language)) } def add(doc: Document) { documents.add (doc) } } class CC[T] extends Serializable { def unapply(a:Any):Option[T] = Some(a.asInstanceOf[T]) } object M extends CC[scala.collection.immutable.Map[String, Any]] object L extends CC[scala.collection.immutable.List[Any]] object S extends CC[String] object D extends CC[Double] object SentimentDetector extends Serializable { val host = "https://eastus.api.cognitive.microsoft.com" val languagesPath = "/text/analytics/v2.0/languages" val sentimentPath = "/text/analytics/v2.0/sentiment" val languagesUrl = new URL(host+languagesPath) val sentimenUrl = new URL(host+sentimentPath) def getConnection(path: URL): HttpsURLConnection = { val connection = path.openConnection().asInstanceOf[HttpsURLConnection] connection.setRequestMethod("POST") connection.setRequestProperty("Content-Type", "text/json") connection.setRequestProperty("Ocp-Apim-Subscription-Key", accessKey) connection.setDoOutput(true) return connection } def prettify (json_text: String): String = { val parser = new JsonParser() val json = parser.parse(json_text).getAsJsonObject() val gson = new GsonBuilder().setPrettyPrinting().create() return gson.toJson(json) } // Handles the call to Cognitive Services API. // Expect Documents as parameters and the address of the API to call. // Returns an instance of Documents in response. def processUsingApi(inputDocs: Documents, path: URL): String = { val docText = new Gson().toJson(inputDocs) val encoded_text = docText.getBytes("UTF-8") val connection = getConnection(path) val wr = new DataOutputStream(connection.getOutputStream()) wr.write(encoded_text, 0, encoded_text.length) wr.flush() wr.close() val response = new StringBuilder() val in = new BufferedReader(new InputStreamReader(connection.getInputStream())) var line = in.readLine() while (line != null) { response.append(line) line = in.readLine() } in.close() return response.toString() } // Calls the language API for specified documents. // Returns a documents with language field set. def getLanguage (inputDocs: Documents): Documents = { try { val response = processUsingApi(inputDocs, languagesUrl) // In case we need to log the json response somewhere val niceResponse = prettify(response) val docs = new Documents() val result = for { // Deserializing the JSON response from the API into Scala types Some(M(map)) <- scala.collection.immutable.List(JSON.parseFull(niceResponse)) L(documents) = map("documents") M(document) <- documents S(id) = document("id") L(detectedLanguages) = document("detectedLanguages") M(detectedLanguage) <- detectedLanguages S(language) = detectedLanguage("iso6391Name") } yield { docs.add(new Document(id = id, text = id, language = language)) } return docs } catch { case e: Exception => return new Documents() } } // Calls the sentiment API for specified documents. Needs a language field to be set for each of them. // Returns documents with sentiment field set, taking a value in the range from 0 to 1. def getSentiment (inputDocs: Documents): Documents = { try { val response = processUsingApi(inputDocs, sentimenUrl) val niceResponse = prettify(response) val docs = new Documents() val result = for { // Deserializing the JSON response from the API into Scala types Some(M(map)) <- scala.collection.immutable.List(JSON.parseFull(niceResponse)) L(documents) = map("documents") M(document) <- documents S(id) = document("id") D(sentiment) = document("score") } yield { docs.add(new Document(id = id, text = id, sentiment = sentiment)) } return docs } catch { case e: Exception => return new Documents() } } } // User Defined Function for processing content of messages to return their sentiment. val toSentiment = udf((textContent: String) => { val inputDocs = new Documents() inputDocs.add (textContent, textContent) val docsWithLanguage = SentimentDetector.getLanguage(inputDocs) val docsWithSentiment = SentimentDetector.getSentiment(docsWithLanguage) if (docsWithLanguage.documents.isEmpty) { // Placeholder value to display for no score returned by the sentiment API (-1).toDouble } else { docsWithSentiment.documents.get(0).sentiment.toDouble } })
// Preparing a dataframe with Content and Sentiment columns val streamingDataFrame = kafka.selectExpr("cast (value as string) AS Content").withColumn("Sentiment", toSentiment($"Content")) // Displaying the streaming data streamingDataFrame.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()
Slack API Token
val superSecretSlackToken = "xoxb-..."
Feedback analysis and action on a streaming Data Frame
import com.ullink.slack.simpleslackapi._ import com.ullink.slack.simpleslackapi.impl.SlackSessionFactory import java.io.IOException object SlackUtil extends Serializable { def sendSlackMessage(content: String, channelName: String){ val session = SlackSessionFactory.createWebSocketSlackSession(superSecretSlackToken) session.connect() val channel = session.findChannelByName(channelName) session.sendMessage(channel, content) } } val toSlack = udf((textContent: String, sentiment: Double) => { if (sentiment <= 0.3) { SlackUtil.sendSlackMessage(textContent, "negative-feedback") "Sent for review to #negative-feedback." } else if (sentiment >= 0.9) { SlackUtil.sendSlackMessage(textContent, "positive-feedback") "Logged as positive in #positive-feedback" } else { "Not logged." } }) val comments = kafka .selectExpr("cast (value as string) AS Content") .withColumn("Sentiment", toSentiment($"Content")) .withColumn("Status", toSlack($"Content", $"Sentiment")) comments .writeStream .outputMode("append") .format("console") .option("truncate", false) .start() .awaitTermination()
Result
What we have achieved here is building a streaming system where we watch can user feedback from numerous independent sources that produce data at their own pace, and get real-time insights about when we need to take action and pay attention to things that need fixed.
We can plug in many additional independent processing scenarios, because once we sent data to Kafka it’s being retained and available for consumption many times.
We can not worry about strikes in user activity and handling the load because Kafka and Spark allow us to scale out and handle the pressure easily because of partitioning.
MORE THINGS
Spark and experimental “Continuous Processing” mode
Traditionally, Spark has been operating through the micro-batch processing mode.
In the Apache Spark 2.3.0, Continuous Processing mode is an experimental feature for millisecond low-latency of end-to-end event processing. It works according to at-least-once fault-tolerance guarantees. Currently, not nearly all operations are supported yet (e.g. aggregation functions, current_timestamp() and current_date() are not supported), there’re no automatic retries of failed tasks, and it needs ensuring there’s enough cluster power/cores to operate efficiently.
To use it, add a trigger:
trigger(Trigger.Continuous(“1 second”))
A checkpoint interval of 1 second means that the continuous processing engine will record the progress of the query every second.
Currently it’s best to use it with Kafka as the source and sink for best end-to-end low-latency processing.
How is it different from micro-batch
With micro-batch processing, Spark streaming engine periodically checks the streaming source, and runs a batch query on new data that has arrived since the last batch ended
This way latencies happen to be around 100s of milliseconds.
Spark driver checkpoints the progress by saving record offsets to a write-ahead-log, which may be then used to restart the query.
Recording of offsets for next batch of records is happening before the batch started processing.
This way, some records have to wait until the end of the current micro-batch to be processed, and this takes time.
How “Continuous Processing” mode works
Spark launches a number of long-running tasks. They constantly read, process and write data.
Events are processed as soon as they’re available at the source.
In distinction to micro-batch mode, processed record offsets are saved to the log after every epoch.
We are capable of achieving end-to-end latency of just a few milliseconds because events are processed and written to sink as soon as they are available in the source, without waiting for other records.
Also, checkpointing is fully asynchronous and uses the Chandy-Lamport algorithm, so nothing interrupts tasks and Spark is able to provide consistent millisecond-level latencies.
Another new thing: Event Hubs + Kafka = ❤️
For those of you who like to use cloud environments for big data processing, this might be interesting.
Azure now has an alternative to running Kafka on HDInsight. You can leave your existing Kafka applications as is and use Event Hubs as a backend through Kafka API.
Event Hubs is a service for streaming data on Azure, conceptually very similar to Kafka.
In other words, Event Hubs for Kafka ecosystems provides a Kafka endpoint that can be used by your existing Kafka based applications as an alternative to running your own Kafka cluster.
It supports Apache Kafka 1.0 and newer client versions, and works with existing Kafka applications, including MirrorMaker – all you have to do is change the connection string and start streaming events from your applications that use the Kafka protocol into Event Hubs.
Functionally, of course, Event Hubs and Kafka are two different things. But this feature can be useful if you already have services written to work with Kafka, and you’d like to not manage any infrastructure and try Event Hubs as a backend without changing your code.
Note:
I also wrote a tutorial on how to use Spark and Event Hubs here.
Kubernetes environment
Many businesses lean towards Kubernetes for big data processing as it allows to reduce costs, offers a lot of flexibility and is convenient when a lot of your existing services are already running on it.
Existing Kubernetes abstractions like Stateful Sets are great building blocks for running stateful processing services, but are most often not enough to provide correct operation for things like Kafka or Spark.
I recommend looking into the topic of Operators, as they are important to help us help Kubernetes be aware of nuances of correct functioning of our frameworks.
Confluent has announced the upcoming Kafka operator coming out in a month or two, and I’m looking forward to trying it. More updates coming.
For those of you interested in running Spark on Kubernetes, it has an experimental (not production) native Kubernetes support since Spark 2.3. I wrote about it here, and here and here.
Thank you for reading!
I’d be happy to know if you liked the article or if it was useful to you. Follow me on Twitter @lenadroid or on YouTube. Always happy to connect, feel free to reach out! For more on this tutorial and other projects I’m working on, please check out my website.