Writing a MapReduce Program and using mrUnit to test it

The advantage of using unit testing is essential when writing m/r jobs (in Java or using Streaming); Let’s imagine we want to write a m/r job that output the average length of the words in a text starting with that letter

Ex:

The Algorithm

Mapper

For each word in the line, emit the first letter of the word as a key, and the length of the word as a value

the key is the letter and the value is the length of the word

Reducer

Because of the sort/shuffle phase built in to MapReduce, the Reducer receives the keys and the list of associates length found for each word with that initial letter

ant the output is

N> the Mapper output type != from the reducer one, we have INT and DOUBLE

Implementation

Define the Driver… This class should configure and submit your basic job. Among the basic steps here, configure the job with the Mapper class and the Reducer class you will write, and the data types of the intermediate and final keys.

Use str.substring(0, 1) // String : first letter of str

str.length() // int : length of str

Define the Reducer… In a single invocation the Reducer receives a string containing one letter along with an iterator of integers. For this call, the reducer should emit a single output of the letter and the average of the integers.

Test your program Compile, jar, and test your program.
the Driver

 
public class AvgWordLength {

public static void main(String[] args) throws Exception {

Job job = new Job();
job.setJarByClass(AvgWordLength.class);
job.setJobName("Average Word Length");

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(LetterMapper.class);
job.setReducerClass(AverageReducer.class);

/*
* The mapper's output keys and values have different data types than
* the reducer's output keys and values. Therefore, you must call the
* setMapOutputKeyClass and setMapOutputValueClass methods.
*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

/*
* Specify the job's output key and value classes.
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.waitForCompletion(true);
}
}

the Mapper

public class LetterMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
for (String word : line.split(" \\ W+")) {
String wordInital = word.substring(0, 1).toUpperCase();
Integer wordLength = wordInital.length();
context.write(new Text(wordInital), new IntWritable(wordLength));
}
}
}

the Reducer

public class AverageReducer extends

Reducer<Text, IntWritable, Text, DoubleWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
double averageValues = 0;
int countValues = 0;
int sumValues = 0;
for (IntWritable val : values) {
sumValues += val.get();
countValues++;
}
if (countValues != 0d) {
averageValues = sumValues / (double) countValues;
}
context.write(key, new DoubleWritable(averageValues));
}
}

N> we can incur in an Exception for empty words…

checking the job task history

Enforcing the code to check the length on the word as :

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
for (String word : line.split("\\W+")) {
Integer wordLength = word.length();
if (wordLength &gt; 0) {
String wordInital = word.substring(0, 1).toUpperCase();
context.write(new Text(wordInital), new IntWritable(wordLength));
}
}
}

we can check the results…

@localhost log_file_analysis]$ hadoop fs -cat /user/training/avg0_0/part-r-00000 | grep "A\|E\|I\|O\|U"
A 3.275899648342265
E 5.307238813182565
I 2.1290417039114753
O 2.8046206567868732
U 4.588696504410324

for the vowels only…

Writing Unit Tests With the MRUnit Framework

Rather than using this test and trial time consuming approach we can use the http://mrunit.apache.org/ to test the m/r program on a set of input/output to be sure the code is working as expected.

We need top test the Mapper/the Reducer and the Driver; the approach is unit testing so we can isolate the data set to test and check the results.

Let’s see an example for the typical wordcount and then for the example of the average word length…

The first thing is to import the classes used in the Unit test

import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

Give a m/r job we define a Test class as

public class TestWordCount {

First let’s define the obj used for the test

this depends on the nature of the m/r job, in this case we have

Map K V = Long Text

Reduce K [V] = Text [I]

MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;

Let’s set up the test. This method will be called before every test; what we need to do is to instantiate the 3 classes above and ser Mapper, Reducer and Driver (to handle the first 2)

@Before
public void setUp() {
/*
* Set up the mapper test harness.
*/
WordMapper mapper = new WordMapper();
mapDriver = new MapDriver<LongWritable, Text, Text, IntWritable>();
mapDriver.setMapper(mapper);
/*
* Set up the reducer test harness.
*/
SumReducer reducer = new SumReducer();
reduceDriver = new ReduceDriver<Text, IntWritable, Text, IntWritable>();
reduceDriver.setReducer(reducer);
/*
* Set up the mapper/reducer test harness.
*/
mapReduceDriver = new MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable>();
mapReduceDriver.setMapper(mapper);
mapReduceDriver.setReducer(reducer);
}

Now we can test individually the mapper, reducers and driver
for the reducer we define an arraylist to simulate the list of values given the key

/*
* Test the mapper.
*/
@Test
public void testMapper() {
/*
* For this test, the mapper's input will be "cat cat dog"
*/
mapDriver.withInput(new LongWritable(1), new Text("cat cat dog"));
/*
* The expected output is "cat 1", "cat 1", and "dog 1".
*/
mapDriver.withOutput(new Text("cat"), new IntWritable(1));
mapDriver.withOutput(new Text("cat"), new IntWritable(1));
mapDriver.withOutput(new Text("dog"), new IntWritable(1));
/*
* Run the test.
*/
mapDriver.runTest();
}
/*
* Test the reducer.
*/
@Test
public void testReducer() {
/*
* For this test, the reducer's input will be "cat 1 1".
*/
List&lt;IntWritable&gt; values = new ArrayList<IntWritable>();
values.add(new IntWritable(1));
values.add(new IntWritable(1));
reduceDriver.withInput(new Text("cat"), values);
/*
* The expected output is "cat 2"
*/
reduceDriver.withOutput(new Text("cat"), new IntWritable(2));
/*
* Run the test.
*/
reduceDriver.runTest();
}
/*
* Test the mapper and reducer working together.
*/
@Test
public void testMapReduce() {
/*
* For this test, the mapper's input will be "1  |  cat cat dog"
*/
mapReduceDriver.withInput(new LongWritable(1), new Text("cat cat dog"));
/*
* The expected output (from the reducer) is "cat 2", "dog 1".
*/
mapReduceDriver.addOutput(new Text("cat"), new IntWritable(2));
mapReduceDriver.addOutput(new Text("dog"), new IntWritable(1));
/*
* Run the test.
*/
mapReduceDriver.runTest();
}
}

the pattern is so for

Mapper … using

withInput

withOutput

Reducer … using

withInput // ArrayList

withOutput

Driver … using

withInput

addOutput

Ex of to calculate Avg length of words…

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

public class TestWordCount {
MapDriver<LongWritable, Text, Text, IntWritable&gt; mapDriver;
ReduceDriver<Text, IntWritable, Text, DoubleWritable> reduceDriver;
MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable> mapReduceDriver;
@Before
public void setUp() {
LetterMapper mapper = new LetterMapper();
mapDriver = new MapDriver<LongWritable, Text, Text, IntWritable>();
mapDriver.setMapper(mapper);
AverageReducer reducer = new AverageReducer();
reduceDriver = new ReduceDriver<Text, IntWritable, Text, DoubleWritable>();
reduceDriver.setReducer(reducer);
mapReduceDriver = new MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable>();
mapReduceDriver.setMapper(mapper);
mapReduceDriver.setReducer(reducer);
}
@Test
public void testMapper() {
mapDriver.withInput(new LongWritable(1), new Text("a test try"));
mapDriver.withOutput(new Text("A"), new IntWritable(1));
mapDriver.withOutput(new Text("T"), new IntWritable(4));
mapDriver.withOutput(new Text("T"), new IntWritable(3));
// !!! get UpperCase by design
mapDriver.runTest();
}
/*
* Test the reducer.
*/
@Test
public void testReducer() {
List&lt;IntWritable&gt; values = new ArrayList<IntWritable>();
values.add(new IntWritable(3));
values.add(new IntWritable(4));
reduceDriver.withInput(new Text("T"), values);
reduceDriver.withOutput(new Text("T"), new DoubleWritable(3.5));
reduceDriver.runTest();
}
/*
* Test the mapper and reducer working together.
*/
@Test
public void testMapReduce() {
 
mapReduceDriver.withInput(new LongWritable(1),
new Text("bed ark bronx"));
 
mapReduceDriver.addOutput(new Text("A"), new DoubleWritable(3.0));
mapReduceDriver.addOutput(new Text("B"), new DoubleWritable(4.0));
// !!! output is sorted
/*
* Run the test.
*/
mapReduceDriver.runTest();
}
}

N> the different usage for the Types used in the Generic definition, so keep always clear the Mapper and Reduce Key Types

Plan Big, an agile pomodoro spreadsheet

Plan Big, an agile pomodoro spreadsheet

I’ve seen too many colleagues starting their day without spending 10 mins thinking upfront what to do during the day, ending up in this way with loads of time wasted and confusion… honestly it’s much better starting your day with a thought what you’ve done, what you need to do today and what you plan to achieve tomorrow; this is essential in a team to have a scope and a feeling to work for something meaningful… I cannot get why this simple steps are still not a dogma in so many companies… are we supposed to be slave of incompetent PM/ Team leaders forever making projects to fail just for a luck of commons sense?

You can use this google doc here to help yourself, to use it you need 2 minutes of your rapacious time and it will help you to be more focused and to think on what are you working on…

to plan your daily task – https://docs.google.com/spreadsheet/ccc?key=0AsSPt8G9vLMjdDlzMk1DdlNkQl90TzhYZm1RV1dlZUE&usp=sharing 

1st you clone it in you google doc 
 2nd you update it with the daily task and how long do you think is going to take considering

20140364_000383-CapturFiles

1 = 25 min commitment ( a pomodoro ) 
so for each task you update the column DONE 
 
20140364_000359-CapturFiles 
Note the time if updated every minute and the state of Estimated Done changes when changing the DONE column; in this way you can have see in realtime view how long it is going to take you before finishing your daily tasks
 
Last but not least you might save the daily sheet, to keep track of your week, just add a new sheet for each new week and copy and paste the columns at the end of the day, so you can switch off your brain and enjoy your free time

SSIS Incremental Pkg Definition

Let’s imagine we have a scenario where a family of similar entities needs to be processed; an example that you might have seen (working on BI projects) is to produce a set of reports that are quite similar in the structure but that might differentiate themselves for some computation applied. Let’s say you cannot use SSRS or because the nature of the problem that would not be suitable…

Scenario definition

Let’s define synthetically and in a simple way the problem to solve and then let’s see how we could address it:

Sources:

  • some ReportTypesID (to identify the type of report to produce)
  • 2 tables with data: TableX, TableY
  • 3 templates: XLS template

Target:

  • 3 XLS generated using the XLS template following the logic:

Using the data in TableX and TableY (they have some columns in common) and 3 given an XLS template produce 3 report types as following:
1. XLS = UNION of TableX & Y values and use Template1
2. XLS = INTERSECTION of TableX & Y values and use Template2
3. XLS = DIFFERENCE of TableX & Y values and use Template3

Given a parameter ReportTypeID generate the related XLS

Not flexible approach

Following a not flexible approach would define 3 different packages ( ex NotFlexibleUNION.dtsx, NotFlexibleINTERSECTION.dtsx and NotFlexibleDIFFERENCE.dtsx) with the same structure

image001

plus some vars

image003

ReportTypeID = and ID 1 to 3 in our case (useful to log what we are running)
veSQLForCalculation = sql to use in the Data Flow (to achieve UNION, INTERSECTION etc…)

and specific hardcoded logic/sql inside to address the 1…3. In this way you end up with a lot of code duplication and a lot different packages to maintain;

A more flexible approach

The following might be suitable in a general situation using child pkg to do calculations; each of one keeps a different setup for ReportTypeID and veSQLForCalculation

image005

Less code duplication but still we have different Childs pkg to manage and all the complexity related passing data between them; looking at our scenario we can reach a better compromise

Incremental approach

The main difference between the 3 reports is how we do calculation (the sql snippet in veSQLForCalculation), to grab the data used to fill the xls;
So what about keeping the sql to run in a table and use this table to configure the pkg at runtime extracting the sql and map it to veSQLForCalculation?

image007

SQL Get veSQLForCalculation grabs the veSQLForCalculation value given the ReportTypeID, that value is mapped to the var veSQLForCalculation. There is an optional SCR parse object to veSQLFor that can be omitted to simplify, what it does is cope a more realistic scenario where you have several columns form the table below, not only 1 veSQLForCalculation and the sql might be big, so does the trick in the code below:

