03 November 2013

Summary of DataStage Design Patterns

Here is a list with the DataStage patterns I have published on this blog:

Will try to update it when something new is posted.

DataStage Patterns–Compare Transformer Record with the Previous Record

A common requirement in a DataStage job is to compare within a Transformer stage the current record with the previous record. For example consider the following rule:

if record(T).TotalPrice – record(T-1).TotalPrice > 10 then 0 else 1

There are several ways to implement this in DataStage:

Use Stage Variables in the Transformer

The straight forward approach will be to define stage variables in the Transformer stage and keep the value(s) from the previous record there. The following screenshot illustrates this (please note that the order of the stage variables is important):

image

Use Join Over a Dummy Column

The disadvantage of the previous method is the use of the stage variables – if we need to access more columns from the previous record the number of the stage variables will increase and the transformer will be too complicated.

Another idea is to have a transformer that will have two output links. Each output link will introduce a new column called “dummy”. The dummy column in the first link will have values from 1..n (where n is the number of input records) and the dummy column from the second link will have values from 0..(n-1). Later if you join both links over this dummy column you will get the values of the current and the next record in one row. The above is illustrated in the following screenshots:

image

image

image

02 November 2013

DataStage Patterns–Generate Sequence of Numbers in a Parallel Transformer

A common requirement in the DataStage jobs is to create a sequence of unique numbers (row id in other words) in a parallel transformer stage. The obvious solution bellow does not work if the job is running on multiple partitions:

image

To overcome this the @PARTITIONNUM, @NUMPARTITIONS and @INROWNUM system variables should be used in a transformer derivation like this:

@PARTITIONNUM + (@NUMPARTITIONS * (@INROWNUM -1))

image

26 October 2013

Discover the DataStage Transformer Code

DataStage generates code for each transformer stage and then compiles this code to produce the executable bits used during a job run. You can not inspect the generated code from the DataStage designer but in this post I will show you how to view it using 3rd party tools.

Let’s say that we have a job with transformer named tr_IncrCol that increases a value of a column:

image

View the TRX Code

To find out the TRX code (a kind of pseudo code similar to C++) for the transformer issue the following commands in the shell:

cd /opt/IBM/InformationServer/Server/Projects/YourProjectName/
find . -name *YourJobName_YourTransformerName.trx
In the case of the transformer above the command will return a file ./RT_SC6/V0S0_TransformerCodeDemo_tr_IncrCol.trx that has the following contents:


//
// Generated file to implement the V0S0_TransformerCodeDemo_tr_IncrCol transform operator.
//
 
// define our input/output link names
inputname 0 in;
outputname 0 out;
 
initialize {
        // define our control variables
        int8 RowRejected0;
        int8 NullSetVar0;
 
}
 
mainloop {
 
        // initialise the rejected row variable
        RowRejected0 = 1;
 
        // evaluate columns (no constraints) for link: out
        out.col1 = (in.col1 + 1);
        writerecord 0;
        RowRejected0 = 0;
}
 
finish {
}

View the C Code


To view the C code you will need to define the following environment variable APT_TRANSFORM_OPERATOR_DEBUG for your project (by using the DataStage Administrator):


image


Now use “Force Compile” from the DataStage designer to recompile your job and issue the following command to find the C file:



cd /opt/IBM/InformationServer/Server/Projects/YourProjectName/
find . -name *YourJobName_YourTransformerName.C

The find command returns a file ./RT_BP6.O/V0S0_TransformerCodeDemo_tr_IncrCol.C with the following contents:



// -*-Mode: C++-*-
/******************************************************************************
*
* Boiler plate for generated transform operator.
*
* Module: bp.transform.C
*
* Licensed Materials  Property of IBM
* "Restricted Materials of IBM"
* (C) Copyright IBM Corp. 2005, 2012
*
* (c) Copyright 2005 Ascential Software Corporation. All Rights Reserved
* This is unpublished proprietary source code of Ascential Software Corporation.
* The copyright notice above does not evidence any actual or intended
* publication of such source code.
*
******************************************************************************/
/*
#######################################################################
##
##      Maintenance log - insert most recent change descriptions at top
##
##      Date   RTC#    WHO     Description.
##   03/02/12  97190   lag     Added destructor logic
##
##      Date   CQ#     WHO     Description.
##   08/25/10  221869 msashiha tweaked stringToNumeric for var. length strings
##   10/20/09  180149  eaj     Transform code optimizations
##   06/18/08 prod93684 msashiha avoid passing STL objects in API methods
##   05/07/08  131689  eaj     Fix string constant conversion issues related to
##             131831          MS 2005 compiler behavior change
##   02/15/08  125689 jcallen  fix the copyright notice
##   06/08/07  112670 dsotkowi  Merge from main
##   02/02/07  112670 dsotkowi   Removed writeOutputRecordChild()
##   01/29/07  112670 dsotkowi   Move more code from transform.C generation 
##                               into transformbase.C & .h
##   01/24/07  112670 dsotkowi Only output writeRecordChild() if has tables.
##   01/22/06 112670  dsotkowi Transform refactoring
##   01/16/07   97851 dsotkowi If there is a null and no reject dataset abort
##                             if APT_EnvironmentFlag APT_TRANSFORM_ABORT_ON_REJECT_NULL 
##                             is set otherwise if there is no reject dataset drop the record.
##   10/06/06  110505 ahirshbe Ported next entry to main/LATEST.
##   10/06/06  107791  eaj     Correctly cleanup temp lookup fileset
##   12/05/06 112590  dsotkowi Fix table -save option.
##   09/08/06 104659  dsotkowi Fix friend APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol_issueConversionWarning linkage.
##      Date   Ecase#  WHO     Description.
##   08/10/06  97302   eaj     Fix createonly and fileset range lookup handling
##   03/07/05  None   ahirshbe Make issueConversionWarning a static member function.
##   03/06/06  88658  dsotkowi Restore declaration of convStat - 
##                             fails transformop_test_MK_bug7 among others
##   02/09/06  85328  vpechori Surrogate keys from DB Sequence.
##   08/18/05  69187   xpu     Added EOW support.
##   06/13/05  66571   eaj     Move sharing control from ops to tables
##   05/04/05  73547   eaj     Handle conversion failures better
##   04/29/05  73479   eaj     Explicitly force std::llabs on linux
##   02/17/05  n/a     eaj     Updatable functionality
##   01/31/05  68170   xpu     Redefine max() and min() on Linux.
##   07/22/04  30156   eaj     Handle a hex value in APT_STRING_PADCHAR
##   03/15/04  none    eaj     Use osh option -string_fields for inserted
##                             field type
##   02/28/04  n/a     xpu     Moved declaration of operator+ out of 
##                   the class.
##   01/26/04          linda   Adding OS/390 support
##   01/21/04  43656   xpu     Update output sort keys at run time.
##   11/21/03  n/a     xpu     Inserted spaces for indention.
##   09/26/03  39721   xpu     Added a newline at the end of the file to
##                avoid aCC 3.37 future fatal error 690.
##   09/04/03  39209   eaj     NLS this fix since it came from 6.0
##   09/04/03  39209   eaj     Added use of APT_STRING_PADCHAR for fixed
##                             length strings.
##   05/08/03  9607    xpu     Removed setting dataset schemas
##   04/09/03  20305   eaj     Added handling of + operator for ustring
##                             string mixes
##   04/09/03  20130   eaj     Add handling of collation sequence option
#######################################################################
*/
 
#include <apt_components/transformop/transformbasehdrs.h>
 
// user-defined function header file
 
 
#include <apt_components/transformop/transformbase.h>
 
// Allow for addition of mixed strings and ustrings
APT_DLL_BUILDOP APT_UString operator+ (const APT_UString&, const APT_String& );
APT_DLL_BUILDOP APT_UString operator+ (const APT_String&,  const APT_UString&);
 
class APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol: public APT_TransformBaseOP
{
public:
  APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol();
  ~APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol();
 
protected:
  // pure virtual functions in parent
  virtual void          processInputRecordChild(int inputDS, int curOutput[], int writtenOutput[]);
  virtual void          setRejectColumnValue(const APT_UString &errMsg, int rejectDs);
 
  // implement the parent's pure virtual methods
  virtual APT_Status    describeOperatorChild(APT_Schema* inputSchema, APT_Schema* outputSchema);
  virtual APT_Status    doInitialProcessingChild(APT_InputAccessorInterface* inpInt[]);
  virtual APT_Status    initializeWaveChild();
  virtual APT_Status     doFinalProcessingChild();
  virtual void          processEOWChild(int inputDS);
  virtual void          serializeChild(APT_Archive& ar, APT_UInt8);
  virtual APT_Status    setupInputInterfaceAccessorChild(APT_InputAccessorInterface** inCur);
  virtual APT_Status    setupOutputInterfaceAccessorChild(APT_OutputAccessorInterface** outCur);
 
    
private:
  // internal macros, required for persistence mechanism
  APT_DECLARE_RTTI(APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol);
  APT_DECLARE_PERSISTENT(APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol);
 
  // Helper conversion class, should be moved into transformbase class
  // in future releases
  template <class T> class stringToNumeric
  {
  public:
    static void convert(T& result, const APT_String& source, char* pBuf,
                        bool* msgPrinted, const APT_String& addlMsg);
    static void convert(T& result, const APT_UString& source, UChar* pBuf,
                        bool* msgPrinted, const APT_String& addlMsg);
    static void convert(APT_Decimal& result, const APT_String& source,
                        bool* msgPrinted, const APT_String& addlMsg);
    static void convert(APT_Decimal& result, const APT_UString& source,
                        bool* msgPrinted, const APT_String& addlMsg);
    static void convert64(APT_Int64& result, const APT_String& source, char* pBuf,
                          bool* msgPrinted, const APT_String& addlMsg);
    static void convert64(APT_Int64& result, const APT_UString& source, UChar* pBuf,
                          bool* msgPrinted, const APT_String& addlMsg);
    static void convertU64(APT_UInt64& result, const APT_String& source, char* pBuf,
                           bool* msgPrinted, const APT_String& addlMsg);
    static void convertU64(APT_UInt64& result, const APT_UString& source, UChar* pBuf,
                           bool* msgPrinted, const APT_String& addlMsg);
  private:
    static void issueWarning(bool* msgPrinted, const APT_String& addlMsg, bool isDecimal = false);
  };
 
  // input accessors
  APT_InputAccessorToInt32 input0Int32col1;
 
 
  // output accessors
  APT_OutputAccessorToInt32 output0Int32col1;
 
 
  // declare stage (global) variables.
  APT_Int8 stage0RowRejected0;
  APT_Int8 stage0NullSetVar0;
 
};
 
APT_IMPLEMENT_RTTI_ONEBASE(APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol, APT_CombinableOperator);
APT_IMPLEMENT_PERSISTENT(APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol);
 
APT_DEFINE_OSH_NAME(APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol, V0S0_TransformerCodeDemo_tr_IncrCol, ARG_DESC);
 
APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol()
 
{
  // initialize base class
  const unsigned nOutputs = 1;
  const unsigned nInputs = 1;
  numinputs_     = nInputs;
  numoutputs_    = nOutputs;
  numtransfers_  = 1;
  numrejects_    = 0;
  inTransAdapt_  = new APT_TransferAdapter[nInputs];
  numRealInputs_ = 1;
  numOfConvWithParams_  = 0;
  numOfStrDefaultConvs_ = 0;
 
  // initialize stage (global) variables.
  stage0RowRejected0 = 0;
  stage0NullSetVar0 = 0;
 
}
 
APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::~APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol()
{
  if (inTransAdapt_) { delete [] inTransAdapt_; inTransAdapt_ = 0; }
 
}
 
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::serializeChild(APT_Archive& ar, APT_UInt8)
{
  // serialize job parameters
 
}
 
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::serialize(APT_Archive& ar, APT_UInt8 n)
{
  serializeChild(ar,0);
  
}
 
 
// initialize operator before serialization and parallel distribution.
APT_Status APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::describeOperatorChild(APT_Schema* inputSchema, APT_Schema* outputSchema)
{
  // set input interface schema.
  inputSchema[0]=APT_Schema(APT_ConvertFromString_TRX("record\n( col1: int32;\n  APT_TRinput0Rec0: *;\n)")); 
  APT_Schema droppedFds;
  setupInputInterfaceSchema(inputSchema, droppedFds, 0, 0);
  unsigned int input;
 
 
  // set output interface schema.
  outputSchema[0]=APT_Schema(APT_ConvertFromString_TRX("record\n( col1: int32;\n  APT_TRoutput0Rec0: *;\n)")); 
  int output;
 
 
  // set view/modify adapter.
 
 
  // set transfer adapter.
  APT_UString* dropped = new APT_UString[1];
  dropUsedOutputFieldsWithSchema(droppedFds);
  inTransAdapt_[0].dropFromVar(APT_UString("APT_TRinput0Rec0"), APT_ConvertFromString_TRX("col1"));
  APT_UString* renamedFrom = new APT_UString[1];
  APT_UString* renamedTo   = new APT_UString[1];
  setInTransferAdapters(0, dropped, renamedFrom, renamedTo);
  delete [] dropped;
  delete [] renamedFrom;
  delete [] renamedTo;
 
 
  // declare transfer.
  declareTransfers();
 
 
  // set partitioner/sort insertion information.
  setModifyOutputSortKeys();
 
 
  // set job parameters
 
 
  return APT_StatusOk;
}
 
