Sink Configuration Options
Reference of the sink connector options.
Connection
couchbase.seed.nodes
Addresses of Couchbase Server nodes, delimited by commas.
If a custom port is specified, it must be the KV port (which is normally 11210 for insecure connections, or 11207 for secure connections).
-
Type: list
-
Importance: high
couchbase.password
Password of the Couchbase user.
May be overridden with the KAFKA_COUCHBASE_PASSWORD environment variable.
-
Type: password
-
Importance: high
couchbase.bucket
Name of the Couchbase bucket to use.
This property is required unless using the experimental AnalyticsSinkHandler.
-
Type: string
-
Default:
""
-
Importance: high
couchbase.network
The network selection strategy for connecting to a Couchbase Server cluster that advertises alternate addresses.
A Couchbase node running inside a container environment (like Docker or Kubernetes) might be configured to advertise both its address within the container environment (known as its "default" address) as well as an "external" address for use by clients connecting from outside the environment.
Setting the 'couchbase.network' config property to 'default' or 'external' forces the selection of the respective addresses. Setting the value to 'auto' tells the connector to select whichever network contains the addresses specified in the 'couchbase.seed.nodes' config property.
-
Type: string
-
Default:
auto
-
Importance: medium
couchbase.bootstrap.timeout
On startup, the connector will wait this long for a Couchbase connection to be established. If a connection is not established before the timeout expires, the connector will terminate.
-
Type: string
-
Default:
30s
-
Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m
-
Importance: medium
Security
couchbase.enable.tls
Use secure connection to Couchbase Server.
If true, you must also tell the connector which certificate to trust. Specify a certificate file with 'couchbase.trust.certificate.path', or a Java keystore file with 'couchbase.trust.store.path' and 'couchbase.trust.store.password'.
-
Type: boolean
-
Default:
false
-
Importance: medium
-
Dependents:
,couchbase.trust.certificate.path
,couchbase.trust.store.path
,couchbase.trust.store.password
,couchbase.enable.hostname.verification
,couchbase.client.certificate.path
couchbase.client.certificate.password
couchbase.enable.hostname.verification
Set this to false
to disable TLS hostname verification for Couchbase connections.
Less secure, but might be required if for some reason you can’t issue a certificate whose Subject Alternative Names match the hostname used to connect to the server.
Only disable if you understand the impact and can accept the risks.
-
Type: boolean
-
Default:
true
-
Importance: medium
couchbase.trust.store.path
Absolute filesystem path to the Java keystore holding the CA certificate used by Couchbase Server.
If you want to use a PEM file instead of a Java keystore, specify couchbase.trust.certificate.path
instead.
-
Type: string
-
Default:
""
-
Importance: medium
couchbase.trust.store.password
Password for accessing the trust store.
May be overridden with the KAFKA_COUCHBASE_TRUST_STORE_PASSWORD environment variable.
-
Type: password
-
Default:
[hidden]
-
Importance: medium
couchbase.trust.certificate.path
Absolute filesystem path to the PEM file containing the CA certificate used by Couchbase Server.
If you want to use a Java keystore instead of a PEM file, specify couchbase.trust.store.path
instead.
-
Type: string
-
Default:
""
-
Importance: medium
couchbase.client.certificate.path
Absolute filesystem path to a Java keystore or PKCS12 bundle holding the private key and certificate chain to use for client certificate authentication (mutual TLS).
If you supply a value for this config property, the couchbase.username
and couchbase.password
properties will be ignored.
-
Type: string
-
Default:
""
-
Importance: medium
Logging
couchbase.log.redaction
Determines which kinds of sensitive log messages from the Couchbase connector will be tagged for later redaction by the Couchbase log redaction tool. NONE = no tagging; PARTIAL = user data is tagged; FULL = user, meta, and system data is tagged.
-
Type: string
-
Default:
NONE
-
Valid Values: One of [NONE, PARTIAL, FULL]
-
Importance: medium
couchbase.log.document.lifecycle
If true, document lifecycle milestones will be logged at INFO level instead of DEBUG. Enabling this feature lets you watch documents flow through the connector. Disabled by default because it generates many log messages.
-
Type: boolean
-
Default:
false
-
Importance: medium
couchbase.metrics.interval
The connector writes metrics to the log at this interval.
Disable metric logging by setting this to 0
.
UNCOMMITTED; this feature may change in a patch release without notice.
-
Since: 4.2.3
-
Type: string
-
Default:
10m
-
Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m
-
Importance: medium
Sink Behavior
couchbase.default.collection
Qualified name (scope.collection or bucket.scope.collection) of the destination collection for messages from topics that don’t have an entry in the couchbase.topic.to.collection
map.
If the bucket component contains a dot, escape it by enclosing it in backticks.
If the bucket component is omitted, it defaults to the value of the couchbase.bucket
property.
-
Type: string
-
Default:
_default._default
-
Valid Values: A qualified collection name like 'scope.collection' or 'bucket.scope.collection'. If the bucket component contains a dot, escape it by enclosing it in backticks.
-
Importance: medium
couchbase.topic.to.collection
A map from Kafka topic to Couchbase collection.
Topic and collection are joined by an equals sign. Map entries are delimited by commas.
A collection name is of the form bucket.scope.collection
or scope.collection
.
If the bucket component is omitted, it defaults to the value of the couchbase.bucket
property.
If the bucket component contains a dot, escape it by enclosing it in backticks.
For example, if you want to write messages from topic "topic1" to collection "scope-a.invoices" in the default bucket, and messages from topic "topic2" to collection "scope-a.widgets" in bucket "other-bucket" you would write: "topic1=scope-a.invoices,topic2=other-bucket.scope-a.widgets".
Defaults to an empty map, with all documents going to the collection specified by couchbase.default.collection
.
-
Type: list
-
Default:
""
-
Valid Values: topic1=scope.collection,topic2=other-bucket.scope.collection,… If a bucket component contains a dot, escape it by enclosing it in backticks.
-
Importance: medium
couchbase.topic.to.document.id
A map of per-topic overrides for the couchbase.document.id
configuration property.
Topic and document ID format are joined by an equals sign. Map entries are delimited by commas.
For example, if documents from topic "topic1" should be assigned document IDs that match their "id" field, and documents from topic "topic2" should be assigned documents IDs that match their "identifier" field, you would write: "topic1=${/id},topic2=${/identifier}".
Defaults to an empty map, which means the value of the couchbase.document.id
configuration property is applied to documents from all topics.
UNCOMMITTED; this feature may change in a patch release without notice.
-
Since: 4.2.4
-
Type: list
-
Default:
""
-
Valid Values: topic1=${/id},topic2=${/some/other/path},…
-
Importance: medium
couchbase.sink.handler
The fully-qualified class name of the sink handler to use. The sink handler determines how the Kafka record is translated into actions on Couchbase documents.
The built-in handlers are: com.couchbase.connect.kafka.handler.sink.UpsertSinkHandler
, com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler
, and com.couchbase.connect.kafka.handler.sink.SubDocumentSinkHandler
.
You can customize the sink connector’s behavior by implementing your own SinkHandler.
-
Type: class
-
Default:
com.couchbase.connect.kafka.handler.sink.UpsertSinkHandler
-
Importance: medium
couchbase.document.mode
Overrides the couchbase.sink.handler
property.
A value of N1QL
forces the handler to com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler
.
A value of SUBDOCUMENT
forces the handler to com.couchbase.connect.kafka.handler.sink.SubDocumentSinkHandler
.
DEPRECATED. Please set the couchbase.sink.handler property instead.
|
-
Type: string
-
Default:
DOCUMENT
-
Valid Values: One of [DOCUMENT, SUBDOCUMENT, N1QL]
-
Importance: medium
couchbase.document.id
Format string to use for the Couchbase document ID (overriding the message key). May refer to document fields via placeholders like ${/path/to/field}
-
Type: string
-
Default:
""
-
Importance: medium
couchbase.remove.document.id
Whether to remove the ID identified by 'couchbase.documentId' from the document before storing in Couchbase.
-
Type: boolean
-
Default:
false
-
Importance: medium
couchbase.document.expiration
Document expiration time specified as an integer followed by a time unit (s = seconds, m = minutes, h = hours, d = days). For example, to have documents expire after 30 minutes, set this value to "30m".
A value of "0" (the default) means documents never expire.
-
Type: string
-
Default:
0
-
Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m
-
Importance: medium
couchbase.retry.timeout
Retry failed writes to Couchbase until this deadline is reached. If time runs out, the connector terminates.
A value of 0
(the default) means the connector will terminate immediately when a write fails.
This retry timeout is distinct from the KV timeout (which you can set via couchbase.env.* ).
The KV timeout affects an individual write attempt, while the retry timeout spans multiple attempts and makes the connector resilient to more kinds of transient failures.
|
Try not to confuse this with the Kafka Connect framework’s built-in errors.retry.timeout config property, which applies only to failures occurring before the framework delivers the record to the Couchbase connector.
|
-
Since: 4.1.4
-
Type: string
-
Default:
0
-
Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m
-
Importance: medium
Durability
couchbase.durability
The preferred way to specify an enhanced durability requirement when using Couchbase Server 6.5 or later.
The default value of NONE
means a write is considered successful as soon as it reaches the memory of the active node.
If you set this to anything other than NONE , then you must not set couchbase.persist.to or couchbase.replicate.to .
|
-
Type: string
-
Default:
NONE
-
Valid Values: One of [NONE, MAJORITY, MAJORITY_AND_PERSIST_TO_ACTIVE, PERSIST_TO_MAJORITY]
-
Importance: medium
couchbase.persist.to
For Couchbase Server versions prior to 6.5, this is how you require the connector to verify a write is persisted to disk on a certain number of replicas before considering the write successful.
If you’re using Couchbase Server 6.5 or later, we recommend using the couchbase.durability
property instead.
-
Type: string
-
Default:
NONE
-
Valid Values: One of [NONE, ACTIVE, ONE, TWO, THREE, FOUR]
-
Importance: medium
couchbase.replicate.to
For Couchbase Server versions prior to 6.5, this is how you require the connector to verify a write has reached the memory of a certain number of replicas before considering the write successful.
If you’re using Couchbase Server 6.5 or later, we recommend using the couchbase.durability
property instead.
-
Type: string
-
Default:
NONE
-
Valid Values: One of [NONE, ONE, TWO, THREE]
-
Importance: medium
N1ql Sink Handler
couchbase.n1ql.operation
The type of update to use when couchbase.sink.handler
is set to com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler
.
This property is specific to N1qlSinkHandler
.
-
Type: string
-
Default:
UPDATE
-
Valid Values: One of [UPDATE, UPDATE_WHERE]
-
Importance: medium
couchbase.n1ql.where.fields
When using the UPDATE_WHERE operation, this is the list of document fields that must match the Kafka message in order for the document to be updated with the remaining message fields. To match against a literal value instead of a message field, use a colon to delimit the document field name and the target value. For example, "type:widget,color" matches documents whose 'type' field is 'widget' and whose 'color' field matches the 'color' field of the Kafka message.
This property is specific to N1qlSinkHandler
.
-
Type: list
-
Default:
""
-
Importance: medium
Analytics Sink Handler
couchbase.analytics.max.records.in.batch
Every Batch consists of an UPSERT or a DELETE statement, based on mutations. This property determines the maximum number of records in the UPSERT or DELETE statement in the batch. Users can configure this parameter based on the capacity of their analytics cluster.
This property is specific to AnalyticsSinkHandler
.
UNCOMMITTED; this feature may change in a patch release without notice.
-
Since: 4.1.14
-
Type: int
-
Default:
100
-
Importance: medium
couchbase.analytics.max.size.in.batch
Every Batch consists of an UPSERT or a DELETE statement, based on mutations. This property defines the max size of all docs in bytes in an UPSERT statement in a batch. Users can configure this parameter based on the capacity of their analytics cluster.
This property is specific to AnalyticsSinkHandler
.
UNCOMMITTED; this feature may change in a patch release without notice.
-
Since: 4.2.0
-
Type: string
-
Default:
5m
-
Valid Values: An integer followed by a size unit (b = bytes, k = kilobytes, m = megabytes, g = gigabytes). For example, to specify 64 megabytes: 64m
-
Importance: medium
couchbase.analytics.query.timeout
This property determines the time period after which client cancels the Query request for Analytics.
This property is specific to AnalyticsSinkHandler
.
UNCOMMITTED; this feature may change in a patch release without notice.
-
Since: 4.2.0
-
Type: string
-
Default:
5m
-
Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m
-
Importance: medium
Sub Document Sink Handler
couchbase.subdocument.path
JSON Pointer to the property of the Kafka message whose value is the subdocument path to use when modifying the Couchbase document.
This property is specific to SubDocumentSinkHandler
.
-
Type: string
-
Default:
""
-
Importance: medium
couchbase.subdocument.operation
Setting to indicate the type of update to a sub-document.
This property is specific to SubDocumentSinkHandler
.
-
Type: string
-
Default:
UPSERT
-
Valid Values: One of [UPSERT, ARRAY_PREPEND, ARRAY_APPEND]
-
Importance: medium
Couchbase Java SDK Settings
couchbase.env.*
Any system property recognized by the Couchbase Java SDK may be specified in the Sink connector config if you omit the com.
prefix from the system property name.
For example, the Couchbase Java SDK recognizes the system property com.couchbase.env.timeout.kvTimeout
.
To specify this setting in the connector config, use the property name couchbase.env.timeout.kvTimeout
.
If you’re thinking about increasing the KV timeout to handle transient error conditions, consider using the connector’s couchbase.retry.timeout config property instead.
The retry timeout is a more robust way to handle all kinds of write failures.
|
For a list of recognized properties, see Java SDK Client Settings.
UNCOMMITTED; this feature may change in a patch release without notice.
-
Since: 4.0.6
Parent topic: Kafka Connector
Previous topic: Source Configuration Options
Next topic: Couchbase Sample with Kafka Streams