public void Main()
{

OleDbDataAdapter A = new OleDbDataAdapter();
System.Data.DataTable dt = new System.Data.DataTable();
A.Fill(dt, Dts.Variables["User::oveMetaSQL"].Value);
// as the object are stored as varmax() this trick make able to copy in the vars, you cannot use an execute task and map to and object directly ;(... if the table was varchar(8000) ok that was possible, but some sql are much bigger

foreach (DataRow row in dt.Rows)
{

object[] array = row.ItemArray;

/*
User::veSQLForSQLCalculateControlFileTotals,User::veSQLForSQLCalculateCreateFileTotals,User::veSQLForSQLCalculateUpdateFileTotals,User::veSQLForSQLIdentifyDiscrepancyDetails,User::veSQLForSQLReconcileTotals,User::veSQLForSQLUpdateReportStatus
*
*/

Dts.Variables["veSQLForSQLTotals"].Value = array[0].ToString();
Dts.Variables["veSQLForSQLControlFile"].Value = array[1].ToString();
Dts.Variables["veSQLForSQLCalculateUpdateFile"].Value = array[2].ToString();

Dts.TaskResult = (int)ScriptResults.Success;
}
}

The table BLOG.tReportypeConfiguration is something like

ReportTypeID (smallint) veSQLForCalculation (varmax)
1 UNION

SELECT Col1, Col2

UNION ALL

2 — INTERSECTION
3 — DIFFERENCE

So we need just to call the package (from a sql agent job o PROC, passing the ReportTypeID parameter)…

Adding new Types

This design is handy when you need add more new types: just add a new row to the BLOG.tReportypeConfiguration table and probably a new related XLS template to use for the new type introduced, and without code changes affecting the package we can expand to 4 the report typed the pkg can manage. Another value is that when (if) sql changes because some fix/biz requirements we can simply update the BLOG.tReportypeConfiguration without changing the pkg

ETL Architecture Smart Examples & Snippets (1 of N)

This topic is something that is usually very hard to find on the blog, there is a plethora of blogs about SSIS tips (sometime I do that too 8P ), but honestly I think that what makes a difference is how you develop the major core part of a system, how do you think of it and the balance to keep between what you want to deliver and the amount of resources (time and skills) you have.

I am trying here to list the core components of the ETL system I have seen in my past experience trying to focus on the most relevant characteristics they had in common, so you can reuse this information when developing yours; I am not focusing on a particular problem as that would be too specific: DW projects would face some special design consideration on how to deal with SCD Dimensions and Very Deep Fact tables, where a pure Load and Transform system should be designed to be fast and balanced with a special consideration to provide restartability in case of connection issues… etc

Generally speaking an ETL system should have: (in random order)

Configurability

Each package has a list of Vars where the values that might change are maintained. These variables usually are configured using SSIS Configuration (usually indirect configuration where you a have and Environment variable that points to the path of an XML with the connection to the sql where all the vars configured value are kept)

Why to keep the pkg configuration in sql is a pro? Because in this way you could think of the following scenario:

For each XYZ SSIS pkg define these core sqls:

  • PkgConfiguration.sql
  • PROC_uspXYZ_OverrideMode.sql
  • SQLAgentJob_XYZ_NormalMode.sql
  • AddAgentJobSchedules.sql
  • SQLAgentJob_XYZ_OverrideMode.sql

I’ll describe each one in a way to focus on what they do and how they are built

PkgConfiguration.sql

It contains the pkg configuration that is maintained in the special table ETL.[SSISConfigurations]; this table has the standard columns

ConfigurationFilter ConfiguredValue PackagePath ConfiguredValueType

That dtexec uses when a package is configured …

The empty PkgConfiguration.sql is

<PRE>

DECLARE @ConfigFilter VARCHAR(255) = 'Config_#XYZ#';

IF NOT EXISTS (
SELECT 1
FROM [ETL].[SSISConfigurations]
WHERE ConfigurationFilter = @ConfigFilter
)
BEGIN

-- *** INSERT (from xls)

-- ***

PRINT 'New [ETL].[SSISConfigurations] values created on ' + @@SERVERNAME + ' in ' + DB_NAME() + ' database';
END
ELSE
BEGIN
PRINT '[ETL].[SSISConfigurations] values already exists on ' + @@SERVERNAME + ' in ' + DB_NAME() + ' database' + ' - no action taken';
END
GO

</PRE>

The part to fill with the specific code is

<PRE>

-- *** INSERT (from xls)

-- ***

</PRE>

To simplify and make that (semi) automatic you can use an external XLS like the following

