Error YBExtractNewRecordState transform not working in CDC

Hi Yuga Team,

I am trying to setup cdc on yugabyte(2.25) using debezium connector.

When I try without YBExtractNewRecordState, the transformer connector setup works fine, but connector creation gets timed out when I try with the transformer. I need this transformer so that sink connector can read data properly.

mentioned connector config below.

        "tasks.max": "1",
        "connector.class": "io.debezium.connector.yugabytedb.YugabyteDBgRPCConnectorctor",
        "database.hostname": "localhost",
        "database.master.addresses": "localhost:7100",
        "database.port": "5433",
        "database.user": "yugabyte",
        "database.password": "yugabyte",
        "database.dbname": "vip",
        "topic.prefix": "dbserver1",
        "snapshot.mode": "never",
        "database.streamid": "7924ce26d87c02bf464d2833d41d4b9a",
        "table.include.list": "public.Users",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.connector.yugabytedb.transforms.YBExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false"

Error Logs

[2025-03-21 20:00:52,214] INFO [ybconnector|worker] Instantiated connector ybconnector with version 1.9.5.y.220.4 of type class io.debezium.connector.yugabytedb.YugabyteDBConnector (org.apache.kafka.connect.runtime.Worker:331)
[2025-03-21 20:00:52,215] INFO [ybconnector|worker] Finished creating connector ybconnector (org.apache.kafka.connect.runtime.Worker:352)
[2025-03-21 20:00:52,219] INFO [ybconnector|worker] Discovered tablet YB Master for table YB Master with partition ["", "") (org.yb.client.AsyncYBClient:2728)
[2025-03-21 20:00:52,227] INFO [ybconnector|worker] Discovered tablet YB Master for table YB Master with partition ["", "") (org.yb.client.AsyncYBClient:2728)
[2025-03-21 20:00:52,231] INFO [ybconnector|worker] Query Language used for tables is ysql (io.debezium.connector.yugabytedb.YBClientUtils:384)
[2025-03-21 20:00:52,235] INFO [ybconnector|worker] Discovered tablet YB Master for table YB Master with partition ["", "") (org.yb.client.AsyncYBClient:2728)
[2025-03-21 20:00:52,239] INFO [ybconnector|worker] Query Language used for tables is ysql (io.debezium.connector.yugabytedb.YBClientUtils:384)
[2025-03-21 20:00:52,242] INFO [ybconnector|worker] Starting thread to monitor the tables (io.debezium.connector.yugabytedb.YugabyteDBTablePoller:58)
[2025-03-21 20:00:52,244] INFO SourceConnectorConfig values:
	config.action.reload = restart
	connector.class = io.debezium.connector.yugabytedb.YugabyteDBConnector
	errors.log.enable = false
	errors.log.include.messages = false
	errors.retry.delay.max.ms = 60000
	errors.retry.timeout = 0
	errors.tolerance = none
	exactly.once.support = requested
	header.converter = null
	key.converter = null
	name = ybconnector
	offsets.storage.topic = null
	predicates = []
	tasks.max = 1
	topic.creation.groups = []
	transaction.boundary = poll
	transaction.boundary.interval.ms = null
	transforms = [unwrap]
	value.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:370)
[2025-03-21 20:00:52,244] INFO [ybconnector|worker] Discovered tablet YB Master for table YB Master with partition ["", "") (org.yb.client.AsyncYBClient:2728)

Also I couldn’t find jar which has below connector class.

“connector.class”: “io.debezium.connector.yugabytedb.YugabyteDBConnector”

both postgres and GRPC connector classes getting timeout while creating connector with YBExtractNewRecordState transform

Postgres Connector Jar → https://github.com/yugabyte/debezium/releases/tag/dz.2.5.2.yb.2024.1.SNAPSHOT.1

GRPC connector Jar →
https://github.com/yugabyte/debezium-connector-yugabytedb/releases/tag/vdz.1.9.5.yb.grpc.2024.2.2

Hi @vivek
From the connector config shared above, it appears that you are trying to set the YBExtractNewRecordState transformer in the source YugabyteDB connector. This transformer should be set in the configs of your sink connector. For example, this is a sample config for JDBC sink connector:

  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
     "tasks.max": "1",
      "topics": "<topic-name>",
      "dialect.name": "PostgreSqlDatabaseDialect",
      "table.name.format": "<table-name>",
      "connection.url": "<connection-url>",
      "auto.create": "true",
      "auto.evolve":"true",
      "insert.mode": "upsert",
      "pk.fields": "<pk-column-name>",
      "pk.mode": "record_key",
      "delete.enabled":"true",
      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.connector.yugabytedb.transforms.YBExtractNewRecordState",
      "transforms.unwrap.drop.tombstones": "false"
    }