Обработка миллионов строк данных потоками на

Обработка миллионов строк данных потоками на

Попробуем решить описанную проблему со следующим стеком технологий:

  • Node.js
  • Sequelize (ORM-библиотека, основанная на промисах)
  • MariaDB

Что на клиенте – не имеет значения. Когда размер данных приближается к 4 Гб, Chrome в любом случае упадет.

Очевидное решение проблемы большого объема данных – потоковая передача. Если вы попробуете отправить их одним большим куском, Node не справится.

Тут возникает первая большая проблема – Sequelize не поддерживает работу с потоками.

Вот так выглядит классический вызов библиотеки:

Конечно, тут кое-что пропущено – вроде конфигурации базы данных и собственно определение вызова метода get() (откуда, например, приходит res ?). Но вы, безусловно, разберетесь в этом самостоятельно.

Результат работы этого кода вполне предсказуем – Node падает. Вы можете, конечно, выделить больше памяти – max-old-space-size=8000 , но вряд ли это можно назвать решением проблемы.

Попробуем вручную реализовать потоковую передачу данных:

В этом примере мы знаем, сколько строк вернется из базы данных, отсюда строка if (i === 5 ). Это просто тест. Чтобы завершить поток, вы должны отправить null . Можно заранее получить значение count (количество строк).

Основная идея состоит в разделении одного большого запроса на несколько маленьких и потоковое получение отдельных чанков. Это работает, Node не падает от перегрузки, но работает очень долго. 3.5 Гб данных вы будете обрабатывать примерно 10 минут!

Обработка миллионов строк данных потоками на

Нет ли другого – более быстрого – решения?

Есть – это неблокирующий клиент MariaDB Node.js connector.

Вот так выглядит обычный запрос:

Он уже достаточно быстрый, однако попробуем потоковый код:

Выглядит загадочно, но тут не происходит ничего сложного. Мы просто создаем пайплайн для данных:

  • Поток queryStream – результат запроса к базе.
  • Поток transformStream – для отправки преобразованных в строки чанков (здесь можно использовать только строки и буферы).
  • Класс stream.PassThrough это реализация трансформирующего стрима.
  • Функция для обработки ошибок.

ps.pipe(res) – отправляет результаты обработки клиенту.

Обработка миллионов строк данных потоками на

Это решение работает гораздо быстрее – те же данные передаются меньше, чем за 4 минуты без перегрузки Node.

Итак, если вам приходится работать с огромным количеством данных, у вас есть три пути: