rvs-Systems GmbH presents

image.png


rvs-Systems zKafka Connector User Manual for Release v1.2

Alexander-Fleming-Ring 19
65428 Rüsselsheim
Germany


Email: info@rvs-systems.com
Internet: www.rvs-systems.com

 

CEO: Carl-Heinz Benecke
Registration: Amtsgericht Lübeck HRB 14223 HL



Content

Apache Kafka and the z/OS

Apache Kafka has become one of the best-known platforms for event-based processing and streaming of data. A Kafka cluster shares the mainframes core capabilities: high availability (resilience), scalability and permanent storage. Kafka client libraries allow to connect from almost anywhere and anything to your Kafka cluster.

On z/OS though, connecting to a Kafka cluster in the open world is quite complex. There is no easy way to integrate Kafka client libraries. Still, many of the most important business processes and a huge amount of business data resides on the mainframe, with the need to be integrated into your cloud infrastructure.

We developed the rvs-Systems zKafka Connector (RzK) as an easy to implement and use interface from your z/OS applications to any Kafka cluster. Our first intention was to provide a secure and direct way for mainframe (z/OS) data into open world Kafka clusters. Further functions were added on in the last months. 

There are two ways to continue reading:

What is the zKafka Connector?


The zKafka Connector (RzK) is a smart link between mainframe z/OS applications and your Kafka clusters.

RzK offers an easy way to produce (write) and consume (read) data from any z/OS application to and from your Kafka clusters. It does not require any further program product, nor middleware. The zKafka Connector significantly simplifies your mainframe data integration with the cloud.

RzK itself is a z/OS (MVS) middleware, which may be used in any program to communicate directly with your Kafka cluster. RzK does not require additional components, OMVS/USS nor Java.

An API is available for IBM Language Environment (LE) programms in

as well as a standard z/OS CALL API to be used from non-LE languages like

and there are further easy and fast integration methods available:


If you can't wait to test the zKafka Connector, start with chapter z/OS.

As a software developer you will find numerous ready-to-run source code examples in the SAMPLIB library. A description of all these samples can be found at chapter RzK API Example Code.

Installation & Quickstart

The zKafka Connector distribution file that you received through email or downloaded from the rvs-Systems Software Download Portal is usually in zip format. Please extract (unzip) the contained files first. You will find an archive for each platform: z/OS (xmi), Linux (tar) and Windows (zip), as well as the user manual.

Installation & Quickstart

z/OS

Currently RzK consists of only a few z/OS modules with some ALIASes and a lot of code examples. The RzK programs do not run APF-authorized and may - but do not necessarily have to - be placed in a z/OS system library. RzK also allows to be used directly from a user library (via STEPLIB). An installation in the classical sense with SMP/E is therefore not necessary.

Only a few steps are required for installation and a first quick successful z/OS to Kafka communication:

  1. Transfer the zKafka Connector distribution file to your z/OS target system.

  2. Unpack the distribution file on z/OS to create the contained product libraries.

  3. Run the provided RzK Command Line Interface (CLI) sample job to produce and/or consume some test messages into/from your Kafka cluster.

For more details, see the following sections below.


Transfer the distribution file to the z/OS target system

The zKafka Connector distribution file that you received through email or downloaded from the rvs-Systems Software Download Portal is usually in zip format. Please extract (unzip) the contained XMI file first before transferring it to your z/OS host.

Now use any file transfer program of your choice, like FTP or TSO IND$FILE, to BINARY transfer the XMI distribution file to the z/OS system. 

XMI data sets on z/OS are files in the TSO TRANSMIT format. These have to be allocated with DCB attributes RECFM=FB and LRECL=80. Otherwise, these can not be extracted. Depending on your z/OS installation defaults, it may be helpful to pre-allocate the target data set with sufficient space and the attributes above.


Unpack the distribution file on z/OS

Unpacking the sequential distribution file is a two-stage process:

  1. The XMI data set is TSO RECEIVEd to create a single self-extracting distribution library.

  2. The resulting library is TSO EXECuted to unpack itself and create all required zKafka Connector product libraries.

You may do this online within a TSO session or just copy and use the following batch job. If you have previously installed a version of the zKafka Connector, you should find this JCL also in SAMPLIB($RZKUNPK).

If you are using the HTML version of the RzK documentation, you may hover with the mouse into the code boxes and use the copy button in the top right corner to copy the code box content into you clipboard.

