22 September 2013

How to create custom operator for InfoSphere DataStage 9.1

This post will try to summarize how to build a custom DataStage operator for the Unix and Windows platforms. The focus will be on how to compile and deploy it, rather than introducing the fundamentals of the process. There is an article here from IBM that covers the fundamentals but:
1) the Visual Studio 2003 used there is rather outdated and the build process with the subsequent version differs;
2) the IBM article does not cover the compilation for the Unix platform.
The following version of the products are used:
  • IBM InfoSphere DataStage 9.1
  • Red Hat Enterprise Linux 5.9 64 bit
  • Microsoft Windows 2008 R2 64 bit
  • Microsoft Visual Studio 2010
  • Windows 7 64 bit (as a workstation to build the version for Windows)

Operator source

We are going to build a simple operator that reverses a string. For this purpose only one input and one output column will be used. The following code is for demonstration purposes only and should not be used in production environment. Detailed description about each method could be found in the IBM article mentioned before.

#define APT_SWITCHOP_C
#define __NUTC__
 
#include <apt_framework/osh_name.h>
#include <apt_framework/coperator.h>
#include <apt_framework/accessor.h>
 
extern APT_Error::SourceModule APT_StrRevOpId("RSOP");
static APT_Error::SourceModule &APT_localErrorSourceModule = APT_StrRevOpId;
 
#define APT_STRREV_ERROR_START 0
#define APT_STRREV_ERROR_END 99
static APT_Error::IndexRange sFilterIndexRange(APT_STRREV_ERROR_START,
    APT_STRREV_ERROR_END,
    "Reverse String Operator",
    APT_StrRevOpId);
 
#define INPUT_COLUMN_NAME "toReverse"
#define OUTPUT_COLUMN_NAME "Reversed"
 
#define MESSAGE_ID_BASE = 0;
#define HELLO_ARGS_DESC \
    "{ otherInfo={ " \
    "             inputs={ " \
    "                 input={ " \
    "                    description='source data for strrevop', " \
    "                    once" \
    "                    } " \
    "              }, " \
    "             outputs={ " \
    "                 output={ " \
    "                    description='output data for strrevop', " \
    "                    minOccurrences=1, maxOccurrences=1 " \
    "                    } " \
    "              }, " \
    "             description='strrev operator:' " \
    "            } " \
    "}"
 
class APT_StrRevOp : public APT_Operator
{
    APT_DECLARE_PERSISTENT(APT_StrRevOp);
    APT_DECLARE_RTTI(APT_StrRevOp);
public:
    APT_StrRevOp();
    ~APT_StrRevOp();
 
protected:
    virtual APT_Status initializeFromArgs_(const APT_PropertyList &args, 
        APT_Operator::InitializeContext context);
    virtual APT_Status describeOperator();
    virtual APT_Status runLocally();
private:
    APT_String reverse(APT_String str);
};
 
 
APT_DEFINE_OSH_NAME(APT_StrRevOp, StrRev, (APT_UString)HELLO_ARGS_DESC);
APT_IMPLEMENT_RTTI_ONEBASE(APT_StrRevOp, APT_Operator);
APT_IMPLEMENT_PERSISTENT(APT_StrRevOp);
 
 
APT_StrRevOp::APT_StrRevOp()
{
    errorLog().setModuleId(APT_StrRevOpId);
}
 
APT_StrRevOp::~APT_StrRevOp() {}
 
APT_Status APT_StrRevOp::initializeFromArgs_(const APT_PropertyList &args,
    APT_Operator::InitializeContext context)
{
    return APT_StatusOk;
}
 
void APT_StrRevOp::serialize(APT_Archive& archive, APT_UInt8)
{
}
 
APT_Status APT_StrRevOp::describeOperator()
{
    setKind(eParallel);
 
    setInputDataSets(1);
    setOutputDataSets(1);
 
    setInputInterfaceSchema(APT_UString("record (") + APT_UString(INPUT_COLUMN_NAME) + APT_UString(":string;)"), 0);
 
    setOutputInterfaceSchema(APT_UString("record (") + APT_UString(OUTPUT_COLUMN_NAME) + APT_UString(":string;)"), 0);
 
    setCheckpointStateHandling(eNoState);
    return APT_StatusOk;
}
 
APT_Status APT_StrRevOp::runLocally()
{
    APT_Status status = APT_StatusOk;
 
    APT_InputCursor inCur;
    setupInputCursor(&inCur, 0);
 
    APT_OutputCursor outCur;
    setupOutputCursor(&outCur, 0);
 
    APT_InputAccessorToString field1in(INPUT_COLUMN_NAME, &inCur);
    APT_OutputAccessorToString field1out(OUTPUT_COLUMN_NAME, &outCur);
 
    while(inCur.getRecord())
    {
        *field1out = reverse(*field1in);
        outCur.putRecord();
    }
 
    return status;
}
 
APT_String APT_StrRevOp::reverse(APT_String str)
{
    APT_String reversed = APT_String("");
    for(int i=str.length() - 1; i >= 0; i--)
    {
        reversed.append(str[i]);
    }
 
    return reversed;
}


