> ## Documentation Index
> Fetch the complete documentation index at: https://private-7c7dfe99-fix-nav-issues.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

> Документация по формату AvroConfluent

# AvroConfluent

| Вход | Выход | Псевдоним |
| ---- | ----- | --------- |
| ✔    | ✔     |           |

<div id="description">
  ## Описание
</div>

[Apache Avro](https://avro.apache.org/) — это строко-ориентированный формат сериализации, использующий двоичное кодирование для эффективной обработки данных. Формат `AvroConfluent` поддерживает чтение и запись сообщений в формате Avro с использованием [Confluent Schema Registry](https://docs.confluent.io/current/schema-registry/index.html) (или сервисов с совместимым API).

Каждое сообщение использует формат передачи данных Confluent: магический байт (`0x00`), затем 4-байтовый идентификатор схемы в порядке байтов big-endian, после чего следует двоичное значение Avro. При чтении ClickHouse определяет идентификатор схемы, обращаясь к реестру. При записи ClickHouse регистрирует схему, полученную из выходных столбцов, и добавляет полученный идентификатор в начало каждой строки. Для оптимальной производительности схемы кэшируются.

<a id="data-types-matching" />

<div id="data-type-mapping">
  ## Соответствие типов данных
</div>

В таблице ниже приведены все типы данных, поддерживаемые форматом Apache Avro, и соответствующие им [типы данных](/ru/reference/data-types) ClickHouse в запросах `INSERT` и `SELECT`.

| Тип данных Avro `INSERT`                    | Тип данных ClickHouse                                                                                 | Тип данных Avro `SELECT`         |
| ------------------------------------------- | ----------------------------------------------------------------------------------------------------- | -------------------------------- |
| `boolean`, `int`, `long`, `float`, `double` | [Int(8\16\32)](/ru/reference/data-types/int-uint), [UInt(8\16\32)](/ru/reference/data-types/int-uint) | `int`                            |
| `boolean`, `int`, `long`, `float`, `double` | [Int64](/ru/reference/data-types/int-uint), [UInt64](/ru/reference/data-types/int-uint)               | `long`                           |
| `boolean`, `int`, `long`, `float`, `double` | [Float32](/ru/reference/data-types/float)                                                             | `float`                          |
| `boolean`, `int`, `long`, `float`, `double` | [Float64](/ru/reference/data-types/float)                                                             | `double`                         |
| `bytes`, `string`, `fixed`, `enum`          | [String](/ru/reference/data-types/string)                                                             | `bytes` или `string` \*          |
| `bytes`, `string`, `fixed`                  | [FixedString(N)](/ru/reference/data-types/fixedstring)                                                | `fixed(N)`                       |
| `enum`                                      | [Enum(8\16)](/ru/reference/data-types/enum)                                                           | `enum`                           |
| `array(T)`                                  | [Array(T)](/ru/reference/data-types/array)                                                            | `array(T)`                       |
| `map(V, K)`                                 | [Map(V, K)](/ru/reference/data-types/map)                                                             | `map(string, K)`                 |
| `union(null, T)`, `union(T, null)`          | [Nullable(T)](/ru/reference/data-types/date)                                                          | `union(null, T)`                 |
| `union(T1, T2, …)` \*\*                     | [Variant(T1, T2, …)](/ru/reference/data-types/variant)                                                | `union(T1, T2, …)` \*\*          |
| `null`                                      | [Nullable(Nothing)](/ru/reference/data-types/special-data-types/nothing)                              | `null`                           |
| `int (date)` \*\*\*                         | [Date](/ru/reference/data-types/date), [Date32](/ru/reference/data-types/date32)                      | `int (date)` \*\*\*              |
| `long (timestamp-millis)` \*\*\*            | [DateTime64(3)](/ru/reference/data-types/datetime)                                                    | `long (timestamp-millis)` \*\*\* |
| `long (timestamp-micros)` \*\*\*            | [DateTime64(6)](/ru/reference/data-types/datetime)                                                    | `long (timestamp-micros)` \*\*\* |
| `bytes (decimal)`  \*\*\*                   | [DateTime64(N)](/ru/reference/data-types/datetime)                                                    | `bytes (decimal)`  \*\*\*        |
| `int`                                       | [IPv4](/ru/reference/data-types/ipv4)                                                                 | `int`                            |
| `fixed(16)`                                 | [IPv6](/ru/reference/data-types/ipv6)                                                                 | `fixed(16)`                      |
| `bytes (decimal)` \*\*\*                    | [Decimal(P, S)](/ru/reference/data-types/decimal)                                                     | `bytes (decimal)` \*\*\*         |
| `string (uuid)` \*\*\*                      | [UUID](/ru/reference/data-types/uuid)                                                                 | `string (uuid)` \*\*\*           |
| `fixed(16)`                                 | [Int128/UInt128](/ru/reference/data-types/int-uint)                                                   | `fixed(16)`                      |
| `fixed(32)`                                 | [Int256/UInt256](/ru/reference/data-types/int-uint)                                                   | `fixed(32)`                      |
| `record`                                    | [Tuple](/ru/reference/data-types/tuple)                                                               | `record`                         |

* `bytes` является значением по умолчанию и задаётся настройкой [`output_format_avro_string_column_pattern`](/ru/reference/settings/formats#output_format_avro_string_column_pattern)

\*\*  [Тип Variant](/ru/reference/data-types/variant) неявно допускает `null` в качестве значения поля, поэтому, например, Avro `union(T1, T2, null)` будет преобразован в `Variant(T1, T2)`.
В результате при генерации Avro из ClickHouse нам всегда нужно включать тип `null` в набор типов Avro `union`, поскольку во время вывода схемы мы не знаем, является ли какое-либо из значений фактически `null`.

\*\*\* [Логические типы Avro](https://avro.apache.org/docs/current/spec.html#Logical+Types)

Неподдерживаемые логические типы данных Avro:

* `time-millis`
* `time-micros`
* `duration`

<div id="format-settings">
  ## Настройки формата
</div>

[//]: # "ПРИМЕЧАНИЕ: эти настройки можно задавать на уровне сеанса, но это встречается нечасто, и слишком заметное упоминание об этом может запутать пользователей."

| Настройка                                        | Описание                                                                                                                                                                     | По умолчанию |
| ------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------ |
| `input_format_avro_allow_missing_fields`         | Следует ли использовать значение по умолчанию вместо выдачи ошибки, если поле не найдено в схеме.                                                                            | `0`          |
| `input_format_avro_null_as_default`              | Следует ли использовать значение по умолчанию вместо выдачи ошибки при вставке значения `null` в столбец, не являющийся Nullable.                                            | `0`          |
| `format_avro_schema_registry_url`                | URL Confluent Schema Registry. Для базовой аутентификации учетные данные в URL-кодировке можно включить непосредственно в URL.                                               |              |
| `format_avro_schema_registry_connection_timeout` | Тайм-аут соединения в секундах для HTTP-клиента Schema Registry (используется как для получения схемы, так и для регистрации). Должен быть больше 0 и меньше 600 (10 минут). | `1`          |
| `format_avro_schema_registry_send_timeout`       | Тайм-аут отправки в секундах для HTTP-клиента Schema Registry. Должен быть больше 0 и меньше 600 (10 минут).                                                                 | `1`          |
| `format_avro_schema_registry_receive_timeout`    | Тайм-аут получения в секундах для HTTP-клиента Schema Registry. Должен быть больше 0 и меньше 600 (10 минут).                                                                | `1`          |
| `output_format_avro_confluent_subject`           | Для вывода: имя subject, под которым схема зарегистрирована в Schema Registry. Обязательно при записи.                                                                       |              |
| `output_format_avro_string_column_pattern`       | Для вывода: шаблон регулярного выражения для столбцов String, которые нужно сериализовать как Avro `string` (по умолчанию — `bytes`).                                        |              |

<div id="examples">
  ## Примеры
</div>

<div id="reading-from-kafka">
  ### Чтение из Kafka
</div>

Чтобы читать Kafka топик в кодировке Avro с помощью [движка таблицы Kafka](/ru/reference/engines/table-engines/integrations/kafka), используйте параметр `format_avro_schema_registry_url`, чтобы указать URL реестра схем.

```sql theme={null}
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;
```

<div id="writing-to-kafka">
  ### Запись в Kafka
</div>

Чтобы записывать сообщения AvroConfluent в топик Kafka, задайте URL реестра схем и имя subject. Схема автоматически регистрируется в реестре при первой записи.

```sql theme={null}
CREATE TABLE topic1_sink
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';

INSERT INTO topic1_sink VALUES ('hello', 'world');
```

<div id="using-basic-authentication">
  #### Использование базовой аутентификации
</div>

Если для вашего реестра схем требуется базовая аутентификация (например, если вы используете Confluent Cloud), вы можете указать URL-кодированные учетные данные в параметре `format_avro_schema_registry_url`.

```sql theme={null}
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';
```

<div id="troubleshooting">
  ## Устранение неполадок
</div>

Чтобы отслеживать прогресс ингестии и диагностировать ошибки Kafka-консьюмера, можно выполнить запрос к [системной таблице `system.kafka_consumers`](/ru/reference/system-tables/kafka_consumers). Если в вашем развертывании несколько реплик (например, в ClickHouse Cloud), необходимо использовать табличную функцию [`clusterAllReplicas`](/ru/reference/functions/table-functions/cluster).

```sql theme={null}
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
```

Если вы столкнулись с проблемами с разрешением схемы, для диагностики можно использовать [kafkacat](https://github.com/edenhill/kafkacat) вместе с [clickhouse-local](/ru/concepts/features/tools-and-utilities/clickhouse-local):

```bash theme={null}
$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c
```