//jobname  JOB               
//*                                                                  *
//*  Product: RVS z/OS Kafka Connector (RzK)                         *
//*                                                                  *
//*   Member: SAMPLIB($RZKUNPK)                                      *
//*                                                                  *
//* Function: Unpack the sequential RzK distribution XMI file and    *
//*           create a self-extracting distribution library.         *
//*           Execute this library to create the contained RzK       *
//*           product libraries.                                     *
//*                                                                  *
//*    Notes: o Adapt the input (XMI) and output (OUT) data set      *
//*             names to your needs.                                 *
//*                                                                  *
//*               (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
//********************************************************************
// EXPORT SYMLIST=*                                                   
//    SET XMI=myHlq.RZKV120.XMI                                       
//    SET OUT=myHlq.RZKV120                                           
//*                                                                   
//UNPACK   EXEC PGM=IKJEFT01,DYNAMNBR=50                              
//*                                                                   
//SYSTSPRT DD  SYSOUT=*                                               
//SYSTSIN  DD  *,SYMBOLS=JCLONLY                                      
RECEIVE INDATASET('&XMI')                                             
DATASET('&OUT')                                                       
EXEC '&OUT'                                                           
/*                                                                    
//                                                                    

Once the unpacking is completed, you find the following libraries as part of the zKafka Connector distribution:

To keep the installation as simple as possible, the zKafka Connector contains only a few non-authorized modules. No APF-authorization is required. This has the advantage that you may test or use it now out-of-the-box without any risk.


Run the RzK Command Line Interface (CLI)

Use the RzK Command Line Interface to test your zKafka Connector installation immediately. The RzK CLI can produce (or consume) one or more test messages directly to (or from) your Kafka cluster without any programming efforts. 

You need the following information to proceed:

Name Description JCL Symbol Name
Broker Kafka broker host name or IP address and port. RzK CLI default is 127.0.0.1:9092 BRK
Topic Kafka topic name to be used. RzK CLI default is "quickstart-events". TPC
STEPLIB Fully qualified RzK LOADLIB data set name from the unpack step above. LIB
Message Key Optional. Message key for the Kafka Produce. RzK CLI default is no key "".  PRO
Consumer Group Optional. Consumer group name for Kafka Consume. RzK CLI default is "RzkGroup".

CON

Message Optional. Test message to be produced into you Kafka cluster. RzK CLI default is a message containing the current RzK version information.

MSG

RZK Options

Optional. -r Repeat messages, -v Debug level [0..15]

OPT

For more details, see chapter RzK Command Line Interface (CLI).


The sample batch job in SAMPLIB(RZKCLIJM) contains the JCL listed below. Update at least the jobcard and the JCL symbols BRK, TPC and LIB before submitting the job.

//jobname  JOB                                                        
//*                                                                  *
//*  Product: RVS z/OS Kafka Connector (RzK)                         *
//*                                                                  *
//*   Member: SAMPLIB(RZKCLIJM)                                      *
//*                                                                  *
//* Function: Use the RZK Command Line Interface to produce and      *
//*           consume one or more test messages.                     *
//*                                                                  *
//*    Notes: o Adapt the JCL symbol values below to your needs.     *
//*                                                                  *
//*           o The STEPLIB DD statements are only required,         *
//*             if the RZK* modules do not (yet) reside in the       *
//*             z/OS LNKLST or LPALST concatenation.                 *
//*                                                                  *
//*               (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
//********************************************************************
// EXPORT SYMLIST=*                                                   
//    SET OPT='-r5 -v1'                            Repeat  DbgLvl     
//    SET PRO='-p -k myMsg'                        PRODUCE MsgKey     
//    SET CON='-c -g myGrp -t10'                   CONSUME ConGrp Tout
//    SET TPC='myTopic'                            Kafka   Topic      
//    SET BRK='myKafka.myDomain.de:9092'           Kafka   Broker:Port
//*                                                                   
//    SET MSG='"Test msg sent via RZK CLI."'                          
//*                                                                   
//    SET RZK=myHlq.RZKV120.LOADLIB                RZK PgmObj PDSE    
//*                                                                   
//RZKPROD  EXEC PGM=RZK,PARM='&PRO &OPT &BRK &TPC &MSG'               
//STEPLIB    DD DISP=SHR,DSN=&RZK                  RZK PgmObj PDSE    
//SYSPRINT   DD SYSOUT=*                           RZK Messages       
//*                                                                   
//RZKCONS  EXEC PGM=RZK,PARM='&CON &OPT &BRK &TPC'                    
//STEPLIB    DD DISP=SHR,DSN=&RZK                  RZK PgmObj PDSE    
//SYSPRINT   DD SYSOUT=*                           RZK Messages       
//                                                                    

Using RzK with TLS/SSL Encrypted Connections

If you have the requirement to use RzK with TLS/SSL encrypted connections, please refer to section AT-TLS Configuration for RzK.

Next Steps

If you have successfully done the steps above and are keen for more, you have the following choices to proceed:

Installation & Quickstart

LINUX

The zKafka Connector LINUX package is intended for development and testing of your application.
If  for example you develop your Application in C, you can do this in a LINUX environment and do easy development and debugging using LINUX tools. The application can then be transferred to z/OS and compiled using the z/OS loadl ibrary of the zKafka Connector.

The LINUX package contains:

•    librzk.so: the LINUX DLL of zKafka
•    rzk.h: the include for your C-Application
•    rzk_sample.c: sample
•    rzk_sample: executable version of the sample
•    rzk_linux: LINUXversion of the RzK Command Line Interface (CLI).

Only a few steps are required for installation and a first quick successful Kafka communication:

  1. Unpack the LINUX  package.

  2. Run the RzK Command Line Interface (CLI) to produce and/or consume some test messages into/from your Kafka cluster.

Run the RzK Command Line Interface (CLI)

Use the RzK Command Line Interface to test your zKafka Connector installation immediately. The RzK CLI can produce (or consume) one or more test messages directly to (or from) your Kafka cluster without any programming efforts. 

You need the following information to proceed:

Name Description
Broker Kafka broker host name or IP address and port. RzK CLI default is 127.0.0.1:9092
Topic Kafka topic name to be used. RzK CLI default is "quickstart-events".
Message Key Optional. Message key for the Kafka Produce. RzK CLI default is no key "". 
Consumer Group Optional. Consumer group name for Kafka Consume. RzK CLI default is "RzkGroup".
Message Optional. Test message to be produced into you Kafka cluster. RzK CLI default is a message containing the current RzK version information.
RZK Options

Optional. -r Repeat messages, -v Debug level [0..15]

For more details, see chapter RzK Command Line Interface (CLI).


Sample for producing 5 test message to your kafka cluster into topic "myTopic":
rzk_linux -r5 -v1 -p mykafka.mydomain.com:9092 myTopic "Test msg sent via windows RzK CLI"


Installation & Quickstart

Windows

The zKafka Connector Windows package is intended for development and testing of your application.
As an example, if you develop your application in C, you can do this in a Windows environment and do easy development and debugging using Windows tools. The application can then be transferred to z/OS and compiled using the z/OS load library of the zKafka Connector.

The Windows package contains:

•    rzk.dll: the windows DLL of zKafka
•    rzk.h: the include for your C-Application
•    rzk.lib: the link library
•    rzk_sample.c: sample
•    rzk_sample.exe: executable version of the sample
•    rzk_win.exe: windows version of the RzK Command Line Interface (CLI).

Only a few steps are required for installation and a first quick successful Kafka communication:

  1. Unpack the windows package.

  2. Run the RzK Command Line Interface (CLI) to produce and/or consume some test messages into/from your Kafka cluster.

Run the RzK Command Line Interface (CLI)

Use the RzK Command Line Interface to test your zKafka Connector installation immediately. The RzK CLI can produce (or consume) one or more test messages directly to (or from) your Kafka cluster without any programming efforts. 

You need the following information to proceed:

Name Description
Broker Kafka broker host name or IP address and port. RzK CLI default is 127.0.0.1:9092
Topic Kafka topic name to be used. RzK CLI default is "quickstart-events".
Message Key Optional. Message key for the Kafka Produce. RzK CLI default is no key "". 
Consumer Group Optional. Consumer group name for Kafka Consume. RzK CLI default is "RzkGroup".
Message Optional. Test message to be produced into you Kafka cluster. RzK CLI default is a message containing the current RzK version information.
RZK Options

Optional. -r Repeat messages, -v Debug level [0..15]

For more details, see chapter RzK Command Line Interface (CLI).


Sample for producing 5 test message to your kafka cluster into topic "myTopic":
rzk_win -r5 -v1 -p mykafka.mydomain.com:9092 myTopic "Test msg sent via windows RzK CLI"


RzK Command Line Interface (CLI)

Overview

The RzK Command Line Interface (CLI) provides an easy to use utility to execute and/or test RzK functions.

You require the following information about the Kafka cluster to start the RzK CLI:


RzK CLI Syntax

RZK [options] [broker[:port] [topic [message]]]


Options 

Options are the different RzK functions that can be called via the following parameter set. 

For ease of use, the input parameters of the options are provided here. A detailed explanation of the functions and their parameter set is provided in the respective chapters (referenced for each option below).   

RZKDBUG - Debug Options

RZKAUTH - Authentication

RZKPROD - Produce

RZKCONS - Consume

RZKCLDB - Load Balancing

RZKSYNC - Set Synchronous Mode

Specific CLI-only options

RzK CLI Examples

In the following sections you find some examples how to use the RzK CLI:

To be able to use the RzK CLI as TSO command, the RzK* modules must reside in a STEPLIB used during TSO LOGON, or in the ISPF ISPLLIB concatenation, or in the z/OS LPALST or LNKLST concatenation.

The STEPLIB DD statements to the RzK load library in all sample jobs are only required, if the RZK* modules do not (yet) reside in the z/OS LPALST or LNKLST concatenation.


Use RZK as TSO Command

TSO RZK -?

RzK, the rvs-Systems zKafka Connector

z/OS CLI, Version 1.2 Build Jul  3 2023 18:35:41
All rights reserved. Copyright 2021-2023 rvs-Systems GmbH, Ruesselsheim


Invocation syntax is rzk [options] [broker[:port] [topic [message]]]
        broker   hostname or IP-address of one of the kafka brokers, default is 127.0.0.1
        port     the listen port of one of the kafka brokers, default is 9092
        topic    the name of the Topic that should be used for produce and consume.
                 Default is "quickstart-events"
        message  will be sent to Kafka as value to be published (produced) in the Topic
        options are:
                -v[x] to generate debug output (0-15, default is 0)
                -u specify username and password as e.g. -u kafkabroker1:kafkabroker1-secret
                -p produce a message
                -c consume/fetch messages
                -k msgkey (for produce, default is no MsgKey)
                -g group (for consume, default is "RzkGroup")
                -q do not display messages returned by consume
                -l enable consumer load balancing
                -b binary, do not use EBCDIC to UTF-8 conversion for produce and consume Data
                -r[#] repeat action # times, default is 1
                -m[#] make produce message # bytes
                -s synchronous mode
                -R[#] set Receiving window size for asynchronous consume
                -S[#] set Sending   window size for asynchronous produce
                -t[#] set timeout for consume to # seconds
                -? to show this message


Produce Messages with the RzK CLI

The sample JCL below can also be found in SAMPLIB(RZKCLIJP).

//jobname  JOB                                                        
//*                                                                  *
//*  Product: RVS z/OS Kafka Connector (RzK)                         *
//*                                                                  *
//*   Member: SAMPLIB(RZKCLIJP)                                      *
//*                                                                  *
//* Function: Use the RzK Command Line Interface to produce one or   *
//*           more test messages.                                    *
//*                                                                  *
//*    Notes: o Adapt the JCL symbol values below to your needs:     *
//*                                                                  *
//*             - OPT: RzK CLI Options                               *
//*             - PRO: Produce Parameters                            *
//*             - TPC: Kafka Topic                                   *
//*             - BRK: Kafka Broker & Port                           *
//*             - MSG: Message text to be produced.                  *
//*                                                                  *
//*           o The STEPLIB DD statements are only required,         *
//*             if the RZK* modules do not (yet) reside in the       *
//*             z/OS LNKLST or LPALST concatenation.                 *
//*                                                                  *
//*               (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
//********************************************************************
// EXPORT SYMLIST=*                                                   
//    SET OPT='-r5 -v1'                            Repeat  DbgLvl     
//    SET PRO='-p -k myKey'                        PRODUCE MsgKey     
//    SET TPC='myTopic'                            Kafka   Topic      
//    SET BRK='myKafka.myDomain.de:9092'           Kafka   Broker:Port
//*                                                                   
//    SET MSG='"Test msg sent via RZK CLI."'                          
//*                                                                   
//    SET RZK=myHlq.RZKV120.LOADLIB                RZK PgmObj PDSE    
//*                                                                   
//RZKCLI   EXEC PGM=RZK,PARM='&PRO &OPT &BRK &TPC &MSG'               
//*                                                                   
//STEPLIB    DD DISP=SHR,DSN=&RZK                  RZK PgmObj PDSE    
//SYSPRINT   DD SYSOUT=*                           RZK Messages       
//                                                                    


Consume Messages with the RzK CLI

The sample JCL below can also be found in SAMPLIB(RZKCLIJC).

//jobname  JOB                                                        
//*                                                                  *
//*  Product: RVS z/OS Kafka Connector (RzK)                         *
//*                                                                  *
//*   Member: SAMPLIB(RZKCLIJC)                                      *
//*                                                                  *
//* Function: Use the RzK Command Line Interface to consume one or   *
//*           more test messages.                                    *
//*                                                                  *
//*    Notes: o Adapt the JCL symbol values below to your needs:     *
//*                                                                  *
//*             - OPT: RzK CLI Options                               *
//*             - CON: Consume Parameters                            *
//*             - TPC: Kafka Topic                                   *
//*             - BRK: Kafka Broker & Port                           *
//*                                                                  *
//*           o The STEPLIB DD statements are only required,         *
//*             if the RZK* modules do not (yet) reside in the       *
//*             z/OS LNKLST or LPALST concatenation.                 *
//*                                                                  *
//*               (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
//********************************************************************
// EXPORT SYMLIST=*                                                   
//    SET OPT='-r5 -v1'                            Repeat  DbgLvl     
//    SET CON='-c -g myGrp -t10'                   CONSUME ConGrp Tout
//    SET TPC='myTopic'                            Kafka   Topic      
//    SET BRK='myKafka.myDomain.de:9092'           Kafka   Broker:Port
//*                                                                   
//    SET RZK=myHlq.RZKV120.LOADLIB                RZK PgmObj PDSE    
//*                                                                   
//RZKCLI   EXEC PGM=RZK,PARM='&CON &OPT &BRK &TPC'                    
//*                                                                   
//STEPLIB    DD DISP=SHR,DSN=&RZK                  RZK PgmObj PDSE    
//SYSPRINT   DD SYSOUT=*                           RZK Messages       
//                                                                    




RZKCOPY Utility

The zKafka Connector (RzK) allows you to easily use the full Kafka capabilities without any program changes or programming knowledge.

With the RZKCOPY utility you can

RZKCOPY works similiar to a standard z/OS copy utility (i.e. IEBGENER, IEBCOPY) and may be used via JCL in batch, as TSO command in foreground or within REXX or USS shell scripts. If required, it may also be called from any program in any language.

For a produce operation, RZKCOPY reads all records from all MVS data sets and/or USS files found allocated to DD name RZKFILE and produces them as messages directly to the specified Kafka topic.

For a consume operation, RZKCOPY consumes messages directly from the specified Kafka topic and writes them as records into the MVS data set or USS file allocated to DD name RZKFILE. The consume processing ends, when the specified number of messages has been consumed or a timeout interval has expired in which no new messages had been received.

Message Processing Details

Due to the unique aspects of z/OS MVS data sets and USS files on one side and Unicode UTF-8 on the other side, a few conversions related to message encoding and message length have to be made prior to producing records read from a data set or file. Similar conversions must be applied, when messages have been consumed from a Kafka topic and before they can be written into an MVS data set or a USS file.

Message EBCDIC/UTF-8 Conversion

Most z/OS data is encoded in EBCDIC, not in ASCII or UTF-8. As long as you produce and consume your data only from z/OS systems, no EBCDIC/UTF-8 conversion is required. As soon as other platforms are involved, you probably would like to convert EBCDIC-encoded data to UTF-8 and vice versa.

RzK supports this conversion automatically during produce and consume processing. By default, RZKCOPY will convert all records read from an MVS data set or USS file to UTF-8 during produce processing. During consume, all messages consumed will be converted by default from UTF-8 to EBCDIC before they are written into the target MVS data set or USS file.

If you do not need or want automatic EBCDIC/UTF-8 conversion during RZKCOPY produce or consume processing, specify the binary option -b in the RZKCOPY PARM string.

Unicode encoded as UTF-8 is a variable-length multi-byte code. While standard letters (like A..Z, a..z, 0..9) always occupy exactly one byte, other special characters from the EBCDIC character set (like ÄÖÜ, äöü) are translated into two-byte long codes. For this reason, the length of some records converted to UTF-8 messages will increase. During consume processing, the reverse effect occurs and the length of records converted from received messages decreases because some UTF-8 two-byte codes will be converted (back) to only one byte.

Message Length vs. Record Length

In a Kafka topic, each message has its own variable length.

On a z/OS system, MVS sequential data sets have a pre-defined record format (RECFM). There are basically two formats, which have to be handled differently:

Produce Processing - Message Length

During produce processing, RZKCOPY honors the file attributes (DCB) of each single MVS data set and USS file allocated to DD name RZKFILE.

When using RZKCOPY, you may concatenate multiple MVS data sets mixed with multiple USS files all with different DCB attributes (RECFM, LRECL, BLKSIZE) at the same time.

During read processing, RZKCOPY recognizes the change of the data set or file and its DCB attributes and always uses the correct record format (RECFM) and record length (LRECL) for processing:

Consume Processing - Record Length

When RZKCOPY writes the consumed Kafka messages to the target MVS data set or USS file allocated to DD name RZKFILE, it always uses the DCB attributes of this output file. RZKCOPY never uses a built-in default. It is the responsibility of the user, to always provide proper DCB attributes.

For RZKCOPY consume processing, at least RECFM, LRECL and a BLKSIZE must be specified for the target output file allocated to DD name RZKFILE, either through the DCB attributes of a pre-allocated data set or through JCL parameters.

If the length of a consumed message after conversion is shorter than or equal to the available space in the record, then

If the length of a consumed message is longer than the LRECL and exceeds the available space in the record, then for fixed-length (RECFM=F) and variable-length (RECFM=V) records, the message is truncated at the end to fit into the maximum record length and a warning message is generated.

To avoid message truncation when consuming messages with unknown different lengths from a topic, use DCB attributes of RECFM=VB, LRECL=32756, BLKSIZE=32760 for your MVS output data set or USS file allocated to DD name RZKFILE.

The maximum allowed record length and block size for normal (non-spanned) sequential data sets on z/OS is 32760.

Ending Produce Processing

When producing records from MVS data sets and/or USS files to a Kafka topic, the produce processing ends, when all records have been read (end of files reached, EOF) and also have been produced to Kafka.

Ending Consume Processing

While consuming messages from Kafka, there is no such "End of Messages" indicator to end consume processing. RZKCOPY would wait "forever" for the next Kafka message, if there were no other methods to end consume processing. RZKCOPY provides two options for this:

If you specify a repeat value which is much larger than the average number of Kafka messages available for consume processing in a given time, RZKCOPY probably waits much longer than you expected. Example: If you specifiy a repeat value of -r1000 to consume 1.000 messages, but there are only 1.000 messages arriving in the whole day, RZKCOPY will run/wait also one day.

If you specifiy a consume timeout value, which is longer than the average time between two Kafka messages arriving for consume, the timeout intervall does not expire and RZKCOPY probably runs much longer, consuming much more messages, than you expected to. Example: If you specify a timeout value of 10 seconds with -t10 but your Kafka is very busy and delivers several messages each second, the timeout interval will amost never expire.

To avoid that RZKCOPY is running longer than you are willing to, depending on your Kafka environment and the amount of messages and their incomming interval, specify both options to end consume processing. Example: Options -r1000 -t10 will end consume processing when either 1.000 messages have been consumed or there were no new messages available from Kafka for the last 10 seconds.

Running RZKCOPY to consume messages without any option to end consume processing properly will allow RZKCOPY to run forever.

RZKCOPY Parameters

The major parameters for controlling the RZKCOPY functionality are based on those of the RzK Command Line Interface (CLI). However, due to the different functionality of both utilities, there are some differences in their individual options.

The RZKCOPY parameter format is

RZKCOPY [options] broker[:port] topic

The options may be coded in any sequence, separated by at least one blank (space).

Option Description
-b Binary mode. No EBCDIC/UTF8 conversion takes place during produce or consume processing.
-c Consume processing. Messages are consumed from the specified Kafka topic and written as records into the MVS data set or USS file allocated to DD name RZKFILE.
-g Consumer group used during consume processing. For more details see section on Consumer / Consumer Groups in Apache Kafka Core Concepts.
-k Produce message key. The message key assures that all messages for the same topic will be stored in the same Kafka partition and can later be consumed in the same sequence as they are written. For more details see section on Messages / Message Key in Apache Kafka Core Concepts.
-p Produce processing. Records are read from all MVS data sets and USS files allocated to DD name RZKFILE and are produced as messages into the specified Kafka topic.
-r# Repeat maximum. RZKCOPY consume processing ends after # messages have been consumed from the specified Kafka topic. The default value used by RZKCOPY is zero, means no maximum for the number of messages to be consumed is defined.
-t# Timeout intervall in seconds for consume processing. RZKCOPY consume processing will end if there were no new messages available from Kafka for the last # seconds. The default value used by RZKCOPY is zero, means no timeout intervall is specified.
-v#

Debug level within the range of 0..15. Issue almost no (#=0) or almost all debug messages (#=15). Please use for error diagnosis purposes only. Avoid using this in a production environment as it generates a large amount of messages written to DD name SYSPRINT. The default debug level used by RZKCOPY is 1, displaying only RZKOPEN and RZKCLSE messages.

Parameter Examples

Produce all records found at DD name RZKFILE in binary format to Kafka myKafka.myDomain.de at default port 9092 into topic myTopic using message key myKey:

RZKCOPY -b -p -k myKey myKafka.myDomain myTopic

Consume messages from Kafka myKafka.myDomain.de at port 9070 from topic myTopic using consumer group myGroup with UTF-8 to EBCDIC conversion and write them to DD name RZKFILE. Consume processing ends if either 1.000 Kafka messages have been received or there were no new messages available from Kafka for the last 60 seconds:

RZKCOPY -c -g myGroup -r1000 -t60 myKafka.myDomain:9070 myTopic

RZKCOPY Examples

The following table contains an overview of all examples available in the SAMPLIB related to the RZKCOPY utility:

Member Description
RZKCPYAJ JCL to assemble, link/bind and run the RZKCPYAP Assembler example program.
RZKCPYAP Assembler example program to call RZKCOPY to produce messages from a pre-allocated MVS data set.
RZKCPYBK JCL to compile, link/bind and run the RZKCPYBP Cobol example program.
RZKCPYBP Cobol example program to call RZKCOPY to produce messages from a pre-allocated MVS data set.
RZKCPYJC RZKCOPY JCL to consume messages from a Kafka topic and write them into an MVS data set.
RZKCPYJD RZKCOPY JCL to consume messages from a Kafka topic and write them into a USS file.
RZKCPYJP RZKCOPY JCL to produce messages from an MVS data sets into a Kafka topic.
RZKCPYJQ RZKCOPY JCL to produce messages from a USS files into a Kafka topic.
RZKCPYRC REXX example program to call RZKCOPY to consume messages from a Kafka topic into an MVS data set.
RZKCPYRP REXX example program to call RZKCOPY to produce messages from an MVS data set into a Kafka topic.

Just for a first impression, you will find a few shortened examples below. For the most complete examples, please refer to the SAMPLIB members described in the table above.

Produce MVS Data Set JCL Example

This example has been shortened. For the complete code please see SAMPLIB(RZKCPYJP).

//jobname  JOB
//*
//RZKCOPY  EXEC PGM=RZKCOPY,
// PARM='-p -k myKey myKafka.myDomain.de:9092 myTopic'
//*
//STEPLIB  DD  DISP=SHR,DSN=myHlq.RZKV120.LOADLIB
//RZKFILE  DD  DISP=SHR,DSN=my.data.set.tobe.produced
//SYSPRINT DD  SYSOUT=*
//

Consume MVS Data Set JCL Example

This example has been shortened. For the complete code please see SAMPLIB(RZKCPYJC).

//jobname  JOB
//*
//RZKCOPY  EXEC PGM=RZKCOPY,
// PARM='-c -g myGroup -t10 myKafka.myDomain.de:9092 myTopic'
//*
//STEPLIB  DD  DISP=SHR,DSN=myHlq.RZKV120.LOADLIB
//RZKFILE  DD  DSN=my.data.set.for.consume,
//             DISP=(NEW,CATLG,DELETE),
//             UNIT=SYSDA,SPACE=(CYL,(30,15),RLSE),
//             DCB=(RECFM=VB,LRECL=255,BLKSIZE=27998)
//SYSPRINT DD  SYSOUT=*
//

Produce MVS Data Set REXX Example

This example has been shortened. For the complete code please see SAMPLIB(RZKCPYRP).

/* TSO REXX */

  RzkDsn = "my.data.set.tobe.produced"; 

  Broker = "myKafka.myDomain.de:9092";
   Topic = "myTopic";
    Func = "-p -k myKey";
    Opts = "-v1";

  JclPrm = Func Opts Broker Topic;

  "ALLOCATE FILE(RZKFILE) DATASET('"RzkDsn"') SHR REUSE";              

  ADDRESS "LINKMVS" "RZKCOPY JclPrm";                                  

  "FREE FILE(RZKFILE)";                                                

RETURN 0;

Produce MVS Data Set Cobol Example

This example has been shortened. For the complete code please see SAMPLIB(RZKCPYBP).

Please note, that this example assumes the files to be produced are already allocated to DD name RZKFILE.

*  Required RZKCOPY EXEC PARM= String ---------------------------*
   01 JclParm.                                                    
      05 Len  PIC S9(4) COMP-5 VALUE 48.                          
      05 Parm.                                                    
         10 Pro PIC X(11) VALUE "-p -k myKey".                 
         10 Fi1 PIC X(01) VALUE " ".                              
         10 Opt PIC X(03) VALUE "-v1".                            
         10 Fi2 PIC X(01) VALUE " ".                              
         10 Srv PIC X(24) VALUE "myKafka.myDomain.de:9092".
         10 Fi3 PIC X(01) VALUE " ".                              
         10 Tpc PIC X(07) VALUE "myTopic".                       
 
   01 RzkRc   PIC S9(9)  COMP-5 VALUE  0.                         
 
CALL "RZKCOPY" USING BY CONTENT JclParm RETURNING RzkRc.

Produce MVS Data Set Assembler Example

This example has been shortened. For the complete code please see SAMPLIB(RZKCPYAP).

Please note, that this example assumes the files to be produced are already allocated to DD name RZKFILE.

Using the RzK API

You may use the zKafka Connector API in almost any environment and programming language.

To achieve this, the RzK API is provided by two modules, which serve different purposes depending on your calling program environment and needs:

Depending on whether the calling programs are running with the z/OS Language Environment or not, there are a few things to consider for an optimal use of the Rzk API in your programs.

You also need to decide whether you want to link/bind RzK itself statically into your program during build, call the RzK API dynamically at program runtime, or use our provided RzK stub as a hybrid-solution that combines the advantages of both variants.

In the following sections, you will find some more details that may help you in making these decisions.

LE API vs. CALL API

In order to provide all API functions you need, the RZK DLL requires an active z/OS Language Environment.

LE API

LE-callers (C/C++, Cobol, Fortran, PL/I, GoLang, Swift) may use the RZK DLL directly. RzK will use and run within the existing Language Environment of the calling program.

CALL API

Typically, LE programs calling other LE programs or DLLs, are passing their parameters differently than non-LE programs do. 

Non-LE-callers (e.g. Assembler, REXX), which are passing parameters through the standard z/OS CALL API, have to use the provided RzK CALL API stub module "RZKCALL". RZKCALL will provide the required Language Environment, loads the RZK DLL (only) on the first call, transforms the passed CALL API parameters to an LE-format and calls the requested RzK DLL API function.

Mixed API

Some non-LE languages (eg. Natural Programming Language NPL) support using the standard z/OS CALL API or - after some additional setup - using the LE API. These languages - depending on which API is used - may use the RZK DLL directly or have to use the RzK stub RZKCALL.

POSIX

The POSIX(ON) LE runtime option is not required, but allowed. This allows RzK to be used from CICS, where multithreading is not allowed and the POSIX(ON) LE runtime option is ignored.

Static Bind vs. Dynamic Call

To use RzK from an LE program, you have four choices how to invoke the RzK API functions:

  1. Include module RZK during link/bind processing and call RZKFUNC() to access the RzK functions. Use the SAMPLIB(RZKAPICK) JCL member to compile, link/bind and run C programms including module RZK.

  2. Include module RZKSTUB during link/bind processing and call RZKSTUB() to access the RzK functions. Use the SAMPLIB(RZKAPICJ) JCL member to compile, link/bind and run C programms including module RZKSTUB.

  3. Implicit dynamically load the RZK LE Dynamic Link Library (DLL) and call RZKFUNC() to access the RzK functions. Use the SAMPLIB(RZKAPICI) JCL member to compile, link/bind and run your C programms, but not including any RzK modules.

  4. Explicit dynamically load the RZK LE Dynamic Link Library (DLL) and call RZKFUNC() to access the RzK functions. Use the SAMPLIB(RZKAPICL) JCL member to compile, link/bind and run your C programms, but not including any RzK modules.

To assist you in finding the best choice for your needs, please also read the language specific details at chapter RzK API Example Code.

RzK API Function Call Reference

RzK is an advanced client API to address the two basic Kafka clients "Producer" and "Consumer". Some additional functionality is provided to further support the developer in using the producer and consumer functions, and the handling of the Kafka cluster. This chapter provides an overview of the multiple RzK function calls.

RzK API Function Call Reference

Terms and Conventions

The following terms and conventions are used for the zKafka Connector and its API:

RzK

zKafka Connector API Function Naming Convention

zKafka Connector Function Codes

Parameters


RzK API Function Call Reference

RZKOPEN - Open

The RZKOPEN API function initializes the RzK environment for one connection to a Kafka broker and returns a handle for all subsequent API calls to this broker.

An application can connect to multiple Kafka clusters or to the same Kafka Cluster using different topics. RZKOPEN is required for each of the connections/sessions.

Seq. #

Parameter

Type

Range

In/Out

Description

1

Function Code

32-bit Signed Integer


In

FCTOPEN


Return Codes

The function returns a handle to be used in subsequent API calls. In case of an error , -1 is returned.

Value

Description

 >= 0

Handle

-1

Error


Code Examples

For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB library.

Cobol

CALL "RZKSTUB" USING BY VALUE FCTOPEN RETURNING Handle.

IF Handle >= 0 THEN
   .
   .
ELSE                                                   
  DISPLAY "Error in RZKOPEN, Handle=x'" FUNCTION HEX-OF(Handle) "'"
END-IF.

C

Handle = RZKSTUB(FCTOPEN);

if (Handle >= 0)
{
   .
   .
}                                                                    
else                                                                 
  printf("Error in RZKOPEN, Handle=x'%08p'\n", Handle);

Assembler

         CALL  (RZK),(=A(FCTOPEN)),VL,MF=(E,WaCall)
*
         IF    (LTR,R15,R15,NM)        ;Handle >= Zero?
*
           ST    R15,WaHandle          ;Handle returned by RZKOPEN
           .
           .
         ELSE  ,
           WTO   'Error in RZKOPEN',ROUTCDE=(2,11)
         ENDIF ,

REXX

Handle = RzkOpen();                                                
                                                                   
IF Handle >= 0                                                     
THEN                                                               
DO                                                                 
  .
  .
END      /* of IF Handle >= 0 THEN DO */                           
ELSE                                                               
  SAY "Error in RZKOPEN, Handle=x'"D2X(Handle, 8)"'";

Natural

CALL 'RZKCALL' #FCTOPEN                  
*                                        
#Handle := RET('RZKCALL')                
*                                        
IF #Handle >= 0
THEN
  .
  .
ELSE                                     
  WRITE 'Error in RZKOPEN'
END-IF
RzK API Function Call Reference

RZKCONN - Connect

The Connect function returns the connection information (IP and Port) of one broker of the Kafka cluster and the topic requested.
RzK will then connect to this broker and get the layout of the cluster and the information about which brokers are handling the partitions and replicas of the specified topic.
When RZKCONN returns without error, the Kafka cluster can be reached, the optionally provided authentication information was accepted and the topic is known and available in the Kafka cluster.

RzK requires that topics are defined before they are used. Dynamic topic creation is not supported.

The information required to connect to the Kafka cluster:

Parameters

Seq. #

Parameter

Type

Range

In/Out

Description

1

Function Code

32-bit Signed Integer


In

FCTCONN

2

Handle

32-bit Signed Integer

internal

In

RzK Connection Handle returned by a previous RZKOPEN call.

3

Broker Name

32-bit String Pointer


In

Hostname or IP address followed by ':' and the port of one of the brokers/instances of the desired kafka cluster.

4

Broker Name Length

32-bit Signed Integer

1..256

In

Kafka Broker Name Length

5

Topic Name

32-bit String Pointer


In

Kafka Topic Name of the topic that the application wants to produce to or consume from.

6

Topic Name Length

32-bit Signed Integer

0..255

In

Kafka Topic Name Length


Return Codes

The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.

Value

Description

 0

Successful completion

>0

Error. Please find further return code information in chapter Messages and Codes.


Code Examples

For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB library.

Cobol

CALL "RZKSTUB" USING BY VALUE FCTCONN Handle          
                  BY CONTENT  BrokerS BY VALUE BrokerL
                  BY CONTENT  TopicS  BY VALUE TopicL 
               RETURNING RzkRc.

C

RzkRc = RZKSTUB(FCTCONN, Handle, BrokerS, BrokerL, TopicS, TopicL);

Assembler

         CALL  (RZK),(=A(FCTCONN),WaHandle,                            +
               BrokerS,BrokerL,TopicS,TopicL),VL,MF=(E,WaCall)

REXX

RzkRc = RzkConn(Handle, BrokerS, TopicS);

Natural

CALL 'RZKCALL' #FCTCONN #Handle #BrokerS #BrokerL #TopicS #TopicL
RzK API Function Call Reference

RZKPROD - Produce

The Kafka Producer (client) or produce function creates new messages. The produce is often called publish or writer (publisher or writer) in other systems. The produce requires a successful connection to the Kafka cluster. This connection contains the destination for the message, i.e. the Kafka cluster and the topic. 

Use the RzK produce function to store a message in the Kafka cluster.

Message Conversion

You can request a message conversion from EBCDIC to UTF-8 (see "Parameters" below) or just send a binary message.
Messages are handled opaque in the Kafka cluster, no codepage conversion is provided by the Kafka cluster. If the data that you want to write into the Kafka cluster is in a readable format (e.g. normal text strings, JSON or XML format) and you want to be able that either UNIX or Windows systems can read (consume) these messages from the Kafka cluster, you need to set the conversion  flag to '1'.

Message Key

Message key is an optional parameter.


Parameters

Seq. #

Parameter

Type

Range

In/Out

Description

1

Function Code

32-bit Signed Integer


In

FCTPROD

2

Handle

32-bit Signed Integer

internal

In

RzK connection handle returned by a previous RZKOPEN call.

3

MessageKey

32-bit String Pointer


In

Pointer to a string with the message key

4

MessageKey Length

32-bit Signed Integer

1..256

In

Length of the message key. Specify 0 to use no message key.

5

Message 

32-bit String Pointer


In

Pointer to the message.

6

Message Length

32-bit Signed Integer

0..MaxInt

In

Length of the message.

7

UTF-8 Conversion

32-bit Signed Integer

0,1

In

Flag to request conversion of Message to UTF-8 Codeset


Return Codes

The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.

Value

Description

 0

Successful completion

>0

Please find further information about the return values in chapter Messages and Codes


Code Examples

For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB library.

Cobol

CALL "RZKSTUB" USING BY VALUE FCTPROD Handle       
                  BY CONTENT  KeyS    BY VALUE KeyL
                  BY CONTENT  MsgS    BY VALUE MsgL
                  BY VALUE    Conv                 
               RETURNING RzkRc.

C

RzkRc = RZKSTUB(FCTPROD, Handle, KeyS, KeyL, MsgS, MsgL, Conv);

Assembler

         CALL  (RZK),(=A(FCTPROD),WaHandle,                            +
               KeyS,KeyL,MsgS,MsgL,Conv),VL,MF=(E,WaCall)

REXX

RzkRc = RzkProd(Handle, KeyS, MsgS, Conv);

Natural

CALL 'RZKCALL' #FCTPROD #Handle #KeyS #KeyL #MsgS #MsgL #Conv
RzK API Function Call Reference

RZKCONS - Consume

A Consume is the operation to read or fetch data from a Kafka cluster.

In the same way that multiple producers can write messages to a Kafka cluster, multiple consumers can read those messages.

Message Conversion

You can optionally request a message conversion from UTF-8 to EBCDIC (see "Parameters" below).

Consumer Groups

Parameters

Seq. #

Parameter

Type

Range

In/Out

Description

1

Function Code

32-bit Signed Integer


In

FCTCONS

2

Handle

32-bit Signed Integer

internal

In

RzK connection handle returned by a previous RZKOPEN call.

3

Group Name

32-bit String Pointer


In

Consumer group name

4

Group Name Length

32-bit Signed Integer

1..256

In

Consumer group name length

5

Message Buffer

32-bit String Pointer


In

Pointer to output message buffer for consumed message. Buffer - not address - will be updated with consumed message.

6

Message Buffer Length

32-bit Signed Integer

0..MaxInt

In/Out

On input: Size of provided output buffer.

On output: Size of used output buffer, means length of returned consumed message.

7

UTF-8 Conversion

32-bit Signed Integer

0,1

In

Flag to request conversion of Message from UTF-8 Codeset to EBCDIC

8

Timeout

32-bit Signed Integer


In

Number of seconds to wait for a message. Specify 0 to wait forever.


Return Codes

The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.

Value

Description

 0

Successful completion

>0

Please find further information about the return codes in chapter Messages and Codes


Code Examples

For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB library.

Cobol

MOVE BufSize TO BufferL       *> (Re)Set maximum buffer size

CALL "RZKSTUB" USING BY VALUE  FCTCONS Handle         
                  BY CONTENT   GroupS  BY VALUE GroupL
                  BY REFERENCE BufferS BufferL        
                  BY VALUE     Conv    TimeOut        
               RETURNING RzkRc.                        

IF RzkRc = 0 THEN
  DISPLAY "(" BufferL ") " BufferS(1:BufferL)
ELSE
  DISPLAY "Error in RZKCONS, RzkRc=x'" FUNCTION HEX-OF(RzkRc) "'"
END-IF.

C

BufferL = BUF_SIZE;              // (Re)Set maximum buffer size

RzkRc = RZKSTUB(FCTCONS, Handle,
                 GroupS, GroupL, &BufferS, &BufferL, Conv, TimeOut);

if (RzkRc == 0)
  printf("(%05d) %.*s\n", BufferL, BufferL, BufferS);
else
  printf("Error in RZKCONS, RzkRc=x'%08p'\n", RzkRc);

Assembler

         IILF  R5,BufSize$
         ST    R5,WaBuffL              ;(Re)Set maximum buffer size
*                                                                  
         CALL  (RZK),(=A(FCTCONS),WaHandle,                            +
               GroupS,GroupL,WaBuffS,WaBuffL,Conv,TimeOut),VL,         +
               MF=(E,WaCall)                                            
*
         IF    (LTR,R15,R15,Z)                ;RzkRc = 0
           WTO   TEXT=WaBuffL+2,MF=(E,WaWto)  ;Show consumed message
         ELSE  ,
           WTO   'Error in RZKCONS',ROUTCDE=(2,11)
         ENDIF ,

REXX

RzkRc = RzkCons(Handle, GroupS, Conv, TimeOut);

IF RzkRc == 0
THEN
  SAY Rzk.Handle.#MSG
ELSE
  SAY "Error in RZKCONS, RC=x'"D2X(RzkRc,8)"'";

Natural

MOVE #BufMaxL TO #BufferL            /* (Re)Set maximum buffer size
*                                                                  
CALL 'RZKCALL' #FCTCONS #Handle                                    
               #GroupS  #GroupL #BufferS #BufferL #Conv #TimeOut
#RzkRc := RET('RZKCALL')
*                                                                  
IF #RzkRc = 0
THEN
  WRITE '(' #BufferL ') ' SUBSTRING(#BufferS,1,#BufferL)
ELSE                                     
  WRITE 'Error in RZKCONS, RC=' #RzkRc
END-IF
RzK API Function Call Reference

RZKCLSE - Close

The Close function will end the connection to the Kafka cluster associated with the handle provided.
If there are no other connections open, Close also does a cleanup of the RzK environment by 
freeing up all memory, stopping all threads and getting ready for the language environment (LE) to be unloaded.

The RzK Handle is no longer valid after returning from Close.


Parameters

Seq. #

Parameter

Type

Range

In/Out

Description

1

Function Code

32-bit Signed Integer


In

FCTCLSE

2

Handle

32-bit Signed Integer

internal

In

RzK connection handle returned by a previous RZKOPEN call.


Return Codes

The function returns 4 if other connection are still open. If there are no further open connections, the Close function cleans up the RzK, frees all memory and stops all threads to be ready for the LE to be ended.

Value

Description

 0

Successful completion: No more open connections.

 4

Successful completion: This session to a Kafka cluster is closed, other sessions are still active.


Code Examples

For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB library.

Cobol

CALL "RZKSTUB" USING BY VALUE FCTCLSE Handle
               RETURNING RzkRc.

C

RzkRc = RZKSTUB(FCTCLSE, Handle);

Assembler

         CALL  (RZK),(=A(FCTCLSE),WaHandle),VL,MF=(E,WaCall)

REXX

RzkRc = RzkClse(Handle);

Natural

CALL 'RZKCALL' #FCTCLSE #Handle
RzK API Function Call Reference

RZKAUTH - Authentication

RzK supports the SASL PLAIN authentication mechanism. Use RZKAUTH to set UserId and Password/Secret for the authentication.

If your Kafka cluster requires authentication, you must use this function before issuing an RZKCONN call. Otherwise, if no credentials have been provided or if they had been wrong, RZKCONN will fail.


Parameters

Seq. #

Parameter

Type

Range

In/Out

Description

1

Function Code

32-bit Signed Integer


In

FCTAUTH 

2

Handle

32-bit Signed Integer

internal

In

RzK Connection Handle returned by previous  a RZKOPEN call.

3

User Name

32-bit String Pointer


In

User Name to be used for authentication

4

User Name Length

32-bit Signed Integer

1..256

In

User Name Length

5

Password

32-bit String Pointer


In

Password to be used for authentication

6

Password Length

32-bit Signed Integer

1..256

In

Password Length


Return Codes

The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.

Value

Description

 0

Successful completion

>0

Error. Please find further return code information in chapter Messages and Codes.


Code Examples

For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB library.

Cobol

CALL "RZKSTUB" USING BY VALUE FCTAUTH Handle        
                  BY CONTENT  UserS   BY VALUE UserL
                  BY CONTENT  PwrdS   BY VALUE PwrdL
               RETURNING RzkRc.                     

C

RzkRc = RZKSTUB(FCTAUTH, Handle, UserS, UserL, PwrdS, PwrdL);

Assembler

         CALL  (RZK),(=A(FCTAUTH),WaHandle,                            +
               UserS,UserL,PwrdS,PwrdL),VL,MF=(E,WaCall)                

REXX

RzkRc = RzkAuth(Handle, UserS, PwrdS);

Natural

CALL 'RZKCALL' #FCTAUTH #Handle #UserS #UserL #PwrdS #PwrdL
RzK API Function Call Reference

RZKDBUG - Debug Level

RzK has the ability to display detailed debugging messages, including a data trace of the communication with the Kafka Broker. The RZKDBUG API call is used to control the type of messages being displayed.

These debug messages are needed to analyze connection and communication issues and you might be asked (by the RzK support team) to set a specific value and provide a certain level in case of an issue.

It is not recommended to set this to anything else but '0', unless asked by the RzK Support to do so.
Using a high debug level value will create a lot of messages which in turn will reduce the performance, increase CPU load and memory usage.

These debug messages can be enabled with the following input values

Parameters

Seq. #

Parameter

Type

Range

In/Out

Description

1

Function Code

32-bit Signed Integer


In

FCTDBUG

2

Handle

32-bit Signed Integer

internal

In

RzK Connection Handle returned by previous a RZKOPEN call.

3

DebugLevel

32-bit Signed Integer

0-15

In

The requested debug level.


Return Codes

The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.

Value

Description

 0

Successful completion

>0

Error. Please find further return code information in chapter Messages and Codes.


Code Examples

For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB library.

Cobol

CALL "RZKSTUB" USING BY VALUE FCTDBUG Handle DbugLvl
               RETURNING RzkRc.               

C

RzkRc = RZKSTUB(FCTDBUG, Handle, DbugLvl);

Assembler

         CALL  (RZK),(=A(FCTDBUG),WaHandle,DbugLvl),VL,MF=(E,WaCall)

REXX

RzkRc = RzkDbug(Handle, DbugLvl);

Natural

CALL 'RZKCALL' #FCTDBUG #Handle #DbugLvl
RzK API Function Call Reference

RZKGERR - Get Last Error

Get Last Error can be used to retrieve the last error code that was reported by RzK and a detailed message for it.


Parameters

Seq. #

Parameter

Type

Range

In/Out

Description

1

Function Code

32-bit Signed Integer


In

FCTGERR

2

Handle

32-bit Signed Integer

internal

In

RzK connection handle returned by a previous RZKOPEN call.

5

Message Buffer

32-bit String Pointer


In

Pointer to output message buffer for detailed error message. Buffer - not address - will be updated with consumed message.

6

Message Buffer Length

32-bit Signed Integer

0..MaxInt

In/Out

On input: Size of provided output buffer.

On output: Size of used output buffer, means length of returned error message.


Return Codes

The function returns the last error code. If it returns 0, there was no error recorded in this connection.

Value

Description

 0,7500...

Last Error code


Code Examples

For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB library.

Cobol

MOVE BufSize TO BufferL       *> (Re)Set maximum buffer size

CALL "RZKSTUB" USING BY VALUE  FCTGERR Handle 
                  BY REFERENCE BufferS BufferL
               RETURNING RzkRc.               
               
DISPLAY RzkRc "(" BufferL ") " BufferS(1:BufferL)

C

BufferL = BUF_SIZE;              // (Re)Set maximum buffer size

RzkRc = RZKSTUB(FCTGERR, Handle, &BufferS, &BufferL);

printf("%05d %.*s\n", RzkRc, BufferL, BufferS);

Assembler

         IILF  R5,BufSize$ 
         ST    R5,WaBuffL              ;(Re)Set maximum buffer size     
*                                                                       
         CALL  (RZK),(=A(FCTGERR),WaHandle,WaBuffS,WaBuffL),VL,        +
               MF=(E,WaCall)                                            
*                                                                       
         WTO   TEXT=WaBuffL+2,MF=(E,WaWto)  ;Show error message         

REXX

RzkRc = RzkGerr(Handle);

SAY RzkRc Rzk.Handle.#ERR;

Natural

