Решил я как-то выгрузить пару лет переписки из Telegram в Apache Doris на своем компе. Зачем? Я тестирую Doris как единую систему хранения и поиска по всем личным данным: сообщениям из мессенджеров, ChatGPT, записям встреч и почте.
Первый запуск был болезненным: загрузка занимала почти 2 часа. После небольшой оптимизации пайплайна тот же сценарий дал другой результат: 206 400 сообщений за 5 секунд, то есть примерно 41 280 сообщений в секунду. Для контекста: каждое сообщение я грузил как JSON-массив.
В этот момент я подумал: «Окей, а что именно внутри Doris делает такую разницу между “2 часа” и “5 секунд”?».
И полез в исходники.
Но моего знания C++ не хватило, поэтому ассистировал мне GPT-5.3 Codex, Gemini и Opus 4.6
В моей школьности нас учили загружать данные в базу через SQL.
Пишешь INSERT INTO ... VALUES(...), жмёшь Enter, ждёшь. Или, если данных много, используешь LOAD DATA INFILE или какой-нибудь bulk insert.
В Apache Doris есть другой путь: Stream Load. Это когда вы отправляете данные прямо через HTTP PUT как будто загружаете файл на сервер. Без SQL. Без парсинга запросов. Без оптимизатора.
curl --location-trusted -u root: \
-H "format:csv" \
-H "column_separator:," \
-T data.csv \
http://fe_host:8030/api/my_db/my_table/_stream_load
Один HTTP-запрос и данные в базе. Синхронно. С JSON-ответом о результате.
Прежде чем нырять в исходники, давайте разберёмся, какие вообще есть способы загрузки данных в Doris и чем они отличаются:
|
Stream Load |
Broker Load |
INSERT INTO |
|
|---|---|---|---|
|
Протокол |
HTTP PUT |
Thrift RPC (через Broker) |
MySQL Protocol |
|
Режим |
Синхронный |
Асинхронный |
Синхронный |
|
Источник |
Локальные файлы, потоки |
HDFS, S3, облачное хранилище |
SQL-запрос, подзапрос |
|
Рекомендуемый объём |
До 10 GB |
До сотен GB |
Мелкие батчи |
|
SQL parsing |
Нет |
Нет |
Да, полный цикл |
|
Основной use case |
Real-time загрузка, ETL |
Массовый batch import |
Интерактивные вставки |
Ключевая мысль: Stream Load убирает SQL parsing из критического пути. Никакого лексера, парсера, анализатора запросов, оптимизатора. Данные идут напрямую из HTTP в движок записи.
Это как разница между тем, чтобы отправить посылку через приёмное отделение почты (заполни бланк, встань в очередь, покажи паспорт) и просто закинуть её в грузовик, который уже стоит у двери.
Вот полная схема того, что происходит от момента отправки HTTP-запроса до получения ответа:
Клиент ──HTTP PUT──> FE ──307 Redirect──> Coordinator BE
│
Begin Transaction (Thrift RPC → FE)
│
Get Import Plan (Thrift RPC → FE)
│
StreamLoadPipe (буферизованное чтение)
│
┌────────┴────────┐
▼ ▼
Executor BE #1 Executor BE #2
│ │
MemTable MemTable
│ │
Segment Segment
(на диск) (на диск)
│ │
└────────┬─────────┘
│
Commit Transaction (→ FE)
│
Publish Version (FE → Executor BE)
│
Result JSON ──> Клиент
Давайте пройдёмся по шагам. Открываем be/src/http/action/stream_load.cpp:
Шаг 1-2: Клиент → FE.
Клиент отправляет HTTP PUT на Frontend. FE парсит заголовки (база, таблица, label, формат), проверяет аутентификацию. Если всё ок — делает HTTP 307 Redirect на один из Backend-узлов. Этот BE становится Coordinator данной загрузки.
Шаг 3-4: Coordinator BE парсит заголовки. В методе on_header создаётся StreamLoadContext — объект, который будет жить на протяжении всей загрузки:
// stream_load.cpp — on_header()
std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
ctx->load_type = TLoadType::MANUL_LOAD;
ctx->load_src_type = TLoadSourceType::RAW;
url_decode(req->param(HTTP_DB_KEY), &ctx->db);
url_decode(req->param(HTTP_TABLE_KEY), &ctx->table);
ctx->label = req->header(HTTP_LABEL_KEY);
Обратите внимание: StreamLoadContext это не какой-то легковесный DTO. Это полноценный объект с таймингами каждого этапа: begin_txn_cost_nanos, stream_load_put_cost_nanos, write_data_cost_nanos, commit_and_publish_txn_cost_nanos.
Doris измеряет каждый шаг загрузки с точностью до наносекунды.
Шаг 5-7: Транзакция и план.
Coordinator BE отправляет два Thrift RPC запроса к FE:
Begin Transaction FE открывает транзакцию, присваивает Transaction ID
Get Import Plan FE генерирует план импорта и возвращает его
// stream_load.cpp — _process_put()
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, ctx](FrontendServiceConnection& client) {
client->streamLoadPut(ctx->put_result, request);
}));
Шаг 8: StreamLoadPipe.
Вот тут начинается самое интересное. Coordinator создаёт StreamLoadPipe — буферизированный канал для потоковой передачи данных. Подробный разбор в Часть 3: Streaming Pipeline — данные не ждут (ниже), а вот минимальный фрагмент из исходников:
// be/src/io/fs/stream_load_pipe.h
static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024; // 4 MB
class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
size_t min_chunk_size = 64 * 1024,
int64_t total_length = -1,
bool use_proto = false);
};
Шаг 9-11: Распределение и запись.
Coordinator BE режет поток на батчи и отправляет их по BRPC (нутренний RPC-фреймворк) на Executor BE. Там батчи сначала пишутся в MemTable (RAM), а затем фоново сбрасываются на диск в Segment-файлы.
Шаг 12-14: Коммит.
После записи всех данных Coordinator отправляет Commit Transaction на FE. FE проверяет, что большинство реплик записано успешно, и отправляет Publish Version на все Executor BE. Данные становятся видимыми.
FE — это диспетчер аэропорта. Он решает, куда направить самолёт (данные), проверяет документы (аутентификация), выдаёт разрешение на посадку (транзакция).
А BE — это грузчики на лётном поле: они принимают груз и раскладывают его по складам (таблетам).
В самом начале stream_load.cpp определяются три метрики:
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::REQUESTS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MILLISECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit::REQUESTS);
Количество запросов, длительность, текущая параллельная нагрузка.
Если у вас в продакшене Doris следите за streaming_load_current_processing. Рекомендация из документации: не более 128 одновременных Stream Load на один BE (параметр webserver_num_workers).
Больше и BE может начать подвисать.
Классический подход к загрузке данных «сначала сохрани на диск, потом обработай».
В batch-сценариях Broker Load обычно даёт более высокий latency до видимости данных и менее «поточную» модель, чем Stream Load (особенно на небольших частых батчах).
Stream Load работает иначе.
Данные обрабатываются по мере поступления.
Пока HTTP-клиент отправляет следующую порцию CSV, предыдущая уже распарсена и летит на Executor BE.
Сердце этого механизма StreamLoadPipe. Откроем io/fs/stream_load_pipe.h:
static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024; // 4 MB
class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
size_t min_chunk_size = 64 * 1024, // 64 KB
int64_t total_length = -1,
bool use_proto = false);
// ...
private:
size_t _buffered_bytes;
size_t _max_buffered_bytes; // 4 MB по умолчанию
size_t _min_chunk_size; // 64 KB по умолчанию
std::deque<ByteBufferPtr> _buf_queue;
std::condition_variable _put_cond;
std::condition_variable _get_cond;
};
Что здесь происходит?
kMaxPipeBufferedBytes = 4MB максимальный размер буфера. Когда буфер заполнен, HTTP-поток блокируется (backpressure) через _put_cond.
min_chunk_size = 64KB минимальный размер чанка. Данные не передаются дальше, пока не накопится хотя бы 64 КБ. Это снижает overhead от слишком частых переключений.
_buf_queue очередь из ByteBuffer'ов. Классический producer-consumer паттерн с condition variables.
HTTP chunks ──→ StreamLoadPipe ──→ BrokerScanNode ──→ OlapTableSink ──→ BRPC ──→ Executor BE
(4 MB буфер) (парсинг CSV/ (распределение (сетевая
JSON → Block) по таблетам) передача)
BrokerScanNode читает данные из StreamLoadPipe батчами и парсит их в формат vectorized::Block — колоночное представление данных в Doris.
OlapTableSink определяет, в какой Partition и Tablet попадает каждая строка (по PartitionKey и DistributionKey), и отправляет данные на соответствующие Executor BE через BRPC.
Ключевой инсайт: пока Coordinator BE читает HTTP-поток и парсит CSV, Executor BE уже пишут предыдущие батчи в свои MemTable'ы.
Это конвейер, а не последовательная обработка.
Представьте завод по разливу воды: пока одна бутылка наполняется, предыдущая уже закупоривается, а та, что перед ней, уже едет по конвейеру к упаковке. Никто не ждёт, пока все бутылки наполнятся, чтобы начать закупоривать.
(Если вы дочитали до сюда — вы точно GPT или крутой человек с остствуем деффицита внимания. Наливайте чай, дальше будет ещё интереснее.)
Когда данные прилетают на Executor BE через BRPC, начинается многоуровневая запись.
Здесь архитектура Doris хорошо декомпозирована: каждый слой в цепочке записи отвечает за одну задачу, а вместе они дают потоковую запись с backpressure и асинхронным flush без блокировки ingest (практически это значит: при пике входящих данных система «притормаживает» приём, а не падает; при этом новые батчи продолжают приниматься, пока предыдущие фоново сбрасываются на диск).
Откроем исходники и посмотрим на иерархию:
LoadChannelMgr
└── LoadChannel (1 на Stream Load задачу на данном BE)
└── TabletsChannel (1 на Index — таблица + её Materialized View)
└── DeltaWriter (1 на каждый Tablet)
└── MemTable (in-memory буфер для записи)
└── Segment (файл на диске, ≤256 MB)
Если сжать до одного абзаца:
LoadChannel держит запись текущего load на конкретном BE;
TabletsChannel раскладывает строки по нужным tablet/index;
DeltaWriter пишет в конкретный tablet и управляет жизненным циклом MemTable.
Ключевой фрагмент из olap/delta_writer.h:
// Writer for a particular (load, index, tablet).
class BaseDeltaWriter {
public:
virtual Status write(const vectorized::Block* block,
const DorisVector<uint32_t>& row_idxs) = 0;
virtual Status close() = 0;
virtual Status build_rowset();
int64_t mem_consumption(MemType mem);
Status wait_flush();
int64_t tablet_id() const { return _req.tablet_id; }
int64_t txn_id() const { return _req.txn_id; }
protected:
std::unique_ptr<BaseRowsetBuilder> _rowset_builder;
std::shared_ptr<MemTableWriter> _memtable_writer;
};
Важно: метод write принимает vectorized::Block (батч колонок), а не по одной строке. Это один из источников высокой скорости записи.
MemTable — это in-memory буфер перед диском. Из olap/memtable.h:
class MemTable {
public:
// Вставка батча строк (векторизованная!)
Status insert(const vectorized::Block* block,
const DorisVector<uint32_t>& row_idxs);
void shrink_memtable_by_agg(); // Агрегация прямо в памяти
bool need_flush() const;
Status to_block(std::unique_ptr<vectorized::Block>* res);
size_t memory_usage() const { return _mem_tracker->consumption(); }
private:
vectorized::MutableBlock _input_mutable_block; // Входные данные
vectorized::MutableBlock _output_mutable_block; // Отсортированный результат
vectorized::Arena _arena; // Аллокатор для строковых данных
size_t _sort(); // Сортировка по ключу
template <bool is_final>
void _aggregate(); // Агрегация для Aggregate/Unique моделей
std::unique_ptr<DorisVector<std::shared_ptr<RowInBlock>>> _row_in_blocks;
};
Три вещи, которые важны для понимания производительности:
Колоночный батч в памяти (vectorized::MutableBlock) вместо row-by-row.
Сортировка и агрегация в RAM (_sort, _aggregate) до записи на диск.
Специализированный аллокатор (vectorized::Arena) + статистика (MemTableStat) для контроля памяти и таймингов.
Ключевая оптимизация: когда MemTable заполняется (в типичных конфигурациях около 200 MB), запись не останавливается.
Старая MemTable уходит в очередь на flush, новая сразу принимает данные, а MemtableFlushExecutor фоново пишет на диск через RowsetWriter -> SegmentWriter.
// Три состояния MemTable:
enum MemType {
ACTIVE = 0, // Принимает данные
WRITE_FINISHED = 1, // Ждёт в очереди на flush
FLUSH = 2 // Сбрасывается на диск
};
Практический итог: Segment ограничен ~256 MB, а один Stream Load на tablet формирует Rowset из одного или нескольких Segment-файлов.
Это как конвейер на заводе: пока один ящик упаковывают (flush на диск), следующий уже наполняется (write в MemTable), а третий уже запечатан и едет на склад (завершённый Segment). Никто не стоит и не ждёт.
Мы разобрали 4 уровня архитектуры от HTTP-запроса до Segment-файла на диске.
Теперь давайте переведём это в конкретные рекомендации.
Stream Load может быть потоковым, но если формат или режим не поддерживает стриминг, BE сначала сохранит тело запроса во временный файл и только потом начнёт выполнение это уже disk-first и совсем другая латентность.
В коде это буквально одна строка:
// stream_load.cpp — _process_put()
ctx->use_streaming = LoadUtil::is_format_support_streaming(ctx->format);
Если скорость «не похожа на Doris», проверьте формат и заголовки.
Каждый Stream Load возвращает JSON с разбивкой по этапам — это не логи и не C-код, а обычный HTTP-ответ:
{
"Status": "Success",
"NumberTotalRows": 1000000,
"LoadBytes": 40888898,
"LoadTimeMs": 2144,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 325,
"WriteDataTimeMs": 1933,
"CommitAndPublishTimeMs": 106
}
Посмотрите на ReadDataTimeMs, WriteDataTimeMs, CommitAndPublishTimeMs они покажут, где именно время уходит.
NDJSON (по одному объекту на строку): используйте read_json_by_line — это даёт самую предсказуемую потоковую обработку.
JSON-массив: нужен strip_outer_array, иначе можно получить не тот режим парсинга.
Chunked transfer + JSON: внутри StreamLoadPipe есть сценарии, где JSON нужно прочитать целиком до парсинга. Это явно отмечено в коде:
// stream_load_pipe.h
// When importing JSON data and using chunked transfer encoding,
// the data needs to be completely read before it can be parsed.
bool _is_chunked_transfer = false;
В best practices прямо рекомендуют держать Stream Load concurrency на один BE < 128 (это упирается в webserver_num_workers).
Высокая конкурентность «съедает» webserver-потоки и роняет производительность; на совсем больших значениях возможны подвисания процесса.
Стартуйте с 16–64 параллельных загрузок (или меньше на ноутбуке) и увеличивайте, пока не упрётесь в CPU/IO.
Большие батчи уменьшают транзакционный и метаданный overhead и дают более крупные сегменты. Stream Load официально позиционируется как метод для файлов до 10 GB (больше — дробить).
Ориентир «несколько MB и выше» почти всегда лучше, чем «много запросов по 50 KB».
Порог flush управляется write_buffer_size (дефолт в Doris 3.x — 100 MB).
В документации прямо говорится: маленький порог — много мелких файлов; слишком большой — риск RPC timeout.
Как это увидеть на практике (без исходников, обычным SQL и метриками):
-- Compaction Score по tablet'ам конкретной таблицы
SHOW TABLET FROM your_table;
-- Смотреть колонки: VersionCount, RowCount, DataSize
-- Если VersionCount растёт и не снижается — compaction не справляется
-- Максимальный compaction score по кластеру
SHOW PROC '/statistic';
-- Столбец MaxCompactionScore — если > 100, уже стоит разбираться
Плюс метрики для Prometheus/Grafana:
doris_fe_max_tablet_compaction_score — максимальный compaction score по всем BE (P0-метрика)
doris_be_tablet_base_max_compaction_score — то же самое, но с каждого BE
doris_be_compaction_bytes_total — объём данных, прошедших через compaction
Если видите тонны мелких сегментов и давление compaction, то увеличивайте write_buffer_size аккуратно (NetEase для лог-сценариев ставят до 1 GB) и следите за таймаутами и IO.
Stream Load на BE измеряет этапы и возвращает их в JSON-ответе, плюс есть метрики текущей нагрузки (streaming_load_current_processing).
Практика: если «медленно», первым делом разделите проблему на три зоны:
Сеть / приём данных — ReceiveDataCostMs, ReadDataCostMs
Запись — WriteDataCostMs (flush в MemTable и на диск)
Commit / Publish — CommitAndPublishCostMs (метаданные, реплики)
Это разделение сразу покажет, виноват ли сетевой слой, движок записи или координация транзакций.
В этой статье мы разобрали путь данных от curl до Segment-файла: HTTP-пайплайн, StreamLoadPipe с backpressure, иерархию записи LoadChannel -> DeltaWriter -> MemTable и async flush.
Но есть ещё три механизма, которые радикально влияют на скорость в конкретных сценариях:
Group Commit — как Doris объединяет тысячи мелких загрузок в одну транзакцию (и почему без этого на high-frequency ingest вы получите ошибку -235 вместо результата).
Pipeline Execution Engine — как push-модель и векторизация ускоряют путь записи.
Memtable Forwarding — оптимизация, которая убирает лишние encode/decode и даёт ускорение до 2.8x.
Разберём их во второй части, если меня выпустят из Песочницы.
Короче, если тема зашла дайте знать в комментариях, какой из трёх механизмов интересен больше всего.