Add a window-based operator

You will compute average speeds over a window separately for vehicles C101 and C133. Use a tumbling window of a fixed number of tuples: each time the window collects the required number of tuples, the operator computes the result and submits an output tuple, discards the window contents, and is again ready to collect tuples in a now empty window.

Window partitioning based on a given attribute means that the operator will allocate a separate buffer for each value of that attribute—in effect, as if you had split the stream by attribute and applied a separate operator to each substream. The specifications are summarized in the following table:

Specification Value
Operator type Aggregate
Window specification Tumbling, based on tuple count, 5 tuples
Window partitioning Yes, based on vehicle ID (id)
Stream to be aggregated Filtered
Output schema id – rstring
time – int64
avgSpeed – float64
Aggregate computation Average(speed)
Results destination File: average.speed

 

  1. Add the two required operators:
    1. In the graphical editor’s palette filter box, enter agg. Drag an Aggregate operator into the main composite. The editor calls it Aggregate_6. This is you main analytical operator.
    2. In the palette filter, enter filesink. Drag a FileSink into the main composite: FileSink_7. This will let you write the analytical results to a file.
  2. Fold the two new operators into the graph by connecting one existing stream and adding another:
    1. Drag a stream from Filtered to Aggregate_6. This means Aggregate_6 is tapping into the same stream that Writer is already consuming, so the schema is already defined. This is indicated in the editor by a solid arrow.
    2. Drag another stream from Aggregate_6 to FileSink_7. This stream does not yet have a schema, so the arrow is dashed.
    3. Click Layout and Fit to Content.
  3. Rename the new stream and operators:
    1. Rename the stream to Averaged.
    2. Rename the Aggregate operator to Averaged by blanking out its alias.
    3. Rename the FileSink to AvgWriter.
  4. Give the Averaged stream (output of the Aggregate operator) its own schema. In the Schema tab of the Properties view for the stream, enter attribute names and types:
    1. In the first field under Name, enter id. Press Tab.
    2. Under Type, enter rstring. Press Tab to go to the next name field.
    3. Continue entering (and using Tab to jump to the next field) to enter the output schema attribute names and types listed in the table above.
  5. Tell the Aggregate operator what to do:
    1. Select the Averaged operator. In the Properties view, go to the Window tab. A placeholder window specification is already completed, but you need to change it slightly.
      1. Click Edit.
      2. In the Add Window Mode dialog, leave Tumbling Window selected.
      3. Set Eviction policy value to 5.
      4. Select Partitioned and leave Eviction policy blank.
      5. Click OK.
    2. Configure the window as partitioned on vehicle ID (the id attribute).
      1. In the Param tab, click Add.
      2. In the Select parameters dialog, select partitionBy and click OK.
      3. In the partitionBy value field, enter id.
    3. Specify the output assignment in the Output tab. You might need to scroll down the list of tabs or make the Properties view taller. Expand the twisty in front of Averaged in the Name column. Widen the columns and enlarge the view horizontally to see the full Name and Value columns. The attributes id and time will be copied from the most recent input tuple. This is already reflected in the Value column. By default, output attribute values are assigned from attributes of the same name based on the last input tuple.
      Because the window is partitioned by id, all tuples in a window partition have the same value for this attribute. This is not the case for time, but in this example it is reasonable to use the most recent value.

      1. Click Show Inputs. Expand the Filtered twisty and again LocationType. This shows the attributes that you can use to create an output assignment expression.
      2. Click in the value field for avgSpeed and press Ctrl+Space for content assist. In the list of possible entries, double-click Average(T) : T. (The syntax means that for any input type T, the output value will also be of type T.) This inserts Average(T) into the field.
      3. Again click in the value field for avgSpeed. Delete the T inside the parentheses and keep the cursor there. Press Ctrl+Space to show content assist, and this time select speed – float64.
        Tip for custom output functions: The functions shown in content assist are custom output functions specific to the Aggregate operator. They are not general-purpose SPL functions. Every output assignment must contain a call to one of these. The automatic assignments for the non-numeric attributes described above implicitly call the Last(T) custom output function.
  6. Specify where the results go. Select the newly added FileSink operator (AvgWriter).
    1. In the Param tab, set the file parameter to "average.speeds" (with the double quotation marks).
    2. Click Add. In the Select parameters dialog, select format and quoteStrings. Click OK. Set format to csv and quoteStrings to false.
    3. Save. Close the Properties view. Your application is ready to launch.
  7. Launch the application with a slight change to the configuration so that each operator gets its own Processing Element (PE). This is in preparation for your exploration of the Streams console.
    1. Right-click MyMainComposite in the Project Explorer and then select Launch > Launch Active Build Config To Running Instance.
    2. In Edit Configuration dialog, scroll down until you see the Fusion section.
    3. Change the Fusion scheme to Manual. Leave the Target number of PEs set to 10.
    4. Click Apply and then Launch.