I have a configuration of this type:
- Oracle GoldenGate verisons 19.21.0.0.231017 (Extract Side) on Oracle DataBase Version 19.23.0.0.0
- Oracle GoldenGate for Big Data Version 21.9.0.0.3 (Replicat Side)
I try to create a cassandra connector and kafka-connect connector, but in all the cases i have this error:
The Oracle type DATE is converted in String.
This is the Cassandra Hanlder log:
Cassandra column [date_col] maps to GG column [DATE_COL] Cassandra data type [TEXT] GG index [3].
And this is the schema of Kafka message:
{
"type": "struct",
"fields": [
{ "type": "int64", "optional": true, "field": "ID" },
{ "type": "string", "optional": true, "field": "VARCHAR_COL" },
{ "type": "string", "optional": true, "field": "DATE_COL" },
{ "type": "double", "optional": true, "field": "DOUBLE_COL" },
{ "type": "int64", "optional": true, "field": "NUMBER_COL" }
],
"optional": true,
"name": "row",
"field": "before"
}
As we can see the DATE_COL is of type String.
This is the Oracle DDL of the table:
CREATE TABLE u_l0_dwh.test_table (
ID NUMBER(10) NOT NULL PRIMARY KEY,
VARCHAR_COL VARCHAR2(200),
DATE_COL DATE,
DOUBLE_COL NUMBER(8,2),
NUMBER_COL NUMBER (2)
);
With the Cassandra connector, I solved the issue by defining the table in Cassandra according to my needs, and now the data is replicating in Cassandra as a timestamp as desired.
With the kafka-connect connector I can't find a solution.
This are the configurations files for the kafka-connect replicat:
- KC.prm file:
REPLICAT kc
TARGETDB LIBFILE libggjava.so SET property=dirprm/kc.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 1000
MAP U_L0_DWH.TEST_TABLE, TARGET U_L0_DWH.TEST_TABLE;
- KC.prop file:
gg.handlerlist=kafkaconnect
#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties
gg.handler.kafkaconnect.mode=op
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkaconnect.topicMappingTemplate=oggtopic
#The following selects the message key using the concatenated primary keys
gg.handler.kafkaconnect.keyMappingTemplate=${null}
#The formatter properties
gg.handler.kafkaconnect.messageFormatting=op
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
gg.handler.kafkaconnect.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]}
gg.handler.kafkaconnect.pkUpdateHandling=delete-insert
#Regex properties
gg.schemareplaceregex=[$]
gg.schemareplacestring=_
#Apache Kafka Classpath
gg.classpath=/shared/Middleware/GGPREXT5/ggjava/lib/kafka_confluent_5.4.11/*
javawriter.bootoptions=-Xmx1G -Xms1G -Duser.timezone=UTC -Djava.class.path=.:ggjava/ggjava.jar:./dirprm
gg.log=log4j
gg.log.level=DEBUG
- kafkaconnect.properties file:
bootstrap.servers=isprex7.gruppoitas.local:49092
acks=1
#JSON Converter Settings
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
#Avro Converter Settings
#key.converter=io.confluent.connect.avro.AvroConverter
#value.converter=io.confluent.connect.avro.AvroConverter
#key.converter.schema.registry.url=http://localhost:8081
#value.converter.schema.registry.url=http://localhost:8081
#Protobuf Converter Settings (Added in Confluent 5.5.0)
#key.converter=io.confluent.connect.protobuf.ProtobufConverter
#value.converter=io.confluent.connect.protobuf.ProtobufConverter
#key.converter.schema.registry.url=http://localhost:8081
#value.converter.schema.registry.url=http://localhost:8081
#Adjust for performance
buffer.memory=33554432
batch.size=16384
linger.ms=0
converter.type=key
converter.type=value
converter.type=header