Streaming analytics basics for Python developers

Streaming analytics basics for Python developers

Course Features

Course Details

Course overview

Imagine you work for the IT department in a hospital and want to tap the life-saving potential of continuous data coming in from patients’ vital sign monitors. A Streams application, processing such data in real time, could make swift detections of dangerous anomalies of vital signs, and alert hospital staff when a patient needs immediate care. IBM® Streams® is an advanced stream processing platform that can ingest, filter, analyze, and correlate massive volumes of continuous data streams. Viewing and analyzing this data helps you make decisions while events are happening. Development in Streams has traditionally been done by using the Streams Processing Language (SPL). However, with the introduction of the Python Application API, developers like you can create streaming applications by using the Python language without first having to learn SPL. This course introduces you to the IBM Streams Python Application API.
Get started

Essential Streams concepts Lab 1: create a simple Python application

FREE
1. Develop a basic Python application
3 of 5
In this section, you will use the Streams Python API Topology class to frame your streaming application, and then create a simple simulator of a continuous data source by repeatedly sending identical tuples of sample data to your stream. Develop your application for this lab in the coding cell under Lab 1 – Create a simple Python application in the course exercise notebook. There are commented headings to organize and structure your application. Sample patient data You will copy this data to send repeated tuples to your stream. It is here for your reference. The instructions below show you how to use it in your application. {"patientId":"patient-1", "device":{"id":"VitalsGenerator", "locationId”:"bed1"}, "readingSource": {"id":123, "deviceId":"VitalsGenerator", "sourceType":"generated"}, "reading": {"ts": 605, "uom":"bpm", "value":82.56785326532197, "readingType": {"code":"8867-4", "system":"streamsx.health/1.0"}}} Before you begin, import the necessary modules from the streamsx package. To add them, include the following imports: from streamsx.topology.topology import Topology from streamsx.topology.context import * Add a data source by creating a function called Observations. In your function, make these changes: Declare a variable jsonStr and assign it the sample patient data. jsonStr = ‘‘‘{"patientId":"patient-1", "device":{"id":"VitalsGenerator", "locationId”:"bed1"}, "readingSource": {"id":123, "deviceId":"VitalsGenerator", "sourceType":"generated"}, "reading": {"ts": 605, "uom":"bpm", "value":82.56785326532197, "readingType": {"code":"8867-4", "system":"streamsx.health/1.0"}}}’’’ Import the json module, and load the JSON data. Assign it the name dictObj: import json dictObj = json.loads(jsonStr) Import the time module, and slow the input rate of your data stream by one second with the sleep function: import time time.sleep(1) Enclose your function in a while-loop, ending with yield dictObj every iteration. The yield statement returns a generator which is a kind of iterable. This completes your function: Create a new topology called topo, invoke the source operator on it with your defined data source, and assign the resulting stream a name of patientData: topo = Topology() patientData = topo.source(Observations) Print the stream data in the console: patientData.sink(print) Submit the application over your Bluemix Streaming Analytics service. You can use...

Continue Reading
FREE
2. View output on the Streams Console
4 of 5
The Streams Console is a data visualization and administrative tool for IBM Streams. It has various dashboards and cards showing information about the flow graphs of your programs. You can view output, as well as submit and cancel jobs, directly from the Streams Console. In this step, you will learn to navigate the Streams Console and view the output of your application, as well as cancel a job. Access your Streams Console from IBM Bluemix: Log in to Bluemix. Click IBM Bluemix and on your list of services. Then, click your Streaming Analytics instance. From the Manage pane, make sure your service is running. If it is not, it will not have gone through while the service was stopped. Click Startand wait until it is running. Then, go back to the notebook and run your application again. Click Launch You should see the Application Dashboard of your Streams Console. Take note of the various cards that show your application. The Summary card lists everything running in your Console at the moment The Streams tree shows a hierarchy of your Streams instances, jobs, and their health The Streams graph shows the data flow of your Streams programs This is all you need to know about the Streams Console right now, but feel free to explore! For a more detailed tour of the Streams Console, consider taking the Get Started with IBM Streams online course. View your application’s output: If the side toolbar is not visible, click the Streams logo at the top left corner of the page. Then, click Log Viewer. Expand the twisty next to your application’s job. It begins with something like ipythoninput... Click your application’s PE. The Processing Element, or PE, is an individual execution program that includes the operators and streams that you defined in your application. Click Console Log. Click Load console messages. If nothing appears, wait a few seconds and then click Reload. Now...