In the Pkg commit sheet make a check list of the Vars that need to be kept in configuration; at minimum the Sources/Logging/Targets (so you can test the pkg in the DEV env and then move it to QA/PROD just updating the sql configuration – don’t hardcode them!)

So how to we setup the:

<PRE>

-- *** INSERT (from xls)

-- ***

</PRE>

I came up to this approach:

Use the BIDS wizard to put in configuration all the vars listed in the Pkg commit sheet

When done; BIDS will fill AUTOMATICALLY the .[ETL].[SSISConfigurations] for the ‘Config_PKG name NO ext’ ConfigFilter

What you have to do is to use the DEV Build SQL Insert and run the following

DECLARE @ConfigFilter varchar(255) = ‘Config_PKG name NO ext’;

SELECT *

FROM [ETL].[SSISConfigurations]

WHERE ConfigurationFilter = @ConfigFilter"

And place the values in the sheet to build the INSERT snippet:

Let’s have a look through screenshots using fake values:

And to use the sql snippet from the last sheet

" DECLARE @ConfigFilter varchar(255) = ‘Config_BLOG’;

SELECT *

FROM [ETL_Logging].[ETL].[SSISConfigurations]

WHERE ConfigurationFilter = @ConfigFilter"

Note> In XLS you can play a lot with cell ref to propagate values and keep all consistent…in this case the cell value is

=" DECLARE @ConfigFilter varchar(255) = ‘"& E3&"’;

SELECT *

FROM [ETL_Logging].[ETL].[SSISConfigurations]

WHERE ConfigurationFilter = @ConfigFilter"

Where E3 is

="Config_"&’Pkg Commit’!B2

For example the row

Config_BLOG

Y

Package.Variables[User::SourceReady].Properties[Value]

String

SELECT @ConfigFilter ,’Y’,’Package.Variables[User::SourceReady].Properties[Value]’,’String’ UNION ALL

is calculated as

=IF(D15="DateTime",

" SELECT @ConfigFilter "&",’"&TEXT(B15,"DD/MM/YYYY hh:mm:ss")&"’,’"&C15&"’,’"&D15&"’ UNION ALL ",

" SELECT @ConfigFilter "&",’"&B15&"’,’"&C15&"’,’" &D15&"’ UNION ALL ")

So we can copy and paste the yellow code as :

<PRE>

DECLARE @ConfigFilter VARCHAR(255) = 'Config_BLOG';

IF NOT EXISTS (
SELECT 1
FROM [ETL].[SSISConfigurations]
WHERE ConfigurationFilter = @ConfigFilter
)
BEGIN
-- *** INSERT (from xls)

	INSERT INTO [ETL_Logging].[ETL].[SSISConfigurations] (ConfigurationFilter, ConfiguredValue, PackagePath, ConfiguredValueType)
	SELECT @ConfigFilter ,'Y','Package.Variables[User::SourceReady].Properties[Value]','String' UNION ALL
	SELECT @ConfigFilter ,'Y','Package.Variables[User::TargetReady].Properties[Value]','String' UNION ALL
	SELECT @ConfigFilter ,'ETL','Package.Variables[User::LoggingDB].Properties[Value]','String' UNION ALL
	SELECT @ConfigFilter ,'OMEGA1','Package.Variables[User::LoggingServer].Properties[Value]','String' UNION ALL
	SELECT @ConfigFilter ,'HOGAN','Package.Variables[User::SourceDB1].Properties[Value]','String' UNION ALL
	SELECT @ConfigFilter ,'X','Package.Variables[User::SourceServer].Properties[Value]','String' UNION ALL
	SELECT @ConfigFilter ,'ALLINONE','Package.Variables[User::TargetDB].Properties[Value]','String' UNION ALL
	SELECT @ConfigFilter ,'0','Package.Variables[User::TargetImportMonth].Properties[Value]','Int32' UNION ALL
	SELECT @ConfigFilter ,'OMEGAX','Package.Variables[User::TargetServer].Properties[Value]','String' -- UNION ALL

-- ***

PRINT 'New [ETL].[SSISConfigurations] values created on ' + @@SERVERNAME + ' in ' + DB_NAME() + ' database';
END
ELSE
BEGIN
PRINT '[ETL].[SSISConfigurations] values already exists on ' + @@SERVERNAME + ' in ' + DB_NAME() + ' database' + ' - no action taken';
END
GO

</PRE>

And you have done; what is left is to clone this with QA/PROD specific values (changing server at least) and deploying the pkg as it is + the PkgConfiguration_QA.sql and PkgConfiguration_PROD.sql so the pkg will configure itself at runtime… and any other changes will involve a DML against [ETL].[SSISConfigurations], without the configuration you should each time edit the pkg and redeploy it … what a nightmare and how many things could go wrong …

Conclusion

So that’s I think a nice tip to use, I guess; we’ll see

PROC_uspXYZ_OverrideMode.sql

SQLAgentJob_XYZ_NormalMode.sql

AddAgentJobSchedules.sql

SQLAgentJob_XYZ_OverrideMode.sql

in the next posts, in the middle time feel free to comment and give your impression or ask for clarification ;)

SSIS Passing ObjVar from Child back to Parent

It’s straightforward to use Parent variable configuration to alter a child package variable using the Execute package.

A bit trickier is to do the other way around. Let’s see how…

Let’s assume we have a SSIS project with the following:

ParentPkg.dtsx

With the following vars:

And

ChildPkg.dtsx

With the following vars:

And as simple Parent Var configuration to get the value of TargetMonth

How does it work:

The flow is the following:

1. ParentPkg runs:
1.1. it sets the value of the TargetMonthEnd
1.2. it calls the child passing it
1.3. The ChildPkg runs:
1.3.1. Get the value of TargetMonthEnd from the parent
1.3.2. Calculate the value of TargetMonthEndChild
1.3.3. Copy it to TargetMonthEnd
1.4. Print both values

some snippets from the code by section:

1.1

<PRE>
Dts.Variables["TargetMonthEnd"].Value = 20140131;
</PRE>

1.3.2

<PRE>
int TargetEndMonth = Convert.ToInt32(Dts.Variables["User::TargetMonthEnd"].Value);

Dts.Variables["User::TargetEndMonthTextChild"].Value = string.Format("[ {0} ] ", TargetEndMonth);
</PRE>

The most relevant part is
1.3.3

<PRE>
public void Main()
{
try
{
Variables vars = null;
if (Dts.VariableDispenser.Contains("User::TargetEndMonthText")
&& (Dts.VariableDispenser.Contains("User::TargetEndMonthTextChild")))
{
Dts.VariableDispenser.LockForWrite("User::TargetEndMonthText");
Dts.VariableDispenser.LockForRead("User::TargetEndMonthTextChild");
Dts.VariableDispenser.GetVariables(ref vars);
vars["User::TargetEndMonthText"].Value = vars"User::TargetEndMonthTextChild"].Value;
vars.Unlock();
}
}
catch { }
Dts.TaskResult = (int)ScriptResults.Success;
}
</PRE>

In this way we can run the child as standalone without breaking it in case it’s not called by a Parent.

The way it works behind the scene is (probably) the parent when using the Execute Package tasks, the Parent and the Child share the same collection of vars (passed by reference) so they can modify them.

Test and Execution

Let’s ‘unit test the Child’ and then go for AN integration test:

As expected the child just pick the default value when called standalone

Now let’s run the Parent with Child package enabled

And to double check … with child disabled

So the rule is

to match the share 2 vars and

to have an extra var in the child that will do the trick.

BI consultants the hard way SSIS template package my way

So you have some Job spec to implement, one or more SSIS packages to implement data extraction, data loading, file moving etc., sometime the approach to deal with sql tasks involve using parameters and dynamic queries, are you going to hard code them or using sql params (that might change if using oledb or ado.net connection…)

Assuming you have already coded some sql snippet that will address the logic parts of the Spec, let’s see how to build a package that can integrate them keeping the sql in the package in a sync with the sql code from the Spec. This is very useful in real case scenario, as Spec changes and you need to apply them to the pkg keeping all together, we live in an Agile word, specially if you’re consultant.

This screenshot represents the objects involved, let my details of them part by part

I assume the package use SSISConfigurations and you have some sort of table where you log the execution of the package (Batch) and probably another table to keep the details of each Batch execution (common approach for an etl logging framework)

The History SEQC

It’s very important to manage that, but having only some nice text that give info about the code changes, it’s not the most important thing, it’s very important to increment the values of 2 pkg level Vars

<DTS:Property DTS:Name=“VersionMajor”>1</DTS:Property>

