How to keep Elasticsearch synchronized with a relational database using Logstash and JDBC

Nowadays, search is one of the primary functionalities needed in every application; it can be fulfilled by Elasticsearch, which also has many other extra features.

How to keep Elasticsearch synchronized with a relational database using Logstash and JDBC
Sync Elasticsearch with DB using Logstash

To get powerful search capabilities, sometimes we deploy Elasticsearch alongside existing relational databases. So, it is necessary to keep Elasticsearch synchronized with data that is stored in associated database. In this blog post, we will explore how to keep Elasticsearch synchronized with relational database using Logstash. 

Logstash helps us to copy the records efficiently and to synchronize updates from a relational database into Elasticsearch.

Logstash

Logstash is a data pipeline that helps collect, parse, and analyze a large variety of structured and unstructured data and events generated across various systems. It provided plugins to connect to various types of input sources and platforms, and is designed to efficiently process logs, events, and unstructured data sources for distribution into a variety of outputs with the use of its output plugins, namely file, stdout (as output on console running Logstash), or Elasticsearch.

In order to get started, we need to install few things which we'll see next and have some configuration ready.

Requirements

Make sure you have installed one relational database (PostgreSQL/MySQL/SQLServer etc). I have used PostgreSQL 13 at the time of writing this blog post. As we need to work with Elasticsearch and Logstash, please install them from its official site. For Elasticsearch I have used Open Distro for Elasticsearch. As Elasticsearch itself is written in Java, make sure you have installed Java. For the database that you have, try to download respective JDBC Connector/Driver. For installation instructions, head over to the respective websites.

Start Elasticsearch and make sure it is up and running at port 9200.

Behind the scenes - synchronization steps

We are going to use Logstash with JDBC input plugin to keep Elasticsearch synchronized with relational database. JDBC input plugin runs a loop that periodically register or polls database for records that were inserted or modified since the last iteration of this loop. The following steps are to be taken care for this to work.

  1. As database records are written into Elasticsearch, there must be some kind of mapping. So, Elasticsearch's "_id" field must be set to the "id" field from database (PostgreSQL/MySQL). This provides direct mapping between the PostgreSQL record and Elasticsearch document. If a particular record is updated in PostgreSQL, then the entire associated document in Elasticsearch will be overwritten. Here overwritten means deleting that document (old) and creating new document and indexing it in Elasticsearch. There might be chances your existing table may not have unique id field, we will discuss later what to do in these scenarios. 
  2. The database table might have millions of records, if any record is updated or inserted we don't want JDBC input plugin to loop through whole set of records again. If we could have field that contains time, we can make use of it to get the last modified/inserted time. We can let Logstash use this field to request only documents that have been modified or inserted since the last iteration of its polling loop. Each time Logstash polls PostgreSQL, it stores the insertion or update time of the last record that it has read from PostgreSQL. On its next iteration, Logstash can easily know to request records with an update or insertion time that is newer than the last record that was received in the previous iteration of the polling loop.

PostgreSQL setup

Create Database

CREATE DATABASE es_velocitybytes;

Create Table

CREATE TABLE es_vb_table (
    id bigserial primary key,
    name varchar(20) NOT NULL,
    vb_timestamp timestamp default now()
);

I think the above table would be sufficient to understand this concept. Let's actually try to understand what we have written above:

  • We have created database and table within the database.
  • The table have three columns id, name and vb_timestamp a timestamp field.
    • id is unique and is primary key in the table.
    • name is just a user defined data that will be stored in the record. To keep this blog post simple we only added one field, but more fields can be added. This is the field that we will modify to show that not only are newly inserted PostgreSQL records copied to Elasticsearch, but that updated records are also correctly propagated to Elasticsearch.
    • vb_timestamp is defined so that any insertion or modification of a record in PostgreSQL will cause its value to be set to the time of modification. This modified time allows us to get any records that have been modified since the last time Logstash requested documents from PostgreSQL.

Insert/Update some data into above table.

INSERT INTO es_vb_table (name) VALUES (<name>);

INSERT INTO es_vb_table(name) VALUES('velocitybytes');

UPDATE es_vb_table SET name = <name> WHERE id= <id>;

UPDATE es_vb_table SET name = 'bytesvelocity' WHERE id = 1;

Synchronization

Create a file named logstash.conf and add following code to it. This logstash pipeline implements the synchronization code that is described in the previous section:

input {
  jdbc {
    jdbc_driver_library => "/postgresql-42.2.19.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://localhost:5432/es_velocitybytes"
    jdbc_user => "postgres"
    jdbc_password => "your_password"
    jdbc_paging_enabled => true
    jdbc_validate_connection => true
    clean_run => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/1 * * * *"
    statement => "SELECT id, name, extract(epoch from vb_timestamp) AS unix_ts_in_secs FROM es_vb_table WHERE (extract(epoch from vb_timestamp) > :sql_last_value AND vb_timestamp < NOW()) ORDER BY vb_timestamp ASC"
    jdbc_fetch_size => "1000"
    jdbc_page_size => "100000"
    last_run_metadata_path => "/.logstash_jdbc_last_run"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
output {
  stdout { codec =>  "rubydebug"}
  elasticsearch {
      hosts => ["http://localhost:9200"]
      index => "rdbms_sync_idx"
      document_id => "%{[@metadata][_id]}"
  }
}

There are few important things to understand from the above pipeline:

  • tracking_column: This field specifies the field “unix_ts_in_secs” (described below) that is used for tracking the last document read by Logstash from PostgreSQL, and is stored on disk in .logstash_jdbc_last_run. This value will be used to determine the starting value for documents that Logstash will request in the next iteration of its polling loop. The value stored in .logstash_jdbc_last_run can be accessed in the SELECT statement as “:sql_last_value”.
  • unix_ts_in_secs: This is a field that is generated by the above SELECT statement, and which contains the “vb_timestamp” as a standard Unix timestamp (seconds since the epoch). This field is referenced by the “tracking_column” that we just discussed. A Unix timestamp is used for tracking progress rather than a normal timestamp, as a normal timestamp may cause errors due to the complexity of correctly converting back and forth between UMT and the local timezone.
  • sql_last_value: This is a built-in parameter that contains the starting point for the current iteration of Logstash’s polling loop, and it is referenced in the SELECT statement line of the above jdbc input configuration. This is set to the most recent value of “unix_ts_in_secs”, which is read from .logstash_jdbc_last_run. This is used as the starting point for documents to be returned by the PostgreSQL query that is executed in Logstash’s polling loop. Including this variable in the query guarantees that insertions or updates that have previously been propagated to Elasticsearch will not be re-sent to Elasticsearch.
  • schedule: This uses cron syntax to specify how often Logstash should poll PostgreSQL for changes. The specification of "*/1 * * * *" tells Logstash to contact PostgreSQL every 1 minute.
  • filter: In this section we simply copy the value of “id” from the PostgreSQL record into a metadata field called “_id”, which we will later reference in the output to ensure that each document is written into Elasticsearch with the correct “_id” value. Using a metadata field ensures that this temporary value does not cause a new field to be created. We also remove the “id”, “@version”, and “unix_ts_in_secs” fields from the document, as we do not wish for them to be written to Elasticsearch.
  • output: In this section we specify that each document should be written to Elasticsearch, and should be assigned an “_id” which is pulled from the metadata field that we created in the filter section. There is also a rubydebug output that can be enabled to help with debugging.

Testing our system

Make sure Elasticsearch is up and running at port 9200. Start logstash with the following command in terminal.

logstash -f /<path>/logstash.conf

Once it runs, an index with name "rdbms_sync_idx" is created in Elasticsearch and the documents would be available.

To test, we can perform a GET request to it as shown:

GET rdbms_sync_idx/_search

which returns something similar to the following response: @timestamp is not that important and is added by default by Logstash.

"hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "rdbms_sync_idx",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2021-05-18T13:04:27.436Z",
          "vb_timestamp" : "2019-05-18T12:58:56.000Z",
          "client_name" : "Velocity Bytes"
        }
      },
    ]
}

Deleted Records in Table

If a record is deleted in database table, then that deletion will not be propagated to Elasticsearch. To address this issue, the following approaches are considered.

  • PostgreSQL records could include an "is_deleted" field, which indicates that they are no longer valid. This is known as a "soft delete". As with any other update to a record in PostgreSQL, the "is_deleted" field will be propagated to Elasticsearch through Logstash. If this approach is implemented, then Elasticsearch and PostgreSQL queries would need to be written so as to exclude records/documents where "is_deleted" is true. Eventually, background jobs can remove such documents from both PostgreSQL and Elastic.
  • Another alternative is to ensure that any system that is responsible for deletion of records in PostgreSQL must also subsequently execute a command to directly delete the corresponding documents from Elasticsearch.

Tables with no unique columns

There might be situations where your table doesn't have unique columns, in those cases combination of multiple fields and generating a unique key could help. If you want to delete some documents based on few conditions, the delete action could help you. The following pipeline implements such scenario. This is just an example.

input {
  jdbc {
    jdbc_driver_library => "<path>/postgresql-42.2.19.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://localhost:5432/es_velocitybytes"
    jdbc_user => "postgres"
    jdbc_password => "your_password"
    jdbc_paging_enabled => true
    jdbc_validate_connection => true
    clean_run => true
    tracking_column => unix_ts_in_secs
    use_column_value => true
    tracking_column_type => "numeric"
    tags => "to_index"
    schedule => "*/1 * * * *"
    statement => "SELECT a_id, b_id, c_id, name, extract(epoch from vb_timestamp) AS unix_ts_in_secs FROM es_vb_table WHERE c_id = 99999 AND (extract(epoch from vb_timestamp) > :sql_last_value AND o_timestamp < NOW()) ORDER BY vb_timestamp ASC"
	jdbc_fetch_size => "5000"
	jdbc_page_size => "100000"
	last_run_metadata_path => "<path>/.logstash_jdbc_last_run"
  }
  jdbc {
    jdbc_driver_library => "<path>/postgresql-42.2.19.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://localhost:5432/es_velocitybytes"
    jdbc_user => "postgres"
    jdbc_password => "your_password*"
    jdbc_paging_enabled => true
    jdbc_validate_connection => true
    clean_run => true
    tracking_column => unix_ts_in_secs
    use_column_value => true
    tracking_column_type => "numeric"
    tags => "to_delete"
    schedule => "*/2 * * * *"
    statement => "SELECT a_id, b_id, c_id, name, extract(epoch from vb_timestamp) AS unix_ts_in_secs FROM es_vb_table WHERE concat(a_id, b_id) NOT IN (SELECT concat(a_id, b_id) FROM es_vb_table WHERE c_id = 99999) AND extract(epoch from vb_timestamp) > :sql_last_value"
	jdbc_page_size => "100"
	last_run_metadata_path => "<path>/.logstash_jdbc_last_run"
  }
}
filter {
  if "to_index" in [tags] {
	mutate {
		remove_field => ["unix_ts_in_secs", "@version"]
	}
  }
  if "to_delete" in [tags] {
	mutate {
		add_field => {
		   "[@metadata][elasticsearch_action]" => "delete"
		}
		remove_field => ["unix_ts_in_secs", "@version"]
  }
  }
  fingerprint {
	source => ["a_id", "b_id"]
	target => "[@metadata][fingerprint]"
    method => "SHA1"
	key => "Log analytics"
	base64encode => true
	concatenate_sources => true
  }
}
output {
  if "to_index" in [tags] {
	elasticsearch {
	  hosts => ["http://localhost:9200"]
	  index => "rdbms_sync_idx"
	  document_id => "%{[@metadata][fingerprint]}"
	}
  }
  if "to_delete" in [tags] {
	elasticsearch {
	  hosts => ["http://localhost:9200"]
	  index => "rdbms_sync_idx"
	  document_id => "%{[@metadata][fingerprint]}"
	  action => "%{[@metadata][elasticsearch_action]}"
	}
  }
  stdout { codec =>  "rubydebug"}
}