> ## Documentation Index
> Fetch the complete documentation index at: https://private-7c7dfe99-fix-nav-issues.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

> ClickPipes позволяет подключать DynamoDB к ClickHouse.

# CDC (фиксация изменений данных) из DynamoDB в ClickHouse

export const Image = ({img, alt, size}) => {
  return <Frame>
      <img src={img} alt={alt} />
    </Frame>;
};

export const CloudNotSupportedBadge = () => {
  return <div className="cloudNotSupportedBadge">
            <div className="cloudNotSupportedIcon">
            <svg width="16" height="16" viewBox="0 0 16 16" fill="none" xmlns="http://www.w3.org/2000/svg">
                <path strokeWidth="1.5" d="M6.33366 12.6666L12.3739 12.6667C13.6593 12.6667 14.7073 11.6187 14.7073 10.3334C14.7073 9.04804 13.6593 8.00003 12.3739 8.00003C12.3739 8.00003 12.3337 7.66659 12.0003 7.33325M10.667 5.33322C8.00033 2.33325 4.45395 4.78537 4.14195 6.68203C2.55728 6.7627 1.29395 8.06203 1.29395 9.6667C1.29395 11.3234 2.66699 12.6666 4.00033 12.6666" stroke="currentColor" strokeLinecap="round" strokeLinejoin="round" />
                <path strokeWidth="1.5" d="M2.66699 14L12.0003 4.66663" stroke="currentColor" strokeLinecap="round" strokeLinejoin="round" />
            </svg>

        </div>
            Not supported in ClickHouse Cloud
        </div>;
};

На этой странице описано, как настроить CDC (фиксация изменений данных) из DynamoDB в ClickHouse с помощью ClickPipes. Эта интеграция состоит из 2 компонентов:

1. Начальный снимок через S3 ClickPipes
2. Обновления в реальном времени через Kinesis ClickPipes

Данные будут поступать в таблицу с движком `ReplacingMergeTree`. Этот движок таблицы обычно используется в сценариях CDC (фиксация изменений данных), так как позволяет применять операции обновления. Подробнее об этом шаблоне можно прочитать в следующих статьях блога:

* [CDC (фиксация изменений данных) с PostgreSQL и ClickHouse — Часть 1](https://clickhouse.com/blog/clickhouse-postgresql-change-data-capture-cdc-part-1?loc=docs-rockest-migrations)
* [CDC (фиксация изменений данных) с PostgreSQL и ClickHouse — Часть 2](https://clickhouse.com/blog/clickhouse-postgresql-change-data-capture-cdc-part-2?loc=docs-rockest-migrations)

<div id="1-set-up-kinesis-stream">
  ## 1. Настройте поток Kinesis
</div>

Сначала включите поток Kinesis для таблицы DynamoDB, чтобы отслеживать изменения в реальном времени. Это нужно сделать до создания снимка, чтобы не потерять данные.
Руководство AWS доступно [здесь](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html).

<Image img="https://mintcdn.com/private-7c7dfe99-fix-nav-issues/8xU-7NRzcVe16bmG/images/integrations/data-ingestion/dbms/dynamodb/dynamodb-kinesis-stream.png?fit=max&auto=format&n=8xU-7NRzcVe16bmG&q=85&s=584d44ecd96d62005dc815f1c183504b" size="lg" alt="Поток Kinesis для DynamoDB" border width="1238" height="215" data-path="images/integrations/data-ingestion/dbms/dynamodb/dynamodb-kinesis-stream.png" />

<div id="2-create-the-snapshot">
  ## 2. Создайте снимок
</div>

Далее мы создадим снимок таблицы DynamoDB. Это можно сделать, экспортировав данные из AWS в S3. Руководство AWS доступно [здесь](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataExport.HowItWorks.html).
**Выберите "Full export" в формате DynamoDB JSON.**

<Image img="https://mintcdn.com/private-7c7dfe99-fix-nav-issues/Pn4j-damTUufNfz2/images/integrations/data-ingestion/dbms/dynamodb/dynamodb-s3-export.png?fit=max&auto=format&n=Pn4j-damTUufNfz2&q=85&s=93a0e1c02a09f5a223023187ecf0dc6d" size="md" alt="Экспорт DynamoDB в S3" border width="686" height="929" data-path="images/integrations/data-ingestion/dbms/dynamodb/dynamodb-s3-export.png" />

<div id="3-load-the-snapshot-into-clickhouse">
  ## 3. Загрузите снимок в ClickHouse
</div>

<div id="create-necessary-tables">
  ### Создайте необходимые таблицы
</div>

Данные из снимка DynamoDB будут выглядеть примерно так:

```json theme={null}
{
  "age": {
    "N": "26"
  },
  "first_name": {
    "S": "sally"
  },
  "id": {
    "S": "0A556908-F72B-4BE6-9048-9E60715358D4"
  }
}
```

Обратите внимание, что данные представлены во вложенном формате. Перед загрузкой в ClickHouse их нужно преобразовать в плоскую структуру. Это можно сделать в ClickHouse с помощью функции `JSONExtract` в materialized view.

Нам нужно создать три таблицы:

1. Таблицу для хранения исходных данных из DynamoDB
2. Таблицу для хранения итоговых данных в плоской структуре (целевая таблица)
3. materialized view для преобразования данных в плоскую структуру

Для приведенного выше примера данных DynamoDB таблицы ClickHouse будут выглядеть так:

```sql theme={null}
/* Таблица снимка */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
    `item` String
)
ORDER BY tuple();

/* Таблица для итоговых преобразованных данных */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
    JSONExtractString(item, 'id', 'S') AS id,
    JSONExtractInt(item, 'age', 'N') AS age,
    JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* Таблица для итоговых преобразованных данных */
CREATE TABLE IF NOT EXISTS "default"."destination" (
    "id" String,
    "first_name" String,
    "age" Int8,
    "version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;
```

К целевой таблице предъявляется несколько требований:

* Эта таблица должна использовать движок `ReplacingMergeTree`
* В таблице должен быть столбец `version`
  * На следующих шагах мы сопоставим поле `ApproximateCreationDateTime` из потока Kinesis со столбцом `version`.
* В качестве ключа сортировки таблица должна использовать ключ партиционирования (задаётся через `ORDER BY`)
  * Для строк с одинаковым ключом сортировки будет выполняться дедупликация по столбцу `version`.

<div id="create-the-snapshot-clickpipe">
  ### Создайте ClickPipe для загрузки снимка
</div>

Теперь вы можете создать ClickPipe, чтобы загрузить данные снимка из S3 в ClickHouse. Следуйте [руководству по S3 ClickPipe](/ru/integrations/clickpipes/object-storage/amazon-s3/overview), но используйте следующие настройки:

* **Путь приёма**: Вам нужно найти путь к экспортированным JSON‑файлам в S3. Он будет выглядеть примерно так:

```text theme={null}
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
```

* **Формат**: JSONEachRow
* **Таблица**: Ваша таблица снимка (например, `default.snapshot` в примере выше)

После создания данные начнут поступать в таблицу снимка и целевую таблицу. Не нужно ждать завершения загрузки снимка, прежде чем переходить к следующему шагу.

<div id="4-create-the-kinesis-clickpipe">
  ## 4. Создайте Kinesis ClickPipe
</div>

Теперь можно настроить Kinesis ClickPipe, чтобы отслеживать изменения из потока Kinesis в реальном времени. Следуйте руководству по Kinesis ClickPipe [здесь](/ru/integrations/clickpipes/kinesis/overview), но используйте следующие настройки:

* **Поток**: поток Kinesis из шага 1
* **Таблица**: ваша целевая таблица (например, `default.destination` в примере выше)
* **Развернуть объект**: true
* **Сопоставления столбцов**:
  * `ApproximateCreationDateTime`: `version`
  * Сопоставьте остальные поля с соответствующими столбцами целевой таблицы, как показано ниже

<Image img="https://mintcdn.com/private-7c7dfe99-fix-nav-issues/Pn4j-damTUufNfz2/images/integrations/data-ingestion/dbms/dynamodb/dynamodb-map-columns.png?fit=max&auto=format&n=Pn4j-damTUufNfz2&q=85&s=8d9cfb6653b30fc7440cabd4019538c4" size="md" alt="Сопоставление столбцов DynamoDB" border width="1784" height="1630" data-path="images/integrations/data-ingestion/dbms/dynamodb/dynamodb-map-columns.png" />

<div id="5-cleanup-optional">
  ## 5. Очистка (необязательно)
</div>

После завершения работы ClickPipe со снимком вы можете удалить таблицу снимка и materialized view.

```sql theme={null}
DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";
```
