Почему HTTP-загрузка в Apache Doris такая быстрая: разбор Stream Load по исходникам

Решил я как-то выгрузить пару лет переписки из Telegram в Apache Doris на своем компе. Зачем? Я тестирую Doris как единую систему хранения и поиска по всем личным данным: сообщениям из мессенджеров, ChatGPT, записям встреч и почте.

Первый запуск был болезненным: загрузка занимала почти 2 часа. После небольшой оптимизации пайплайна тот же сценарий дал другой результат: 206 400 сообщений за 5 секунд, то есть примерно 41 280 сообщений в секунду. Для контекста: каждое сообщение я грузил как JSON-массив.

В этот момент я подумал: «Окей, а что именно внутри Doris делает такую разницу между “2 часа” и “5 секунд”?».

И полез в исходники.

Но моего знания C++ не хватило, поэтому ассистировал мне GPT-5.3 Codex, Gemini и Opus 4.6

Часть 1: А что вообще такое Stream Load?

HTTP PUT вместо SQL, и почему это принципиально

В моей школьности нас учили загружать данные в базу через SQL.
Пишешь INSERT INTO ... VALUES(...), жмёшь Enter, ждёшь. Или, если данных много, используешь LOAD DATA INFILE или какой-нибудь bulk insert.

В Apache Doris есть другой путь: Stream Load. Это когда вы отправляете данные прямо через HTTP PUT как будто загружаете файл на сервер. Без SQL. Без парсинга запросов. Без оптимизатора.

curl --location-trusted -u root: \
-H "format:csv" \
-H "column_separator:," \
-T data.csv \
http://fe_host:8030/api/my_db/my_table/_stream_load

Один HTTP-запрос и данные в базе. Синхронно. С JSON-ответом о результате.

Три способа загрузки: кто есть кто

Прежде чем нырять в исходники, давайте разберёмся, какие вообще есть способы загрузки данных в Doris и чем они отличаются:

Stream Load

Broker Load

INSERT INTO

Протокол

HTTP PUT

Thrift RPC (через Broker)

MySQL Protocol

Режим

Синхронный

Асинхронный

Синхронный

Источник

Локальные файлы, потоки

HDFS, S3, облачное хранилище

SQL-запрос, подзапрос

Рекомендуемый объём

До 10 GB

До сотен GB

Мелкие батчи

SQL parsing

Нет

Нет

Да, полный цикл

Основной use case

Real-time загрузка, ETL

Массовый batch import

Интерактивные вставки

Ключевая мысль: Stream Load убирает SQL parsing из критического пути. Никакого лексера, парсера, анализатора запросов, оптимизатора. Данные идут напрямую из HTTP в движок записи.

Это как разница между тем, чтобы отправить посылку через приёмное отделение почты (заполни бланк, встань в очередь, покажи паспорт) и просто закинуть её в грузовик, который уже стоит у двери.

Часть 2: 14 шагов HTTP-запроса

Что происходит, когда вы нажимаете Enter после curl

Вот полная схема того, что происходит от момента отправки HTTP-запроса до получения ответа:

Клиент ──HTTP PUT──> FE ──307 Redirect──> Coordinator BE
                                              │
                                     Begin Transaction (Thrift RPC → FE)
                                              │
                                     Get Import Plan (Thrift RPC → FE)
                                              │
                                     StreamLoadPipe (буферизованное чтение)
                                              │
                                     ┌────────┴────────┐
                                     ▼                  ▼
                               Executor BE #1     Executor BE #2
                                     │                  │
                                  MemTable           MemTable
                                     │                  │
                                  Segment            Segment
                                  (на диск)          (на диск)
                                     │                  │
                                     └────────┬─────────┘
                                              │
                                     Commit Transaction (→ FE)
                                              │
                                     Publish Version (FE → Executor BE)
                                              │
                                     Result JSON ──> Клиент

Давайте пройдёмся по шагам. Открываем be/src/http/action/stream_load.cpp:

Шаг 1-2: Клиент → FE.

Клиент отправляет HTTP PUT на Frontend. FE парсит заголовки (база, таблица, label, формат), проверяет аутентификацию. Если всё ок — делает HTTP 307 Redirect на один из Backend-узлов. Этот BE становится Coordinator данной загрузки.

Шаг 3-4: Coordinator BE парсит заголовки. В методе on_header создаётся StreamLoadContext — объект, который будет жить на протяжении всей загрузки:

// stream_load.cpp — on_header()
std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
ctx->load_type = TLoadType::MANUL_LOAD;
ctx->load_src_type = TLoadSourceType::RAW;

url_decode(req->param(HTTP_DB_KEY), &ctx->db);
url_decode(req->param(HTTP_TABLE_KEY), &ctx->table);
ctx->label = req->header(HTTP_LABEL_KEY);

Обратите внимание: StreamLoadContext это не какой-то легковесный DTO. Это полноценный объект с таймингами каждого этапа: begin_txn_cost_nanos, stream_load_put_cost_nanos, write_data_cost_nanos, commit_and_publish_txn_cost_nanos.
Doris измеряет каждый шаг загрузки с точностью до наносекунды.

Шаг 5-7: Транзакция и план.

Coordinator BE отправляет два Thrift RPC запроса к FE:

  1. Begin Transaction FE открывает транзакцию, присваивает Transaction ID

  2. Get Import Plan FE генерирует план импорта и возвращает его

// stream_load.cpp — _process_put()
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, ctx](FrontendServiceConnection& client) {
client->streamLoadPut(ctx->put_result, request);
}));

Шаг 8: StreamLoadPipe.

Вот тут начинается самое интересное. Coordinator создаёт StreamLoadPipe — буферизированный канал для потоковой передачи данных. Подробный разбор в Часть 3: Streaming Pipeline — данные не ждут (ниже), а вот минимальный фрагмент из исходников:

// be/src/io/fs/stream_load_pipe.h
static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024; // 4 MB

class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
size_t min_chunk_size = 64 * 1024,
int64_t total_length = -1,
bool use_proto = false);
};

Шаг 9-11: Распределение и запись.

Coordinator BE режет поток на батчи и отправляет их по BRPC (нутренний RPC-фреймворк) на Executor BE. Там батчи сначала пишутся в MemTable (RAM), а затем фоново сбрасываются на диск в Segment-файлы.

Шаг 12-14: Коммит.

После записи всех данных Coordinator отправляет Commit Transaction на FE. FE проверяет, что большинство реплик записано успешно, и отправляет Publish Version на все Executor BE. Данные становятся видимыми.

FE — это диспетчер аэропорта. Он решает, куда направить самолёт (данные), проверяет документы (аутентификация), выдаёт разрешение на посадку (транзакция).

А BE — это грузчики на лётном поле: они принимают груз и раскладывают его по складам (таблетам).

Три метрики, которые расскажут всё

В самом начале stream_load.cpp определяются три метрики:

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::REQUESTS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MILLISECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit::REQUESTS);

Количество запросов, длительность, текущая параллельная нагрузка.

Если у вас в продакшене Doris следите за streaming_load_current_processing. Рекомендация из документации: не более 128 одновременных Stream Load на один BE (параметр webserver_num_workers).

Больше и BE может начать подвисать.

Часть 3: Streaming Pipeline — данные не ждут

Почему Stream Load не "disk-first"

Классический подход к загрузке данных «сначала сохрани на диск, потом обработай».

В batch-сценариях Broker Load обычно даёт более высокий latency до видимости данных и менее «поточную» модель, чем Stream Load (особенно на небольших частых батчах).

Stream Load работает иначе.

Данные обрабатываются по мере поступления.

Пока HTTP-клиент отправляет следующую порцию CSV, предыдущая уже распарсена и летит на Executor BE.

Сердце этого механизма StreamLoadPipe. Откроем io/fs/stream_load_pipe.h:

static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;  // 4 MB

class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
size_t min_chunk_size = 64 * 1024, // 64 KB
int64_t total_length = -1,
bool use_proto = false);
// ...
private:
size_t _buffered_bytes;
size_t _max_buffered_bytes; // 4 MB по умолчанию
size_t _min_chunk_size; // 64 KB по умолчанию

std::deque<ByteBufferPtr> _buf_queue;
std::condition_variable _put_cond;
std::condition_variable _get_cond;
};

Что здесь происходит?

  1. kMaxPipeBufferedBytes = 4MB максимальный размер буфера. Когда буфер заполнен, HTTP-поток блокируется (backpressure) через _put_cond.

  2. min_chunk_size = 64KB минимальный размер чанка. Данные не передаются дальше, пока не накопится хотя бы 64 КБ. Это снижает overhead от слишком частых переключений.

  3. _buf_queue очередь из ByteBuffer'ов. Классический producer-consumer паттерн с condition variables.

Поток данных внутри Coordinator BE

HTTP chunks ──→ StreamLoadPipe ──→ BrokerScanNode ──→ OlapTableSink ──→ BRPC ──→ Executor BE
                 (4 MB буфер)      (парсинг CSV/     (распределение    (сетевая
                                    JSON → Block)     по таблетам)      передача)

BrokerScanNode читает данные из StreamLoadPipe батчами и парсит их в формат vectorized::Block — колоночное представление данных в Doris.

OlapTableSink определяет, в какой Partition и Tablet попадает каждая строка (по PartitionKey и DistributionKey), и отправляет данные на соответствующие Executor BE через BRPC.

Ключевой инсайт: пока Coordinator BE читает HTTP-поток и парсит CSV, Executor BE уже пишут предыдущие батчи в свои MemTable'ы.

Это конвейер, а не последовательная обработка.

Представьте завод по разливу воды: пока одна бутылка наполняется, предыдущая уже закупоривается, а та, что перед ней, уже едет по конвейеру к упаковке. Никто не ждёт, пока все бутылки наполнятся, чтобы начать закупоривать.

Часть 4: Иерархия записи — от LoadChannel до MemTable

Самое интересное: как данные попадают на диск

(Если вы дочитали до сюда — вы точно GPT или крутой человек с остствуем деффицита внимания. Наливайте чай, дальше будет ещё интереснее.)

Когда данные прилетают на Executor BE через BRPC, начинается многоуровневая запись.

Здесь архитектура Doris хорошо декомпозирована: каждый слой в цепочке записи отвечает за одну задачу, а вместе они дают потоковую запись с backpressure и асинхронным flush без блокировки ingest (практически это значит: при пике входящих данных система «притормаживает» приём, а не падает; при этом новые батчи продолжают приниматься, пока предыдущие фоново сбрасываются на диск).

Откроем исходники и посмотрим на иерархию:

LoadChannelMgr
  └── LoadChannel           (1 на Stream Load задачу на данном BE)
        └── TabletsChannel  (1 на Index — таблица + её Materialized View)
              └── DeltaWriter   (1 на каждый Tablet)
                    └── MemTable        (in-memory буфер для записи)
                          └── Segment   (файл на диске, ≤256 MB)

Если сжать до одного абзаца:

  • LoadChannel держит запись текущего load на конкретном BE;

  • TabletsChannel раскладывает строки по нужным tablet/index;

  • DeltaWriter пишет в конкретный tablet и управляет жизненным циклом MemTable.

Ключевой фрагмент из olap/delta_writer.h:

// Writer for a particular (load, index, tablet).
class BaseDeltaWriter {
public:
virtual Status write(const vectorized::Block* block,
const DorisVector<uint32_t>& row_idxs) = 0;
virtual Status close() = 0;
virtual Status build_rowset();

int64_t mem_consumption(MemType mem);
Status wait_flush();

int64_t tablet_id() const { return _req.tablet_id; }
int64_t txn_id() const { return _req.txn_id; }

protected:
std::unique_ptr<BaseRowsetBuilder> _rowset_builder;
std::shared_ptr<MemTableWriter> _memtable_writer;
};

Важно: метод write принимает vectorized::Block (батч колонок), а не по одной строке. Это один из источников высокой скорости записи.

MemTable: где живут данные до flush

MemTable — это in-memory буфер перед диском. Из olap/memtable.h:

class MemTable {
public:
// Вставка батча строк (векторизованная!)
Status insert(const vectorized::Block* block,
const DorisVector<uint32_t>& row_idxs);

void shrink_memtable_by_agg(); // Агрегация прямо в памяти
bool need_flush() const;
Status to_block(std::unique_ptr<vectorized::Block>* res);

size_t memory_usage() const { return _mem_tracker->consumption(); }

private:
vectorized::MutableBlock _input_mutable_block; // Входные данные
vectorized::MutableBlock _output_mutable_block; // Отсортированный результат
vectorized::Arena _arena; // Аллокатор для строковых данных

size_t _sort(); // Сортировка по ключу
template <bool is_final>
void _aggregate(); // Агрегация для Aggregate/Unique моделей

std::unique_ptr<DorisVector<std::shared_ptr<RowInBlock>>> _row_in_blocks;
};

Три вещи, которые важны для понимания производительности:

  1. Колоночный батч в памяти (vectorized::MutableBlock) вместо row-by-row.

  2. Сортировка и агрегация в RAM (_sort, _aggregate) до записи на диск.

  3. Специализированный аллокатор (vectorized::Arena) + статистика (MemTableStat) для контроля памяти и таймингов.

Async Flush: конвейер не останавливается

Ключевая оптимизация: когда MemTable заполняется (в типичных конфигурациях около 200 MB), запись не останавливается.

Старая MemTable уходит в очередь на flush, новая сразу принимает данные, а MemtableFlushExecutor фоново пишет на диск через RowsetWriter -> SegmentWriter.

// Три состояния MemTable:
enum MemType {
ACTIVE = 0, // Принимает данные
WRITE_FINISHED = 1, // Ждёт в очереди на flush
FLUSH = 2 // Сбрасывается на диск
};

Практический итог: Segment ограничен ~256 MB, а один Stream Load на tablet формирует Rowset из одного или нескольких Segment-файлов.

Это как конвейер на заводе: пока один ящик упаковывают (flush на диск), следующий уже наполняется (write в MemTable), а третий уже запечатан и едет на склад (завершённый Segment). Никто не стоит и не ждёт.

Практические выводы: как получить скорость Stream Load и не устроить себе пожар

Мы разобрали 4 уровня архитектуры от HTTP-запроса до Segment-файла на диске.
Теперь давайте переведём это в конкретные рекомендации.

1. Убедитесь, что вы реально в streaming path

Stream Load может быть потоковым, но если формат или режим не поддерживает стриминг, BE сначала сохранит тело запроса во временный файл и только потом начнёт выполнение это уже disk-first и совсем другая латентность.

В коде это буквально одна строка:

// stream_load.cpp — _process_put()
ctx->use_streaming = LoadUtil::is_format_support_streaming(ctx->format);

Если скорость «не похожа на Doris», проверьте формат и заголовки.
Каждый Stream Load возвращает JSON с разбивкой по этапам — это не логи и не C-код, а обычный HTTP-ответ:

{
"Status": "Success",
"NumberTotalRows": 1000000,
"LoadBytes": 40888898,
"LoadTimeMs": 2144,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 325,
"WriteDataTimeMs": 1933,
"CommitAndPublishTimeMs": 106
}

Посмотрите на ReadDataTimeMs, WriteDataTimeMs, CommitAndPublishTimeMs они покажут, где именно время уходит.