Building and deploying – Red Hat Enterprise Linux


To build your custom operator under Linux you can (and must) use the same compiler and linker that is used by DataStage for the Parallel Transformers. To find out the compiler and the required options use the DataStage Administrator –> select your project and click Properties –> General tab –> Environment button –> Parallel category and then Compiler as shown bellow:

image

Once you have these the process is straight forward:

g++ -c -O -fPIC -Wno-deprecated -m64 -mtune=generic -mcmodel=small -I/opt/IBM/InformationServer/Server/PXEngine/include strrevop.c

g++ -shared -m64 -L/opt/IBM/InformationServer/Server/PXEngine/lib -lorchgeneralx86_64 -lorchx86_64 -lorchmonitorx86_64 -lorchcorex86_64 -lorchsortx86_64 strrevOp.o -o libstrrevop.so

Please note that your compile options will differ, especially if you are not running on 64 bit platform. The first command will compile the source code and the second one will produce the shared object (.so) file which will be loaded and executed by DataStage. You need to move the libstrrevop.so into a folder referenced by the LD_LIBRARY_PATH:

image

The next step would be to provide a mapping between the osh name and the .so file. Do it by adding the following at the end of the /opt/IBM/InformationServer/Server/PXEngine/etc/operator.apt file:

StrRev libstrrevop 1

Now register the operator into the DataStage designer via New –> Other -> Parallel Custom Stage and modify the first two tabs as shown bellow:

image

image

Now you are ready to use the new stage – just define one input columns for it named toReverse (VARCHAR) and one output column Reversed(VARCHAR).

Building and deploying – Windows


To build under Windows start the “Visual Studio Command Prompt (2010)” from the start menu as shown bellow:

image

Now use the following commands to produce the output .dll file for the custom operator:

set INCLUDE=%INCLUDE%;C:\Program Files (x86)\MKS Toolkit\include;C:\IBM\InformationServer\Server\PXEngine\include;C:\Program Files (x86)\Microsoft SDKs\Windows\v7.0A\Include;C:\Program Files (x86)\Microsoft Visual Studio 10.0\VC\include

set LIB=%LIB%;C:\Program Files (x86)\Microsoft SDKs\Windows\v7.0A\Lib;C:\Program Files (x86)\Microsoft Visual Studio 10.0\VC\lib;C:\IBM\InformationServer\Server\PXEngine\lib;C:\Program Files (x86)\MKS Toolkit\lib

cl -TP -EHa -DAPT_USE_ANSI_IOSTREAMS -MD -DNDEBUG -DIDM_LITTLE_ENDIAN -c StrRevOp.cpp

link -dll -base:0x50000000 -NODEFAULTLIB:MSVCRT.lib –NODEFAULTLIB:libcp.lib StrRevOp.obj liborchcorent.lib liborchnt.lib Kernel32.lib

The compilation process requires the MKS Toolkit, which is a 3rt party library. You don’t have to download it since it is a part of the IBM Information Server installation and you have it on your server where DataStage is running. Just replace the “C:\Program Files (x86)\MKS Toolkit” with the actual path in your environment (this could be a network share like “\\EtlServer\MKS Toolkit”).

Make sure to replace the “C:\IBM\InformationServer\Server\PXEngine” with the actual directory where the IBM Information serve resides (also could be a network share).

Pay special attention to the “-NODEFAULTLIB:MSVCRT.lib -NODEFAULTLIB:libcp.lib” switches from the link step. These are required since DataStage uses a version of the MKS Toolkit that is compiled with an outdated version of the C++ compiler. Because of this the linker tries to find libcp.lib but this is not part of Visual Studio 2010 anymore and the following error is produced:

LINK : fatal error LNK1104: cannot open file 'libcp.lib'

MSVCRT.lib(ti_inst.obj) : error LNK2005: "private: __thiscall type_info::type_info(class type_info const &)" (??0type_info@@AAE@ABV0@@Z) already defined in c.lib(typinfo.obj)

MSVCRT.lib(ti_inst.obj) : error LNK2005: "private: class type_info & __thiscall type_info::operator=(class type_info const &)" (??4type_info@@AAEAAV0@ABV0@@Z) already defined in c.lib(typinfo.obj) myhelloworld.dll : fatal error LNK1169: one or more multiply defined symbols found

More about this can be found here.

Once you have successfully finished the build process, the produced StrRevOp.dll needs to be placed in a directory referenced in the PATH environment variable. I would recommend to create new directory for custom operators and register it with the help of the DataStage Administrator as shown bellow:

image

The next step would be to provide a mapping between the osh name and the dll file. Do it by adding the following at the end of the c:\IBM\InformationServer\Server\PXEngine\etc\operator.apt file:

StrRev StrRevOp 1

Now register the operator into the DataStage designer via New –> Other -> Parallel Custom Stage Type and provide exactly the same information as in the Linux manual above.

Once you have completed the steps above, the newly registered StrRev operator can be used like any other operator:

image

Further Reading


Here is a list with resources to explore further: