Add a test for unexpected data

A best practice is to validate the data that you receive from a feed. Data formats might not be well defined, ill-formed data can occur, and transmission noise can also appear. You do not want your application to fail when the data does not conform to its expectations. In this lab, you will be receiving live data.

As an example of this kind of validation, you will add an operator that checks one attribute: the vehicle ID (id). In the data file, all records have an id value of the form Cnnn (presumably, with “C” for “car”). Even though it doesn’t at the moment, assume that your application depends on this format; for example, it could take a different action depending on the type of vehicle indicated by that first letter (say, “B” for “bus”). Also, there might be a system requirement that all vehicle IDs must be exactly four characters.

Rather than silently dropping the tuple when it does not match requirements, it is better practice to save the “bad” data so that you can audit what happened and later perhaps enhance the application.

In summary, the vehicle ID (id attribute) specifications are as follows:

Criterion Value
First character “C”
Length 4


Therefore, if any data comes in with an unexpected value for id, your program will shunt it aside as invalid data. There are several operators that can take care of this. Which one you use is to some degree a matter of taste. You have already used one that works well, which is the Filter. So let’s use a different one here.

The Split operator sends tuples to different output ports (or none) depending on the evaluation of an arbitrary expression. This expression can, but does not need to, involve attribute values from the incoming tuple. It can have as many output ports as you need. In this case, you need only two: one for the regular flow that you’ve been dealing with (the “valid” values), and one for the rest (the “not valid” ones).

The Split mechanism works as follows. (See the example in the figure below.)

  • The N output ports are numbered 0, 1, …, N-1.
  • A parameter called index contains an arbitrary expression that returns a 64-bit integer (an int64 or, if unsigned, a uint64).
  • This expression is evaluated for each incoming tuple.
  • The expression’s value, n, determines which output port p the tuple is submitted to:
    • If n ≥ 0, p = n modulo N
    • If n < 0, the tuple is dropped

    Tip: At any time as you follow the steps below, use Layout and Fit to Content to keep the graph organized and visible.

  1. Add a Split operator to the graph. In this case, you need one with two output ports.
    1. In the graphical editor, find the Split operator by using the Find box or browsing to Toolkits > spl > spl.utility > Split.
    2. Drag a Split operator (not a template) into the canvas and drop it directly onto the Throttled stream (output of the Throttled operator).
    3. Drag an Output Port from the palette (under Design) onto the Split operator. This gives it a second output port.
  2. Capture the Split’s second output stream in a file:
    1. Add a FileSink to the graph.
    2. Drag a stream from the second output of the Split to the input of the new FileSink.
    3. Drag the LocationType schema from the palette (under Current Graph/Schemas) onto the new stream. This turns it from a dashed line into a solid line.
      Notice that the new stream from the Split’s first output port is already solid. It automatically inherits the schema from the original Throttled stream.
  3. Edit the properties of each of the new streams. In the General tab, rename the first stream, which is the one that goes to Filtered, to Expected. Rename the second one to Unexpected.
  4. Configure the Split operator. Because it has two output streams, it is better to set a descriptive alias than to blank it out; otherwise, it would be known by the name of the first output stream (Expected), which is misleading.
    1. Edit the operator properties. In the General tab, rename it to IDChecker.
    2. In the Param tab, click Add. Then, select index and click OK.
    3. In the Value field for the index parameter, enter the following string exactly as shown. The “l” after 0 and 1 is a lowercase letter L that indicates a “long” 64-bit integer:
      substring(id,0,1) == "C" && length(id) == 4 ? 0l : 1l
      This means that if the substring of the id attribute starting at offset zero with length one (in other words, the first character of id) is “C” and the length of the id attribute is four, then zero; otherwise one. Therefore, proper IDs go out from the first port (Expected), and everything else goes out from the second port Unexpected.
      SPL expression language syntax: The syntax <Boolean-expression> ? <action-if-true> : <action-if-false> is known from C, Java, and other languages. The functions substring(string,start,length) and length(string) are from the Standard Toolkit. The suffix “l” (the lowercase letter L) indicates that the numbers are 64-bit values (“long” integers). SPL does not make implicit type conversions. Integer numbers with no suffix are 32-bit values and assigning one to a 64-bit parameter would result in an error.
  5. Configure the new FileSink operator. You’ve used a FileSink in two previous labs, so refer to those if you forgot how to do it.
    1. Rename the FileSink operator to ErrWriter.
    2. Set the following parameter values:
      Parameter Value
      file “error.observations”
      flush 2u
      format csv
      quoteStrings false

      Flushing buffered file writes: FileSink performs buffered file I/O, meaning that it writes to buffers maintained by system libraries rather than directly to disk. These buffers are only written out to disk (flushed) as they fill up, or when the requesting application terminates. When the output is a slow trickle, this can mean that you will not see anything in the file for a long time. Setting flush to 2u (the u is for “unsigned” integer) guarantees that you will see data at least in batches of two records.

  6. Save, wait for the build to finish, launch the app, and verify that the original output files, and average.speeds, are being written to the data directory as before and that the new output file error.observations has at least two records in it after a suitable amount of time. The input file contains two records with a malformed ID.