Big Data and KNIME
There is a lot of talking these days about big data.
I have always been kind of skeptikal about it. All of my projects so far have been working very well using KNIME alone, even with very large amounts of data (up to 50 millions rows).

176 Mio Rows processed with KNIME
However, recently, I have worked on a very particular project on electricity usage data, where I had to aggregate and transform 175 millions of data rows (take or give a million of data rows).
The project included reading the time series of electricity usage sampled every half an hour between July 15th 2009 and January 1st 2011 for around 6000 monitoring meter ID (household or business) placed around Ireland. The data, indeed, came from a pilot that the electricity and gas Irish company ran to evaluate how much information such meter ID could bring.

The final goal of the project was to predict the electricity usage for clusters of meter IDs.
The half-an-hour electricity usage data was aggregated into an hourly, daily, and monthly time series for each meter ID. Aggregated information (or KPIs) was extracted from each time series to describe the electricity consumption behavior of each meter ID. On the basis of these KPIs, the 6000 meter IDs had been clustered into 28 clusters only. I then worked on the prediction of the hourly, daily, and monthly time series of the 28 cluster prototypes.

The project is distributed over three KNIME workflows. The first workflow imports, aggregates, and transforms the raw data, as it is shown in the picture below.
The workflow ran on a 4-core laptop, with 8 GB RAM, 64-bit, 2.20 GHz, and Windows 7.
The first meta-node, named "Read all Data", loops over all files to read and concatenate their content. The output of that meta-node is a massive data table of 176 millions rows containing all available raw data. In order to read all those rows from 6 files, the loop takes only half an hour. Which is really astonishing considering the amount of data!

The second meta-node, named "String to Datetime", converts all date/ time values (days, months, year, hours, and minutes) from the original proprietary format into a KNIME DateTime object. The last node of the sub-flow contained in this meta-node is a "Sorter" node to sort all rows in ascending order by time. All date/time transformation nodes were executed relatively quickly, using all together up to 2 hours. The "Sorter" node on the opposite represented the bottle neck of the whole sub-workflow, using almost 5 hours.

Finally, the aggregations. Two parallel meta-nodes aggregate the data rows into a daily, monthly, weekly, yearly time series and into an hourly time series respectively and calculate the amount of energy used during week-end vs. businees days, during each day of the week, and during morning, afternoon, evening, night, early morning, and late afternoon in average. These two meta-nodes also calculate new time series for each meter ID on a daily, monthly, weekly, yearly, and hourly scale. Both nodes are massively time consuming, due to all aggregations performed in terms of "GroupBy" and "Pivoting" nodes.

The "GroupBy", "Sorter", and "Pivoting" all share the same sorting algorithm and might subtract resources from each other, if running at the same time. This effect is not noticeable for smaller data, even for 50 millions rows. But with 176 millions rows it
quickly becomes a problem. In this case, you need to execute one node after the other, either manually or setting artificial dependencies by means of flow variable connections
across nodes. Still, even after inserting dependencies, these two meta-nodes took almost 3 days in total  to run.

All calculated values are then joined together and a few additional percentages are also inserted. The execution of this meta-node, named "% values", adds just a few hours to the total execution of the workflow.

The good news is that KNIME does not break. If you have enough time and patience you can still run your workflow on 176 millions rows, slowly but without major issues. In other projects using very large amounts of data, for example 50 millions rows, KNIME has never crashed and the execution times on the laptop above have always been acceptable.