MOVE #BufMaxL TO #BufferL            /* (Re)Set maximum buffer size
*                                                                  
CALL 'RZKCALL' #FCTGERR #Handle #BufferS #BufferL
*     
WRITE RET('RZKCALL') SUBSTRING(#BufferS,1,#BufferL)
RzK API Function Call Reference

RZKCLDB - Load Balancing

RzK supports the load balancing protocol of Kafka clusters. With this protocol, multiple applications can read from the same topic and each message is only received by one of the applications.

E.g. if two applications are connected to the same Kafka cluster, consuming the same topic with the same group name (for more details on this see RZKCONS - Consume) and both have load balancing enabled, the available messages will be distributed between the two applications.

If the Flag is set to:


Parameters

Seq. #

Parameter

Type

Range

In/Out

Description

1

Function Code

32-bit Signed Integer


In

FCTCLDB

2

Handle

32-bit Signed Integer

internal

In

RzK connection handle returned by a previous RZKOPEN call.

3

Flag

32-bit Signed Integer

0,1

In

Enable (1) or disable (0) the consume load balancing.


Return Codes

The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.

Value

Description

 0

Successful completion

>0

Please find further information about the return values in chapter Messages and Codes


Code Examples

For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB library.

Cobol

CALL "RZKSTUB" USING BY VALUE FCTCLDB Handle LoadBal
               RETURNING RzkRc.                     

C

RzkRc = RZKSTUB(FCTCLDB, Handle, LoadBal);

Assembler

         CALL  (RZK),(=A(FCTCLDB),WaHandle,LoadBal),VL,MF=(E,WaCall)

REXX

RzkRc = RzkCldb(Handle, LoadBal);

Natural

CALL 'RZKCALL' #FCTCLDB #Handle #LoadBal
RzK API Function Call Reference

RZKSYNC - Set Synchronous Mode

Depending on network load, network latency and the Kafka cluster load/performance, it takes some time between a request being sent to the Kafka cluster and the return of the reply for the request.

Use the "Set Synchronous Mode" function for produce and consume:

Offsets are stored in the Kafka cluster and managed by the Kafka cluster for each group name and topic.
This allows the RzK after a reconnect to start reading from the last offset that was committed.

Parameters

Seq. #

Parameter

Type

Range

In/Out

Description

1

Function Code

32-bit Signed Integer


In

FCTSYNC

2

Handle

32-bit Signed Integer

internal

In

RzK connection handle returned by a previous RZKOPEN call.

3

Mode

32-bit Signed Integer

0,1,2

In

Synch Mode (default=0)

4

Send Window

32-bit Unsigned Integer

0..10000

In

Window size for messages sent to the Kafka server (default = 100)

5

Receive Window

32-bit Unsigned Integer

0..10000

In

Window size for messages received/consumed before the offset is commited to the Kafka server (default = 100)


Mode

Send Window
Receive Window


Return Codes

The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.

Value

Description

 0

Successful completion

>0

Please find further information about the return values in chapter Messages and Codes


Code Examples

For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB library.

Cobol

CALL "RZKSTUB" USING BY VALUE FCTSYNC Handle       
                  BY VALUE    SyncLvl SndWin RecWin
               RETURNING RzkRc.                    

C

RzkRc = RZKSTUB(FCTSYNC, Handle, SyncLvl, SndWin, RecWin);

Assembler

         CALL  (RZK),(=A(FCTSYNC),WaHandle,                            +
               SyncLvl,SndWin,RecWin),VL,MF=(E,WaCall)                  

REXX

RzkRc = RzkSync(Handle, SyncLvl, SndWin, RecWin);

Natural

CALL 'RZKCALL' #FCTSYNC #Handle #SyncLvl #SndWin #RecWin
RzK API Function Call Reference

RZKTERM - Terminate

This function is of use / required when an RzK cleanup in case of an error (e.g. recovery routine) is needed, and you miss any knowledge of still open RzK handles. 

The Terminate function closes all open "connections" to Kafka clusters. All handles retrieved previously by an Open function call are invalidated. All memory is returned to the system, all threads are stopped.

Parameters

Seq. #

Parameter

Type

Range

In/Out

Description

1

Function Code

32-bit Signed Integer


In

FCTTERM


Return Codes

RZKTERM returns always zero.

Value

Description

 0

 


Code Examples

For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB library.

Cobol

CALL "RZKSTUB" USING BY VALUE FCTTERM RETURNING RzkRc.

C

RzkRc = RZKSTUB(FCTTERM);

Assembler

         CALL  (RZK),(=A(FCTTERM)),VL,MF=(E,WaCall)

REXX

RzkRc = RzkTerm();

Natural

CALL 'RZKCALL' #FCTTERM
RzK API Function Call Reference

RZKTIMO - Group Member Time Out

Kafka requires a separate connection for storing and retrieving consume offsets and consume load balancing. This connection is based on the consumer group name (that is specified when the RzK consume function is called). This function is used to change the timeout value for this connection.

RzK is required to send a "Heartbeat" so that this timeout does not expire. That's why the Application using RzK needs to do a consume or produce call at least once within the specified timeout value so that a HeartBeat can be sent to the kafka server.

Using a higher timeout value here allows the Application to spend a longer time until the next call to the RzK is required.

On the other side, this value also defines time after that the Kafka server assumes the application dead and releases the group for a reconnect of the application. So, this value influences the time it takes until an Application that abended can resume consuming messages from the kafka server after a restart.

The RzK default is 30 seconds.


Parameters

Seq. #

Parameter

Type

Range

In/Out

Description

1

Function Code

32-bit Signed Integer


In

FCTTIMO

2

Handle

32-bit Signed Integer

internal

In

RzK connection handle returned by a previous RZKOPEN call.

3

ConnTim

32-bit Signed Integer


In

Timeout for the Group Member Connection in seconds


Return Codes

The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.

Value

Description

 0

Successful completion

>0

Please find further information about the return values in chapter Messages and Codes


Code Examples

For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB library.

Cobol

CALL "RZKSTUB" USING BY VALUE FCTTIMO Handle ConnTim
               RETURNING RzkRc.                     

C

RzkRc = RZKSTUB(FCTTIMO, Handle, ConnTim);

Assembler

         CALL  (RZK),(=A(FCTTIMO),WaHandle,ConnTim),VL,MF=(E,WaCall)

REXX

RzkRc = RzkTimo(Handle, ConnTim);

Natural

CALL 'RZKCALL' #FCTTIMO #Handle #ConnTim

RzK API Example Code

If you need some sample program source code to get started, you may proceed with one of the examples below. You will also find the required JCL to compile link/bind and run these samples out of the box.

RzK API Example Code

Overview

All example programs and JCL can be found in the provided RzK SAMPLIB. The API-related member names are in the format RZKAPI#%, where # is representing the programming language and % is describing the content:

# Language
A Assembler
B Cobol
C C
J JCL
K Configuration
N Natural (NPL)
R REXX

With our program examples, we have tried to represent the most important and typical use cases. The required JCL is also included. All programs are executable out-of-the-box. If the standard z/OS JCL compile procedures are fully customized on your system, then the sample programs and the compile JCL should run immediately without any changes. You only need to adjust the names of the data sets you are using, as well as the Kafka-specific configuration such as broker IP address & port, and the topic name to use. Please also note the additional information in the respective example members.

% Member Description
0 Language-specific RzK API include member or header file (if available).
J Language-specific JCL to compile, link/bind and run the example programs.
A All available RzK API calls in one program as a quick reference.
PQ Produce a single message to a Kafka topic (using different call techniques).
CD Consume a single message from a Kafka topic (using different call techniques).
MN Consume all (new) topic messages until an end-marker message is found (using different call techniques).
TU Consume all (new) topic messages until a timeout occures (using different call techniques).

Just for a first impression, you will find a few shortened examples on the next language-specific pages. For the most complete examples, please always refer to the SAMPLIB members itself.

To keep the sample code short, there are only a few checks for return codes and error messages.
Avoid doing this in production code.

RzK API Example Code

Assembler

The following example members are available for the Assembler (HLASM) programming language:

Member Description
RZKAPIA0 Assembler RzK API COPY include member.
Member Including & calling RZKCALL
RZKAPIAJ JCL to assemble, link/bind and run the Assembler example programs.
RZKAPIAA All available RzK API calls in one program as a quick reference.
RZKAPIAP Produce a single message to a Kafka topic.
RZKAPIAC Consume a single message from a Kafka topic.
RZKAPIAM Consume all (new) topic messages until an end-marker message is found.
RZKAPIAT Consume all (new) topic messages until a timeout occures.

Remarks

RZKCALL, which is an ALIAS of RZKSTUB, and module RZK must either be found in the STEPLIB, or in the z/OS LPALST or LNKLST concatenation.

To keep the sample code short, there are only a few checks for return codes and error messages.
Avoid doing this in production code.

Examples

Just for a first impression, you will find a few shortened examples below. For the most complete examples, please always refer to the SAMPLIB members described in the table above.

Includes

You will find the required RzK constants definitions for Assembler programs in SAMPLIB(RZKAPIA0). Include this member with

COPY RZKAPIA0

into your program via DD name SYSLIB during assembly.

***********************************************************************
*                                                                     *
*    Product: RVS z/OS Kafka Connector (RzK)                          *
*                                                                     *
*     Member: SAMPLIB(RZKAPIA0)                                       *
*                                                                     *
*       Type: HLASM Assembler Source Include                          *
*                                                                     *
*   Function: Define required RzK constants for Assembler programs.   *
*                                                                     *
*      Notes: o Use COPY RZKAPIA0 to include this member into your    *
*               Assembler program source code.                        *
*                                                                     *
*                  (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
***********************************************************************
*                                                                      
*---------------------------------------------------------------------*
* RVS z/OS Kafka Connector Function Codes.                            *
*---------------------------------------------------------------------*
FCTOPEN  EQU    1                      ;RZKOPEN                        
FCTCONN  EQU    2                      ;RZKCONN                        
FCTCLSE  EQU    3                      ;RZKCLSE                        
FCTPROD  EQU    4                      ;RZKPROD                        
FCTCONS  EQU    5                      ;RZKCONS                        
FCTDBUG  EQU    6                      ;RZKDBUG                        
FCTSYNC  EQU    7                      ;RZKSYNC                        
FCTAUTH  EQU    8                      ;RZKAUTH                        
FCTGERR  EQU    9                      ;RZKGERR                        
FCTCLDB  EQU   10                      ;RZKCLDB                        
FCTTERM  EQU   11                      ;RZKTERM                        
FCTTIMO  EQU   12                      ;RZKTIMO                        
*                                                                      

Function Overview

The RzK Assembler API provides the following functions. See SAMPLIB(RZKAPIAA) member for the complete code example.

         CALL  (RZK),(=A(FCTOPEN)),VL,MF=(E,WaCall)                     
*                                                                       
         CALL  (RZK),(=A(FCTDBUG),WaHandle,DbugLvl),VL,MF=(E,WaCall)    
*                                                                       
         CALL  (RZK),(=A(FCTAUTH),WaHandle,                            +
               UserS,UserL,PwrdS,PwrdL),VL,MF=(E,WaCall)                
*                                                                       
         CALL  (RZK),(=A(FCTSYNC),WaHandle,                            +
               SyncLvl,SndWin,RecWin),VL,MF=(E,WaCall)                  
*                                                                       
         CALL  (RZK),(=A(FCTCLDB),WaHandle,LoadBal),VL,MF=(E,WaCall)    
*                                                                       
         CALL  (RZK),(=A(FCTTIMO),WaHandle,ConnTim),VL,MF=(E,WaCall)    
*                                                                       
         CALL  (RZK),(=A(FCTCONN),WaHandle,                            +
               BrokerS,BrokerL,TopicS,TopicL),VL,MF=(E,WaCall)          
*                                                                       
         CALL  (RZK),(=A(FCTPROD),WaHandle,                            +
               KeyS,KeyL,MsgS,MsgL,Conv),VL,MF=(E,WaCall)               
*                                                                       
         CALL  (RZK),(=A(FCTCONS),WaHandle,                            +
               GroupS,GroupL,WaBuffS,WaBuffL,Conv,ConsTim),VL,         +
               MF=(E,WaCall)                                            
*                                                                       
         CALL  (RZK),(=A(FCTGERR),WaHandle,WaBuffS,WaBuffL),VL,        +
               MF=(E,WaCall)                                            
*                                                                       
         CALL  (RZK),(=A(FCTCLSE),WaHandle),VL,MF=(E,WaCall)            
*                                                                       
         CALL  (RZK),(=A(FCTTERM)),VL,MF=(E,WaCall)                     


Produce

The substantial section of an Assembler program producing a message to a Kafka topic.

         CALL  (RZK),(=A(FCTOPEN)),VL,MF=(E,WaCall)                     
*                                                                       
         ST    R15,WaHandle            ;Handle returned by RZKOPEN      
         LTR   R15,R15                 ;Handle >= Zero?  
         JM    ErrOpen                 ;No, RZKOPEN Error
*                                                                       
         CALL  (RZK),(=A(FCTCONN),WaHandle,                            +
               BrokerS,BrokerL,TopicS,TopicL),VL,MF=(E,WaCall)          
*                                                                       
         CALL  (RZK),(=A(FCTPROD),WaHandle,                            +
               KeyS,KeyL,MsgS,MsgL,Conv),VL,MF=(E,WaCall)               
*                                                                       
         CALL  (RZK),(=A(FCTCLSE),WaHandle),VL,MF=(E,WaCall)            


Consume

The substantial section of an Assembler program consuming a message from a Kafka topic.

         CALL  (RZK),(=A(FCTOPEN)),VL,MF=(E,WaCall)                     
*                                                                       
         ST    R15,WaHandle            ;Handle returned by RZKOPEN      
         LTR   R15,R15                 ;Handle >= Zero?  
         JM    ErrOpen                 ;No, RZKOPEN Error
*                                                                       
         CALL  (RZK),(=A(FCTCONN),WaHandle,                            +
               BrokerS,BrokerL,TopicS,TopicL),VL,MF=(E,WaCall)          
*                                                                       
         IILF  R5,BufSize$             ;Get size of defined buffer...   
         ST    R5,WaBuffL              ;...and init msg buf length parm 
*                                                                       
         CALL  (RZK),(=A(FCTCONS),WaHandle,                            +
               GroupS,GroupL,WaBuffS,WaBuffL,Conv,ConsTim),VL,         +
               MF=(E,WaCall)                                            
*                                                                       
         CALL  (RZK),(=A(FCTCLSE),WaHandle),VL,MF=(E,WaCall)            


RzK API Example Code

Cobol

The following example members are available for the Cobol programming language:

Member Description
RZKAPIB0 Cobol RzK API COPY member.
Member Including & calling RZKSTUB
RZKAPIBJ JCL to compile, link/bind and run the Cobol example programs.
RZKAPIBP Produce a single message to a Kafka topic.
RZKAPIBC Consume a single message from a Kafka topic.
RZKAPIBM Consume all (new) topic messages until an end-marker message is found.
RZKAPIBT Consume all (new) topic messages until a timeout occures.
Member Including RZK & calling RZKFUNC
RZKAPIBK JCL to compile, link/bind and run the Cobol example programs.
RZKAPIBA All available RzK API calls in one program as a quick reference.
RZKAPIBQ Produce a single message to a Kafka topic.
RZKAPIBD Consume a single message from a Kafka topic.
RZKAPIBN Consume all (new) topic messages until an end-marker message is found.
RZKAPIBU Consume all (new) topic messages until a timeout occures.

Remarks

When using RZKSTUB, module RZK must either be found in the STEPLIB, or in the z/OS LPALST or LNKLST concatenation.

To keep the sample code short, there are only a few checks for return codes and error messages.
Avoid doing this in production code.

Examples

Just for a first impression, you will find a few shortened examples below. For the most complete examples, please always refer to the SAMPLIB members described in the table above.

Includes

You will find the required RzK constants definitions for Cobol programs in SAMPLIB(RZKAPIB0). Include this member with

COPY RZKAPIB0.

into the WORKING-STORAGE SECTION of your program via DD name SYSLIB during compile.

******************************************************************
*                                                                *
*    Product: RVS z/OS Kafka Connector (RzK)                     *
*                                                                *
*     Member: SAMPLIB(RZKAPIB0)                                  *
*                                                                *
*       Type: COBOL Source Code                                  *
*                                                                *
*   Function: RzK constants for Cobol programs.                  *
*                                                                *
*      Notes: o Include with COPY into WORKING-STORAGE SECTION.  *
*                                                                *
*             (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
******************************************************************
                                                                  
*  RzK Function Codes                                             
   01 FCTOPEN PIC S9(9)  COMP-5 VALUE  1.                         
   01 FCTCONN PIC S9(9)  COMP-5 VALUE  2.                         
   01 FCTCLSE PIC S9(9)  COMP-5 VALUE  3.                         
   01 FCTPROD PIC S9(9)  COMP-5 VALUE  4.                         
   01 FCTCONS PIC S9(9)  COMP-5 VALUE  5.                         
   01 FCTDBUG PIC S9(9)  COMP-5 VALUE  6.                         
   01 FCTSYNC PIC S9(9)  COMP-5 VALUE  7.                         
   01 FCTAUTH PIC S9(9)  COMP-5 VALUE  8.                         
   01 FCTGERR PIC S9(9)  COMP-5 VALUE  9.                         
   01 FCTCLDB PIC S9(9)  COMP-5 VALUE 10.                         
   01 FCTTERM PIC S9(9)  COMP-5 VALUE 11.                         
   01 FCTTIMO PIC S9(9)  COMP-5 VALUE 12.                         

Function Overview

The RzK Cobol API provides the following functions. See SAMPLIB(RZKAPIBA) member for the complete code example.

CALL "RZKFUNC" USING BY VALUE FCTOPEN RETURNING Handle.     
                                                            
CALL "RZKFUNC" USING BY VALUE FCTDBUG Handle DbugLvl        
               RETURNING RzkRc.                             
                                                            
CALL "RZKFUNC" USING BY VALUE FCTAUTH Handle                
                  BY CONTENT  UserS   BY VALUE UserL        
                  BY CONTENT  PwrdS   BY VALUE PwrdL        
               RETURNING RzkRc.                             
                                                            
CALL "RZKFUNC" USING BY VALUE FCTSYNC Handle                
                  BY VALUE    SyncLvl SndWin RecWin         
               RETURNING RzkRc.                             
                                                            
CALL "RZKFUNC" USING BY VALUE FCTCLDB Handle LoadBal        
               RETURNING RzkRc.                             
                                                            
CALL "RZKFUNC" USING BY VALUE FCTTIMO Handle ConnTim        
               RETURNING RzkRc.                             
                                                            
CALL "RZKFUNC" USING BY VALUE FCTCONN Handle                
                  BY CONTENT  BrokerS BY VALUE BrokerL      
                  BY CONTENT  TopicS  BY VALUE TopicL       
               RETURNING RzkRc.                             
                                                            
CALL "RZKFUNC" USING BY VALUE FCTPROD Handle                
                  BY CONTENT  KeyS    BY VALUE KeyL         
                  BY CONTENT  MsgS    BY VALUE MsgL         
                  BY VALUE    Conv                          
               RETURNING RzkRc.                             
                                                            
CALL "RZKFUNC" USING BY VALUE  FCTCONS Handle               
                  BY CONTENT   GroupS  BY VALUE GroupL      
                  BY REFERENCE BufferS BufferL              
                  BY VALUE     Conv    ConsTim              
               RETURNING RzkRc.                             
                                                            
CALL "RZKFUNC" USING BY VALUE  FCTGERR Handle               
                  BY REFERENCE BufferS BufferL              
               RETURNING RzkRc.                             
                                                            
CALL "RZKFUNC" USING BY VALUE FCTCLSE Handle                
               RETURNING RzkRc.                             
                                                            
CALL "RZKFUNC" USING BY VALUE FCTTERM RETURNING RzkRc.      

Produce

The substantial section of a Cobol program producing a message to a Kafka topic.

CALL "RZKFUNC" USING BY VALUE FCTOPEN RETURNING Handle. 
                                                        
IF Handle >= 0 THEN                                     
                                                        
  CALL "RZKFUNC" USING BY VALUE FCTCONN Handle          
                    BY CONTENT  BrokerS BY VALUE BrokerL
                    BY CONTENT  TopicS  BY VALUE TopicL 
                 RETURNING RzkRc                        
                                                        
  CALL "RZKFUNC" USING BY VALUE FCTPROD Handle          
                    BY CONTENT  KeyS    BY VALUE KeyL   
                    BY CONTENT  MsgS    BY VALUE MsgL   
                    BY VALUE    Conv                    
                 RETURNING RzkRc                        
                                                        
  CALL "RZKFUNC" USING BY VALUE FCTCLSE Handle          
                 RETURNING RzkRc                        
END-IF.                                                 


Consume

The substantial section of a Cobol program consuming a message from a Kafka topic.

CALL "RZKFUNC" USING BY VALUE FCTOPEN RETURNING Handle.     
                                                            
IF Handle >= 0 THEN                                         
                                                            
  CALL "RZKFUNC" USING BY VALUE FCTCONN Handle              
                    BY CONTENT  BrokerS BY VALUE BrokerL    
                    BY CONTENT  TopicS  BY VALUE TopicL     
                 RETURNING RzkRc                            
                                                            
  MOVE BufMaxL TO BufferL            *> (Re)Set max buf size
                                                            
  CALL "RZKFUNC" USING BY VALUE  FCTCONS Handle             
                    BY CONTENT   GroupS  BY VALUE GroupL    
                    BY REFERENCE BufferS BufferL            
                    BY VALUE     Conv    ConsTim            
                 RETURNING RzkRc                            
                                                            
  CALL "RZKFUNC" USING BY VALUE FCTCLSE Handle              
                 RETURNING RzkRc                            
END-IF.                                                     


RzK API Example Code

C

The following example members are available for the C programming language:

Member Description
RZKAPIC0 C RzK API header file #include member.
Member Including & calling RZKSTUB
RZKAPICJ JCL to compile, link/bind and run the C example programs.
RZKAPICP Produce a single message to a Kafka topic.
RZKAPICC Consume a single message from a Kafka topic.
RZKAPICM Consume all (new) topic messages until an end-marker message is found.
RZKAPICT Consume all (new) topic messages until a timeout occures.
Member Including RZK & calling RZKFUNC
RZKAPICK JCL to compile, link/bind and run the C example programs.
RZKAPICA All available RzK API calls in one program as a quick reference.
RZKAPICQ Produce a single message to a Kafka topic.
RZKAPICD Consume a single message from a Kafka topic.
RZKAPICN Consume all (new) topic messages until an end-marker message is found.
RZKAPICU Consume all (new) topic messages until a timeout occures.
Member Loading the RZK DLL & calling RZKFUNC
RZKAPICL JCL to compile, link/bind and run the C example programs.
RZKAPICR Produce a single message to a Kafka topic.
RZKAPICE Consume a single message from a Kafka topic.
RZKAPICO Consume all (new) topic messages until an end-marker message is found.
RZKAPICV Consume all (new) topic messages until a timeout occures.

Remarks

When using the RZK DLL or RZKSTUB, module RZK must either be found in the STEPLIB, or in the z/OS LPALST or LNKLST concatenation.

To keep the sample code short, there are only a few checks for return codes and error messages.
Avoid doing this in production code.

Examples

Just for a first impression, you will find a few shortened examples below. For the most complete examples, please always refer to the SAMPLIB members described in the table above.

Includes

You will find the required RzK constants and prototype definitions for C programs in SAMPLIB(RZKAPIC0). Include this member as C header file with

#include "rzkapic0.h"

into your C program via DD name USERLIB during compile.            

/*********************************************************************/
/*                                                                   */
/*    Product: RVS z/OS Kafka Connector (RzK)                        */
/*                                                                   */
/*     Member: SAMPLIB(RZKAPIC0)                                     */
/*                                                                   */
/*       Type: C Pgm Header File                                     */
/*                                                                   */
/*   Function: RzK constants & prototype definitions for C programs. */
/*                                                                   */
/*      Notes: o #include "rzkapic0.h" into your C program via       */
/*               DD name USERLIB during compile.                     */
/*                                                                   */
/*                (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 */
/*********************************************************************/
                                                                       
  /*-----------------------------------------------------------------*/
  /* RZKFUNC Function Codes.                                         */
  /*-----------------------------------------------------------------*/
  #define FCTOPEN  1                                                   
  #define FCTCONN  2                                                   
  #define FCTCLSE  3                                                   
  #define FCTPROD  4                                                   
  #define FCTCONS  5                                                   
  #define FCTDBUG  6                                                   
  #define FCTSYNC  7                                                   
  #define FCTAUTH  8                                                   
  #define FCTGERR  9                                                   
  #define FCTCLDB 10                                                   
  #define FCTTERM 11                                                   
  #define FCTTIMO 12                                                   
                                                                       
  /*-----------------------------------------------------------------*/
  /* RzK Function Prototypes.                                        */
  /*-----------------------------------------------------------------*/
  int RZKSTUB(int, ...);                                               
  int RZKFUNC(int, ...);                                               

Function Overview

The RzK C API provides the following functions. See SAMPLIB(RZKAPICA) member for the complete code example.

Handle = RZKFUNC(FCTOPEN);                                              
                                                                        
 RzkRc = RZKFUNC(FCTDBUG, Handle, DbugLvl);                             
                                                                        
 RzkRc = RZKFUNC(FCTAUTH, Handle, UserS, UserL, PwrdS, PwrdL);          
                                                                        
 RzkRc = RZKFUNC(FCTSYNC, Handle, SyncLvl, SndWin, RecWin);             
                                                                        
 RzkRc = RZKFUNC(FCTCLDB, Handle, LoadBal);                             
                                                                        
 RzkRc = RZKFUNC(FCTTIMO, Handle, ConnTim);                             
                                                                        
 RzkRc = RZKFUNC(FCTCONN, Handle, BrokerS, BrokerL, TopicS, TopicL);    
                                                                        
 RzkRc = RZKFUNC(FCTPROD, Handle, KeyS, KeyL, MsgS, MsgL, Conv);        
                                                                        
 RzkRc = RZKFUNC(FCTCONS, Handle,      
                  GroupS, GroupL, &BufferS, &BufferL, Conv, ConsTim);
                                                                        
 RzkRc = RZKFUNC(FCTGERR, Handle, &BufferS, &BufferL);                  
                                                                        
 RzkRc = RZKFUNC(FCTCLSE, Handle);                                      
                                                                        
 RzkRc = RZKFUNC(FCTTERM);                                              

Produce

The substantial section of a C program producing a message to a Kafka topic.

Handle = RZKFUNC(FCTOPEN);                                           
                                                                     
if (Handle >= 0)                                                     
{                                                                    
                                                                     
  RzkRc = RZKFUNC(FCTCONN, Handle, BrokerS, BrokerL, TopicS, TopicL);
                                                                     
  RzkRc = RZKFUNC(FCTPROD, Handle, KeyS, KeyL, MsgS, MsgL, Conv);    
                                                                     
  RzkRc = RZKFUNC(FCTCLSE, Handle);                                  
                                                                     
}                                                                    


Consume

The substantial section of a C program consuming a message from a Kafka topic.

Handle = RZKFUNC(FCTOPEN);                                            
                                                                      
if (Handle >= 0)                                                      
{                                                                     
                                                                      
  RzkRc = RZKFUNC(FCTCONN, Handle, BrokerS, BrokerL, TopicS, TopicL); 

  BufferL = BUF_SIZE;              // (Re)Set maximum buffer size
                                                                 
  RzkRc = RZKFUNC(FCTCONS, Handle,                                    
                  GroupS,  GroupL, &BufferS, &BufferL, Conv, ConsTim);
                                                                      
  RzkRc = RZKFUNC(FCTCLSE, Handle);                                   
                                                                      
}                                                                     


RzK API Example Code

REXX

REXX programs may also use the RzK functionality through the CALL API. REXX on z/OS provides the ADDRESS LINKPGM environment to call programs via a standard CALL API. To save you from having to map the entire API yourself, the RzK package already provides you with an RzK REXX Wrapper API. This can be found in SAMPLIB(RZKAPIR0) to be included by your REXX code.

Example Members

The following example members are available for the REXX programming language:

Member Description
RZKAPIR0 RzK REXX Wrapper API include member.
RZKAPIRJ JCL to optional include, preprocess, compile, link/bind and run the REXX example programs.
Member Using the RZK REXX Wrapper API and RZKCALL
RZKAPIRA All available RzK API calls in one program as a quick reference.
RZKAPIRP Produce a single message to a Kafka topic.
RZKAPIRC Consume a single message from a Kafka topic.
RZKAPIRM Consume all (new) topic messages until an end-marker message is found.
RZKAPIRT Consume all (new) topic messages until a timeout occures.

Remarks

RZKCALL, which is an ALIAS of RZKSTUB, and module RZK must either be found in the STEPLIB, or in the z/OS LPALST or LNKLST concatenation.

To keep the sample code short, there are only a few checks for return codes and error messages.
Avoid doing this in production code.

Example Code

Just for a first impression, you will find a few shortened examples below. For the most complete examples, please always refer to the SAMPLIB members described in the table above.

REXX Wrapper API Include

You will find the provided REXX Wrapper API to use RZKCALL with REXX programs in SAMPLIB(RZKAPIR0). Include this member with

/*%INCLUDE RZKAPIR0 */

behind your own REXX code and use the compile JCL in SAMPLIB(RZKAPIRJ) to do the include processing for you (recommended), or copy RZKAPIR0 manually.

Function Overview

The RzK REXX Wrapper API provides the following functions. See SAMPLIB(RZKAPIRA) member for the complete code example.

Handle = RzkOpen();                               
                                                  
 RzkRc = RzkDbug(Handle, DbugLvl);                
                                                  
 RzkRc = RzkAuth(Handle, UserS, PwrdS);           
                                                  
 RzkRc = RzkSync(Handle, SyncLvl, SndWin, RecWin);
                                                  
 RzkRc = RzkCldb(Handle, LoadBal);                
                                                  
 RzkRc = RzkTimo(Handle, ConnTim);                
                                                  
 RzkRc = RzkConn(Handle, BrokerS, TopicS);        
                                                  
 RzkRc = RzkProd(Handle, KeyS, MsgS, Conv);       
                                                  
 RzkRc = RzkCons(Handle, GroupS, Conv, ConsTim);  
                                                                                                  
 RzkRc = RzkGerr(Handle);                         
                                                  
 RzkRc = RzkClse(Handle);                         
                                                  
 RzkRc = RzkTerm();                               

Produce

The substantial section of an REXX program producing a message to a Kafka topic.

Handle = RzkOpen();                         
                                            
IF Handle >= 0                              
THEN                                        
DO                                          
                                            
  RzkRc = RzkConn(Handle, BrokerS, TopicS); 
                                            
  RzkRc = RzkProd(Handle, KeyS, MsgS, Conv);
                                            
  RzkRc = RzkClse(Handle);                  
                                            
END;      /* of IF Handle >= 0 THEN DO */    


Consume

The substantial section of an REXX program consuming a message from a Kafka topic.

Handle = RzkOpen();                           
                                              
IF Handle >= 0                                
THEN                                          
DO                                            
                                              
  RzkRc = RzkConn(Handle,BrokerS,TopicS);     
                                              
  RzkRc = RzkCons(Handle,GroupS,Conv,ConsTim);
                                              
  IF RzkRc == 0                               
  THEN                                        
    SAY Rzk.Handle.#MSG;
                                              
  RzkRc = RzkClse(Handle);                    
                                              
END;      /* of IF Handle >= 0 THEN DO */      


RzK API Example Code

Natural (NPL)

The following example members are available for the Natural Programming Language (NPL):

Member Description
RZKAPIN0 Natural RzK API include member.
Member Calling RZKCALL
RZKAPINA All available RzK API calls in one program as a quick reference.
RZKAPINP Produce a single message to a Kafka topic.
RZKAPINC Consume a single message from a Kafka topic.

z/OS Call API vs LE API

Natural supports using the standard z/OS CALL API or - after some additional setup - using the LE API. Depending on your current NPL setup, you may use the RZK DLL directly or have to use the RzK stub RZKCALL.

For more information see Software AG - Natural for Mainframes - Support of IBM LE Subprograms.

Remarks

RZKCALL, which is an ALIAS of RZKSTUB, and module RZK must either be found in the STEPLIB, or in the z/OS LPALST or LNKLST concatenation.

To keep the sample code short, there are only a few checks for return codes and error messages.
Avoid doing this in production code.

The Natural (NPL) code has been provided as an example to the best of our knowledge. Since we do not have a Natural development environment available, the sample code is not tested (yet).

 

Examples

Just for a first impression, you will find a few shortened examples below. For the most complete examples, please always refer to the SAMPLIB members described in the table above.

Includes

You will find the required RzK constants definitions for Natural programs in SAMPLIB(RZKAPIN0). Include this member with

INCLUDE RZKAPIN0

into the DEFINE DATA section of your program.

***********************************************************************
*                                                                     *
*    Product: RVS z/OS Kafka Connector (RzK)                          *
*                                                                     *
*     Member: SAMPLIB(RZKAPIN0)                                       *
*                                                                     *
*       Type: Natural (NPL) Source Code                               *
*                                                                     *
*   Function: Define required RzK constants for Natural programs.     *
*                                                                     *
*      Notes: o Use INCLUDE RZKAPIN0 to include this member into your *
*               Natural program source code.                          *
*                                                                     *
*                  (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
***********************************************************************
*                                                                      
*---------------------------------------------------------------------*
* RVS z/OS Kafka Connector Function Codes.                            *
*---------------------------------------------------------------------*
1 #FCTOPEN  (I4)   CONST <01>                                          
1 #FCTCONN  (I4)   CONST <02>                                          
1 #FCTCLSE  (I4)   CONST <03>                                          
1 #FCTPROD  (I4)   CONST <04>                                          
1 #FCTCONS  (I4)   CONST <05>                                          
1 #FCTDBUG  (I4)   CONST <06>                                          
1 #FCTSYNC  (I4)   CONST <07>                                          
1 #FCTAUTH  (I4)   CONST <08>                                          
1 #FCTGERR  (I4)   CONST <09>                                          
1 #FCTCLDB  (I4)   CONST <10>                                          
1 #FCTTERM  (I4)   CONST <11>                                          
1 #FCTTIMO  (I4)   CONST <12>
*                                                                      

Function Overview

The RzK Natural API provides the following functions. See SAMPLIB(RZKAPINA) member for the complete code example.

CALL 'RZKCALL' #FCTOPEN                                            
*                                                                  
CALL 'RZKCALL' #FCTDBUG #Handle #DbugLvl                           
*                                                                  
CALL 'RZKCALL' #FCTAUTH #Handle #UserS #UserL #PwrdS #PwrdL        
*                                                                  
CALL 'RZKCALL' #FCTSYNC #Handle #SyncLvl #SndWin #RecWin           
*                                                                  
CALL 'RZKCALL' #FCTCLDB #Handle #LoadBal                           
*                                                                  
CALL 'RZKCALL' #FCTTIMO #Handle #ConnTim                           
*                                                                  
CALL 'RZKCALL' #FCTCONN #Handle #BrokerS #BrokerL #TopicS #TopicL  
*                                                                  
CALL 'RZKCALL' #FCTPROD #Handle #KeyS #KeyL #MsgS #MsgL #Conv      
*                                                                  
CALL 'RZKCALL' #FCTCONS #Handle                                    
               #GroupS  #GroupL #BufferS #BufferL #Conv #ConsTim    
*                                                                  
CALL 'RZKCALL' #FCTGERR #Handle #BufferS #BufferL                  
*                                                                  
CALL 'RZKCALL' #FCTCLSE #Handle                                    
*                                                                  
CALL 'RZKCALL' #FCTTERM                                            

Produce

The substantial section of a Natural program producing a message to a Kafka topic.

CALL 'RZKCALL' #FCTOPEN 
*                       
#Handle := RET('RZKCALL')                                            
*                                                                    
IF #Handle >= 0                                                      
THEN                                                                 
*                                                                    
  CALL 'RZKCALL' #FCTCONN #Handle #BrokerS #BrokerL #TopicS #TopicL  
*                                                                    
* CALL 'RZKCALL' #FCTPROD #Handle #KeyS #KeyL #MsgS #MsgL #Conv      
*                                                                    
  CALL 'RZKCALL' #FCTCLSE #Handle                                    
*                                                                    
END-IF                                                               


Consume

The substantial section of a Natural program consuming a message from a Kafka topic.

CALL 'RZKCALL' #FCTOPEN 
*                       
#Handle := RET('RZKCALL')                                            
*                                                                    
IF #Handle >= 0                                                      
THEN                                                                 
*                                                                    
  CALL 'RZKCALL' #FCTCONN #Handle #BrokerS #BrokerL #TopicS #TopicL  
*                                                                    
  MOVE #BufMaxL TO #BufferL            /* (Re)Set maximum buffer size
*                                                                    
  CALL 'RZKCALL' #FCTCONS #Handle                                    
                 #GroupS  #GroupL #BufferS #BufferL #Conv #ConsTim    
*                                                                    
  CALL 'RZKCALL' #FCTCLSE #Handle                                    
*                                                                    
END-IF                                                               


RzK Messages and Codes

RzK Messages and Codes

RZK Trace Messages

Trace Messages for a detailed analysis can be activated using DD-Name RZKSTUB in one of the following ways:

//RZKSTUB DD DUMMY

or 

TSO ALLOC FILE(RZKSTUB) DUMMY

 

The following sample trace messages where created for a PRODUCE of 1.000.000 messaged. 

RZKAPIAS - RzK CALL API Assembler Sample Program 
RZKSTUB0001T RVS z/OS Kafka Connector CALL API Stub Routine v1.0.0j (31.03.2023) started...
RZKSTUB0003T ChkTrc() DDName RZKSTUB found ALLOCATEd, message trace mode activated. 
RZKSTUB0008T TstApf() Running non-APF authorized. 
RZKSTUB0601T SysInf() Sys=RS09 Plex=RSYPLEX Lpar= Vm=RSYZOS Hw=RSYHOST Fmid=HBB77C0 CPUs=03/03. 
RZKSTUB0602T SysInf() >CPU Id=1 Ser=0000013907 
RZKSTUB0602T SysInf() >CPU Id=2 Ser=0000023907 
RZKSTUB0602T SysInf() >CPU Id=3 Ser=0000033907 (zIIP) 
RZKSTUB0009T IniSub() R15=x'00000000' Tkn=x'0D6EEDB8' Pip=x'80095330' Rtn=x'8D6AFA20'. 
03/31/2023 01:13:47.334 RZK0007I RZKOPEN -- -- RzK - RVS Kafka Connector for COBOL/C/..., z/OS Version 1.1 Build Mar 27 2023 12:37:27 
03/31/2023 01:13:47.334 RZK0017I RZKDBUG -- -- DebugLevel is set to 1 
03/31/2023 01:13:47.335 RZK0038I RZKSYNC -- -- Synch-Mode is set to 0: Produce Msgs are sent and control is returned imm, OffsetCo after reading X msgs
03/31/2023 01:17:03.212 RZK0018I RZKCLSE -- -- Statistics: #Msgs produce: 1000000 consume: 0 ConnectionTime: 195877 msec 
03/31/2023 01:17:03.212 RZK0019I RZKCLSE -- -- Statistics: snd=107524 rcv=18295 wait=19915 marshall=17449 unmarshall=0 msec, SendWinExh=0 
03/31/2023 01:17:03.212 RZK0022I RZKCLSE -- -- Statistics: sends=1000003 sendBytes=145000083 recvs=2000033 recvBytes=57001304 selectWait=7479, selectNoWait=1992522
RZKSTUB0010T TrmEnv() R15=x'00000000' Tkn=x'0D6EEDB8' Env=x'00000000'.
RZKSTUB0099T RVS z/OS Kafka Connector CALL API Stub Routine ended. 
***



RzK Messages and Codes

Informational Messages

Codes for informational messages are 0-4999

Code

Description

RZK0000I

Successful completion


RZK0001I

Broker <ip address/hostname>:<port>


RZK0002I

HEXDump for <description> len=<len of dump area>


RZK0003I

HEXDump offset: data text


RZK0004I

Request sent CorrelID=<id>


RZK0005I

Reply CorrID = <id> #Responses = <num> Error = <error code> <error text>


RZK0006I

Request sent to Broker id = <id> hostname = <hostname> port = <prot> Partition = <partition> topic = <topic> offs = <offset>

RZK0007I

<Version String> Build <date> <time>


RZK0008I

Server <hostname>:<port> supports <protocol> Verisons <min>-<max>


RZK0009I

Parameters are bootstrap server<hostname>:<port>  Topic=<topic>


RZK0010I

BrokerList id <id> = <hostname>:<port> rack=<rack description>


RZK0011I

Partition <id> Leader=<id> brokeridx=<index> isr=<id> err=<error#> <error string>


RZK0012I

MsgKeyHash for <MessageKey> results in Partition <id>


RZK0013I

Topic <topic> (Broker ID=<id>, <hostname>:<port>)


RZK0014I

PartitionListIdx=<index> Partition=<id> Leader=<id>


RZK0015I

MessageKey=<MessageKey> Encoding=<UTF8/Binary>


RZK0016I

Msg=<message to be sent to kafka> (len=<message len>)


RZK0017I

DebugLevel is set to <level>


RZK0018I

Statistics: #Msgs produce: <num>  consume: <num> ConnectionTime: <num> msec


RZK0019I

Statistics: snd=<num> rcv=<num> wait=<num> marshall=<num> unmarshall=<num> msec, SendWinExh=<num>

RZK0020I

Pending Replys from Broker <hostname>:<port>


RZK0021I

Reply received for CorrelID <id> from Broker <hostname>:<port>


RZK0022I

Statistics: sends=<num> sendBytes=<num> recvs=<num> recvBytes=<num> selectWait=<num>, selectNoWait=<num>

RZK0023I

Find Coordinator (type=<type>) for Group: <group>


RZK0024I

Connect to BrokerID=<broker id> <hostname>:<port>


RZK0025I

Found Coordinator for Group : <group>  BrokerID=<broker id> <hostname>:<port>


RZK0026I

Received Message from Kafka: BrokerID=<broker id>,   Partition=<id>, Offset=<offset>, MsgLen=<message len>

RZK0027I

Offset=<offset>


RZK0028I

Resend with Group MemberID: <member id>


RZK0029I

The group is rebalancing, so a rejoin is needed


RZK0030I

HexDump for <description> len=<len of dump area>, (only 256 bytes displayed)

 

RZK0031I

Information from kafka server: <error code> <error message>


RZK0032I

<description> Reply from Kafka: <hostname>:<port>  BrokerID=<broker id>  Partition=<id> CorrelID=<id>

RZK0033I

Debugging message that displays key/value pairs.  


RZK0034I

Debugging message that displays key/value pairs.  


RZK0035I

Skipping Message Partition=<id> offs=<offset> KeyLen=<key length> ValueLen=<value length>


RZK0036I

Receive and scratch left over data


RZK0037I

Shutting Down: Ignoring CorrelID <id> from Broker <hostname>:<port>  


RZK0038I

Synch-Mode is set to <mode>


RZK0039I

#Mechanisms = <number of supported SASL authentication mechanisms>, Supported Mechanisms = <list of supported mechanism>, err = <error code>

RZK0040I

Error Code = <error code>, Reply Error = <reply error>

RZK0041I

General Mutex: <Locked | Unlocked>


RZK0042I

HeartBeat Thread: <Starting | Already Started | …>


RZK0043I

Consumer load balancing <enabled/disabled>


RZK0044I

Connection closed on remote side: Broker ID=<broker id> <hostname>:<port>

 

 

RzK Messages and Codes

Warning Messages

Codes for warning messages are 5000-7499.

Code

Description

RZK5000W

Received Message truncated from <len> to <len> bytes


RZK5001W

License expires on <expiration date>. Pls contact support.


RZK5002W

Error with length of partition section in reply buffer


RZK5003W

Message is not complete: CorrID=<id> MessageOffset=<base offset> Offset in ReplyData=<marshall offset>

RZK5004W

Invalid window size. <send/receive> window size = <Size> , it will be reset to default size <size>


RZK5005W

No Reply received for <keyID>/<keyName>, CorrelID <id> from Broker <hostname>:<port>


RZK5006W

All open Sessions are closed 


 

RzK Messages and Codes

Error Messages

Codes for error messages are 7500-9999 

Code

Description

RZK7500E

TCPIP Error from Function <function>: <error code> (<error message>)


RZK7501E

Connection closed by Server. Most likely there is authentication required


RZK7502E

Invalid RzK handle


RZK7503E

Cannot allocate Memory for <internal ID>


RZK7504E

------- rzk_log_vf is called with MessageNum (<msg#>) and provides <num> arguments, but <num> arguments are required for this Message

RZK7505E

------- rzk_log_vf is called with MessageNum (<msg#>). Array has only <num> elements, <num> Element of array cannot be accessed

RZK7506E

Reply CorrID = <id> #Responses = <1> Error = <code> <message>


RZK7507E

No BrokerList available. Potential Sequence error in Application


RZK7508E

No PartitionList available. This is strange! There is a BrokerList but no PartitionList ...


RZK7509E

No Leader available for Partition <id>, Topic <topic>


RZK7510E

Error from kafka server: <error> <error message>


RZK7511E

No Broker (BorkerID=<id>) found for Partition <num>, Topic <topic>


RZK7512E

Could not connect to Broker <hostname>:<port> ID <brokerID> Error=<code> <msg>


RZK7513E

Could not connect to any Broker for Topic <topic>


RZK7514E

marshall RequestHeader failed


RZK7515E

 Timeout waiting for Reply from Kakfa, Broker <hostname>:<port>


RZK7516E

Unexpected Reply: No Queued Request for CorrelID <id> from Broker <hostname>:<port>


RZK7518E

Reply from kafka is <num> bytes, only processing <num> bytes


RZK7519E

No valid IP address or hostname is provided for BrokerID <broker id> (<hostname>)


RZK7520E

Find Coordinator for Group <groupname> failed: Could not find BrokerID                    <broker id> in Brokerlist

RZK7521E

unable to connect to any broker of this kafka cluster <…>


RZK7522E

License expired on <expiration date>. Pls contact support.


RZK7523E

No Coordinator available. Potential Sequence error in Application


RZK7524E

Could not Join Group <group> for topic <topic> on Broker ID=<id>  <hostname>:<port> rc=<rc> (<error text>)

RZK7525E

There is no Broker available for Partition <id> of Topic <topic>


RZK7526E

Processing Buffer with Wrong CorrelID: <hostname>:<port> BrokerID=<id> Partition=<id> CorrelID expected=<id> actual=<id>

RZK7527E

Synch-Mode <synch mode> is not supported


RZK7528E

No Connection to Coordinater. Broker ID=<broker id> <hostname>:<port>


RZK7530E

Cannot get local CodePage, String conversion to UTF-8 not possible


RZK7531E

The limit of max <x> concurrent sessions is reached!


RZK7532E

<system call> failed: Error=<system error #/errno>, <system error text>


RZK7533E

Broker <hostname>:<port>  - received reply for invalid Partition Index (only <id> Partitions known)


RZK7534E

Reply for unsupported Api Request: <id> /Version=<API-Version> (APIKey-Name) CorrelID = <id>


RZK7535E

There is already a consumer active for Topic:<topic> in Group:<group> and consume load balancing is not enabled

RZK7536E

Kafka Protocol Error: <kafka protocol error messages>


RZK7537E

Invalid Function Code <i>. Only Function Codes from 0 to 9 are supported 


 

RZK7538E 

iconv_open() failed for <function> with error <errorcode> 


 

RZK7539E 

iconv() failed with error <errorcode> 


 

RZK7540E 

CRC does not match in message from Broker <hostname>:<port> 


 

RZK7541E 

Topic is too long (<lenght> bytes), max 255 bytes are supported 


 

Additional Information

Additional Information

Apache Kafka Core Concepts

Apache Kafka is an event-streaming platform, that is optimized for high-throughput. It is a fault-tolerant and scalable platform to handle real-time data feeds.

Kafka Cluster and Brokers

Kafka works as a cluster of one or more nodes in one or more data centers. Kafka refers to a single Kafka server as a broker. The brokers are designed to operate as part of a Kafka cluster.

When you initally connect to Kafka, you perform a so-called bootstrap connection, handled by the RZKCONN API call of RzK. This connection can be to any of the brokers in the cluster, and is required to request all the information you need to produce (write, send, publish) a message to or consume (read, receive, fetch) a message from a Kafka cluster.

Zookeeper

Kafka uses a zookeeper. The zookeeper works as the central configuration and management system. It keeps track of the brokers, topics, the partition assignment and, leader election. Basically it manages the metadata of the Kafka cluster.

Messages

A message is the unit of data for Kafka. Kafka saves messages just as a sequence of bytes (byte array, buffer). It may contain any data, which may be text (like JSON) or arbitrary binary objects.

Messages can have a message key which is metadata that is used to determine the destination partition.

Message Key

Topics and Partitions

Topics are logical categories of messages. Messages are organized in topics, and a topic generally contains messages with the same type of data.  

A topic is organized into one or more partitions. Partitions are the core concept behind Kafka’s scaling capabilities. The concept of partitions allows to split data of a topic into multiple streams, allowing for more throughput and better storage usage.

The messages on a partition are ordered by a number called the offset

Replication

A topic can be replicated onto multiple brokers to improve the availability. When replication is set up, one of the brokers is the leader for one partition, and the other brokers are the followers.

When the broker in lead goes offline, another broker becomes automatically the leader for the partition.

Producer

The Kafka Producer (client) writes or publishes messages to a Kafka cluster. Depending on its configuration and parameters (eg. message key), the producer chooses the destination partition.

A producer can publish to one or more topics. 

Remember the bootstrap connection that was mentioned above? For a producer the bootstrap connection returns the information required to publish to a topic, i.e. the broker and partition to use. The producer can then estabhlish the connection to the partition leader and start publishing messages. These actions happen automatically when a producer connects to the Kafka cluster.

Consumer

A Kafka Consumer (client) reads or fetches data from a Kafka cluster.

As multiple producers can write messages to a cluster, multiple consumers can read those messages.

When a consumer processes a message, the message is not removed from its topic. Consumers remember and store within Kafka which messages have been processed. They commit the offset, i.e. store the position of the message they have last read.

Consumer Groups

Consumer groups are established for the purposes to scale reading, i.e. consuming messages. Multiple consumers can subscribe to one topic and belong to the same consumer group. Kafka assures that the group will receive messages from different partitions in the topic and balances the load. 

The group name is used in Kafka to manage the current consume offset:

Groups also identify which consumers are load balanced (if enabled).

The zKafka Connector

Now that you are familiar with the key Kafka terms, read on how you can use the zKafka Connector in your process flows and programs to take full advantage of the power of Kafka Clusters at What is the zKafka Connector?

Additional Information

AT-TLS Configuration for RzK

Using IBM z/OS Application Transparent Transport Layer Security (AT-TLS) is the best and securest way to implement TLS security for a z/OS application. Some of the features provided with an AT-TLS implementation are:

RzK and AT-TLS

The zKafka Connector uses AT-TLS in the following way:

Requirements

AT-TLS requires the Policy Agent to be configured and started, and the TCP/IP stack to be enabled for AT-TLS.

Configuration

The Kafka server and the RzK client have to be configured in AT-TLS as non-controlling AT-TLS applications:

Choosing the right Cipher Algorithm

RzK and AT-TLS support TLS through the system SSL cryptographic services base element of z/OS. System SSL supports different cipher algorithms that provide both encryption and data authentication for data integrity.

Encryption scrambles the data so it is transferred confidentially and cannot be interpreted without a special key.

Data authentication algorithms ensure that the data was not modified during the transfer.

Some of the supplied cipher algorithms provide only data authentication, and some provide both encryption and authentication.

Be aware that the actual cipher algorithm used for the session is determined by a negotiation between the Kafka server and the RzK client. Please be aware, that both sides - server & client - have to support the selected cipher algorithms.

For example, if you configure an RzK client to use the Triple DES encryption with SHA authentication algorithm, but the server does not support that cipher algorithm, Triple DES encryption & SHA authentication will not be used for sessions between the RzK client and the Kafka server.

AT-TLS Sample Policy for Rzk Clients

The following example AT-TLS policy for RzK clients may also be found in SAMPLIB(RZKTLSK0).

#*********************************************************************
#                                                                    *
#    Product: RVS z/OS Kafka Connector (RzK)                         *
#                                                                    *
#     Member: SAMPLIB(RZKTLSK0)                                      *
#                                                                    *
#       Type: AT-TLS Policy                                          *
#                                                                    *
#   Function: AT-TLS Sample Policy for Rzk Clients                   *
#                                                                    *
#      Notes: o Adapt the required AT-TLS parms below to your needs. *
#                                                                    *
#                 (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
#*********************************************************************
TTLSRule                           RZK-Client                       
{                                                                   
   LocalAddr                       ALL                              
   RemoteAddr                      <KAFKA Server IP>                    
   RemotePortRange                 xxxx-xxxx                        
   Direction                       Both                             
   Priority                        251                              
   TTLSGroupActionRef              gAct-RZKClientx                  
   TTLSEnvironmentActionRef        eAct-RZKClientx                  
   TTLSConnectionActionRef         cAct-RZKClientx                  
}                                                                   
TTLSGroupAction                    gAct-RZKClientx                  
{                                                                   
   TTLSEnabled                     On                               
   GroupUserInstance               1                                
   Trace                           6                               
}                                                                   
TTLSEnvironmentAction              eAct-RZKClientx                  
{                                                                   
   HandshakeRole                   Client                           
   TTLSEnvironmentAdvancedParms                                     
   {                                                                
      ClientAuthType               PassThru                         
   }                                                                
   EnvironmentUserInstance         0                                
   TTLSKeyringParmsRef             kyRingParms-RZKClientx           
   Trace                           6                               
}                                                                   
TTLSConnectionAction               cAct-RZKClientx                  
{                                                                   
   HandshakeRole                   Client                           
   TTLSCipherParmsRef              cipher_RZKClientx                
   TTLSConnectionAdvancedParmsRef  cAdv-RZKClientx                  
   CtraceClearText                 On                               
   Trace                           6                               
}                                                                   
TTLSConnectionAdvancedParms        cAdv-RZKClientx                  
{                                                                   
   ApplicationControlled           Off                              
   SecondaryMap                    On                               
   SSLv3                           Off                              
   TLSv1                           Off                              
   TLSv1.1                         On                               
   TLSv1.2                         On                               
}                                                                   
TTLSKeyringParms                   kyRingParms-RZKClientx           
{                                                                   
   Keyring                         RZK_Keyring                      
}                                                                   
TTLSCipherParms                    cipher_RZKClientx                
{                                                                   
#  AES 256                                                             
   V3CipherSuites                  TLS_DHE_RSA_WITH_AES_256_CBC_SHA    
   V3CipherSuites                  TLS_RSA_WITH_AES_256_CBC_SHA        
   V3CipherSuites                  TLS_DHE_DSS_WITH_AES_256_CBC_SHA    
}