<DTS:Property DTS:Name=“VersionMinor”>0</DTS:Property>

This will help you a lot when troubleshooting something in production deployment, it happened to me to see old version of packages deployed rather than the upgrades…

The title is mapped to a Var; using an expression as following

@[System::PackageName] +””+ @[User::BatchProcessDescription]
Where the latter is

“<Package purpose description to go here>”+”ver “+

(DT_WSTR, 11) @[System::VersionMajor] +”_”+

(DT_WSTR, 11) @[System::VersionMinor] +”_b”+

(DT_WSTR, 11) @[System::VersionBuild]

The BatchProcessDescription it’s a Var used to log the current execution status in a table so in this way you can check the running states of the packages deployed querying that table, and even to use it as a simple semaphore/coordinator.

TaskFlow block

All the logic it’s in the following block

1) Print Vars is used to log all the Vars contents in the logging tables; this is great when you need to retrospect some batch execution looking for root cause issues

2) the SQL Log Started/Success are just sql task to log in a table the main task flow details such as LoadStart, End and Status

Ex

It might be necessary to do some pre/post checks before the actual execution, so the 2 containers come handy to that;

Ex if the package loads data from Source to Target, you might check that data is not loaded already (in a normal execution mode) so you can add come checks in SEQC Housekeeping Pre Tasks, as the main block it’s intended to log actual data movement…

Execution override

It’s very common to use some sort of configuration to configure package variables used in the dataflow; the configuration happens when the package is loaded, so this sections it’s a sort of handy tool for the development and testing

The Boolean var Dev_Mode is used to set the behavior of the pkg itself. When true SEQC Dev Mode Pre is enabled so the Dev_Mode Setup is hit

Ex

You can override some values coming from the Config, so you can change the behavior of the package without updating, for example, anything on SSISConfigurations table (if you’re using it)

Another handy SCR is Parse Meta SQL veSQLFor, as you can end up with loads of sql script and related sql, you need to keep this in Vars (or expression Vars); this tasks will parse some meta sql very close to the sql you had written to test the spec in pre-coding phase and translate is sql expression you can copy enad paste in Var expression, rather than doing for each of them manually…

Ex

Let’s assume you have this 2 sql snippet to implement this (dummy) logic

— 1.1 Read Data.sql

SELECT GETDATE() AS CurrentDate

And

— 2.1.Do something

DECLARE @dateToCheck DATETIME =’2014-01-01— PARAM

SELECT DATEDIFF(SECOND, @dateToCheck, GETDATE()) DiffInSeconds

You can define 2 sql vars

veSQLFor1 to keep the 1.1

veSQLFor2 for 2.1, but it’s more tricky as it needs to get the value calculated from the previous code, and assuming the value is saved as result in a datetime var CurrentDate, you have to write something like that:

“DECLARE @dateToCheck DATETIME = Â ”

+ (DT_WSTR, 4 ) YEAR(@[User::CurrentDate]) + “” +

Right( “0” + (DT_WSTR, 2 ) MONTH(@[User::CurrentDate]),2) + “” +

Right( “0” + (DT_WSTR, 2 ) DAY(@[User::CurrentDate]),2) +

SELECT DATEDIFF(SECOND, @dateToCheck, GETDATE()) DiffInSeconds

image
As you can see, apart the complication of the SSIS type conversions, you have an nice match with the original sql.

The Parse Meta SQL veSQLFor does that: will parse the some sql metadata and it will print the SSIS expression so you can define the var expression without having to crate in the dialogue (super expensive in time and not handy)

Ex

Given this input

— 2.1.Do something

DECLARE @dateToCheck DATETIME =@User::CurrentDate

SELECT DATEDIFF(SECOND, @dateToCheck, GETDATE()) DiffInSeconds
it will print the SSIS expression, so you can copy and paste it and set the veSqlFor2 expression (it’s done just on package development of course, I will write a specific post to share the details)
public void Main()

{

//N> this are contained initially in the lveSQLForxyz

List<string> lVarsToReplaceInMetaSQL = new List<string> {
“User::vSourceTable001”,
“User::vTargetTable001”
};

//TODO
List<string> lVeSQLForVars = new List<string> {
“veSQLFor1”
};

//dict that will contain the lveSQLForxyz with proper SSIS exprseesion coming for the MetaSQL
Dictionary<string, string> dictMetaSQL = new Dictionary<string, string>();

//N> Add MetaSQL form Mng SQL
AddMetaSQL(dictMetaSQL);

//N> update ExpressifyVar()
ParseMetaSQL(dictMetaSQL, lVarsToReplaceInMetaSQL);

PersistInVars(dictMetaSQL);

CheckLenghtNoUnreplaced(dictMetaSQL);

PrintSQL(lVeSQLForVars);

Dts.TaskResult = (int)ScriptResults.Success

Dev_Mode Check for TOP it’s just a way to check if you are using TOP in some sql clause, very common when prototyping the flow, you need just to process/extract a few row to check the logic works, and you can fully run it in a finalization stage to check more etl performance issue if any…

Finally all the main EventHandler are enabled/disabled by Dev_Mode, as I noticed this speedup the package execution; it’s mapped to the Disable property

BI consultants the hard way from specs to ssis

So you have some Job spec to implement, one or more SSIS packages, let’s see an approach that will help to build a quality package in short time.

A lot of BI developers just jump into BIDS and start coding, trying to match the specs and reworking what done very often, as they understand the data workflow: wrong. This approach is inconsistent, long and keen to errors.

Unfortunately you are a BI consultant, not many tools are given to you, so let’s be smart; a cool approach I developed in the last roles it’s the following, please give me some feedback.

The idea is quite simple: use an excel worksheet to abstract the business logic of the spec so the implementation would be a matter of clicks and drag and drops in BIDS, when the dataflow is completed with the necessary tasks, connections, ad vars.

This is an example of an xls:

The general structure of the xls has the following columns:

  • SPEC
  • TaskName
  • Connection Input
  • Input Var Used
  • veSQLFor Input Var
  • DFT Fields
  • ROW_Count
  • Output Var
  • Connection output
  • hyperlink to file

let’s see what they mean.

SPEC

It’s just a reference to the spec doc section (if any)

If they are missing I suggest adding them to split the spec logic in more manageable chunks.

TaskName

The name of the task; try to prefix the name itself with the type of task

Ex:

SQL Read Latest Business Date Active = sql task to read latest

SCR Extract CurrentZipDate = Script Task to extract …

DFT Import the file content = Data Flow Task to import …

In this way you can apply (as I did) a Conditional formatting on the column to see them in different colours

Connection Input

It’s the name of the connection used by the task (if any). This is very useful so you always know the task to which data source (file or db or other) is pulling data from

You can use the

To define the connection used by the task and define a data validation:

In this way the when you need a new connection you’ll add first it to the Connection worksheet and then you can select in the Task sheet for the Task itself… keeping it consistent; avoid to copy and paste the text.

Input Var Used

The idea is similar the connection; the pkg vars are often source of errors if not managed properly, the best is to list them in a separate sheet and setup the Data validation to constraint the value you can select in the Task sheet for the Input Vars Used column

veSQLFor Input Var

This is an advanced topics, I will post something specific, in general avoid hardcodig sql snippets in the tasks, define it as vars and use expressions if they needs to be dynamic (when they dependent on some vars value); at least you can use a formula to build the name of the vars based on the task, so the matching would be straightforward:

Ex

SQL Update ControlTable = veSQLForSQLUpdateControlTable

So in the Sql Task SQL Update ControlTable, the script will get the sql from a vars named veSQLForSQLUpdateControlTable.. Easy.

You can use a simple formula like this to build the name

=”veSQLFor”&SUBSTITUTE(CELL WITH TASKNAME,” “, “”)

DFT Fields & ROW_Count

This can be optional and of course they hold the Field of a DFT task and if any ROW_Count you need to log

Ex

Usually in Data extraction you need to log the number of rows read/inserted/deleted etc… keep name consistency

Output Var &Connection output

The same as for the Input ones, if the task has outputs and connections or it’s updating a var here is where to track them

In this way you can see the dataflow even you haven’t coded the dtsx yet, just scrolling down the xls…

Hyperlink to file

This make sense if you have read the previous post https://mamatucci.wordpress.com/2014/01/15/bi-consultants-the-hard-way-structuring-sql-and-ssis-solution/ as I advice to write the sql in separate snippet and to keep it a separate sql solution, you can use a hyperlink to open the sql snippet from xls and check the details of it.