1. Develop a basic Python application

In this section, you will use the Streams Python API Topology class to frame your streaming application, and then create a simple simulator of a continuous data source by repeatedly sending identical tuples of sample data to your stream.

Develop your application for this lab in the coding cell under Lab 1 – Create a simple Python application in the course exercise notebook. There are commented headings to organize and structure your application.

Sample patient data

You will copy this data to send repeated tuples to your stream. It is here for your reference. The instructions below show you how to use it in your application.

{"patientId":"patient-1", "device":{"id":"VitalsGenerator", "locationId”:"bed1"}, "readingSource": {"id":123, "deviceId":"VitalsGenerator", "sourceType":"generated"}, "reading": {"ts": 605, "uom":"bpm", "value":82.56785326532197, "readingType": {"code":"8867-4", "system":"streamsx.health/1.0"}}}

  1. Before you begin, import the necessary modules from the streamsx package. To add them, include the following imports: from streamsx.topology.topology import Topology
    from streamsx.topology.context import *
  2. Add a data source by creating a function called Observations. In your function, make these changes:
    • Declare a variable jsonStr and assign it the sample patient data.
      jsonStr = ‘‘‘{"patientId":"patient-1", "device":{"id":"VitalsGenerator", "locationId”:"bed1"}, "readingSource": {"id":123, "deviceId":"VitalsGenerator", "sourceType":"generated"}, "reading": {"ts": 605, "uom":"bpm", "value":82.56785326532197, "readingType": {"code":"8867-4", "system":"streamsx.health/1.0"}}}’’’
    • Import the json module, and load the JSON data. Assign it the name dictObj:
      import json
      dictObj = json.loads(jsonStr)
    • Import the time module, and slow the input rate of your data stream by one second with the sleep function:
      import time
      time.sleep(1)
    • Enclose your function in a while-loop, ending with yield dictObj every iteration. The yield statement returns a generator which is a kind of iterable. This completes your function:
  3. Create a new topology called topo, invoke the source operator on it with your defined data source, and assign the resulting stream a name of patientData:
    topo = Topology()
    patientData = topo.source(Observations)
  4. Print the stream data in the console:
    patientData.sink(print)
  5. Submit the application over your Bluemix Streaming Analytics service. You can use the given configurations cfg:
    submit(‘STREAMING_ANALYTICS_SERVICE’, topo, cfg)
  6. Run your Python application by selecting the cell with your code and clicking the Run button at the top of the notebook. It might take a minute, but you will eventually see a message showing that your application has been submitted.

    It will begin with streamsx.topology.context.SubmissionResult..., and your cell will stop running.

    You’ve now created your first Python Streams application! You won’t see the output yet. You’ll need to access your Streams Console to see it.