> ## Documentation Index
> Fetch the complete documentation index at: https://private-7c7dfe99-fix-nav-issues.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

> ClickPipes allows you to connect ClickHouse to DynamoDB.

# CDC from DynamoDB to ClickHouse

export const Image = ({img, alt, size}) => {
  return <Frame>
      <img src={img} alt={alt} />
    </Frame>;
};

export const CloudNotSupportedBadge = () => {
  return <div className="cloudNotSupportedBadge">
            <div className="cloudNotSupportedIcon">
            <svg width="16" height="16" viewBox="0 0 16 16" fill="none" xmlns="http://www.w3.org/2000/svg">
                <path strokeWidth="1.5" d="M6.33366 12.6666L12.3739 12.6667C13.6593 12.6667 14.7073 11.6187 14.7073 10.3334C14.7073 9.04804 13.6593 8.00003 12.3739 8.00003C12.3739 8.00003 12.3337 7.66659 12.0003 7.33325M10.667 5.33322C8.00033 2.33325 4.45395 4.78537 4.14195 6.68203C2.55728 6.7627 1.29395 8.06203 1.29395 9.6667C1.29395 11.3234 2.66699 12.6666 4.00033 12.6666" stroke="currentColor" strokeLinecap="round" strokeLinejoin="round" />
                <path strokeWidth="1.5" d="M2.66699 14L12.0003 4.66663" stroke="currentColor" strokeLinecap="round" strokeLinejoin="round" />
            </svg>

        </div>
            Not supported in ClickHouse Cloud
        </div>;
};

This page covers how set up CDC from DynamoDB to ClickHouse using ClickPipes. There are 2 components to this integration:

1. The initial snapshot via S3 ClickPipes
2. Real-time updates via Kinesis ClickPipes

Data will be ingested into a `ReplacingMergeTree`. This table engine is commonly used for CDC scenarios to allow update operations to be applied. More on this pattern can be found in the following blog articles:

