rvs-Systems GmbH presents
rvs-Systems zKafka Connector User Manual for Release v1.2
Alexander-Fleming-Ring 19 |
|
CEO: Carl-Heinz Benecke
Registration: Amtsgericht Lübeck HRB 14223 HL
© Copyright 2024 by rvs-Systems GmbH
Content
- Apache Kafka and the z/OS
- What is the zKafka Connector?
- Installation & Quickstart
- RzK Command Line Interface (CLI)
- RZKCOPY Utility
- Using the RzK API
- RzK API Function Call Reference
- Terms and Conventions
- RZKOPEN - Open
- RZKCONN - Connect
- RZKPROD - Produce
- RZKCONS - Consume
- RZKCLSE - Close
- RZKAUTH - Authentication
- RZKDBUG - Debug Level
- RZKGERR - Get Last Error
- RZKCLDB - Load Balancing
- RZKSYNC - Set Synchronous Mode
- RZKTERM - Terminate
- RZKTIMO - Group Member Time Out
- RzK API Example Code
- RzK Messages and Codes
- Additional Information
Apache Kafka and the z/OS
Apache Kafka has become one of the best-known platforms for event-based processing and streaming of data. A Kafka cluster shares the mainframes core capabilities: high availability (resilience), scalability and permanent storage. Kafka client libraries allow to connect from almost anywhere and anything to your Kafka cluster.
On z/OS though, connecting to a Kafka cluster in the open world is quite complex. There is no easy way to integrate Kafka client libraries. Still, many of the most important business processes and a huge amount of business data resides on the mainframe, with the need to be integrated into your cloud infrastructure.
We developed the rvs-Systems zKafka Connector (RzK) as an easy to implement and use interface from your z/OS applications to any Kafka cluster. Our first intention was to provide a secure and direct way for mainframe (z/OS) data into open world Kafka clusters. Further functions were added on in the last months.
There are two ways to continue reading:
- In case you are new to Kafka, you may find some basic information about Kafka in the section Apache Kafka Core Concepts first.
- We guess that most of you are familiar with Kafka already, therefore just continue to find more information about the zKafka Connector on the next pages.
What is the zKafka Connector?
The zKafka Connector (RzK) is a smart link between mainframe z/OS applications and your Kafka clusters.
RzK offers an easy way to produce (write) and consume (read) data from any z/OS application to and from your Kafka clusters. It does not require any further program product, nor middleware. The zKafka Connector significantly simplifies your mainframe data integration with the cloud.
RzK itself is a z/OS (MVS) middleware, which may be used in any program to communicate directly with your Kafka cluster. RzK does not require additional components, OMVS/USS nor Java.
An API is available for IBM Language Environment (LE) programms in
- COBOL
- C/C++
- PL/I
- Fortran
as well as a standard z/OS CALL API to be used from non-LE languages like
- Assembler (HLASM)
- REXX (LINKPGM)
- NPL (NATURAL)
- etc
and there are further easy and fast integration methods available:
- The RZKCOPY Utility and
- the RzK Command Line Interface (CLI).
If you can't wait to test the zKafka Connector, start with chapter z/OS.
As a software developer you will find numerous ready-to-run source code examples in the SAMPLIB
library. A description of all these samples can be found at chapter RzK API Example Code.
Installation & Quickstart
The zKafka Connector distribution file that you received through email or downloaded from the rvs-Systems Software Download Portal is usually in zip format. Please extract (unzip) the contained files first. You will find an archive for each platform: z/OS (xmi), Linux (tar) and Windows (zip), as well as the user manual.
z/OS
Currently RzK consists of only a few z/OS modules with some ALIASes and a lot of code examples. The RzK programs do not run APF-authorized and may - but do not necessarily have to - be placed in a z/OS system library. RzK also allows to be used directly from a user library (via STEPLIB). An installation in the classical sense with SMP/E is therefore not necessary.
Only a few steps are required for installation and a first quick successful z/OS to Kafka communication:
- Transfer the zKafka Connector distribution file to your z/OS target system.
- Unpack the distribution file on z/OS to create the contained product libraries.
- Run the provided RzK Command Line Interface (CLI) sample job to produce and/or consume some test messages into/from your Kafka cluster.
For more details, see the following sections below.
Transfer the distribution file to the z/OS target system
The zKafka Connector distribution file that you received through email or downloaded from the rvs-Systems Software Download Portal is usually in zip format. Please extract (unzip) the contained XMI file first before transferring it to your z/OS host.
Now use any file transfer program of your choice, like FTP or TSO IND$FILE, to BINARY transfer the XMI distribution file to the z/OS system.
XMI data sets on z/OS are files in the TSO TRANSMIT format. These have to be allocated with DCB attributes RECFM=FB and LRECL=80. Otherwise, these can not be extracted. Depending on your z/OS installation defaults, it may be helpful to pre-allocate the target data set with sufficient space and the attributes above.
Unpack the distribution file on z/OS
Unpacking the sequential distribution file is a two-stage process:
- The XMI data set is TSO RECEIVEd to create a single self-extracting distribution library.
- The resulting library is TSO EXECuted to unpack itself and create all required zKafka Connector product libraries.
You may do this online within a TSO session or just copy and use the following batch job. If you have previously installed a version of the zKafka Connector, you should find this JCL also in SAMPLIB($RZKUNPK)
.
If you are using the HTML version of the RzK documentation, you may hover with the mouse into the code boxes and use the copy button in the top right corner to copy the code box content into you clipboard.
//jobname JOB
//* *
//* Product: RVS z/OS Kafka Connector (RzK) *
//* *
//* Member: SAMPLIB($RZKUNPK) *
//* *
//* Function: Unpack the sequential RzK distribution XMI file and *
//* create a self-extracting distribution library. *
//* Execute this library to create the contained RzK *
//* product libraries. *
//* *
//* Notes: o Adapt the input (XMI) and output (OUT) data set *
//* names to your needs. *
//* *
//* (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
//********************************************************************
// EXPORT SYMLIST=*
// SET XMI=myHlq.RZKV120.XMI
// SET OUT=myHlq.RZKV120
//*
//UNPACK EXEC PGM=IKJEFT01,DYNAMNBR=50
//*
//SYSTSPRT DD SYSOUT=*
//SYSTSIN DD *,SYMBOLS=JCLONLY
RECEIVE INDATASET('&XMI')
DATASET('&OUT')
EXEC '&OUT'
/*
//
Once the unpacking is completed, you find the following libraries as part of the zKafka Connector distribution:
LOADLIB
: A PDSE library containing the required program objects and load modules with their aliases.SAMPLIB
: A PDSE library containing sample jobs and sample program source code.
To keep the installation as simple as possible, the zKafka Connector contains only a few non-authorized modules. No APF-authorization is required. This has the advantage that you may test or use it now out-of-the-box without any risk.
Run the RzK Command Line Interface (CLI)
Use the RzK Command Line Interface to test your zKafka Connector installation immediately. The RzK CLI can produce (or consume) one or more test messages directly to (or from) your Kafka cluster without any programming efforts.
You need the following information to proceed:
Name | Description | JCL Symbol Name |
Broker | Kafka broker host name or IP address and port. RzK CLI default is 127.0.0.1:9092 | BRK |
Topic | Kafka topic name to be used. RzK CLI default is "quickstart-events". | TPC |
STEPLIB | Fully qualified RzK LOADLIB data set name from the unpack step above. | LIB |
Message Key | Optional. Message key for the Kafka Produce. RzK CLI default is no key "". | PRO |
Consumer Group | Optional. Consumer group name for Kafka Consume. RzK CLI default is "RzkGroup". |
CON |
Message | Optional. Test message to be produced into you Kafka cluster. RzK CLI default is a message containing the current RzK version information. |
MSG |
RZK Options |
Optional. -r Repeat messages, -v Debug level [0..15] |
OPT |
For more details, see chapter RzK Command Line Interface (CLI).
The sample batch job in SAMPLIB(RZKCLIJM)
contains the JCL listed below. Update at least the jobcard and the JCL symbols BRK, TPC and LIB before submitting the job.
//jobname JOB
//* *
//* Product: RVS z/OS Kafka Connector (RzK) *
//* *
//* Member: SAMPLIB(RZKCLIJM) *
//* *
//* Function: Use the RZK Command Line Interface to produce and *
//* consume one or more test messages. *
//* *
//* Notes: o Adapt the JCL symbol values below to your needs. *
//* *
//* o The STEPLIB DD statements are only required, *
//* if the RZK* modules do not (yet) reside in the *
//* z/OS LNKLST or LPALST concatenation. *
//* *
//* (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
//********************************************************************
// EXPORT SYMLIST=*
// SET OPT='-r5 -v1' Repeat DbgLvl
// SET PRO='-p -k myMsg' PRODUCE MsgKey
// SET CON='-c -g myGrp -t10' CONSUME ConGrp Tout
// SET TPC='myTopic' Kafka Topic
// SET BRK='myKafka.myDomain.de:9092' Kafka Broker:Port
//*
// SET MSG='"Test msg sent via RZK CLI."'
//*
// SET RZK=myHlq.RZKV120.LOADLIB RZK PgmObj PDSE
//*
//RZKPROD EXEC PGM=RZK,PARM='&PRO &OPT &BRK &TPC &MSG'
//STEPLIB DD DISP=SHR,DSN=&RZK RZK PgmObj PDSE
//SYSPRINT DD SYSOUT=* RZK Messages
//*
//RZKCONS EXEC PGM=RZK,PARM='&CON &OPT &BRK &TPC'
//STEPLIB DD DISP=SHR,DSN=&RZK RZK PgmObj PDSE
//SYSPRINT DD SYSOUT=* RZK Messages
//
Using RzK with TLS/SSL Encrypted Connections
If you have the requirement to use RzK with TLS/SSL encrypted connections, please refer to section AT-TLS Configuration for RzK.
Next Steps
If you have successfully done the steps above and are keen for more, you have the following choices to proceed:
- Continue with chapter RzK API Example Code and use some of our sample programs contained in the
SAMPLIB
to get more familiar with the zKafka Connector API functions. - Immediately start integrating zKafka Connector function calls into your existing programs by RzK API Function Call Reference.
- Use the RZKCOPY Utility to integrate the RzK connectivity into your existing processes without any program development at all.
LINUX
The zKafka Connector LINUX package is intended for development and testing of your application.
If for example you develop your Application in C, you can do this in a LINUX environment and do easy development and debugging using LINUX tools. The application can then be transferred to z/OS and compiled using the z/OS loadl ibrary of the zKafka Connector.
The LINUX package contains:
• librzk.so: the LINUX DLL of zKafka
• rzk.h: the include for your C-Application
• rzk_sample.c: sample
• rzk_sample: executable version of the sample
• rzk_linux: LINUXversion of the RzK Command Line Interface (CLI).
Only a few steps are required for installation and a first quick successful Kafka communication:
- Unpack the LINUX package.
- Run the RzK Command Line Interface (CLI) to produce and/or consume some test messages into/from your Kafka cluster.
Run the RzK Command Line Interface (CLI)
Use the RzK Command Line Interface to test your zKafka Connector installation immediately. The RzK CLI can produce (or consume) one or more test messages directly to (or from) your Kafka cluster without any programming efforts.
You need the following information to proceed:
Name | Description |
Broker | Kafka broker host name or IP address and port. RzK CLI default is 127.0.0.1:9092 |
Topic | Kafka topic name to be used. RzK CLI default is "quickstart-events". |
Message Key | Optional. Message key for the Kafka Produce. RzK CLI default is no key "". |
Consumer Group | Optional. Consumer group name for Kafka Consume. RzK CLI default is "RzkGroup". |
Message | Optional. Test message to be produced into you Kafka cluster. RzK CLI default is a message containing the current RzK version information. |
RZK Options |
Optional. -r Repeat messages, -v Debug level [0..15] |
For more details, see chapter RzK Command Line Interface (CLI).
Sample for producing 5 test message to your kafka cluster into topic "myTopic":
rzk_linux -r5 -v1 -p mykafka.mydomain.com:9092 myTopic "Test msg sent via windows RzK CLI"
Windows
The zKafka Connector Windows package is intended for development and testing of your application.
As an example, if you develop your application in C, you can do this in a Windows environment and do easy development and debugging using Windows tools. The application can then be transferred to z/OS and compiled using the z/OS load library of the zKafka Connector.
The Windows package contains:
• rzk.dll: the windows DLL of zKafka
• rzk.h: the include for your C-Application
• rzk.lib: the link library
• rzk_sample.c: sample
• rzk_sample.exe: executable version of the sample
• rzk_win.exe: windows version of the RzK Command Line Interface (CLI).
Only a few steps are required for installation and a first quick successful Kafka communication:
- Unpack the windows package.
- Run the RzK Command Line Interface (CLI) to produce and/or consume some test messages into/from your Kafka cluster.
Run the RzK Command Line Interface (CLI)
Use the RzK Command Line Interface to test your zKafka Connector installation immediately. The RzK CLI can produce (or consume) one or more test messages directly to (or from) your Kafka cluster without any programming efforts.
You need the following information to proceed:
Name | Description |
Broker | Kafka broker host name or IP address and port. RzK CLI default is 127.0.0.1:9092 |
Topic | Kafka topic name to be used. RzK CLI default is "quickstart-events". |
Message Key | Optional. Message key for the Kafka Produce. RzK CLI default is no key "". |
Consumer Group | Optional. Consumer group name for Kafka Consume. RzK CLI default is "RzkGroup". |
Message | Optional. Test message to be produced into you Kafka cluster. RzK CLI default is a message containing the current RzK version information. |
RZK Options |
Optional. -r Repeat messages, -v Debug level [0..15] |
For more details, see chapter RzK Command Line Interface (CLI).
Sample for producing 5 test message to your kafka cluster into topic "myTopic":
rzk_win -r5 -v1 -p mykafka.mydomain.com:9092 myTopic "Test msg sent via windows RzK CLI"
RzK Command Line Interface (CLI)
Overview
The RzK Command Line Interface (CLI) provides an easy to use utility to execute and/or test RzK functions.
You require the following information about the Kafka cluster to start the RzK CLI:
- broker - host name or IP address of the Kafka cluster to connect to
-
port - the listening port of one of the respective Kafka brokers
-
A default value of "9092" is used in case the parameter is ommitted
-
- topic - Kafka topic name to be used for produce or consume
- A default value of "quickstart-events" is used by the RZK CLI in case no topic is provided.
- message - the message to be produced, i.e. published into the above topic of the above cluster
RzK CLI Syntax
RZK [options] [broker[:port] [topic [message]]]
Options
Options are the different RzK functions that can be called via the following parameter set.
For ease of use, the input parameters of the options are provided here. A detailed explanation of the functions and their parameter set is provided in the respective chapters (referenced for each option below).
- -v[x] - generate debug output, values are 0-15, the default is 0.
- 0: only error messages are issued.
- 1-15: RzK provides tracing and debugging information. The higher the number the more details are displayed.
- -u - specify username and password for authentication, e.g. -u kafkabroker1:kafkabroker1-secret.
- -p - produce/publish a message.
-
-k - msgkey, provide a message key for the produce function.
-
The default is no message key, i.e. Kafka will distribute the messages evenly in the partitions for the topic.
-
- -b - binary produce the message, no EBCDIC to UTF-8 conversion is done.
- -c - consume/fetch messages.
- -g - group, define the consumer group.
- The default is "RzkGroup".
- The default is "RzkGroup".
- -b - binary consume the messages, no UTF-8 to EBCDIC conversion is done.
- -t - Set a timeout for consume to # seconds.
- Default is 0 seconds, means no timeout.
- If no timeout has been specified and no more messages are currently available, the CLI waits until more messages arrive and the specified retry count (-r) is met.
- -q - quite mode, the CLI will not display the messages returned by consume. This is helpful for doing high volume tests.
- -l - enable consumer load balancing.
RZKSYNC - Set Synchronous Mode
- -s - use the synchronous mode for produce and consume.
- -R - set the "receiving window size" for asynchronous consume.
- Default value is 100.
- -S - set the "sending window size" for asynchronous produce.
- Default value is 100.
- Default value is 100.
Specific CLI-only options
- -r[#] - repeat this request/action # times.
- The default is 1.
- In combination with -p: the specified number of messages will be sent to the Kafka cluster.
- In combination with -c: the specified number of messages will be received from the Kafka cluster.
- The default is 1.
- -m[#] - make produce message # bytes.
- If the message text specified on the command line is longer than #bytes, the message is truncated.
Otherwise, the message text from the command line is repeated until the total size of #bytes is reached.
- If the message text specified on the command line is longer than #bytes, the message is truncated.
- -? - show this message
RzK CLI Examples
In the following sections you find some examples how to use the RzK CLI:
- Use RZK as TSO Command
- Produce Messages with the RzK CLI, see
SAMPLIB(RZKCLIJP)
- Consume Messages with the RzK CLI, see
SAMPLIB(RZKCLIJC)
To be able to use the RzK CLI as TSO command, the RzK* modules must reside in a STEPLIB used during TSO LOGON, or in the ISPF ISPLLIB concatenation, or in the z/OS LPALST or LNKLST concatenation.
The STEPLIB DD statements to the RzK load library in all sample jobs are only required, if the RZK* modules do not (yet) reside in the z/OS LPALST or LNKLST concatenation.
Use RZK as TSO Command
TSO RZK -?
RzK, the rvs-Systems zKafka Connector
z/OS CLI, Version 1.2 Build Jul 3 2023 18:35:41
All rights reserved. Copyright 2021-2023 rvs-Systems GmbH, Ruesselsheim
Invocation syntax is rzk [options] [broker[:port] [topic [message]]]
broker hostname or IP-address of one of the kafka brokers, default is 127.0.0.1
port the listen port of one of the kafka brokers, default is 9092
topic the name of the Topic that should be used for produce and consume.
Default is "quickstart-events"
message will be sent to Kafka as value to be published (produced) in the Topic
options are:
-v[x] to generate debug output (0-15, default is 0)
-u specify username and password as e.g. -u kafkabroker1:kafkabroker1-secret
-p produce a message
-c consume/fetch messages
-k msgkey (for produce, default is no MsgKey)
-g group (for consume, default is "RzkGroup")
-q do not display messages returned by consume
-l enable consumer load balancing
-b binary, do not use EBCDIC to UTF-8 conversion for produce and consume Data
-r[#] repeat action # times, default is 1
-m[#] make produce message # bytes
-s synchronous mode
-R[#] set Receiving window size for asynchronous consume
-S[#] set Sending window size for asynchronous produce
-t[#] set timeout for consume to # seconds
-? to show this message
Produce Messages with the RzK CLI
The sample JCL below can also be found in SAMPLIB(RZKCLIJP)
.
//jobname JOB
//* *
//* Product: RVS z/OS Kafka Connector (RzK) *
//* *
//* Member: SAMPLIB(RZKCLIJP) *
//* *
//* Function: Use the RzK Command Line Interface to produce one or *
//* more test messages. *
//* *
//* Notes: o Adapt the JCL symbol values below to your needs: *
//* *
//* - OPT: RzK CLI Options *
//* - PRO: Produce Parameters *
//* - TPC: Kafka Topic *
//* - BRK: Kafka Broker & Port *
//* - MSG: Message text to be produced. *
//* *
//* o The STEPLIB DD statements are only required, *
//* if the RZK* modules do not (yet) reside in the *
//* z/OS LNKLST or LPALST concatenation. *
//* *
//* (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
//********************************************************************
// EXPORT SYMLIST=*
// SET OPT='-r5 -v1' Repeat DbgLvl
// SET PRO='-p -k myKey' PRODUCE MsgKey
// SET TPC='myTopic' Kafka Topic
// SET BRK='myKafka.myDomain.de:9092' Kafka Broker:Port
//*
// SET MSG='"Test msg sent via RZK CLI."'
//*
// SET RZK=myHlq.RZKV120.LOADLIB RZK PgmObj PDSE
//*
//RZKCLI EXEC PGM=RZK,PARM='&PRO &OPT &BRK &TPC &MSG'
//*
//STEPLIB DD DISP=SHR,DSN=&RZK RZK PgmObj PDSE
//SYSPRINT DD SYSOUT=* RZK Messages
//
Consume Messages with the RzK CLI
The sample JCL below can also be found in SAMPLIB(RZKCLIJC)
.
//jobname JOB
//* *
//* Product: RVS z/OS Kafka Connector (RzK) *
//* *
//* Member: SAMPLIB(RZKCLIJC) *
//* *
//* Function: Use the RzK Command Line Interface to consume one or *
//* more test messages. *
//* *
//* Notes: o Adapt the JCL symbol values below to your needs: *
//* *
//* - OPT: RzK CLI Options *
//* - CON: Consume Parameters *
//* - TPC: Kafka Topic *
//* - BRK: Kafka Broker & Port *
//* *
//* o The STEPLIB DD statements are only required, *
//* if the RZK* modules do not (yet) reside in the *
//* z/OS LNKLST or LPALST concatenation. *
//* *
//* (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
//********************************************************************
// EXPORT SYMLIST=*
// SET OPT='-r5 -v1' Repeat DbgLvl
// SET CON='-c -g myGrp -t10' CONSUME ConGrp Tout
// SET TPC='myTopic' Kafka Topic
// SET BRK='myKafka.myDomain.de:9092' Kafka Broker:Port
//*
// SET RZK=myHlq.RZKV120.LOADLIB RZK PgmObj PDSE
//*
//RZKCLI EXEC PGM=RZK,PARM='&CON &OPT &BRK &TPC'
//*
//STEPLIB DD DISP=SHR,DSN=&RZK RZK PgmObj PDSE
//SYSPRINT DD SYSOUT=* RZK Messages
//
RZKCOPY Utility
The zKafka Connector (RzK) allows you to easily use the full Kafka capabilities without any program changes or programming knowledge.
With the RZKCOPY utility you can
- immediately produce records from one or more z/OS MVS data sets and/or USS files directly as messages into a Kafka topic, or
- consume messages directly from a Kafka topic and write them into a z/OS MVS data set or a USS file for further processing.
RZKCOPY works similiar to a standard z/OS copy utility (i.e. IEBGENER, IEBCOPY) and may be used via JCL in batch, as TSO command in foreground or within REXX or USS shell scripts. If required, it may also be called from any program in any language.
For a produce operation, RZKCOPY reads all records from all MVS data sets and/or USS files found allocated to DD name RZKFILE and produces them as messages directly to the specified Kafka topic.
For a consume operation, RZKCOPY consumes messages directly from the specified Kafka topic and writes them as records into the MVS data set or USS file allocated to DD name RZKFILE. The consume processing ends, when the specified number of messages has been consumed or a timeout interval has expired in which no new messages had been received.
Message Processing Details
Due to the unique aspects of z/OS MVS data sets and USS files on one side and Unicode UTF-8 on the other side, a few conversions related to message encoding and message length have to be made prior to producing records read from a data set or file. Similar conversions must be applied, when messages have been consumed from a Kafka topic and before they can be written into an MVS data set or a USS file.
Message EBCDIC/UTF-8 Conversion
Most z/OS data is encoded in EBCDIC, not in ASCII or UTF-8. As long as you produce and consume your data only from z/OS systems, no EBCDIC/UTF-8 conversion is required. As soon as other platforms are involved, you probably would like to convert EBCDIC-encoded data to UTF-8 and vice versa.
RzK supports this conversion automatically during produce and consume processing. By default, RZKCOPY will convert all records read from an MVS data set or USS file to UTF-8 during produce processing. During consume, all messages consumed will be converted by default from UTF-8 to EBCDIC before they are written into the target MVS data set or USS file.
If you do not need or want automatic EBCDIC/UTF-8 conversion during RZKCOPY produce or consume processing, specify the binary option -b
in the RZKCOPY PARM string.
Unicode encoded as UTF-8 is a variable-length multi-byte code. While standard letters (like A..Z, a..z, 0..9) always occupy exactly one byte, other special characters from the EBCDIC character set (like ÄÖÜ, äöü) are translated into two-byte long codes. For this reason, the length of some records converted to UTF-8 messages will increase. During consume processing, the reverse effect occurs and the length of records converted from received messages decreases because some UTF-8 two-byte codes will be converted (back) to only one byte.
Message Length vs. Record Length
In a Kafka topic, each message has its own variable length.
On a z/OS system, MVS sequential data sets have a pre-defined record format (RECFM). There are basically two formats, which have to be handled differently:
- RECFM=F: All records in the data set have the same fixed length (LRECL). Shorter records are filled during file creation with a fill (pad) character, typically - but not necessarily - a blank (space). Example: RECFM=F, LRECL=80 means, that all records in the file have a length of 80 bytes.
- RECFM=V: Each record within the data set has also its own variable length, but not longer than the maximum length (LRECL) specified during file allocation minus four for the Record Descriptor Word (RDW). Example: RECFM=V, LRECL=84 means, that the record may contain data with a length of 1..80 bytes.
Produce Processing - Message Length
During produce processing, RZKCOPY honors the file attributes (DCB) of each single MVS data set and USS file allocated to DD name RZKFILE.
When using RZKCOPY, you may concatenate multiple MVS data sets mixed with multiple USS files all with different DCB attributes (RECFM, LRECL, BLKSIZE) at the same time.
During read processing, RZKCOPY recognizes the change of the data set or file and its DCB attributes and always uses the correct record format (RECFM) and record length (LRECL) for processing:
- Records with a fixed-length (RECFM=F) will always be produced with the LRECL of the data set. No stripping of trailing pad characters is done. Example: If you produce a RECFM=F, LRECL=80 data set, each message produced to Kafka has a length of 80 bytes before the optional EBCDIC to UTF-8 conversion.
- Records with a variable-length (RECFM=V) will always be produced as messages with an individual length. The length of each message is derived from the records RDW. Example: If you produce a RECFM=V, LRECL=84 data set, each message produced to Kafka may have an individual length between 1..80 bytes before the optional EBCDIC to UTF-8 conversion.
Consume Processing - Record Length
When RZKCOPY writes the consumed Kafka messages to the target MVS data set or USS file allocated to DD name RZKFILE, it always uses the DCB attributes of this output file. RZKCOPY never uses a built-in default. It is the responsibility of the user, to always provide proper DCB attributes.
For RZKCOPY consume processing, at least RECFM, LRECL and a BLKSIZE must be specified for the target output file allocated to DD name RZKFILE, either through the DCB attributes of a pre-allocated data set or through JCL parameters.
If the length of a consumed message after conversion is shorter than or equal to the available space in the record, then
- for records with a fixed-length (RECFM=F), the message will be moved into the record. If the message is shorter than the LRECL of the data set, an EBCDIC blank (space) x'40' is used as pad character to fill the record up to its target (LRECL) length. Example: If you consume into a RECFM=F, LRECL=80 data set, a message with a length of 60 bytes will be padded with 20 blanks to the right for a target LRECL of 80 bytes.
- for records with a variable-length (RECFM=V), the message will be moved into the record. The Record Descriptor Word (RDW) of this record is set to the length of the consumed message (plus four). No pad characters are applied. Example: If you consume into a RECFM=V, LRECL=84 data set, a message with a length of 60 bytes results also into a record with a 60 bytes length and an RDW value of 64.
If the length of a consumed message is longer than the LRECL and exceeds the available space in the record, then for fixed-length (RECFM=F) and variable-length (RECFM=V) records, the message is truncated at the end to fit into the maximum record length and a warning message is generated.
To avoid message truncation when consuming messages with unknown different lengths from a topic, use DCB attributes of RECFM=VB, LRECL=32756, BLKSIZE=32760 for your MVS output data set or USS file allocated to DD name RZKFILE.
The maximum allowed record length and block size for normal (non-spanned) sequential data sets on z/OS is 32760.
Ending Produce Processing
When producing records from MVS data sets and/or USS files to a Kafka topic, the produce processing ends, when all records have been read (end of files reached, EOF) and also have been produced to Kafka.
Ending Consume Processing
While consuming messages from Kafka, there is no such "End of Messages" indicator to end consume processing. RZKCOPY would wait "forever" for the next Kafka message, if there were no other methods to end consume processing. RZKCOPY provides two options for this:
- Specify the maximum number of messages to be consumed with RZKCOPY PARM string option
-r
(repeat consume counter). After the specified number of messages have been consumed, RZKCOPY processing ends. - Specify a timeout interval value (in seconds) with RZKCOPY PARM string option
-t
(timeout). After the specified timeout interval has expired in which no new messages had been received from Kafka, RZKCOPY processing ends.
If you specify a repeat value which is much larger than the average number of Kafka messages available for consume processing in a given time, RZKCOPY probably waits much longer than you expected. Example: If you specifiy a repeat value of -r1000
to consume 1.000 messages, but there are only 1.000 messages arriving in the whole day, RZKCOPY will run/wait also one day.
If you specifiy a consume timeout value, which is longer than the average time between two Kafka messages arriving for consume, the timeout intervall does not expire and RZKCOPY probably runs much longer, consuming much more messages, than you expected to. Example: If you specify a timeout value of 10 seconds with -t10
but your Kafka is very busy and delivers several messages each second, the timeout interval will amost never expire.
To avoid that RZKCOPY is running longer than you are willing to, depending on your Kafka environment and the amount of messages and their incomming interval, specify both options to end consume processing. Example: Options -r1000 -t10
will end consume processing when either 1.000 messages have been consumed or there were no new messages available from Kafka for the last 10 seconds.
Running RZKCOPY to consume messages without any option to end consume processing properly will allow RZKCOPY to run forever.
RZKCOPY Parameters
The major parameters for controlling the RZKCOPY functionality are based on those of the RzK Command Line Interface (CLI). However, due to the different functionality of both utilities, there are some differences in their individual options.
The RZKCOPY parameter format is
RZKCOPY [options] broker[:port] topic
The options may be coded in any sequence, separated by at least one blank (space).
Option | Description |
-b |
Binary mode. No EBCDIC/UTF8 conversion takes place during produce or consume processing. |
-c |
Consume processing. Messages are consumed from the specified Kafka topic and written as records into the MVS data set or USS file allocated to DD name RZKFILE . |
-g |
Consumer group used during consume processing. For more details see section on Consumer / Consumer Groups in Apache Kafka Core Concepts. |
-k |
Produce message key. The message key assures that all messages for the same topic will be stored in the same Kafka partition and can later be consumed in the same sequence as they are written. For more details see section on Messages / Message Key in Apache Kafka Core Concepts. |
-p |
Produce processing. Records are read from all MVS data sets and USS files allocated to DD name RZKFILE and are produced as messages into the specified Kafka topic. |
-r# |
Repeat maximum. RZKCOPY consume processing ends after # messages have been consumed from the specified Kafka topic. The default value used by RZKCOPY is zero, means no maximum for the number of messages to be consumed is defined. |
-t# |
Timeout intervall in seconds for consume processing. RZKCOPY consume processing will end if there were no new messages available from Kafka for the last # seconds. The default value used by RZKCOPY is zero, means no timeout intervall is specified. |
-v# |
Debug level within the range of 0..15. Issue almost no (#=0) or almost all debug messages (#=15). Please use for error diagnosis purposes only. Avoid using this in a production environment as it generates a large amount of messages written to DD name |
Parameter Examples
Produce all records found at DD name RZKFILE
in binary format to Kafka myKafka.myDomain.de
at default port 9092
into topic myTopic
using message key myKey
:
RZKCOPY -b -p -k myKey myKafka.myDomain myTopic
Consume messages from Kafka myKafka.myDomain.de
at port 9070
from topic myTopic
using consumer group myGroup
with UTF-8 to EBCDIC conversion and write them to DD name RZKFILE
. Consume processing ends if either 1.000 Kafka messages have been received or there were no new messages available from Kafka for the last 60 seconds:
RZKCOPY -c -g myGroup -r1000 -t60 myKafka.myDomain:9070 myTopic
RZKCOPY Examples
The following table contains an overview of all examples available in the SAMPLIB
related to the RZKCOPY utility:
Member | Description |
RZKCPYAJ |
JCL to assemble, link/bind and run the RZKCPYAP Assembler example program. |
RZKCPYAP |
Assembler example program to call RZKCOPY to produce messages from a pre-allocated MVS data set. |
RZKCPYBK |
JCL to compile, link/bind and run the RZKCPYBP Cobol example program. |
RZKCPYBP |
Cobol example program to call RZKCOPY to produce messages from a pre-allocated MVS data set. |
RZKCPYJC |
RZKCOPY JCL to consume messages from a Kafka topic and write them into an MVS data set. |
RZKCPYJD |
RZKCOPY JCL to consume messages from a Kafka topic and write them into a USS file. |
RZKCPYJP |
RZKCOPY JCL to produce messages from an MVS data sets into a Kafka topic. |
RZKCPYJQ |
RZKCOPY JCL to produce messages from a USS files into a Kafka topic. |
RZKCPYRC |
REXX example program to call RZKCOPY to consume messages from a Kafka topic into an MVS data set. |
RZKCPYRP |
REXX example program to call RZKCOPY to produce messages from an MVS data set into a Kafka topic. |
Just for a first impression, you will find a few shortened examples below. For the most complete examples, please refer to the SAMPLIB
members described in the table above.
Produce MVS Data Set JCL Example
This example has been shortened. For the complete code please see SAMPLIB(RZKCPYJP)
.
//jobname JOB
//*
//RZKCOPY EXEC PGM=RZKCOPY,
// PARM='-p -k myKey myKafka.myDomain.de:9092 myTopic'
//*
//STEPLIB DD DISP=SHR,DSN=myHlq.RZKV120.LOADLIB
//RZKFILE DD DISP=SHR,DSN=my.data.set.tobe.produced
//SYSPRINT DD SYSOUT=*
//
Consume MVS Data Set JCL Example
This example has been shortened. For the complete code please see SAMPLIB(RZKCPYJC)
.
//jobname JOB
//*
//RZKCOPY EXEC PGM=RZKCOPY,
// PARM='-c -g myGroup -t10 myKafka.myDomain.de:9092 myTopic'
//*
//STEPLIB DD DISP=SHR,DSN=myHlq.RZKV120.LOADLIB
//RZKFILE DD DSN=my.data.set.for.consume,
// DISP=(NEW,CATLG,DELETE),
// UNIT=SYSDA,SPACE=(CYL,(30,15),RLSE),
// DCB=(RECFM=VB,LRECL=255,BLKSIZE=27998)
//SYSPRINT DD SYSOUT=*
//
Produce MVS Data Set REXX Example
This example has been shortened. For the complete code please see SAMPLIB(RZKCPYRP)
.
/* TSO REXX */
RzkDsn = "my.data.set.tobe.produced";
Broker = "myKafka.myDomain.de:9092";
Topic = "myTopic";
Func = "-p -k myKey";
Opts = "-v1";
JclPrm = Func Opts Broker Topic;
"ALLOCATE FILE(RZKFILE) DATASET('"RzkDsn"') SHR REUSE";
ADDRESS "LINKMVS" "RZKCOPY JclPrm";
"FREE FILE(RZKFILE)";
RETURN 0;
Produce MVS Data Set Cobol Example
This example has been shortened. For the complete code please see SAMPLIB(RZKCPYBP)
.
Please note, that this example assumes the files to be produced are already allocated to DD name RZKFILE.
* Required RZKCOPY EXEC PARM= String ---------------------------*
01 JclParm.
05 Len PIC S9(4) COMP-5 VALUE 48.
05 Parm.
10 Pro PIC X(11) VALUE "-p -k myKey".
10 Fi1 PIC X(01) VALUE " ".
10 Opt PIC X(03) VALUE "-v1".
10 Fi2 PIC X(01) VALUE " ".
10 Srv PIC X(24) VALUE "myKafka.myDomain.de:9092".
10 Fi3 PIC X(01) VALUE " ".
10 Tpc PIC X(07) VALUE "myTopic".
01 RzkRc PIC S9(9) COMP-5 VALUE 0.
CALL "RZKCOPY" USING BY CONTENT JclParm RETURNING RzkRc.
Produce MVS Data Set Assembler Example
This example has been shortened. For the complete code please see SAMPLIB(RZKCPYAP)
.
Please note, that this example assumes the files to be produced are already allocated to DD name RZKFILE.
*
LINK EP=RZKCOPY,PARAM=(JclParm),VL=1
*
*---------------------------------------------------------------------*
* Required RZKCOPY EXEC PARM= String. *
*---------------------------------------------------------------------*
JclParm DC AL2(JclParm$) ;EXEC PARM= String Length
Pro DC C'-p -k myKey' ;Produce, MsgKey
DC C' ' ;
Opt DC C'-v1' ;DbgLvl
DC C' ' ;
Brk DC C'myKafka.myDomain.de:9092'
DC C' ' ;
Tpc DC C'myTopic' ;Topic
JclParm$ EQU *-JclParm ;
*--------------------------------------;
Using the RzK API
You may use the zKafka Connector API in almost any environment and programming language.
To achieve this, the RzK API is provided by two modules, which serve different purposes depending on your calling program environment and needs:
- RZK: The RzK core logic is contained in a z/OS Language Environment (LE) Dynamic Link Library (DLL) named "RZK" with the following attributes:
-
- Reentrant (RENT)
- 31-bit addressing mode (AMODE 31)
- Non-APF-authorized (AC 0)
- Non-XPLINK
- No POSIX(ON) LE runtime option required, but allowed.
- As a format 3 program object, it must reside within a PDSE load library.
- RZKSTUB: Significantly smaller module, designed to be linked/bound into your programs independant of their runtime environment.
- Reentrant (RENT)
- 31-bit addressing mode (AMODE 31)
- Non-APF-authorized (AC 0)
- ALIASes RZKCALL, RZKPARM, RZKCOPY
- As a classic load module, it may reside in any load library.
Depending on whether the calling programs are running with the z/OS Language Environment or not, there are a few things to consider for an optimal use of the Rzk API in your programs.
You also need to decide whether you want to link/bind RzK itself statically into your program during build, call the RzK API dynamically at program runtime, or use our provided RzK stub as a hybrid-solution that combines the advantages of both variants.
In the following sections, you will find some more details that may help you in making these decisions.
LE API vs. CALL API
In order to provide all API functions you need, the RZK DLL requires an active z/OS Language Environment.
LE API
LE-callers (C/C++, Cobol, Fortran, PL/I, GoLang, Swift) may use the RZK DLL directly. RzK will use and run within the existing Language Environment of the calling program.
CALL API
Typically, LE programs calling other LE programs or DLLs, are passing their parameters differently than non-LE programs do.
Non-LE-callers (e.g. Assembler, REXX), which are passing parameters through the standard z/OS CALL API, have to use the provided RzK CALL API stub module "RZKCALL". RZKCALL will provide the required Language Environment, loads the RZK DLL (only) on the first call, transforms the passed CALL API parameters to an LE-format and calls the requested RzK DLL API function.
Mixed API
Some non-LE languages (eg. Natural Programming Language NPL) support using the standard z/OS CALL API or - after some additional setup - using the LE API. These languages - depending on which API is used - may use the RZK DLL directly or have to use the RzK stub RZKCALL.
POSIX
The POSIX(ON) LE runtime option is not required, but allowed. This allows RzK to be used from CICS, where multithreading is not allowed and the POSIX(ON) LE runtime option is ignored.
Static Bind vs. Dynamic Call
To use RzK from an LE program, you have four choices how to invoke the RzK API functions:
- Include module RZK during link/bind processing and call RZKFUNC() to access the RzK functions. Use the
SAMPLIB(RZKAPICK)
JCL member to compile, link/bind and run C programms including module RZK. - Include module RZKSTUB during link/bind processing and call RZKSTUB() to access the RzK functions. Use the
SAMPLIB(RZKAPICJ)
JCL member to compile, link/bind and run C programms including module RZKSTUB. - Implicit dynamically load the RZK LE Dynamic Link Library (DLL) and call RZKFUNC() to access the RzK functions. Use the
SAMPLIB(RZKAPICI)
JCL member to compile, link/bind and run your C programms, but not including any RzK modules. - Explicit dynamically load the RZK LE Dynamic Link Library (DLL) and call RZKFUNC() to access the RzK functions. Use the
SAMPLIB(RZKAPICL)
JCL member to compile, link/bind and run your C programms, but not including any RzK modules.
To assist you in finding the best choice for your needs, please also read the language specific details at chapter RzK API Example Code.
RzK API Function Call Reference
RzK is an advanced client API to address the two basic Kafka clients "Producer" and "Consumer". Some additional functionality is provided to further support the developer in using the producer and consumer functions, and the handling of the Kafka cluster. This chapter provides an overview of the multiple RzK function calls.
Terms and Conventions
The following terms and conventions are used for the zKafka Connector and its API:
RzK
- We use RzK to refer to the zKafka Connector, short for rvs-Systems zKafka Connector.
zKafka Connector API Function Naming Convention
- All zKafka Connector API functions start with "RZK" and are followed by a four character description of the function itself.
- Each function is invoked by calling "RZKFUNC" (for "function") and providing the corresponding function parameter.
- Eg. The RzK API function to produce / publish a message to your Kafka cluster is "RZKPROD".
zKafka Connector Function Codes
- The function code is always the first input parameter for every standard RzK API CALL, i.e. the first parameter provided to "RZKFUNC".
- All function codes start by an "FCT" for function.
- They are followed by a 4 character abbreviation that describes the function itself.
- Eg. The function code to produce a message to your Kafka cluster is "FCTPROD".
Parameters
- Parameters need to be provided in the order defined in the following chapters.
RZKOPEN - Open
The RZKOPEN API function initializes the RzK environment for one connection to a Kafka broker and returns a handle for all subsequent API calls to this broker.
An application can connect to multiple Kafka clusters or to the same Kafka Cluster using different topics. RZKOPEN is required for each of the connections/sessions.
Seq. # |
Parameter |
Type |
Range |
In/Out |
Description |
1 |
Function Code |
32-bit Signed Integer |
|
In |
FCTOPEN |
Return Codes
The function returns a handle to be used in subsequent API calls. In case of an error , -1 is returned.
Value |
Description |
>= 0 |
Handle |
-1 |
Error |
Code Examples
For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB
library.
Cobol
CALL "RZKSTUB" USING BY VALUE FCTOPEN RETURNING Handle.
IF Handle >= 0 THEN
.
.
ELSE
DISPLAY "Error in RZKOPEN, Handle=x'" FUNCTION HEX-OF(Handle) "'"
END-IF.
C
Handle = RZKSTUB(FCTOPEN);
if (Handle >= 0)
{
.
.
}
else
printf("Error in RZKOPEN, Handle=x'%08p'\n", Handle);
Assembler
CALL (RZK),(=A(FCTOPEN)),VL,MF=(E,WaCall)
*
IF (LTR,R15,R15,NM) ;Handle >= Zero?
*
ST R15,WaHandle ;Handle returned by RZKOPEN
.
.
ELSE ,
WTO 'Error in RZKOPEN',ROUTCDE=(2,11)
ENDIF ,
REXX
Handle = RzkOpen();
IF Handle >= 0
THEN
DO
.
.
END /* of IF Handle >= 0 THEN DO */
ELSE
SAY "Error in RZKOPEN, Handle=x'"D2X(Handle, 8)"'";
Natural
CALL 'RZKCALL' #FCTOPEN
*
#Handle := RET('RZKCALL')
*
IF #Handle >= 0
THEN
.
.
ELSE
WRITE 'Error in RZKOPEN'
END-IF
RZKCONN - Connect
The Connect function returns the connection information (IP and Port) of one broker of the Kafka cluster and the topic requested.
RzK will then connect to this broker and get the layout of the cluster and the information about which brokers are handling the partitions and replicas of the specified topic.
When RZKCONN returns without error, the Kafka cluster can be reached, the optionally provided authentication information was accepted and the topic is known and available in the Kafka cluster.
RzK requires that topics are defined before they are used. Dynamic topic creation is not supported.
The information required to connect to the Kafka cluster:
- BrokerName: Kafka refers to a single Kafka server as a broker. Kafka brokers are designed to operate as part of a Kafka cluster. The BrokerName (IP address or hostname) specified for the connection can be any of the instances (brokers) of the Kafka cluster.
- Port: the TCP/IP port number used for the connection to be established.
- Topic: Messages in Kafka are organized into "topics". It is kind of a category where the message ends up.
Parameters
Seq. # |
Parameter |
Type |
Range |
In/Out |
Description |
1 |
Function Code |
32-bit Signed Integer |
|
In |
FCTCONN |
2 |
Handle |
32-bit Signed Integer |
internal |
In |
RzK Connection Handle returned by a previous RZKOPEN call. |
3 |
Broker Name |
32-bit String Pointer |
|
In |
Hostname or IP address followed by ':' and the port of one of the brokers/instances of the desired kafka cluster. |
4 |
Broker Name Length |
32-bit Signed Integer |
1..256 |
In |
Kafka Broker Name Length |
5 |
Topic Name |
32-bit String Pointer |
|
In |
Kafka Topic Name of the topic that the application wants to produce to or consume from. |
6 |
Topic Name Length |
32-bit Signed Integer |
0..255 |
In |
Kafka Topic Name Length |
Return Codes
The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.
Value |
Description |
0 |
Successful completion |
>0 |
Error. Please find further return code information in chapter Messages and Codes. |
Code Examples
For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB
library.
Cobol
CALL "RZKSTUB" USING BY VALUE FCTCONN Handle
BY CONTENT BrokerS BY VALUE BrokerL
BY CONTENT TopicS BY VALUE TopicL
RETURNING RzkRc.
C
RzkRc = RZKSTUB(FCTCONN, Handle, BrokerS, BrokerL, TopicS, TopicL);
Assembler
CALL (RZK),(=A(FCTCONN),WaHandle, +
BrokerS,BrokerL,TopicS,TopicL),VL,MF=(E,WaCall)
REXX
RzkRc = RzkConn(Handle, BrokerS, TopicS);
Natural
CALL 'RZKCALL' #FCTCONN #Handle #BrokerS #BrokerL #TopicS #TopicL
RZKPROD - Produce
The Kafka Producer (client) or produce function creates new messages. The produce is often called publish or writer (publisher or writer) in other systems. The produce requires a successful connection to the Kafka cluster. This connection contains the destination for the message, i.e. the Kafka cluster and the topic.
Use the RzK produce function to store a message in the Kafka cluster.
Message Conversion
You can request a message conversion from EBCDIC to UTF-8 (see "Parameters" below) or just send a binary message.
Messages are handled opaque in the Kafka cluster, no codepage conversion is provided by the Kafka cluster. If the data that you want to write into the Kafka cluster is in a readable format (e.g. normal text strings, JSON or XML format) and you want to be able that either UNIX or Windows systems can read (consume) these messages from the Kafka cluster, you need to set the conversion flag to '1'.
Message Key
Message key is an optional parameter.
- Using a message key assures that all messages for the same topic with that message key will be stored in one partition only. In that way Kafka ensures that the sequence of messages consumed will be the same sequence that the messages were written.
- Without a message key, the messages sent (produced) to a Kafka cluster will be distributed between multiple partitions to improve throughput and storage usage. In that case, the sequence of messages can not be guaranteed by Kafka when the messages are retrieved (consumed).
If no message key is provided, the messages will be sent to a partition according to the round robin rule.
Parameters
Seq. # |
Parameter |
Type |
Range |
In/Out |
Description |
1 |
Function Code |
32-bit Signed Integer |
|
In |
FCTPROD |
2 |
Handle |
32-bit Signed Integer |
internal |
In |
RzK connection handle returned by a previous RZKOPEN call. |
3 |
MessageKey |
32-bit String Pointer |
|
In |
Pointer to a string with the message key |
4 |
MessageKey Length |
32-bit Signed Integer |
1..256 |
In |
Length of the message key. Specify 0 to use no message key. |
5 |
Message |
32-bit String Pointer |
|
In |
Pointer to the message. |
6 |
Message Length |
32-bit Signed Integer |
0..MaxInt |
In |
Length of the message. |
7 |
UTF-8 Conversion |
32-bit Signed Integer |
0,1 |
In |
Flag to request conversion of Message to UTF-8 Codeset |
Return Codes
The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.
Value |
Description |
0 |
Successful completion |
>0 |
Please find further information about the return values in chapter Messages and Codes |
Code Examples
For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB
library.
Cobol
CALL "RZKSTUB" USING BY VALUE FCTPROD Handle
BY CONTENT KeyS BY VALUE KeyL
BY CONTENT MsgS BY VALUE MsgL
BY VALUE Conv
RETURNING RzkRc.
C
RzkRc = RZKSTUB(FCTPROD, Handle, KeyS, KeyL, MsgS, MsgL, Conv);
Assembler
CALL (RZK),(=A(FCTPROD),WaHandle, +
KeyS,KeyL,MsgS,MsgL,Conv),VL,MF=(E,WaCall)
REXX
RzkRc = RzkProd(Handle, KeyS, MsgS, Conv);
Natural
CALL 'RZKCALL' #FCTPROD #Handle #KeyS #KeyL #MsgS #MsgL #Conv
RZKCONS - Consume
A Consume is the operation to read or fetch data from a Kafka cluster.
In the same way that multiple producers can write messages to a Kafka cluster, multiple consumers can read those messages.
Message Conversion
You can optionally request a message conversion from UTF-8 to EBCDIC (see "Parameters" below).
Consumer Groups
- Consumer groups serve two purposes:
- Keep track of which messages are already read and ensure that, if an application is stopped and started again, no messages are read twice
- Allow load balancing between multiple applications for consuming messages.
For details on load balancing see RZKCLDB - Load Balancing
- Keep track of which messages are already read and ensure that, if an application is stopped and started again, no messages are read twice
- Groups are created automatically on the Kafka cluster when used.
- Group information (e.g. last offset consumed) is held and managed by the Kafka cluster.
- Cleaning/removing a group is done using Kafka administration tools on the Kafka cluster.
- The group name is used in Kafka to manage the current consume offset.
- To get all messages in a topic from the beginning, use a group name that has not been used before or get the group name removed on the Kafka cluster.
- To continue getting messages from the point (offset) where you last consumed, use the same group name as before.
- To get all messages in a topic from the beginning, use a group name that has not been used before or get the group name removed on the Kafka cluster.
Parameters
Seq. # |
Parameter |
Type |
Range |
In/Out |
Description |
1 |
Function Code |
32-bit Signed Integer |
|
In |
FCTCONS |
2 |
Handle |
32-bit Signed Integer |
internal |
In |
RzK connection handle returned by a previous RZKOPEN call. |
3 |
Group Name |
32-bit String Pointer |
|
In |
Consumer group name |
4 |
Group Name Length |
32-bit Signed Integer |
1..256 |
In |
Consumer group name length |
5 |
Message Buffer |
32-bit String Pointer |
|
In |
Pointer to output message buffer for consumed message. Buffer - not address - will be updated with consumed message. |
6 |
Message Buffer Length |
32-bit Signed Integer |
0..MaxInt |
In/Out |
On input: Size of provided output buffer. On output: Size of used output buffer, means length of returned consumed message. |
7 |
UTF-8 Conversion |
32-bit Signed Integer |
0,1 |
In |
Flag to request conversion of Message from UTF-8 Codeset to EBCDIC |
8 |
Timeout |
32-bit Signed Integer |
|
In |
Number of seconds to wait for a message. Specify 0 to wait forever. |
Return Codes
The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.
Value |
Description |
0 |
Successful completion |
>0 |
Please find further information about the return codes in chapter Messages and Codes |
Code Examples
For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB
library.
Cobol
MOVE BufSize TO BufferL *> (Re)Set maximum buffer size
CALL "RZKSTUB" USING BY VALUE FCTCONS Handle
BY CONTENT GroupS BY VALUE GroupL
BY REFERENCE BufferS BufferL
BY VALUE Conv TimeOut
RETURNING RzkRc.
IF RzkRc = 0 THEN
DISPLAY "(" BufferL ") " BufferS(1:BufferL)
ELSE
DISPLAY "Error in RZKCONS, RzkRc=x'" FUNCTION HEX-OF(RzkRc) "'"
END-IF.
C
BufferL = BUF_SIZE; // (Re)Set maximum buffer size
RzkRc = RZKSTUB(FCTCONS, Handle,
GroupS, GroupL, &BufferS, &BufferL, Conv, TimeOut);
if (RzkRc == 0)
printf("(%05d) %.*s\n", BufferL, BufferL, BufferS);
else
printf("Error in RZKCONS, RzkRc=x'%08p'\n", RzkRc);
Assembler
IILF R5,BufSize$
ST R5,WaBuffL ;(Re)Set maximum buffer size
*
CALL (RZK),(=A(FCTCONS),WaHandle, +
GroupS,GroupL,WaBuffS,WaBuffL,Conv,TimeOut),VL, +
MF=(E,WaCall)
*
IF (LTR,R15,R15,Z) ;RzkRc = 0
WTO TEXT=WaBuffL+2,MF=(E,WaWto) ;Show consumed message
ELSE ,
WTO 'Error in RZKCONS',ROUTCDE=(2,11)
ENDIF ,
REXX
RzkRc = RzkCons(Handle, GroupS, Conv, TimeOut);
IF RzkRc == 0
THEN
SAY Rzk.Handle.#MSG
ELSE
SAY "Error in RZKCONS, RC=x'"D2X(RzkRc,8)"'";
Natural
MOVE #BufMaxL TO #BufferL /* (Re)Set maximum buffer size
*
CALL 'RZKCALL' #FCTCONS #Handle
#GroupS #GroupL #BufferS #BufferL #Conv #TimeOut
#RzkRc := RET('RZKCALL')
*
IF #RzkRc = 0
THEN
WRITE '(' #BufferL ') ' SUBSTRING(#BufferS,1,#BufferL)
ELSE
WRITE 'Error in RZKCONS, RC=' #RzkRc
END-IF
RZKCLSE - Close
The Close function will end the connection to the Kafka cluster associated with the handle provided.
If there are no other connections open, Close also does a cleanup of the RzK environment by
freeing up all memory, stopping all threads and getting ready for the language environment (LE) to be unloaded.
The RzK Handle is no longer valid after returning from Close.
Parameters
Seq. # |
Parameter |
Type |
Range |
In/Out |
Description |
1 |
Function Code |
32-bit Signed Integer |
|
In |
FCTCLSE |
2 |
Handle |
32-bit Signed Integer |
internal |
In |
RzK connection handle returned by a previous RZKOPEN call. |
Return Codes
The function returns 4 if other connection are still open. If there are no further open connections, the Close function cleans up the RzK, frees all memory and stops all threads to be ready for the LE to be ended.
Value |
Description |
0 |
Successful completion: No more open connections. |
4 |
Successful completion: This session to a Kafka cluster is closed, other sessions are still active. |
Code Examples
For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB
library.
Cobol
CALL "RZKSTUB" USING BY VALUE FCTCLSE Handle
RETURNING RzkRc.
C
RzkRc = RZKSTUB(FCTCLSE, Handle);
Assembler
CALL (RZK),(=A(FCTCLSE),WaHandle),VL,MF=(E,WaCall)
REXX
RzkRc = RzkClse(Handle);
Natural
CALL 'RZKCALL' #FCTCLSE #Handle
RZKAUTH - Authentication
RzK supports the SASL PLAIN authentication mechanism. Use RZKAUTH to set UserId and Password/Secret for the authentication.
If your Kafka cluster requires authentication, you must use this function before issuing an RZKCONN call. Otherwise, if no credentials have been provided or if they had been wrong, RZKCONN will fail.
Parameters
Seq. # |
Parameter |
Type |
Range |
In/Out |
Description |
1 |
Function Code |
32-bit Signed Integer |
|
In |
FCTAUTH |
2 |
Handle |
32-bit Signed Integer |
internal |
In |
RzK Connection Handle returned by previous a RZKOPEN call. |
3 |
User Name |
32-bit String Pointer |
|
In |
User Name to be used for authentication |
4 |
User Name Length |
32-bit Signed Integer |
1..256 |
In |
User Name Length |
5 |
Password |
32-bit String Pointer |
|
In |
Password to be used for authentication |
6 |
Password Length |
32-bit Signed Integer |
1..256 |
In |
Password Length |
Return Codes
The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.
Value |
Description |
0 |
Successful completion |
>0 |
Error. Please find further return code information in chapter Messages and Codes. |
Code Examples
For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB
library.
Cobol
CALL "RZKSTUB" USING BY VALUE FCTAUTH Handle
BY CONTENT UserS BY VALUE UserL
BY CONTENT PwrdS BY VALUE PwrdL
RETURNING RzkRc.
C
RzkRc = RZKSTUB(FCTAUTH, Handle, UserS, UserL, PwrdS, PwrdL);
Assembler
CALL (RZK),(=A(FCTAUTH),WaHandle, +
UserS,UserL,PwrdS,PwrdL),VL,MF=(E,WaCall)
REXX
RzkRc = RzkAuth(Handle, UserS, PwrdS);
Natural
CALL 'RZKCALL' #FCTAUTH #Handle #UserS #UserL #PwrdS #PwrdL
RZKDBUG - Debug Level
RzK has the ability to display detailed debugging messages, including a data trace of the communication with the Kafka Broker. The RZKDBUG API call is used to control the type of messages being displayed.
These debug messages are needed to analyze connection and communication issues and you might be asked (by the RzK support team) to set a specific value and provide a certain level in case of an issue.
It is not recommended to set this to anything else but '0', unless asked by the RzK Support to do so.
Using a high debug level value will create a lot of messages which in turn will reduce the performance, increase CPU load and memory usage.
These debug messages can be enabled with the following input values
- 0 Only error messages are issued.
- 1-15 Messages with less (1) or more (15) detailed debugging information are issued.
Parameters
Seq. # |
Parameter |
Type |
Range |
In/Out |
Description |
1 |
Function Code |
32-bit Signed Integer |
|
In |
FCTDBUG |
2 |
Handle |
32-bit Signed Integer |
internal |
In |
RzK Connection Handle returned by previous a RZKOPEN call. |
3 |
DebugLevel |
32-bit Signed Integer |
0-15 |
In |
The requested debug level. |
Return Codes
The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.
Value |
Description |
0 |
Successful completion |
>0 |
Error. Please find further return code information in chapter Messages and Codes. |
Code Examples
For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB
library.
Cobol
CALL "RZKSTUB" USING BY VALUE FCTDBUG Handle DbugLvl
RETURNING RzkRc.
C
RzkRc = RZKSTUB(FCTDBUG, Handle, DbugLvl);
Assembler
CALL (RZK),(=A(FCTDBUG),WaHandle,DbugLvl),VL,MF=(E,WaCall)
REXX
RzkRc = RzkDbug(Handle, DbugLvl);
Natural
CALL 'RZKCALL' #FCTDBUG #Handle #DbugLvl
RZKGERR - Get Last Error
Get Last Error can be used to retrieve the last error code that was reported by RzK and a detailed message for it.
Parameters
Seq. # |
Parameter |
Type |
Range |
In/Out |
Description |
1 |
Function Code |
32-bit Signed Integer |
|
In |
FCTGERR |
2 |
Handle |
32-bit Signed Integer |
internal |
In |
RzK connection handle returned by a previous RZKOPEN call. |
5 |
Message Buffer |
32-bit String Pointer |
|
In |
Pointer to output message buffer for detailed error message. Buffer - not address - will be updated with consumed message. |
6 |
Message Buffer Length |
32-bit Signed Integer |
0..MaxInt |
In/Out |
On input: Size of provided output buffer. On output: Size of used output buffer, means length of returned error message. |
Return Codes
The function returns the last error code. If it returns 0, there was no error recorded in this connection.
Value |
Description |
0,7500... |
Last Error code |
Code Examples
For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB
library.
Cobol
MOVE BufSize TO BufferL *> (Re)Set maximum buffer size
CALL "RZKSTUB" USING BY VALUE FCTGERR Handle
BY REFERENCE BufferS BufferL
RETURNING RzkRc.
DISPLAY RzkRc "(" BufferL ") " BufferS(1:BufferL)
C
BufferL = BUF_SIZE; // (Re)Set maximum buffer size
RzkRc = RZKSTUB(FCTGERR, Handle, &BufferS, &BufferL);
printf("%05d %.*s\n", RzkRc, BufferL, BufferS);
Assembler
IILF R5,BufSize$
ST R5,WaBuffL ;(Re)Set maximum buffer size
*
CALL (RZK),(=A(FCTGERR),WaHandle,WaBuffS,WaBuffL),VL, +
MF=(E,WaCall)
*
WTO TEXT=WaBuffL+2,MF=(E,WaWto) ;Show error message
REXX
RzkRc = RzkGerr(Handle);
SAY RzkRc Rzk.Handle.#ERR;
Natural
MOVE #BufMaxL TO #BufferL /* (Re)Set maximum buffer size
*
CALL 'RZKCALL' #FCTGERR #Handle #BufferS #BufferL
*
WRITE RET('RZKCALL') SUBSTRING(#BufferS,1,#BufferL)
RZKCLDB - Load Balancing
RzK supports the load balancing protocol of Kafka clusters. With this protocol, multiple applications can read from the same topic and each message is only received by one of the applications.
E.g. if two applications are connected to the same Kafka cluster, consuming the same topic with the same group name (for more details on this see RZKCONS - Consume) and both have load balancing enabled, the available messages will be distributed between the two applications.
If the Flag is set to:
- 0 - Load balancing is disabled.
- If there is already an application connected with the same consumer group, the consume fails.
- If additional consumers are trying to join the same consumer group, they are rejected.
- 1 - Load balancing is enabled.
- If multiple applications start a consume using the same group, the Kafka consume load balancing is used. This means, that, if the topic has multiple partitions, each application gets some of the partitions assigned and then reads only data from those partitions.
- Reading Messages form other partitions will be done by the other applications.
- This is all handled by the RzK.
- If multiple applications start a consume using the same group, the Kafka consume load balancing is used. This means, that, if the topic has multiple partitions, each application gets some of the partitions assigned and then reads only data from those partitions.
Parameters
Seq. # |
Parameter |
Type |
Range |
In/Out |
Description |
1 |
Function Code |
32-bit Signed Integer |
|
In |
FCTCLDB |
2 |
Handle |
32-bit Signed Integer |
internal |
In |
RzK connection handle returned by a previous RZKOPEN call. |
3 |
Flag |
32-bit Signed Integer |
0,1 |
In |
Enable (1) or disable (0) the consume load balancing. |
Return Codes
The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.
Value |
Description |
0 |
Successful completion |
>0 |
Please find further information about the return values in chapter Messages and Codes |
Code Examples
For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB
library.
Cobol
CALL "RZKSTUB" USING BY VALUE FCTCLDB Handle LoadBal
RETURNING RzkRc.
C
RzkRc = RZKSTUB(FCTCLDB, Handle, LoadBal);
Assembler
CALL (RZK),(=A(FCTCLDB),WaHandle,LoadBal),VL,MF=(E,WaCall)
REXX
RzkRc = RzkCldb(Handle, LoadBal);
Natural
CALL 'RZKCALL' #FCTCLDB #Handle #LoadBal
RZKSYNC - Set Synchronous Mode
Depending on network load, network latency and the Kafka cluster load/performance, it takes some time between a request being sent to the Kafka cluster and the return of the reply for the request.
Use the "Set Synchronous Mode" function for produce and consume:
- Synchronous mode for the Producer:
RzK has to wait for the reply from the Kafka cluster before control is returned to the application (after a produce function call). - Synchronous mode for the Consumer:
The "Set Synchronous Mode" assures that consume messages will be returned to the application right after the corresponding offset of the message is committed to the Kafka cluster.
Offsets are stored in the Kafka cluster and managed by the Kafka cluster for each group name and topic.
This allows the RzK after a reconnect to start reading from the last offset that was committed.
Parameters
Seq. # |
Parameter |
Type |
Range |
In/Out |
Description |
1 |
Function Code |
32-bit Signed Integer |
|
In |
FCTSYNC |
2 |
Handle |
32-bit Signed Integer |
internal |
In |
RzK connection handle returned by a previous RZKOPEN call. |
3 |
Mode |
32-bit Signed Integer |
0,1,2 |
In |
Synch Mode (default=0) |
4 |
Send Window |
32-bit Unsigned Integer |
0..10000 |
In |
Window size for messages sent to the Kafka server (default = 100) |
5 |
Receive Window |
32-bit Unsigned Integer |
0..10000 |
In |
Window size for messages received/consumed before the offset is commited to the Kafka server (default = 100) |
Mode
- 0 - asynchronous mode.
- Produce messages will be sent immediately to the Kafka cluster. RzK will not wait for a reply unless there are more than "Send Window Size" messages sent without a reply from the Kafka cluster.
- Consume messages will be returned to the application until the "Receive Window Size" is reached. At this point, the offset of the last message consumed will be committed to the Kafka cluster.
- This is the default.
- 1 - synchronous mode.
- Produce messages will be sent immediately to the Kafka cluster and control is returned only after the reply from the Kafka server is received.
- Consume messages will be returned to the application right after the corresponding offset of the message is committed to the Kafka cluster.
- 2 - force an immediate synchronization.
- RzK waits for all outstanding replies from the Kafka cluster and commits the current consume/receive offset.
- Finally, the mode is set to asynchronous (0).
Send Window
- In asynchronous mode, RzK only waits for a reply if more than "Send Window" size messages are in transit.
- The default value is 100.
- When a value of 0 is specified, the default value is used.
Receive Window
- In asynchronous mode, RzK only sends a offset commit when "Receive Window" size messages were returned to application (consume).
- The default value is 100.
- When a value of 0 is specified, the default value is used.
Return Codes
The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.
Value |
Description |
0 |
Successful completion |
>0 |
Please find further information about the return values in chapter Messages and Codes |
Code Examples
For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB
library.
Cobol
CALL "RZKSTUB" USING BY VALUE FCTSYNC Handle
BY VALUE SyncLvl SndWin RecWin
RETURNING RzkRc.
C
RzkRc = RZKSTUB(FCTSYNC, Handle, SyncLvl, SndWin, RecWin);
Assembler
CALL (RZK),(=A(FCTSYNC),WaHandle, +
SyncLvl,SndWin,RecWin),VL,MF=(E,WaCall)
REXX
RzkRc = RzkSync(Handle, SyncLvl, SndWin, RecWin);
Natural
CALL 'RZKCALL' #FCTSYNC #Handle #SyncLvl #SndWin #RecWin
RZKTERM - Terminate
This function is of use / required when an RzK cleanup in case of an error (e.g. recovery routine) is needed, and you miss any knowledge of still open RzK handles.
The Terminate function closes all open "connections" to Kafka clusters. All handles retrieved previously by an Open function call are invalidated. All memory is returned to the system, all threads are stopped.
Parameters
Seq. # |
Parameter |
Type |
Range |
In/Out |
Description |
1 |
Function Code |
32-bit Signed Integer |
|
In |
FCTTERM |
Return Codes
RZKTERM returns always zero.
Value |
Description |
0 |
|
Code Examples
For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB
library.
Cobol
CALL "RZKSTUB" USING BY VALUE FCTTERM RETURNING RzkRc.
C
RzkRc = RZKSTUB(FCTTERM);
Assembler
CALL (RZK),(=A(FCTTERM)),VL,MF=(E,WaCall)
REXX
RzkRc = RzkTerm();
Natural
CALL 'RZKCALL' #FCTTERM
RZKTIMO - Group Member Time Out
Kafka requires a separate connection for storing and retrieving consume offsets and consume load balancing. This connection is based on the consumer group name (that is specified when the RzK consume function is called). This function is used to change the timeout value for this connection.
RzK is required to send a "Heartbeat" so that this timeout does not expire. That's why the Application using RzK needs to do a consume or produce call at least once within the specified timeout value so that a HeartBeat can be sent to the kafka server.
Using a higher timeout value here allows the Application to spend a longer time until the next call to the RzK is required.
On the other side, this value also defines time after that the Kafka server assumes the application dead and releases the group for a reconnect of the application. So, this value influences the time it takes until an Application that abended can resume consuming messages from the kafka server after a restart.
It is recommended to use a value between 10 and 30 seconds.
For debugging an application, it might be useful to increased this to several minutes.
The RzK default is 30 seconds.
Parameters
Seq. # |
Parameter |
Type |
Range |
In/Out |
Description |
1 |
Function Code |
32-bit Signed Integer |
|
In |
FCTTIMO |
2 |
Handle |
32-bit Signed Integer |
internal |
In |
RzK connection handle returned by a previous RZKOPEN call. |
3 |
ConnTim |
32-bit Signed Integer |
|
In |
Timeout for the Group Member Connection in seconds |
Return Codes
The function returns 0 on successful completion or an error code in case the function failed. The function RZKGERR - Get Last Error can be used to retrieve the error message related to the return code.
Value |
Description |
0 |
Successful completion |
>0 |
Please find further information about the return values in chapter Messages and Codes |
Code Examples
For a complete list of ready-to-run sample programs, language-specific include members and compile JCL, please have a closer look at chapter RzK API Example Code and the RzK-delivered SAMPLIB
library.
Cobol
CALL "RZKSTUB" USING BY VALUE FCTTIMO Handle ConnTim
RETURNING RzkRc.
C
RzkRc = RZKSTUB(FCTTIMO, Handle, ConnTim);
Assembler
CALL (RZK),(=A(FCTTIMO),WaHandle,ConnTim),VL,MF=(E,WaCall)
REXX
RzkRc = RzkTimo(Handle, ConnTim);
Natural
CALL 'RZKCALL' #FCTTIMO #Handle #ConnTim
RzK API Example Code
If you need some sample program source code to get started, you may proceed with one of the examples below. You will also find the required JCL to compile link/bind and run these samples out of the box.
Overview
All example programs and JCL can be found in the provided RzK SAMPLIB
. The API-related member names are in the format RZKAPI#%
, where #
is representing the programming language and %
is describing the content:
# |
Language |
A |
Assembler |
B |
Cobol |
C |
C |
J |
JCL |
K |
Configuration |
N |
Natural (NPL) |
R |
REXX |
With our program examples, we have tried to represent the most important and typical use cases. The required JCL is also included. All programs are executable out-of-the-box. If the standard z/OS JCL compile procedures are fully customized on your system, then the sample programs and the compile JCL should run immediately without any changes. You only need to adjust the names of the data sets you are using, as well as the Kafka-specific configuration such as broker IP address & port, and the topic name to use. Please also note the additional information in the respective example members.
% |
Member Description |
0 |
Language-specific RzK API include member or header file (if available). |
J |
Language-specific JCL to compile, link/bind and run the example programs. |
A |
All available RzK API calls in one program as a quick reference. |
P , Q |
Produce a single message to a Kafka topic (using different call techniques). |
C , D |
Consume a single message from a Kafka topic (using different call techniques). |
M , N |
Consume all (new) topic messages until an end-marker message is found (using different call techniques). |
T , U |
Consume all (new) topic messages until a timeout occures (using different call techniques). |
Just for a first impression, you will find a few shortened examples on the next language-specific pages. For the most complete examples, please always refer to the SAMPLIB
members itself.
To keep the sample code short, there are only a few checks for return codes and error messages.
Avoid doing this in production code.
Assembler
The following example members are available for the Assembler (HLASM) programming language:
Member | Description |
RZKAPIA0 |
Assembler RzK API COPY include member. |
Member | Including & calling RZKCALL |
---|---|
RZKAPIAJ |
JCL to assemble, link/bind and run the Assembler example programs. |
RZKAPIAA |
All available RzK API calls in one program as a quick reference. |
RZKAPIAP |
Produce a single message to a Kafka topic. |
RZKAPIAC |
Consume a single message from a Kafka topic. |
RZKAPIAM |
Consume all (new) topic messages until an end-marker message is found. |
RZKAPIAT |
Consume all (new) topic messages until a timeout occures. |
Remarks
- You may use the
SAMPLIB(RZKAPIAJ)
JCL member to assemble, link/bind and run the Assembler example programs using module RZKCALL. - It is not required to include RZKCALL during link/bind processing. All Assembler samples dynamically LOAD the RzK CALL API stub RZKCALL once and then CALL it repeatedly.
- All provided Assember example programs are reenterable (RENT) with AMODE(31) and RMODE(31), and are running non-APF authorized.
RZKCALL, which is an ALIAS of RZKSTUB, and module RZK must either be found in the STEPLIB, or in the z/OS LPALST or LNKLST concatenation.
To keep the sample code short, there are only a few checks for return codes and error messages.
Avoid doing this in production code.
Examples
Just for a first impression, you will find a few shortened examples below. For the most complete examples, please always refer to the SAMPLIB
members described in the table above.
Includes
You will find the required RzK constants definitions for Assembler programs in SAMPLIB(RZKAPIA0)
. Include this member with
COPY RZKAPIA0
into your program via DD name SYSLIB
during assembly.
***********************************************************************
* *
* Product: RVS z/OS Kafka Connector (RzK) *
* *
* Member: SAMPLIB(RZKAPIA0) *
* *
* Type: HLASM Assembler Source Include *
* *
* Function: Define required RzK constants for Assembler programs. *
* *
* Notes: o Use COPY RZKAPIA0 to include this member into your *
* Assembler program source code. *
* *
* (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
***********************************************************************
*
*---------------------------------------------------------------------*
* RVS z/OS Kafka Connector Function Codes. *
*---------------------------------------------------------------------*
FCTOPEN EQU 1 ;RZKOPEN
FCTCONN EQU 2 ;RZKCONN
FCTCLSE EQU 3 ;RZKCLSE
FCTPROD EQU 4 ;RZKPROD
FCTCONS EQU 5 ;RZKCONS
FCTDBUG EQU 6 ;RZKDBUG
FCTSYNC EQU 7 ;RZKSYNC
FCTAUTH EQU 8 ;RZKAUTH
FCTGERR EQU 9 ;RZKGERR
FCTCLDB EQU 10 ;RZKCLDB
FCTTERM EQU 11 ;RZKTERM
FCTTIMO EQU 12 ;RZKTIMO
*
Function Overview
The RzK Assembler API provides the following functions. See SAMPLIB(RZKAPIAA)
member for the complete code example.
CALL (RZK),(=A(FCTOPEN)),VL,MF=(E,WaCall)
*
CALL (RZK),(=A(FCTDBUG),WaHandle,DbugLvl),VL,MF=(E,WaCall)
*
CALL (RZK),(=A(FCTAUTH),WaHandle, +
UserS,UserL,PwrdS,PwrdL),VL,MF=(E,WaCall)
*
CALL (RZK),(=A(FCTSYNC),WaHandle, +
SyncLvl,SndWin,RecWin),VL,MF=(E,WaCall)
*
CALL (RZK),(=A(FCTCLDB),WaHandle,LoadBal),VL,MF=(E,WaCall)
*
CALL (RZK),(=A(FCTTIMO),WaHandle,ConnTim),VL,MF=(E,WaCall)
*
CALL (RZK),(=A(FCTCONN),WaHandle, +
BrokerS,BrokerL,TopicS,TopicL),VL,MF=(E,WaCall)
*
CALL (RZK),(=A(FCTPROD),WaHandle, +
KeyS,KeyL,MsgS,MsgL,Conv),VL,MF=(E,WaCall)
*
CALL (RZK),(=A(FCTCONS),WaHandle, +
GroupS,GroupL,WaBuffS,WaBuffL,Conv,ConsTim),VL, +
MF=(E,WaCall)
*
CALL (RZK),(=A(FCTGERR),WaHandle,WaBuffS,WaBuffL),VL, +
MF=(E,WaCall)
*
CALL (RZK),(=A(FCTCLSE),WaHandle),VL,MF=(E,WaCall)
*
CALL (RZK),(=A(FCTTERM)),VL,MF=(E,WaCall)
Produce
The substantial section of an Assembler program producing a message to a Kafka topic.
CALL (RZK),(=A(FCTOPEN)),VL,MF=(E,WaCall)
*
ST R15,WaHandle ;Handle returned by RZKOPEN
LTR R15,R15 ;Handle >= Zero?
JM ErrOpen ;No, RZKOPEN Error
*
CALL (RZK),(=A(FCTCONN),WaHandle, +
BrokerS,BrokerL,TopicS,TopicL),VL,MF=(E,WaCall)
*
CALL (RZK),(=A(FCTPROD),WaHandle, +
KeyS,KeyL,MsgS,MsgL,Conv),VL,MF=(E,WaCall)
*
CALL (RZK),(=A(FCTCLSE),WaHandle),VL,MF=(E,WaCall)
Consume
The substantial section of an Assembler program consuming a message from a Kafka topic.
CALL (RZK),(=A(FCTOPEN)),VL,MF=(E,WaCall)
*
ST R15,WaHandle ;Handle returned by RZKOPEN
LTR R15,R15 ;Handle >= Zero?
JM ErrOpen ;No, RZKOPEN Error
*
CALL (RZK),(=A(FCTCONN),WaHandle, +
BrokerS,BrokerL,TopicS,TopicL),VL,MF=(E,WaCall)
*
IILF R5,BufSize$ ;Get size of defined buffer...
ST R5,WaBuffL ;...and init msg buf length parm
*
CALL (RZK),(=A(FCTCONS),WaHandle, +
GroupS,GroupL,WaBuffS,WaBuffL,Conv,ConsTim),VL, +
MF=(E,WaCall)
*
CALL (RZK),(=A(FCTCLSE),WaHandle),VL,MF=(E,WaCall)
Cobol
The following example members are available for the Cobol programming language:
Member | Description |
---|---|
RZKAPIB0 |
Cobol RzK API COPY member. |
Member | Including & calling RZKSTUB |
---|---|
RZKAPIBJ |
JCL to compile, link/bind and run the Cobol example programs. |
RZKAPIBP |
Produce a single message to a Kafka topic. |
RZKAPIBC |
Consume a single message from a Kafka topic. |
RZKAPIBM |
Consume all (new) topic messages until an end-marker message is found. |
RZKAPIBT |
Consume all (new) topic messages until a timeout occures. |
Member | Including RZK & calling RZKFUNC |
---|---|
RZKAPIBK |
JCL to compile, link/bind and run the Cobol example programs. |
RZKAPIBA |
All available RzK API calls in one program as a quick reference. |
RZKAPIBQ |
Produce a single message to a Kafka topic. |
RZKAPIBD |
Consume a single message from a Kafka topic. |
RZKAPIBN |
Consume all (new) topic messages until an end-marker message is found. |
RZKAPIBU |
Consume all (new) topic messages until a timeout occures. |
Remarks
- To use RzK from a Cobol program, you have two choices how to include and call the RzK functions:
- Include module RZKSTUB during link/bind processing and use CALL RZKSTUB to access the RzK functions. Use the
SAMPLIB(RZKAPIBJ)
JCL member to compile, link/bind by including RZKSTUB, and run Cobol programs.
Using the RZKSTUB is recommended for the Cobol programming language.
- Include module RZK during link/bind processing and use CALL RZKFUNC to access the RzK functions. Use the
SAMPLIB(RZKAPIBK)
JCL member to compile, link/bind by including module RZK, and run Cobol programs.
- Include module RZKSTUB during link/bind processing and use CALL RZKSTUB to access the RzK functions. Use the
- All provided Cobol example programs are reenterable (RENT) with AMODE(31) and RMODE(31), and are running non-APF authorized.
When using RZKSTUB, module RZK must either be found in the STEPLIB, or in the z/OS LPALST or LNKLST concatenation.
To keep the sample code short, there are only a few checks for return codes and error messages.
Avoid doing this in production code.
Examples
Just for a first impression, you will find a few shortened examples below. For the most complete examples, please always refer to the SAMPLIB
members described in the table above.
Includes
You will find the required RzK constants definitions for Cobol programs in SAMPLIB(RZKAPIB0)
. Include this member with
COPY RZKAPIB0.
into the WORKING-STORAGE SECTION
of your program via DD name SYSLIB
during compile.
******************************************************************
* *
* Product: RVS z/OS Kafka Connector (RzK) *
* *
* Member: SAMPLIB(RZKAPIB0) *
* *
* Type: COBOL Source Code *
* *
* Function: RzK constants for Cobol programs. *
* *
* Notes: o Include with COPY into WORKING-STORAGE SECTION. *
* *
* (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
******************************************************************
* RzK Function Codes
01 FCTOPEN PIC S9(9) COMP-5 VALUE 1.
01 FCTCONN PIC S9(9) COMP-5 VALUE 2.
01 FCTCLSE PIC S9(9) COMP-5 VALUE 3.
01 FCTPROD PIC S9(9) COMP-5 VALUE 4.
01 FCTCONS PIC S9(9) COMP-5 VALUE 5.
01 FCTDBUG PIC S9(9) COMP-5 VALUE 6.
01 FCTSYNC PIC S9(9) COMP-5 VALUE 7.
01 FCTAUTH PIC S9(9) COMP-5 VALUE 8.
01 FCTGERR PIC S9(9) COMP-5 VALUE 9.
01 FCTCLDB PIC S9(9) COMP-5 VALUE 10.
01 FCTTERM PIC S9(9) COMP-5 VALUE 11.
01 FCTTIMO PIC S9(9) COMP-5 VALUE 12.
Function Overview
The RzK Cobol API provides the following functions. See SAMPLIB(RZKAPIBA)
member for the complete code example.
CALL "RZKFUNC" USING BY VALUE FCTOPEN RETURNING Handle.
CALL "RZKFUNC" USING BY VALUE FCTDBUG Handle DbugLvl
RETURNING RzkRc.
CALL "RZKFUNC" USING BY VALUE FCTAUTH Handle
BY CONTENT UserS BY VALUE UserL
BY CONTENT PwrdS BY VALUE PwrdL
RETURNING RzkRc.
CALL "RZKFUNC" USING BY VALUE FCTSYNC Handle
BY VALUE SyncLvl SndWin RecWin
RETURNING RzkRc.
CALL "RZKFUNC" USING BY VALUE FCTCLDB Handle LoadBal
RETURNING RzkRc.
CALL "RZKFUNC" USING BY VALUE FCTTIMO Handle ConnTim
RETURNING RzkRc.
CALL "RZKFUNC" USING BY VALUE FCTCONN Handle
BY CONTENT BrokerS BY VALUE BrokerL
BY CONTENT TopicS BY VALUE TopicL
RETURNING RzkRc.
CALL "RZKFUNC" USING BY VALUE FCTPROD Handle
BY CONTENT KeyS BY VALUE KeyL
BY CONTENT MsgS BY VALUE MsgL
BY VALUE Conv
RETURNING RzkRc.
CALL "RZKFUNC" USING BY VALUE FCTCONS Handle
BY CONTENT GroupS BY VALUE GroupL
BY REFERENCE BufferS BufferL
BY VALUE Conv ConsTim
RETURNING RzkRc.
CALL "RZKFUNC" USING BY VALUE FCTGERR Handle
BY REFERENCE BufferS BufferL
RETURNING RzkRc.
CALL "RZKFUNC" USING BY VALUE FCTCLSE Handle
RETURNING RzkRc.
CALL "RZKFUNC" USING BY VALUE FCTTERM RETURNING RzkRc.
Produce
The substantial section of a Cobol program producing a message to a Kafka topic.
CALL "RZKFUNC" USING BY VALUE FCTOPEN RETURNING Handle.
IF Handle >= 0 THEN
CALL "RZKFUNC" USING BY VALUE FCTCONN Handle
BY CONTENT BrokerS BY VALUE BrokerL
BY CONTENT TopicS BY VALUE TopicL
RETURNING RzkRc
CALL "RZKFUNC" USING BY VALUE FCTPROD Handle
BY CONTENT KeyS BY VALUE KeyL
BY CONTENT MsgS BY VALUE MsgL
BY VALUE Conv
RETURNING RzkRc
CALL "RZKFUNC" USING BY VALUE FCTCLSE Handle
RETURNING RzkRc
END-IF.
Consume
The substantial section of a Cobol program consuming a message from a Kafka topic.
CALL "RZKFUNC" USING BY VALUE FCTOPEN RETURNING Handle.
IF Handle >= 0 THEN
CALL "RZKFUNC" USING BY VALUE FCTCONN Handle
BY CONTENT BrokerS BY VALUE BrokerL
BY CONTENT TopicS BY VALUE TopicL
RETURNING RzkRc
MOVE BufMaxL TO BufferL *> (Re)Set max buf size
CALL "RZKFUNC" USING BY VALUE FCTCONS Handle
BY CONTENT GroupS BY VALUE GroupL
BY REFERENCE BufferS BufferL
BY VALUE Conv ConsTim
RETURNING RzkRc
CALL "RZKFUNC" USING BY VALUE FCTCLSE Handle
RETURNING RzkRc
END-IF.
C
The following example members are available for the C programming language:
Member | Description |
---|---|
RZKAPIC0 |
C RzK API header file #include member. |
Member | Including & calling RZKSTUB |
---|---|
RZKAPICJ |
JCL to compile, link/bind and run the C example programs. |
RZKAPICP |
Produce a single message to a Kafka topic. |
RZKAPICC |
Consume a single message from a Kafka topic. |
RZKAPICM |
Consume all (new) topic messages until an end-marker message is found. |
RZKAPICT |
Consume all (new) topic messages until a timeout occures. |
Member | Including RZK & calling RZKFUNC |
---|---|
RZKAPICK |
JCL to compile, link/bind and run the C example programs. |
RZKAPICA |
All available RzK API calls in one program as a quick reference. |
RZKAPICQ |
Produce a single message to a Kafka topic. |
RZKAPICD |
Consume a single message from a Kafka topic. |
RZKAPICN |
Consume all (new) topic messages until an end-marker message is found. |
RZKAPICU |
Consume all (new) topic messages until a timeout occures. |
Member | Loading the RZK DLL & calling RZKFUNC |
---|---|
RZKAPICL |
JCL to compile, link/bind and run the C example programs. |
RZKAPICR |
Produce a single message to a Kafka topic. |
RZKAPICE |
Consume a single message from a Kafka topic. |
RZKAPICO |
Consume all (new) topic messages until an end-marker message is found. |
RZKAPICV |
Consume all (new) topic messages until a timeout occures. |
Remarks
- To use RzK from a C program, you have three choices how to invoke the RzK API functions:
- Dynamically load the RZK LE Dynamic Link Library (DLL) and call RZKFUNC() to access the RzK functions. Use the
SAMPLIB(RZKAPICL)
JCL member to compile, link/bind without including any RzK modules, and run your C programs.
Using the RZK DLL technique is recommended for the C programming language.
- Include module RZKSTUB during link/bind processing and call RZKSTUB() to access the RzK functions. Use the
SAMPLIB(RZKAPICJ)
JCL member to compile, link/bind by including module RZKSTUB, and run C programs. - Include module RZK during link/bind processing and call RZKFUNC() to access the RzK functions. Use the
SAMPLIB(RZKAPICK)
JCL member to compile, link/bind by including module RZK, and run C programs.
- Dynamically load the RZK LE Dynamic Link Library (DLL) and call RZKFUNC() to access the RzK functions. Use the
- All provided C example programs are reenterable (RENT) with AMODE(31) and RMODE(31), and are running non-APF authorized.
When using the RZK DLL or RZKSTUB, module RZK must either be found in the STEPLIB, or in the z/OS LPALST or LNKLST concatenation.
To keep the sample code short, there are only a few checks for return codes and error messages.
Avoid doing this in production code.
Examples
Just for a first impression, you will find a few shortened examples below. For the most complete examples, please always refer to the SAMPLIB
members described in the table above.
Includes
You will find the required RzK constants and prototype definitions for C programs in SAMPLIB(RZKAPIC0)
. Include this member as C header file with
#include "rzkapic0.h"
into your C program via DD name USERLIB
during compile.
/*********************************************************************/
/* */
/* Product: RVS z/OS Kafka Connector (RzK) */
/* */
/* Member: SAMPLIB(RZKAPIC0) */
/* */
/* Type: C Pgm Header File */
/* */
/* Function: RzK constants & prototype definitions for C programs. */
/* */
/* Notes: o #include "rzkapic0.h" into your C program via */
/* DD name USERLIB during compile. */
/* */
/* (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 */
/*********************************************************************/
/*-----------------------------------------------------------------*/
/* RZKFUNC Function Codes. */
/*-----------------------------------------------------------------*/
#define FCTOPEN 1
#define FCTCONN 2
#define FCTCLSE 3
#define FCTPROD 4
#define FCTCONS 5
#define FCTDBUG 6
#define FCTSYNC 7
#define FCTAUTH 8
#define FCTGERR 9
#define FCTCLDB 10
#define FCTTERM 11
#define FCTTIMO 12
/*-----------------------------------------------------------------*/
/* RzK Function Prototypes. */
/*-----------------------------------------------------------------*/
int RZKSTUB(int, ...);
int RZKFUNC(int, ...);
Function Overview
The RzK C API provides the following functions. See SAMPLIB(RZKAPICA)
member for the complete code example.
Handle = RZKFUNC(FCTOPEN);
RzkRc = RZKFUNC(FCTDBUG, Handle, DbugLvl);
RzkRc = RZKFUNC(FCTAUTH, Handle, UserS, UserL, PwrdS, PwrdL);
RzkRc = RZKFUNC(FCTSYNC, Handle, SyncLvl, SndWin, RecWin);
RzkRc = RZKFUNC(FCTCLDB, Handle, LoadBal);
RzkRc = RZKFUNC(FCTTIMO, Handle, ConnTim);
RzkRc = RZKFUNC(FCTCONN, Handle, BrokerS, BrokerL, TopicS, TopicL);
RzkRc = RZKFUNC(FCTPROD, Handle, KeyS, KeyL, MsgS, MsgL, Conv);
RzkRc = RZKFUNC(FCTCONS, Handle,
GroupS, GroupL, &BufferS, &BufferL, Conv, ConsTim);
RzkRc = RZKFUNC(FCTGERR, Handle, &BufferS, &BufferL);
RzkRc = RZKFUNC(FCTCLSE, Handle);
RzkRc = RZKFUNC(FCTTERM);
Produce
The substantial section of a C program producing a message to a Kafka topic.
Handle = RZKFUNC(FCTOPEN);
if (Handle >= 0)
{
RzkRc = RZKFUNC(FCTCONN, Handle, BrokerS, BrokerL, TopicS, TopicL);
RzkRc = RZKFUNC(FCTPROD, Handle, KeyS, KeyL, MsgS, MsgL, Conv);
RzkRc = RZKFUNC(FCTCLSE, Handle);
}
Consume
The substantial section of a C program consuming a message from a Kafka topic.
Handle = RZKFUNC(FCTOPEN);
if (Handle >= 0)
{
RzkRc = RZKFUNC(FCTCONN, Handle, BrokerS, BrokerL, TopicS, TopicL);
BufferL = BUF_SIZE; // (Re)Set maximum buffer size
RzkRc = RZKFUNC(FCTCONS, Handle,
GroupS, GroupL, &BufferS, &BufferL, Conv, ConsTim);
RzkRc = RZKFUNC(FCTCLSE, Handle);
}
REXX
REXX programs may also use the RzK functionality through the CALL API. REXX on z/OS provides the ADDRESS LINKPGM environment to call programs via a standard CALL API. To save you from having to map the entire API yourself, the RzK package already provides you with an RzK REXX Wrapper API. This can be found in SAMPLIB(RZKAPIR0)
to be included by your REXX code.
Example Members
The following example members are available for the REXX programming language:
Member | Description |
RZKAPIR0 |
RzK REXX Wrapper API include member. |
RZKAPIRJ |
JCL to optional include, preprocess, compile, link/bind and run the REXX example programs. |
Member | Using the RZK REXX Wrapper API and RZKCALL |
---|---|
RZKAPIRA |
All available RzK API calls in one program as a quick reference. |
RZKAPIRP |
Produce a single message to a Kafka topic. |
RZKAPIRC |
Consume a single message from a Kafka topic. |
RZKAPIRM |
Consume all (new) topic messages until an end-marker message is found. |
RZKAPIRT |
Consume all (new) topic messages until a timeout occures. |
Remarks
- You may use the
SAMPLIB(RZKAPIRJ)
JCL member to optionally process your REXX program:
- Automatically include the RzK REXX Wrapper API
- Preprocess the REXX source into the CEXEC format
- Compile & link/bind to create a z/OS load module
- Run the created REXX example program.
- All provided REXX example programs - if compiled into an LMOD - are reenterable (RENT) with AMODE(31) and RMODE(31), and are running non-APF authorized.
- All REXX samples dynamically LOAD and CALL the RzK CALL API stub RZKCALL through ADDRESS LINKPGM. It is not required to include RZKCALL during link/bind processing.
RZKCALL, which is an ALIAS of RZKSTUB, and module RZK must either be found in the STEPLIB, or in the z/OS LPALST or LNKLST concatenation.
To keep the sample code short, there are only a few checks for return codes and error messages.
Avoid doing this in production code.
Example Code
Just for a first impression, you will find a few shortened examples below. For the most complete examples, please always refer to the SAMPLIB
members described in the table above.
REXX Wrapper API Include
You will find the provided REXX Wrapper API to use RZKCALL with REXX programs in SAMPLIB(RZKAPIR0)
. Include this member with
/*%INCLUDE RZKAPIR0 */
behind your own REXX code and use the compile JCL in SAMPLIB(RZKAPIRJ)
to do the include processing for you (recommended), or copy RZKAPIR0
manually.
Function Overview
The RzK REXX Wrapper API provides the following functions. See SAMPLIB(RZKAPIRA)
member for the complete code example.
Handle = RzkOpen();
RzkRc = RzkDbug(Handle, DbugLvl);
RzkRc = RzkAuth(Handle, UserS, PwrdS);
RzkRc = RzkSync(Handle, SyncLvl, SndWin, RecWin);
RzkRc = RzkCldb(Handle, LoadBal);
RzkRc = RzkTimo(Handle, ConnTim);
RzkRc = RzkConn(Handle, BrokerS, TopicS);
RzkRc = RzkProd(Handle, KeyS, MsgS, Conv);
RzkRc = RzkCons(Handle, GroupS, Conv, ConsTim);
RzkRc = RzkGerr(Handle);
RzkRc = RzkClse(Handle);
RzkRc = RzkTerm();
Produce
The substantial section of an REXX program producing a message to a Kafka topic.
Handle = RzkOpen();
IF Handle >= 0
THEN
DO
RzkRc = RzkConn(Handle, BrokerS, TopicS);
RzkRc = RzkProd(Handle, KeyS, MsgS, Conv);
RzkRc = RzkClse(Handle);
END; /* of IF Handle >= 0 THEN DO */
Consume
The substantial section of an REXX program consuming a message from a Kafka topic.
Handle = RzkOpen();
IF Handle >= 0
THEN
DO
RzkRc = RzkConn(Handle,BrokerS,TopicS);
RzkRc = RzkCons(Handle,GroupS,Conv,ConsTim);
IF RzkRc == 0
THEN
SAY Rzk.Handle.#MSG;
RzkRc = RzkClse(Handle);
END; /* of IF Handle >= 0 THEN DO */
Natural (NPL)
The following example members are available for the Natural Programming Language (NPL):
Member | Description |
RZKAPIN0 |
Natural RzK API include member. |
Member | Calling RZKCALL |
---|---|
RZKAPINA |
All available RzK API calls in one program as a quick reference. |
RZKAPINP |
Produce a single message to a Kafka topic. |
RZKAPINC |
Consume a single message from a Kafka topic. |
z/OS Call API vs LE API
Natural supports using the standard z/OS CALL API or - after some additional setup - using the LE API. Depending on your current NPL setup, you may use the RZK DLL directly or have to use the RzK stub RZKCALL.
For more information see Software AG - Natural for Mainframes - Support of IBM LE Subprograms.
Remarks
- All Natural samples dynamically LOAD and CALL the RzK CALL API stub RZKCALL. It is not required to include RZKCALL during link/bind processing.
RZKCALL, which is an ALIAS of RZKSTUB, and module RZK must either be found in the STEPLIB, or in the z/OS LPALST or LNKLST concatenation.
To keep the sample code short, there are only a few checks for return codes and error messages.
Avoid doing this in production code.
The Natural (NPL) code has been provided as an example to the best of our knowledge. Since we do not have a Natural development environment available, the sample code is not tested (yet).
Examples
Just for a first impression, you will find a few shortened examples below. For the most complete examples, please always refer to the SAMPLIB
members described in the table above.
Includes
You will find the required RzK constants definitions for Natural programs in SAMPLIB(RZKAPIN0)
. Include this member with
INCLUDE RZKAPIN0
into the DEFINE DATA
section of your program.
***********************************************************************
* *
* Product: RVS z/OS Kafka Connector (RzK) *
* *
* Member: SAMPLIB(RZKAPIN0) *
* *
* Type: Natural (NPL) Source Code *
* *
* Function: Define required RzK constants for Natural programs. *
* *
* Notes: o Use INCLUDE RZKAPIN0 to include this member into your *
* Natural program source code. *
* *
* (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
***********************************************************************
*
*---------------------------------------------------------------------*
* RVS z/OS Kafka Connector Function Codes. *
*---------------------------------------------------------------------*
1 #FCTOPEN (I4) CONST <01>
1 #FCTCONN (I4) CONST <02>
1 #FCTCLSE (I4) CONST <03>
1 #FCTPROD (I4) CONST <04>
1 #FCTCONS (I4) CONST <05>
1 #FCTDBUG (I4) CONST <06>
1 #FCTSYNC (I4) CONST <07>
1 #FCTAUTH (I4) CONST <08>
1 #FCTGERR (I4) CONST <09>
1 #FCTCLDB (I4) CONST <10>
1 #FCTTERM (I4) CONST <11>
1 #FCTTIMO (I4) CONST <12>
*
Function Overview
The RzK Natural API provides the following functions. See SAMPLIB(RZKAPINA)
member for the complete code example.
CALL 'RZKCALL' #FCTOPEN
*
CALL 'RZKCALL' #FCTDBUG #Handle #DbugLvl
*
CALL 'RZKCALL' #FCTAUTH #Handle #UserS #UserL #PwrdS #PwrdL
*
CALL 'RZKCALL' #FCTSYNC #Handle #SyncLvl #SndWin #RecWin
*
CALL 'RZKCALL' #FCTCLDB #Handle #LoadBal
*
CALL 'RZKCALL' #FCTTIMO #Handle #ConnTim
*
CALL 'RZKCALL' #FCTCONN #Handle #BrokerS #BrokerL #TopicS #TopicL
*
CALL 'RZKCALL' #FCTPROD #Handle #KeyS #KeyL #MsgS #MsgL #Conv
*
CALL 'RZKCALL' #FCTCONS #Handle
#GroupS #GroupL #BufferS #BufferL #Conv #ConsTim
*
CALL 'RZKCALL' #FCTGERR #Handle #BufferS #BufferL
*
CALL 'RZKCALL' #FCTCLSE #Handle
*
CALL 'RZKCALL' #FCTTERM
Produce
The substantial section of a Natural program producing a message to a Kafka topic.
CALL 'RZKCALL' #FCTOPEN
*
#Handle := RET('RZKCALL')
*
IF #Handle >= 0
THEN
*
CALL 'RZKCALL' #FCTCONN #Handle #BrokerS #BrokerL #TopicS #TopicL
*
* CALL 'RZKCALL' #FCTPROD #Handle #KeyS #KeyL #MsgS #MsgL #Conv
*
CALL 'RZKCALL' #FCTCLSE #Handle
*
END-IF
Consume
The substantial section of a Natural program consuming a message from a Kafka topic.
CALL 'RZKCALL' #FCTOPEN
*
#Handle := RET('RZKCALL')
*
IF #Handle >= 0
THEN
*
CALL 'RZKCALL' #FCTCONN #Handle #BrokerS #BrokerL #TopicS #TopicL
*
MOVE #BufMaxL TO #BufferL /* (Re)Set maximum buffer size
*
CALL 'RZKCALL' #FCTCONS #Handle
#GroupS #GroupL #BufferS #BufferL #Conv #ConsTim
*
CALL 'RZKCALL' #FCTCLSE #Handle
*
END-IF
RzK Messages and Codes
RZK Trace Messages
Trace Messages for a detailed analysis can be activated using DD-Name RZKSTUB in one of the following ways:
//RZKSTUB DD DUMMY
or
TSO ALLOC FILE(RZKSTUB) DUMMY
The following sample trace messages where created for a PRODUCE of 1.000.000 messaged.
RZKAPIAS - RzK CALL API Assembler Sample Program
RZKSTUB0001T RVS z/OS Kafka Connector CALL API Stub Routine v1.0.0j (31.03.2023) started...
RZKSTUB0003T ChkTrc() DDName RZKSTUB found ALLOCATEd, message trace mode activated.
RZKSTUB0008T TstApf() Running non-APF authorized.
RZKSTUB0601T SysInf() Sys=RS09 Plex=RSYPLEX Lpar= Vm=RSYZOS Hw=RSYHOST Fmid=HBB77C0 CPUs=03/03.
RZKSTUB0602T SysInf() >CPU Id=1 Ser=0000013907
RZKSTUB0602T SysInf() >CPU Id=2 Ser=0000023907
RZKSTUB0602T SysInf() >CPU Id=3 Ser=0000033907 (zIIP)
RZKSTUB0009T IniSub() R15=x'00000000' Tkn=x'0D6EEDB8' Pip=x'80095330' Rtn=x'8D6AFA20'.
03/31/2023 01:13:47.334 RZK0007I RZKOPEN -- -- RzK - RVS Kafka Connector for COBOL/C/..., z/OS Version 1.1 Build Mar 27 2023 12:37:27
03/31/2023 01:13:47.334 RZK0017I RZKDBUG -- -- DebugLevel is set to 1
03/31/2023 01:13:47.335 RZK0038I RZKSYNC -- -- Synch-Mode is set to 0: Produce Msgs are sent and control is returned imm, OffsetCo after reading X msgs
03/31/2023 01:17:03.212 RZK0018I RZKCLSE -- -- Statistics: #Msgs produce: 1000000 consume: 0 ConnectionTime: 195877 msec
03/31/2023 01:17:03.212 RZK0019I RZKCLSE -- -- Statistics: snd=107524 rcv=18295 wait=19915 marshall=17449 unmarshall=0 msec, SendWinExh=0
03/31/2023 01:17:03.212 RZK0022I RZKCLSE -- -- Statistics: sends=1000003 sendBytes=145000083 recvs=2000033 recvBytes=57001304 selectWait=7479, selectNoWait=1992522
RZKSTUB0010T TrmEnv() R15=x'00000000' Tkn=x'0D6EEDB8' Env=x'00000000'.
RZKSTUB0099T RVS z/OS Kafka Connector CALL API Stub Routine ended.
***
Informational Messages
Codes for informational messages are 0-4999
Code |
Description |
RZK0000I |
Successful completion |
RZK0001I |
Broker <ip address/hostname>:<port> |
RZK0002I |
HEXDump for <description> len=<len of dump area> |
RZK0003I |
HEXDump offset: data text |
RZK0004I |
Request sent CorrelID=<id> |
RZK0005I |
Reply CorrID = <id> #Responses = <num> Error = <error code> <error text> |
RZK0006I |
Request sent to Broker id = <id> hostname = <hostname> port = <prot> Partition = <partition> topic = <topic> offs = <offset> |
RZK0007I |
<Version String> Build <date> <time> |
RZK0008I |
Server <hostname>:<port> supports <protocol> Verisons <min>-<max> |
RZK0009I |
Parameters are bootstrap server<hostname>:<port> Topic=<topic> |
RZK0010I |
BrokerList id <id> = <hostname>:<port> rack=<rack description> |
RZK0011I |
Partition <id> Leader=<id> brokeridx=<index> isr=<id> err=<error#> <error string> |
RZK0012I |
MsgKeyHash for <MessageKey> results in Partition <id> |
RZK0013I |
Topic <topic> (Broker ID=<id>, <hostname>:<port>) |
RZK0014I |
PartitionListIdx=<index> Partition=<id> Leader=<id> |
RZK0015I |
MessageKey=<MessageKey> Encoding=<UTF8/Binary> |
RZK0016I |
Msg=<message to be sent to kafka> (len=<message len>) |
RZK0017I |
DebugLevel is set to <level> |
RZK0018I |
Statistics: #Msgs produce: <num> consume: <num> ConnectionTime: <num> msec |
RZK0019I |
Statistics: snd=<num> rcv=<num> wait=<num> marshall=<num> unmarshall=<num> msec, SendWinExh=<num> |
RZK0020I |
Pending Replys from Broker <hostname>:<port> |
RZK0021I |
Reply received for CorrelID <id> from Broker <hostname>:<port> |
RZK0022I |
Statistics: sends=<num> sendBytes=<num> recvs=<num> recvBytes=<num> selectWait=<num>, selectNoWait=<num> |
RZK0023I |
Find Coordinator (type=<type>) for Group: <group> |
RZK0024I |
Connect to BrokerID=<broker id> <hostname>:<port> |
RZK0025I |
Found Coordinator for Group : <group> BrokerID=<broker id> <hostname>:<port> |
RZK0026I |
Received Message from Kafka: BrokerID=<broker id>, Partition=<id>, Offset=<offset>, MsgLen=<message len> |
RZK0027I |
Offset=<offset> |
RZK0028I |
Resend with Group MemberID: <member id> |
RZK0029I |
The group is rebalancing, so a rejoin is needed |
RZK0030I |
HexDump for <description> len=<len of dump area>, (only 256 bytes displayed)
|
RZK0031I |
Information from kafka server: <error code> <error message> |
RZK0032I |
<description> Reply from Kafka: <hostname>:<port> BrokerID=<broker id> Partition=<id> CorrelID=<id> |
RZK0033I |
Debugging message that displays key/value pairs. |
RZK0034I |
Debugging message that displays key/value pairs. |
RZK0035I |
Skipping Message Partition=<id> offs=<offset> KeyLen=<key length> ValueLen=<value length> |
RZK0036I |
Receive and scratch left over data |
RZK0037I |
Shutting Down: Ignoring CorrelID <id> from Broker <hostname>:<port> |
RZK0038I |
Synch-Mode is set to <mode> |
RZK0039I |
#Mechanisms = <number of supported SASL authentication mechanisms>, Supported Mechanisms = <list of supported mechanism>, err = <error code> |
RZK0040I |
Error Code = <error code>, Reply Error = <reply error> |
RZK0041I |
General Mutex: <Locked | Unlocked> |
RZK0042I |
HeartBeat Thread: <Starting | Already Started | …> |
RZK0043I |
Consumer load balancing <enabled/disabled> |
RZK0044I |
Connection closed on remote side: Broker ID=<broker id> <hostname>:<port>
|
Warning Messages
Codes for warning messages are 5000-7499.
Code |
Description |
RZK5000W |
Received Message truncated from <len> to <len> bytes |
RZK5001W |
License expires on <expiration date>. Pls contact support. |
RZK5002W |
Error with length of partition section in reply buffer |
RZK5003W |
Message is not complete: CorrID=<id> MessageOffset=<base offset> Offset in ReplyData=<marshall offset> |
RZK5004W |
Invalid window size. <send/receive> window size = <Size> , it will be reset to default size <size> |
RZK5005W |
No Reply received for <keyID>/<keyName>, CorrelID <id> from Broker <hostname>:<port> |
RZK5006W |
All open Sessions are closed
|
Error Messages
Codes for error messages are 7500-9999
Code |
Description |
RZK7500E |
TCPIP Error from Function <function>: <error code> (<error message>) |
RZK7501E |
Connection closed by Server. Most likely there is authentication required |
RZK7502E |
Invalid RzK handle |
RZK7503E |
Cannot allocate Memory for <internal ID> |
RZK7504E |
------- rzk_log_vf is called with MessageNum (<msg#>) and provides <num> arguments, but <num> arguments are required for this Message |
RZK7505E |
------- rzk_log_vf is called with MessageNum (<msg#>). Array has only <num> elements, <num> Element of array cannot be accessed |
RZK7506E |
Reply CorrID = <id> #Responses = <1> Error = <code> <message> |
RZK7507E |
No BrokerList available. Potential Sequence error in Application |
RZK7508E |
No PartitionList available. This is strange! There is a BrokerList but no PartitionList ... |
RZK7509E |
No Leader available for Partition <id>, Topic <topic> |
RZK7510E |
Error from kafka server: <error> <error message> |
RZK7511E |
No Broker (BorkerID=<id>) found for Partition <num>, Topic <topic> |
RZK7512E |
Could not connect to Broker <hostname>:<port> ID <brokerID> Error=<code> <msg> |
RZK7513E |
Could not connect to any Broker for Topic <topic> |
RZK7514E |
marshall RequestHeader failed |
RZK7515E |
Timeout waiting for Reply from Kakfa, Broker <hostname>:<port> |
RZK7516E |
Unexpected Reply: No Queued Request for CorrelID <id> from Broker <hostname>:<port> |
RZK7518E |
Reply from kafka is <num> bytes, only processing <num> bytes |
RZK7519E |
No valid IP address or hostname is provided for BrokerID <broker id> (<hostname>) |
RZK7520E |
Find Coordinator for Group <groupname> failed: Could not find BrokerID <broker id> in Brokerlist |
RZK7521E |
unable to connect to any broker of this kafka cluster <…> |
RZK7522E |
License expired on <expiration date>. Pls contact support. |
RZK7523E |
No Coordinator available. Potential Sequence error in Application |
RZK7524E |
Could not Join Group <group> for topic <topic> on Broker ID=<id> <hostname>:<port> rc=<rc> (<error text>) |
RZK7525E |
There is no Broker available for Partition <id> of Topic <topic> |
RZK7526E |
Processing Buffer with Wrong CorrelID: <hostname>:<port> BrokerID=<id> Partition=<id> CorrelID expected=<id> actual=<id> |
RZK7527E |
Synch-Mode <synch mode> is not supported |
RZK7528E |
No Connection to Coordinater. Broker ID=<broker id> <hostname>:<port> |
RZK7530E |
Cannot get local CodePage, String conversion to UTF-8 not possible |
RZK7531E |
The limit of max <x> concurrent sessions is reached! |
RZK7532E |
<system call> failed: Error=<system error #/errno>, <system error text> |
RZK7533E |
Broker <hostname>:<port> - received reply for invalid Partition Index (only <id> Partitions known) |
RZK7534E |
Reply for unsupported Api Request: <id> /Version=<API-Version> (APIKey-Name) CorrelID = <id> |
RZK7535E |
There is already a consumer active for Topic:<topic> in Group:<group> and consume load balancing is not enabled |
RZK7536E |
Kafka Protocol Error: <kafka protocol error messages> |
RZK7537E |
Invalid Function Code <i>. Only Function Codes from 0 to 9 are supported
|
RZK7538E |
iconv_open() failed for <function> with error <errorcode>
|
RZK7539E |
iconv() failed with error <errorcode>
|
RZK7540E |
CRC does not match in message from Broker <hostname>:<port>
|
RZK7541E |
Topic is too long (<lenght> bytes), max 255 bytes are supported
|
Additional Information
Apache Kafka Core Concepts
Apache Kafka is an event-streaming platform, that is optimized for high-throughput. It is a fault-tolerant and scalable platform to handle real-time data feeds.
Kafka Cluster and Brokers
Kafka works as a cluster of one or more nodes in one or more data centers. Kafka refers to a single Kafka server as a broker. The brokers are designed to operate as part of a Kafka cluster.
When you initally connect to Kafka, you perform a so-called bootstrap connection, handled by the RZKCONN API call of RzK. This connection can be to any of the brokers in the cluster, and is required to request all the information you need to produce (write, send, publish) a message to or consume (read, receive, fetch) a message from a Kafka cluster.
Zookeeper
Kafka uses a zookeeper. The zookeeper works as the central configuration and management system. It keeps track of the brokers, topics, the partition assignment and, leader election. Basically it manages the metadata of the Kafka cluster.
Messages
A message is the unit of data for Kafka. Kafka saves messages just as a sequence of bytes (byte array, buffer). It may contain any data, which may be text (like JSON) or arbitrary binary objects.
Messages can have a message key which is metadata that is used to determine the destination partition.
Message Key
- A message key assures that all messages for the same topic will be stored in the same partition. In that way Kafka ensures that the sequence of messages consumed will be in the same sequence the messages were written.
- Messages that are produced without a message key will be distributed between multiple partitions to improve throughput and storage usage. In that case, the sequence of messages can not be guaranteed by Kafka when the messages are retrieved (consumed).
Topics and Partitions
Topics are logical categories of messages. Messages are organized in topics, and a topic generally contains messages with the same type of data.
A topic is organized into one or more partitions. Partitions are the core concept behind Kafka’s scaling capabilities. The concept of partitions allows to split data of a topic into multiple streams, allowing for more throughput and better storage usage.
The messages on a partition are ordered by a number called the offset.
Replication
A topic can be replicated onto multiple brokers to improve the availability. When replication is set up, one of the brokers is the leader for one partition, and the other brokers are the followers.
When the broker in lead goes offline, another broker becomes automatically the leader for the partition.
Producer
The Kafka Producer (client) writes or publishes messages to a Kafka cluster. Depending on its configuration and parameters (eg. message key), the producer chooses the destination partition.
A producer can publish to one or more topics.
Remember the bootstrap connection that was mentioned above? For a producer the bootstrap connection returns the information required to publish to a topic, i.e. the broker and partition to use. The producer can then estabhlish the connection to the partition leader and start publishing messages. These actions happen automatically when a producer connects to the Kafka cluster.
Consumer
A Kafka Consumer (client) reads or fetches data from a Kafka cluster.
As multiple producers can write messages to a cluster, multiple consumers can read those messages.
When a consumer processes a message, the message is not removed from its topic. Consumers remember and store within Kafka which messages have been processed. They commit the offset, i.e. store the position of the message they have last read.
Consumer Groups
Consumer groups are established for the purposes to scale reading, i.e. consuming messages. Multiple consumers can subscribe to one topic and belong to the same consumer group. Kafka assures that the group will receive messages from different partitions in the topic and balances the load.
The group name is used in Kafka to manage the current consume offset:
- Groups are created automatically on the Kafka cluster when used.
- Group information (e.g. last offset consumed) is held and managed by the Kafka cluster.
- Cleaning/removing a group is done using Kafka administration tools on the Kafka cluster.
- The group name is used in Kafka to manage the current consume offset.
- To get all messages in a topic from the beginning, use a group name that has not been used before.
- To continue with the next message at the point where you left off with the last consume, simply use the same group name as before.
Groups also identify which consumers are load balanced (if enabled).
The zKafka Connector
Now that you are familiar with the key Kafka terms, read on how you can use the zKafka Connector in your process flows and programs to take full advantage of the power of Kafka Clusters at What is the zKafka Connector?
AT-TLS Configuration for RzK
Using IBM z/OS Application Transparent Transport Layer Security (AT-TLS) is the best and securest way to implement TLS security for a z/OS application. Some of the features provided with an AT-TLS implementation are:
- Allows to specify the label of the certificate to be used for authentication instead of using the default certificate.
- Supports SSL Session Key Refresh.
- Supports SSL Sysplex Session Id Caching.
- Allows a decrypted SSL data trace for an application.
- Receives more detailed diagnostic messages in the syslogd.
RzK and AT-TLS
The zKafka Connector uses AT-TLS in the following way:
- The RzK client can be enabled to use AT-TLS.
- By using AT-TLS for the RzK Clients, the Kafka server always provides server certificate authentication to all clients to verify that the server is really who it claims to be. Therefore, a RACF SAF key ring user keyring is required to contain at least the certificate for the CA that signed the server certificate and the client certificate (or the server certificate if the server certificate is self-signed).
- The RzK client has to be configured to use AT-TLS as a non-controlling AT-TLS application. For an AT-TLS policy sample, please see below. For more information on AT-TLS, please see IBM z/OS Communications Server: IP Configuration Guide, Server applications, Application Transparent Transport Layer Security data protection.
Requirements
AT-TLS requires the Policy Agent to be configured and started, and the TCP/IP stack to be enabled for AT-TLS.
Configuration
The Kafka server and the RzK client have to be configured in AT-TLS as non-controlling AT-TLS applications:
- The
TTLSRule
statement for the RzK client requires aDirection
parameter with the valueBoth
. - The RzK client requires the
HandshakeRole
parameter with the valueClient
to be coded on theTTLSEnvironmentAction
statement. - Code a
TTLSEnvironmentAdvancedParms
statement with theApplicationControlled
Off
andSecondaryMap
On
parameters. - The
SecondaryMap
parameter enables active or passive data connections to use the AT-TLS policy that is used for the controlled connection.
Choosing the right Cipher Algorithm
RzK and AT-TLS support TLS through the system SSL cryptographic services base element of z/OS. System SSL supports different cipher algorithms that provide both encryption and data authentication for data integrity.
Encryption scrambles the data so it is transferred confidentially and cannot be interpreted without a special key.
Data authentication algorithms ensure that the data was not modified during the transfer.
Some of the supplied cipher algorithms provide only data authentication, and some provide both encryption and authentication.
Be aware that the actual cipher algorithm used for the session is determined by a negotiation between the Kafka server and the RzK client. Please be aware, that both sides - server & client - have to support the selected cipher algorithms.
For example, if you configure an RzK client to use the Triple DES encryption with SHA authentication algorithm, but the server does not support that cipher algorithm, Triple DES encryption & SHA authentication will not be used for sessions between the RzK client and the Kafka server.
AT-TLS Sample Policy for Rzk Clients
The following example AT-TLS policy for RzK clients may also be found in SAMPLIB(RZKTLSK0)
.
#*********************************************************************
# *
# Product: RVS z/OS Kafka Connector (RzK) *
# *
# Member: SAMPLIB(RZKTLSK0) *
# *
# Type: AT-TLS Policy *
# *
# Function: AT-TLS Sample Policy for Rzk Clients *
# *
# Notes: o Adapt the required AT-TLS parms below to your needs. *
# *
# (C) Copyright rvs-Systems GmbH, Germany, 2021-2023 *
#*********************************************************************
TTLSRule RZK-Client
{
LocalAddr ALL
RemoteAddr <KAFKA Server IP>
RemotePortRange xxxx-xxxx
Direction Both
Priority 251
TTLSGroupActionRef gAct-RZKClientx
TTLSEnvironmentActionRef eAct-RZKClientx
TTLSConnectionActionRef cAct-RZKClientx
}
TTLSGroupAction gAct-RZKClientx
{
TTLSEnabled On
GroupUserInstance 1
Trace 6
}
TTLSEnvironmentAction eAct-RZKClientx
{
HandshakeRole Client
TTLSEnvironmentAdvancedParms
{
ClientAuthType PassThru
}
EnvironmentUserInstance 0
TTLSKeyringParmsRef kyRingParms-RZKClientx
Trace 6
}
TTLSConnectionAction cAct-RZKClientx
{
HandshakeRole Client
TTLSCipherParmsRef cipher_RZKClientx
TTLSConnectionAdvancedParmsRef cAdv-RZKClientx
CtraceClearText On
Trace 6
}
TTLSConnectionAdvancedParms cAdv-RZKClientx
{
ApplicationControlled Off
SecondaryMap On
SSLv3 Off
TLSv1 Off
TLSv1.1 On
TLSv1.2 On
}
TTLSKeyringParms kyRingParms-RZKClientx
{
Keyring RZK_Keyring
}
TTLSCipherParms cipher_RZKClientx
{
# AES 256
V3CipherSuites TLS_DHE_RSA_WITH_AES_256_CBC_SHA
V3CipherSuites TLS_RSA_WITH_AES_256_CBC_SHA
V3CipherSuites TLS_DHE_DSS_WITH_AES_256_CBC_SHA
}