Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

DarioBalinzo/kafka-connect-elasticsearch-source

Open more actions menu

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kafka-connect-elasticsearch-source

YourActionName Actions Status

Kafka Connect Elasticsearch Source: fetch data from elastic-search and sends it to kafka. The connector fetches only new data using a strictly incremental / temporal field (like a timestamp or an incrementing id). It supports dynamic schema and nested objects/ arrays.

Requirements:

  • Elasticsearch 6.x and 7.x
  • Java >= 8
  • Maven

Output data serialization format:

The connector uses kafka-connect schema and structs, that are agnostic regarding the user serialization method (e.g. it might be Avro or json, etc...).

Bugs or new Ideas?

Installation:

Compile the project with:

mvn clean package -DskipTests

You can also compile and running both unit and integration tests (docker is mandatory) with:

mvn clean package

Copy the jar with dependencies from the target folder into connect classpath ( e.g /usr/share/java/kafka-connect-elasticsearch ) or set plugin.path parameter appropriately.

Example

Using kafka connect in distributed way, a sample config file to fetch my_awesome_index* indices and to produce output topics with es_ prefix:

{       
  "name": "elastic-source",
   "config": {
             "connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
             "tasks.max": "1",
             "es.host" : "localhost",
             "es.port" : "9200",
             "index.prefix" : "my_awesome_index",
             "topic.prefix" : "es_",
             "incrementing.field.name" : "@timestamp"
        }
}

To start the connector with curl:

curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors | jq

To check the status:

curl localhost:8083/connectors/elastic-source/status | jq

To stop the connector:

curl -X DELETE localhost:8083/connectors/elastic-source | jq

Documentation

Elasticsearch Configuration

es.host ElasticSearch host. Optionally it is possible to specify many hosts using ; as separator (host1;host2;host3)

  • Type: string
  • Importance: high
  • Dependents: index.prefix

es.port ElasticSearch port

  • Type: string
  • Importance: high
  • Dependents: index.prefix

es.scheme ElasticSearch scheme (http/https)

  • Type: string
  • Importance: medium
  • Default: http

es.user Elasticsearch username

  • Type: string
  • Default: null
  • Importance: high

es.password Elasticsearch password

  • Type: password
  • Default: null
  • Importance: high

incrementing.field.name The name of the strictly incrementing field to use to detect new records.

  • Type: any
  • Importance: high

incrementing.secondary.field.name In case the main incrementing field may have duplicates, this secondary field is used as a secondary sort field in order to avoid data losses when paginating (available starting from versions >= 1.4).

  • Type: any
  • Importance: low

es.tls.truststore.location Elastic ssl truststore location

  • Type: string
  • Importance: medium

es.tls.truststore.password Elastic ssl truststore password

  • Type: string
  • Default: ""
  • Importance: medium

es.tls.keystore.location Elasticsearch keystore location

  • Type: string
  • Importance: medium

es.tls.keystore.password Elasticsearch keystore password

  • Type: string
  • Default: ""
  • Importance: medium

connection.attempts Maximum number of attempts to retrieve a valid Elasticsearch connection.

  • Type: int
  • Default: 3
  • Importance: low

connection.backoff.ms Backoff time in milliseconds between connection attempts.

  • Type: long
  • Default: 10000
  • Importance: low

index.prefix Indices prefix to include in copying. Periodically, new indices are discovered if they match the pattern.

  • Type: string
  • Default: ""
  • Importance: medium

index.names List of elasticsearch indices: es1,es2,es3

  • Type: string
  • Default: null
  • Importance: medium

Connector Configuration

poll.interval.ms Frequency in ms to poll for new data in each index.

  • Type: int
  • Default: 5000
  • Importance: high

batch.max.rows Maximum number of documents to include in a single batch when polling for new data.

  • Type: int
  • Default: 10000
  • Importance: low

topic.prefix Prefix to prepend to index names to generate the name of the Kafka topic to publish data

  • Type: string
  • Importance: high

filters.whitelist Whitelist filter for extracting a subset of fields from elastic-search json documents. The whitelist filter supports nested fields. To provide multiple fields use ; as separator (e.g. customer;order.qty;order.price).

  • Type: string
  • Importance: medium
  • Default: null

filters.blacklist Blacklist filter for extracting a subset of fields from elastic-search json documents. The blacklist filter supports nested fields. To provide multiple fields use ; as separator (e.g. customer;order.qty;order.price).

  • Type: string
  • Importance: medium
  • Default: null

filters.json_cast This filter casts nested fields to json string, avoiding parsing recursively as kafka connect-schema. The json-cast filter supports nested fields. To provide multiple fields use ; as separator (e.g. customer;order.qty;order.price).

  • Type: string
  • Importance: medium
  • Default: null

fieldname_converter Configuring which field name converter should be used (allowed values: avro or nop). By default, the avro field name converter renames the json fields non respecting the avro specifications (https://avro.apache.org/docs/current/spec.html#names) in order to be serialized correctly. To disable the field name conversion set this parameter to nop.

  • Type: string
  • Importance: medium
  • Default: avro

About

Kafka Connect Elasticsearch Source

Topics

Resources

License

Stars

Watchers

Forks

Sponsor this project

Packages

No packages published

Contributors 5

Languages

Morty Proxy This is a proxified and sanitized view of the page, visit original site.