* [Change Data Capture (CDC) with PostgreSQL and ClickHouse - Part 1](https://clickhouse.com/blog/clickhouse-postgresql-change-data-capture-cdc-part-1?loc=docs-rockest-migrations)
* [Change Data Capture (CDC) with PostgreSQL and ClickHouse - Part 2](https://clickhouse.com/blog/clickhouse-postgresql-change-data-capture-cdc-part-2?loc=docs-rockest-migrations)

<h2 id="1-set-up-kinesis-stream">
  1. Set up Kinesis stream
</h2>

First, you will want to enable a Kinesis stream on your DynamoDB table to capture changes in real-time. We want to do this before we create the snapshot to avoid missing any data.
Find the AWS guide located [here](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html).

<Image img="https://mintcdn.com/private-7c7dfe99-fix-nav-issues/8xU-7NRzcVe16bmG/images/integrations/data-ingestion/dbms/dynamodb/dynamodb-kinesis-stream.png?fit=max&auto=format&n=8xU-7NRzcVe16bmG&q=85&s=584d44ecd96d62005dc815f1c183504b" size="lg" alt="DynamoDB Kinesis Stream" border width="1238" height="215" data-path="images/integrations/data-ingestion/dbms/dynamodb/dynamodb-kinesis-stream.png" />

<h2 id="2-create-the-snapshot">
  2. Create the snapshot
</h2>

Next, we will create a snapshot of the DynamoDB table. This can be achieved through an AWS export to S3. Find the AWS guide located [here](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataExport.HowItWorks.html).
**You will want to do a "Full export" in the DynamoDB JSON format.**

<Image img="https://mintcdn.com/private-7c7dfe99-fix-nav-issues/Pn4j-damTUufNfz2/images/integrations/data-ingestion/dbms/dynamodb/dynamodb-s3-export.png?fit=max&auto=format&n=Pn4j-damTUufNfz2&q=85&s=93a0e1c02a09f5a223023187ecf0dc6d" size="md" alt="DynamoDB S3 Export" border width="686" height="929" data-path="images/integrations/data-ingestion/dbms/dynamodb/dynamodb-s3-export.png" />

<h2 id="3-load-the-snapshot-into-clickhouse">
  3. Load the snapshot into ClickHouse
</h2>

<h3 id="create-necessary-tables">
  Create necessary tables
</h3>

The snapshot data from DynamoDB will look something this:

```json theme={null}
{
  "age": {
    "N": "26"
  },
  "first_name": {
    "S": "sally"
  },
  "id": {
    "S": "0A556908-F72B-4BE6-9048-9E60715358D4"
  }
}
```

Observe that the data is in a nested format. We will need to flatten this data before loading it into ClickHouse. This can be done using the `JSONExtract` function in ClickHouse in a materialized view.

We will want to create three tables:

1. A table to store the raw data from DynamoDB
2. A table to store the final flattened data (destination table)
3. A materialized view to flatten the data

For the example DynamoDB data above, the ClickHouse tables would look like this:

```sql theme={null}
/* Snapshot table */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
    `item` String
)
ORDER BY tuple();

/* Table for final flattened data */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
    JSONExtractString(item, 'id', 'S') AS id,
    JSONExtractInt(item, 'age', 'N') AS age,
    JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* Table for final flattened data */
CREATE TABLE IF NOT EXISTS "default"."destination" (
    "id" String,
    "first_name" String,
    "age" Int8,
    "version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;
```

There are a few requirements for the destination table:

* This table must be a `ReplacingMergeTree` table
* The table must have a `version` column
  * In later steps, we will be mapping the `ApproximateCreationDateTime` field from the Kinesis stream to the `version` column.
* The table should use the partition key as the sorting key (specified by `ORDER BY`)
  * Rows with the same sorting key will be deduplicated based on the `version` column.

<h3 id="create-the-snapshot-clickpipe">
  Create the snapshot ClickPipe
</h3>

Now you can create a ClickPipe to load the snapshot data from S3 into ClickHouse. Follow the S3 ClickPipe guide [here](/integrations/clickpipes/object-storage/amazon-s3/overview), but use the following settings:

* **Ingest path**: You will need to locate the path of the exported json files in S3. The path will look something like this:

```text theme={null}
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
```

* **Format**: JSONEachRow
* **Table**: Your snapshot table (e.g. `default.snapshot` in example above)

Once created, data will begin populating in the snapshot and destination tables. You don't need to wait for the snapshot load to finish before moving on to the next step.

<h2 id="4-create-the-kinesis-clickpipe">
  4. Create the Kinesis ClickPipe
</h2>

Now we can set up the Kinesis ClickPipe to capture real-time changes from the Kinesis stream. Follow the Kinesis ClickPipe guide [here](/integrations/clickpipes/kinesis/overview), but use the following settings:

* **Stream**: The Kinesis stream used in step 1
* **Table**: Your destination table (e.g. `default.destination` in example above)
* **Flatten object**: true
* **Column mappings**:
  * `ApproximateCreationDateTime`: `version`
  * Map other fields to the appropriate destination columns as shown below

<Image img="https://mintcdn.com/private-7c7dfe99-fix-nav-issues/Pn4j-damTUufNfz2/images/integrations/data-ingestion/dbms/dynamodb/dynamodb-map-columns.png?fit=max&auto=format&n=Pn4j-damTUufNfz2&q=85&s=8d9cfb6653b30fc7440cabd4019538c4" size="md" alt="DynamoDB Map Columns" border width="1784" height="1630" data-path="images/integrations/data-ingestion/dbms/dynamodb/dynamodb-map-columns.png" />

<h2 id="5-cleanup-optional">
  5. Cleanup (optional)
</h2>

Once the snapshot ClickPipe has finished, you can delete the snapshot table and materialized view.

```sql theme={null}
DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";
```