The same Workflow using "RushAccelerator for KNIME"
However, for this project I had only one week available to present some kind of results. Three days of execution time were not really possible. For the first time, then I decided to approach the KNIME solution for Big Data, named RushAccelerator for KNIME and produced by Actian (http://bigdata.pervasive.com/Products/RushAccelerator-for-KNIME.aspx).
You can download a free trial or buy a license from their web site. Actually I downloaded what is now an old version of the "RushAccelerator for KNIME" software, when the company was still called Pervasive. Nowadays there is a new version with many more nodes available and a brand new company name "Actian".
Picture
"RushAccelerator for KNIME" installs a new category in your KNIME "Node Repository", named "Pervasive DataRush" that contains a few specialized nodes to run on a big data platform (see picture on the left).
With those, I rebuilt my workflow using the RushAccelerator for KNIME nodes for datetime manipulation, values calculation/extraction, and most of all aggregations.

On the plus side, you can mix most of KNIME nodes and RushAccelerator nodes without problems and some nodes have a similar GUI as their corresponding KNIME nodes.

On the minus side, the configuration window of some RushAccelerator nodes is quite different with respect to the configuration window of the corresponding KNIME nodes, like for example the "Rows to Columns" node which is supposed to perform the same task as the "Pivoting" node. However, the highly time consuming tasks were only a few, repeated many times. In addition, RushAccelerator is not for free, but license costs are affordable.

RushAccelerator nodes also have a new tab in the configuration window, named "Job Manager Selection". Here you can set the engine you intend to run the workflow on: either the default engine or the DataRush executor.

The new workflow built with RushAcceletor nodes is shown in the figure below. The structure remained the same as the original workflow and most of the nodes contained in the meta-nodes actually mirror the original KNIME nodes.

The new Execution Time
And now about the execution time!
The result was quite impressive. The execution time of the whole workflow went down from circa 3 days to one hour and 16 minutes!
That was amazing and moved my attitude towards Big Data from full skepticism into cautiously positive.
I did find an example here where Big Data really made the difference between the success of the project and a total failure!

This does not mean that all workflows need to run on some Big Data platform. However, depending on the amount of data, available time and machine, sometimes you might wish to speed up execution times enough to get results in a reasonable time!
 
 
How to calculate  Year To Date values (YTD) with KNIME 2.0?
The question is not trivial, since KNIME does not allow interactions across rows but only across columns in a data set. It seems it will be possible in KNIME 2.1 version inside the Java Snippet node. However, I am still working with the 2.0 version and I had to implement a workflow that calculated YTD values. The final solution was actually not too complicated.

Let's suppose that we have a data set with a number of values on the x-axis and all the months of the current year on the y-axis. We do receive the data in this format and we need to produce the ouput also in this format with the year months on the y-axis. There are three main steps to calculate its YTD values. 

    -  transpose the data set so to have the months as column headers
    -  calculate the YTD values across each row
    -  transpose the data back so to have the months as Row ID and the original and new YTD values as column headers
Picture
Transpose metanode  
  1. All rows must contain ONLY DOUBLE values. If values are mixed in a row, for example double and string, a transpose operation on that row will produce a column with unknown type with a number of consequent problems on unknown type columns (for example: Java snippet node does not work on unknown type columns). This means that all no double value columns must be dropped from the data. This is what the first node, a Column Filter, does.
  2. Same problem for rows with all missing values. A transpose operation will produce a column with unkonwn type for that row. Here we can proceed with a "Missing Values" node, but if we do not want to alter the data set content is enough to insert a "temp" column with constant value 0.0 at the end of the data set. The temp values will force columns with all empty values to take the type DOUBLE. This is the task of the "Rule Engine" node. (default =0.0 as a String, column_name = temp).
  3. String to Number converts the values 0.0 in temp column from String into Double.
  4. We can finally perform the Transpose operation. Now all months are the column headers and all our original values are the RowID.
  5. Now we have the data in the desired form where an YTD operation across rows would be allowed. Let's remove all DOUBLE missing values in order to avoid undesired states of the following Java snippet nodes.
  6. We need now to filter out the row "temp". it is not necessary, but just to make the YTD operation cleaner. Column "temp" was created with column header "temp". This means that we have now a row with RowID "temp". Since there is no other way to identify this extraneous row, we need to import all Row IDs into a real column (let's call it "items") and from there filter the row where the value of "items " is "temp". This is the task of the RowID node and the Row Filter node.
  7. Finally, the Column Filter node removes this artificially created column "items".
Picture
YTD metanode
Here we have 12 Java Snippet nodes, each one calculating the YTD values for each month of the year, as for example for the month of April:     
   
       double sum = 0.0;
       sum += ($Jan$+$Feb$+$Mar$+$Apr$);       
      return sum;  

The new value is then written in  column Apr_YTD and appended at the end of the data set.
Picture
Transpose Back metanode
Now we need to trasnpose the whole data set back, to have the months on the y-axis and the values (original + YTD values this time) on the X_axis.
  1. The first Column Filter gets rid of the original values. Keeps only the newly calculated YTD values.
  2. We perform a rename of the columns "<month>_YTD" into just the name of the "<month>", that is column "Apr_YTD" will be renamed just "Apr".
  3. We perform the transpose back operation
  4. We rename all values that are now the column headers from "<column_header>" to "<column_header>_YTD".
  5. The node RowID creates a copy column of the RowID with the name of the month.
  6. The rule engine is writing for each row the current year from a workflow variable.
  7. And finally the Row Filter selects the month of which YTD value we are interested in.  
 The result is now a data set with only one row (the month of interest) and all the YTD values for that month.
Picture