2. Для JSON не верьте в магию — выбирайте режим парсинга осознанно

  • NDJSON (по одному объекту на строку): используйте read_json_by_line — это даёт самую предсказуемую потоковую обработку.

  • JSON-массив: нужен strip_outer_array, иначе можно получить не тот режим парсинга.

  • Chunked transfer + JSON: внутри StreamLoadPipe есть сценарии, где JSON нужно прочитать целиком до парсинга. Это явно отмечено в коде:

// stream_load_pipe.h
// When importing JSON data and using chunked transfer encoding,
// the data needs to be completely read before it can be parsed.
bool _is_chunked_transfer = false;

3. Конкурентность: держите под контролем

В best practices прямо рекомендуют держать Stream Load concurrency на один BE < 128 (это упирается в webserver_num_workers).

Высокая конкурентность «съедает» webserver-потоки и роняет производительность; на совсем больших значениях возможны подвисания процесса.

Стартуйте с 16–64 параллельных загрузок (или меньше на ноутбуке) и увеличивайте, пока не упрётесь в CPU/IO.

4. Размер батча важнее красивых слов про HTTP

Большие батчи уменьшают транзакционный и метаданный overhead и дают более крупные сегменты. Stream Load официально позиционируется как метод для файлов до 10 GB (больше — дробить).

Ориентир «несколько MB и выше» почти всегда лучше, чем «много запросов по 50 KB».

5. Настройка MemTable = баланс между мелкими файлами и таймаутами

Порог flush управляется write_buffer_size (дефолт в Doris 3.x — 100 MB).

В документации прямо говорится: маленький порог — много мелких файлов; слишком большой — риск RPC timeout.

Как это увидеть на практике (без исходников, обычным SQL и метриками):

-- Compaction Score по tablet'ам конкретной таблицы
SHOW TABLET FROM your_table;
-- Смотреть колонки: VersionCount, RowCount, DataSize
-- Если VersionCount растёт и не снижается — compaction не справляется

-- Максимальный compaction score по кластеру
SHOW PROC '/statistic';
-- Столбец MaxCompactionScore — если > 100, уже стоит разбираться

Плюс метрики для Prometheus/Grafana:

  • doris_fe_max_tablet_compaction_score — максимальный compaction score по всем BE (P0-метрика)

  • doris_be_tablet_base_max_compaction_score — то же самое, но с каждого BE

  • doris_be_compaction_bytes_total — объём данных, прошедших через compaction

Если видите тонны мелких сегментов и давление compaction, то увеличивайте write_buffer_size аккуратно (NetEase для лог-сценариев ставят до 1 GB) и следите за таймаутами и IO.

6. Что мерить, чтобы не гадать

Stream Load на BE измеряет этапы и возвращает их в JSON-ответе, плюс есть метрики текущей нагрузки (streaming_load_current_processing).

Практика: если «медленно», первым делом разделите проблему на три зоны:

  • Сеть / приём данныхReceiveDataCostMs, ReadDataCostMs

  • ЗаписьWriteDataCostMs (flush в MemTable и на диск)

  • Commit / PublishCommitAndPublishCostMs (метаданные, реплики)

Это разделение сразу покажет, виноват ли сетевой слой, движок записи или координация транзакций.


Что дальше

В этой статье мы разобрали путь данных от curl до Segment-файла: HTTP-пайплайн, StreamLoadPipe с backpressure, иерархию записи LoadChannel -> DeltaWriter -> MemTable и async flush.

Но есть ещё три механизма, которые радикально влияют на скорость в конкретных сценариях:

  • Group Commit — как Doris объединяет тысячи мелких загрузок в одну транзакцию (и почему без этого на high-frequency ingest вы получите ошибку -235 вместо результата).

  • Pipeline Execution Engine — как push-модель и векторизация ускоряют путь записи.

  • Memtable Forwarding — оптимизация, которая убирает лишние encode/decode и даёт ускорение до 2.8x.

Разберём их во второй части, если меня выпустят из Песочницы.

Короче, если тема зашла дайте знать в комментариях, какой из трёх механизмов интересен больше всего.


Внимание!

Официальный сайт бота по ссылке ниже.

Официальный сайт