Continue Reading

Lab 2: handle a variety of patient data

FREE
1. Filter data with a lambda function
3 of 6
Imagine that you are a doctor who is receiving all kinds of data from your patient, but you’re only interested in her heart rate. In this section, you will filter out the unwanted data. Develop your application for this section in the code cell under the Lab 2 – Handle a variety of patient data heading. Invoke the filter operation on the patientData stream and assign the name heartRate to the filtered stream. heartRate = patientData.filter() As input to the filter operation, create a lambda function that takes a patient data tuple and returns True if its code value is equal to 8867-4. For your reference, the sample patient data is shown below: {"patientId":"patient-1", "device":{"id":"VitalsGenerator", "locationId”:"bed1"}, "readingSource": {"id":123, "deviceId":"VitalsGenerator", "sourceType":"generated"}, "reading": {"ts": 605, "uom":"bpm", "value":82.56785326532197, "readingType": {"code":"8867-4", "system":"streamsx.health/1.0"}}} heartRate = patientData.filter(lambda tuple: (tuple[‘reading’][‘readingType’][‘code’]==‘8867-4’)) Sink the filtered stream to the console.Whenever you’re adding in new stream operations, make sure that the downstream code matches correctly. In this case, change your existing sink invocation to reflect the change that you made in declaring a new heartrate stream: heartRate.sink(print) Submit your application to the Bluemix Streaming Analytics service. This step is the same as in Lab 1. Run your application and view the output on the Streams Console.Your output will look exactly the same as in the first exercise. Why? Look closely at your sample patient data: its readingType code is 8867-4. Each of the tuples that run through your application are clones of this patient data. They are all heart rate readings! If you were to manually change this type code to something else and then rerun the program, you’d notice that none of the tuples make it through to output. Cancel all jobs running on the Streams Console.
FREE
3. Subscribe to the simulator
5 of 6
Now that the patient simulator is running, you can subscribe to its data and use it in your own application. In this step, continue to develop your application in the coding cell under the Lab 2 > Filter Data and Subscribe section. Add the following import: from streamsx.topology import schema In your notebook, replace your Observations source with a subscribe call to the patient simulator. The topic to subscribe to is ingest-beaconand the data type is JSON. patientData = topo.subscribe(‘ingest-beacon’, schema.CommonSchema.Json) From now on, you will be subscribing to the patient data simulator rather than using the Observations function. (You can omit it from your code if you want.) Make sure the simulator is running before you submit any subsequent applications. Submit your application to the Bluemix Streaming Analytics service and run the cell.There should now be two jobs running in your Streams Console: one for the simulator and one for your application. In the Log Viewer, examine the output of the simulator with job name com.ibm.streamsx.health……. Notice the different values for code, uom (unit of measure), and, of course, value. The simulator generates many different kinds of readings with varying values. View the other job running on your Streams Console. This is your application. If you view the Console Log, you’ll notice that only tuples of heart rate type (code=8867-4) are getting through. This shows that your filter is working. Now that you have a more realistic application that is complete with diverse tuples of data and a filter function, you’ll transform the tuples that you are analyzing.

Lab 3: anonymize and average data

FREE
1. Anonymize patient data
3 of 5
Imagine that you are collecting your patient data for a study on the quality of care in your hospital. In a case like this, it makes sense to anonymize all identifying patient data before any operations are done on it. You will be doing this with the map function. For this step, develop your application in the coding cell under the Lab 3 – Anonymize and average data section of the course exercise notebook. At the beginning of your application, import the hashlib module. Make a function called anonymize that has these settings: Has a single parameter tuple Hashes the tuple’s values for patientId and locationId by using the sha256 algorithm and saves the hashed valuesHint: To hash the data, you will have to callitem_to_encode= hashlib.sha256 (item_to_encode.encode(‘utf-8’)).digest() Returns the hashed tuple Use the map function on a stream to anonymize its content, with a call to your new function. Name the resulting stream patientX. The data anonymization should come just before the filter function.patientX = patientData.map(anonymize) Make all of the appropriate downstream alterations to your code to reflect the new changes. Ensure the patient data simulator is running on your Bluemix account. Submit your application to the Bluemix Streaming Analytics service as you did in the previous labs. Run your application and view its output.When you view the output in the Console Log, notice that the values for PatientId and LocationId have been replaced by hashes, anonymizing the patients. Cancel the job running your application.
FREE
2. Keeping states
4 of 5
In this section, you will keep the state of the heart rate readings from the patient data stream. For each new tuple that comes in, you will calculate the moving average of the last 10 tuples. You can continue your development for this step in the same coding cell as the previous step. Import the getReadingValue function from the ingest.Observation module. It takes a tuple as input and returns the reading value for you. Create a new callable class called Avg. Specify the __init__ function: Give it one parameter: n Declare two variables: n for the number of tuples to average (assigned from the input) and last_n that will store a list of the last n tuples, which will be empty when initialized. def __init__ (self, n): self.n = n self.last_n = [] Specify the __call__ function: Give it one parameter: tuple Append the heart rate reading of the input tuple to the list by using the getReadingValue function:self.last_n.append(getReadingValue(tuple)) If the length of the list is greater than n, pop the oldest tuple in the list Return the average of all the values in the list:return sum(self.last_n)/len(self.last_n) Your completed Avg class should look like this: Using your new class, calculate the moving average heart rate for the last 10 tuples. Call the new stream avgHr.avgHr = heartRate.map(Avg(10)) Make the appropriate downstream changes to your code. Submit over Bluemix, run the cell, and view your output.When you run the application, the patient’s data will not appear anymore. Instead, you will have an output of the average heart rate readings. That’s the end of this lab. Now your application is fully functional. Not only does it anonymize and select important data from the input stream, it also performs an aggregation of this data and returns its real-time average value, greatly simplifying the output. In the next and final lab, you’ll have the chance to use MatPlotLib to visualize this...

Continue Reading

Lab 4: visualize data in a Python notebook

FREE
2. Fetch view data
4 of 5
Do your development for this section in the coding cell under Lab 4 – Visualize data in Python notebook > Step 2a – Fetch view data. Do not copy and paste your application from previous steps into this cell; this will be a separate code segment. Import the following items:from collections import deque from IPython.lib import backgroundjobs as bg Create a double-ended queue (deque) that can hold up to 2000 tuples. Call it plotQueue.plotQueue = deque([], 2000) Start your data fetch and call it view.view = avgHrView.start_data_fetch() Create a data_collector­ function that iterates through the view and appends each value to the list. def data_collecter(view): for d in iter(view.get, None): plotQueue.append(float(d)) Create an instance of the BackgroundJobManager class. Name it jobs.jobs = bg.BackgroundJobManager() Start a new background job and pass in both data_collecter and view.jobs.new(data_collecter, view) Run the cell.You won’t observe any output because it is merely saving your streams data in the background. Run the third cell under Lab 4 – Visualize data in Python notebook> Step 2b – Visualize view data using Matplotlib.This cell contains everything you need to create a plot for your data view. A data plot should appear under this last cell and update in real time with the average heart rate data from your streaming application. With data visualization tools like this, you don’t even need to go into the Streams Console to see the outputs of your program. You have now learned not only to create a life-like Streams application in the Python language, but also to generate visuals for your data.

Wrap up


More Courses by this Instructor