The pipeline we have previously developed works great but lacks the processing part. The Dataframe also does not carry any data.
In this part we are learning how to add data to the Dataframe, modify the frame number and add a processing block, called a Plugin, to our pipeline. Let's add the plugin first and give it a name.
def example1(): aDataframe = pipeline.Dataframe() aDatasource = pipeline.Datasource() aDatasink = pipeline.Datasink() aPlugin = pipeline.Plugin() aDatasource.setName("Our Datasource") aDatasink.setName("Our Datasink") aPlugin.setName("Our Plugin") print(aDatasource.getName()) print(aDatasource.getID()) print(aDatasink.getName()) print(aDatasink.getID()) aDatasource.setOutput(aPlugin) aPlugin.setOutput(aDatasink) aDatasource.addDataframe(aDataframe) aDataframe.setFrameNumber(1234) aDatasource.run()
Our shiny new pipeline has now a plugin between the Datasource and the Datasink. Plugins are doing the actual work and make the pipeline a data processing pipeline. Let's run the new and improved pipeline.
Our Datasource
67762201
Our Datasink
92842991
Our Plugin
10391329
Hello from: Our Plugin ID: 10391329
Frame number: 1234
Hello from: Our Datasink ID: 92842991
Frame number: 1234
The Plugin also greets us with its name and ID. And like the Datasink, we also see the number of the processed Dataframe. Before generating a continuous stream of data and data frames, let's see what happens if we add more than one Dataframe to the pipeline before running it. And let's give them different frame numbers to tell them apart. Furthermore, we remove the print statements to shorten the code a bit.
def example2(): aDataframe0 = pipeline.Dataframe() aDataframe1 = pipeline.Dataframe() aDatasource = pipeline.Datasource() aDatasink = pipeline.Datasink() aPlugin = pipeline.Plugin() aDatasource.setName("Our Datasource") aDatasink.setName("Our Datasink") aPlugin.setName("Our Plugin") aDatasource.setOutput(aPlugin) aPlugin.setOutput(aDatasink) aDataframe0.setFrameNumber(1234) aDataframe1.setFrameNumber(4711) aDatasource.addDataframe(aDataframe0) aDatasource.addDataframe(aDataframe1) aDatasource.run()
Running the new pipeline gives us:
Hello from: Our Plugin ID: 29904399
Frame number: 1234
Hello from: Our Datasink ID: 25859330
Frame number: 1234
Hello from: Our Plugin ID: 29904399
Frame number: 4711
Hello from: Our Datasink ID: 25859330
Frame number: 4711
Surprised? All pipelines we are building are able to buffer the data before running! This is because every pipeline object has an internal buffer called a FIFO -- FIrst in First Out. Other names for this are circular buffer or ring buffer. For the purpose of online processing of continuous data, the ability to buffer is very useful in the case data arrives before the pipeline object has finished processing the previous data.
The next thing we are going to do is generating a stream of data, attach the data to the Dataframe, and pass it down the pipeline. We will then run the pipeline on this continuous stream of data and see what happens.
import numpy as np
def example3(): aDatasource = pipeline.Datasource() aDatasink = pipeline.Datasink() aPlugin = pipeline.Plugin() aDatasource.setName("Our Datasource") aDatasink.setName("Our Datasink") aPlugin.setName("Our Plugin") aDatasource.setOutput(aPlugin) aPlugin.setOutput(aDatasink) for n in range(100): aDataframe = pipeline.Dataframe() aDataframe.setFrameNumber(n) aDataframe.setData(np.random.random(1)) aDatasource.addDataframe(aDataframe) aDatasource.run()
The output of this looks like this:
Hello from: Our Plugin ID: 1319679
Frame number: 0
Hello from: Our Datasink ID: 93881903
Frame number: 0
Hello from: Our Plugin ID: 1319679
Frame number: 1
Hello from: Our Datasink ID: 93881903
Frame number: 1
Hello from: Our Plugin ID: 1319679
Frame number: 2
Hello from: Our Datasink ID: 93881903
Frame number: 2
Hello from: Our Plugin ID: 1319679
Frame number: 3
...
Hello from: Our Plugin ID: 1319679
Frame number: 97
Hello from: Our Datasink ID: 93881903
Frame number: 97
Hello from: Our Plugin ID: 1319679
Frame number: 98
Hello from: Our Datasink ID: 93881903
Frame number: 98
Hello from: Our Plugin ID: 1319679
Frame number: 99
Hello from: Our Datasink ID: 93881903
Frame number: 99
Quite a lot of things are happening here.
Our code produces 100 Dataframes each containing a random number as data. We also set the frame number for each of the Dataframes.
All frames are added to the Datasource which then passes them to the Plugin, which in turn then passes them to the Datasink.
Finally, the Datasink then removes all the data frames from memory.
Important technical detail:
No Dataframe is actually passed down the pipeline. Instead, only references https://en.wikipedia.org/wiki/Reference_(computer_science) to the data frames are passed down the pipeline.
Awesome! We are now ready to write Plugins to process and display the data we are generating.
And we will then add those plugins to our existing pipeline and run it!