// parallel method, invoked in each partition.
APT_Status APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::doInitialProcessingChild(APT_InputAccessorInterface* inpInt[])
{
  APT_Status status = APT_StatusOk;
  // define global variable
 
  initNullMask(1);
 
 
  // set up input accessor.
 
 
  // set up output accessor.
  setupOutputInterfaceAccessor();
 
 
  return status;
}
 
APT_Status APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::setupInputInterfaceAccessorChild(APT_InputAccessorInterface** inCur)
{
  APT_STD::map<APT_UString, APT_InputAccessorBase*>* inAcc =
    new APT_STD::map<APT_UString, APT_InputAccessorBase*, lessAPTUStr>[numRealInputs_];
 
  inAcc[0].insert(APT_STD::map<APT_UString, APT_InputAccessorBase*, lessAPTUStr>::value_type(APT_ConvertFromString_TRX("col1"),  &input0Int32col1));
 
  for (unsigned int input=0; input < 1; input++) {
    APT_STD::map<APT_UString, APT_InputAccessorBase*, lessAPTUStr>::iterator iter;
    for (iter = inAcc[input].begin(); iter != inAcc[input].end(); ++iter)
      inCur[input]->setupAccessor(iter->first, iter->second);
  }
 
  delete [] inAcc;
 
 
  return APT_StatusOk;
}
 
 
APT_Status APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::setupOutputInterfaceAccessorChild(APT_OutputAccessorInterface** outCur)
{
  APT_STD::map<APT_UString, APT_OutputAccessorBase*>* outAcc =
    new APT_STD::map<APT_UString, APT_OutputAccessorBase*, lessAPTUStr>[numoutputs_ + numrejects_];
 
  outAcc[0].insert(APT_STD::map<APT_UString, APT_OutputAccessorBase*, lessAPTUStr>::value_type(APT_ConvertFromString_TRX("col1"),  &output0Int32col1));
 
  for (unsigned int output=0; output < 1; output++) {
    APT_STD::map<APT_UString, APT_OutputAccessorBase*, lessAPTUStr>::iterator iter;
    for (iter = outAcc[output].begin(); iter != outAcc[output].end(); ++iter)
      outCur[output]->setupAccessor(iter->first, iter->second);
  }
 
  delete [] outAcc;
 
 
  return APT_StatusOk;
}
 
 
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::processInputRecordChild(int inputDS, int curOutput[], int writtenOutput[])
{
  APT_Status convStat = APT_StatusOk;
  
  // declare local variables
 
  // process records.
  clearNullMask();
  // length of outputs is 1
  // length of fields is 1 for output 0
  stage0RowRejected0=1;
  output0Int32col1[0]=input0Int32col1[0]+1;
  transferAndPutRecord(0);
  stage0RowRejected0=0;
 
 
}
 
// parallel method, invoked in each partition.
APT_Status APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::doFinalProcessingChild()
{
  // cleanup tables and caches
 
     
  return APT_StatusOk;
}
 
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::setRejectColumnValue(const APT_UString &errMsg, APT_Int32 rejectDs)
{
  // Set the value of the reject column
 
}
 
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::processEOWChild(int inputDS)
{
  // declare local variable
 
 
  // "finish" code segment
 
}
 
APT_Status APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::initializeWaveChild()
{
  APT_Status status = APT_StatusOk;
  APT_Status convStat = APT_StatusOk;
  
  // declare local variable
 
 
  // "initialize" code segment
 
 
  return status;
}
 
template <class T>
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::stringToNumeric<T>::convert(T& result, const APT_String& source, char* pBuf,
                                     bool* msgPrinted, const APT_String& addlMsg)
{
  result = (T)strtod(source, &pBuf);
  if (APT_strlen(pBuf) > 0 || APT_strlen(source) == 0)
  {
    result = 0;
    issueWarning(msgPrinted, addlMsg);
  }
}
 
template <class T>
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::stringToNumeric<T>::convert(T& result, const APT_UString& source, UChar* pBuf,
                                     bool* msgPrinted, const APT_String& addlMsg)
{
  result = (T)APT_strtod(source, &pBuf);
  if (APT_strlen(pBuf) > 0 || APT_strlen(source) == 0)
  {
    result = 0;
    issueWarning(msgPrinted, addlMsg);
  }
}
 
template <class T>
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::stringToNumeric<T>::convert(APT_Decimal& result, const APT_String& source,
                                     bool* msgPrinted, const APT_String& addlMsg)
{
  APT_Status checkResult;
  result.assignFromString(source, -1, APT_Decimal::eRoundInf, &checkResult);
  if (checkResult == APT_StatusFailed)
  {
    result = 0;
    issueWarning(msgPrinted, addlMsg, true);
  }
}
 
template <class T>
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::stringToNumeric<T>::convert(APT_Decimal& result, const APT_UString& source,
                                     bool* msgPrinted, const APT_String& addlMsg)
{
  APT_Status checkResult;
  result.assignFromString(APT_ConvertToString(source), -1, APT_Decimal::eRoundInf, &checkResult);
  if (checkResult == APT_StatusFailed)
  {
    result = 0;
    issueWarning(msgPrinted, addlMsg, true);
  }
}
 
template <class T>
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::stringToNumeric<T>::convert64(APT_Int64& result, const APT_String& source, char* pBuf,
                                       bool* msgPrinted, const APT_String& addlMsg)
{
  result = APT_strtol64(source,&pBuf,10);
  if (APT_strlen(pBuf) > 0 || APT_strlen(source) == 0)
    stringToNumeric<APT_Int64>::convert(result, source, pBuf, msgPrinted, addlMsg);
}
 
template <class T>
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::stringToNumeric<T>::convert64(APT_Int64& result, const APT_UString& source, UChar* pBuf,
                                       bool* msgPrinted, const APT_String& addlMsg)
{
  result = APT_strtol64(source,&pBuf,10);
  if (APT_strlen(pBuf) > 0 || APT_strlen(source) == 0)
    stringToNumeric<APT_Int64>::convert(result, source, pBuf, msgPrinted, addlMsg);
}
 
template <class T>
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::stringToNumeric<T>::convertU64(APT_UInt64& result, const APT_String& source, char* pBuf,
                                        bool* msgPrinted, const APT_String& addlMsg)
{
  result = APT_strtoul64(source, &pBuf, 10);
  if (APT_strlen(pBuf) > 0 || APT_strlen(source) == 0)
    stringToNumeric<APT_UInt64>::convert(result, source, pBuf, msgPrinted, addlMsg);
}
 
template <class T>
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::stringToNumeric<T>::convertU64(APT_UInt64& result, const APT_UString& source, UChar* pBuf,
                                        bool* msgPrinted, const APT_String& addlMsg)
{
  result = APT_strtoul64(source, &pBuf, 10);
  if (APT_strlen(pBuf) > 0 || APT_strlen(source) == 0)
    stringToNumeric<APT_UInt64>::convert(result, source, pBuf, msgPrinted, addlMsg);
}
 
template <class T>
void APT_TransformOperatorImplV0S0_TransformerCodeDemo_tr_IncrCol::stringToNumeric<T>::issueWarning(bool* msgPrinted, const APT_String& addlMsg, bool isDecimal)
{
  if (*msgPrinted)
    return;
 
  *msgPrinted = true;
  APT_String msg;
  static APT_String msgNumeric("Numeric string expected ");
  static APT_String msgDecimal("Numeric string expected of the appropriate decimal precision ");
  static APT_String msgEnd(". Use default value.");
  if (isDecimal)
    msg = msgDecimal + addlMsg + msgEnd;
  else
    msg = msgNumeric + addlMsg + msgEnd;
  
  APT_TOFunctions::get().print_message(msg);
}