Data Engineering with C++: High-Performance Pipelines
Data engineering is often associated with languages like Python or Java, but C++ offers unmatched performance, control, and efficiency for high-throughput, low-latency data systems. In this blog, we’ll walk through how to build fast and efficient data pipelines using C++.
🚀 Why C++ for Data Engineering?
- Blazing Speed: C++ is close to the metal—perfect for performance-critical tasks.
- Deterministic Memory Management: Unlike GC-based languages, C++ gives you fine-grained control.
- System-Level Access: Ideal for custom ingestion, transformation, and storage logic.
🧱 Project Structure
data-pipeline-cpp/
├── include/
│ └── pipeline.hpp
├── src/
│ ├── main.cpp
│ └── pipeline.cpp
├── CMakeLists.txt
└── data/
└── input.csv
🔧 Step 1: Setup Your Environment
CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(DataPipeline)
set(CMAKE_CXX_STANDARD 17)
add_executable(DataPipeline src/main.cpp src/pipeline.cpp)
target_include_directories(DataPipeline PRIVATE include)
📥 Step 2: Reading CSV Data
pipeline.hpp
#pragma once
#include <string>
#include <vector>
struct Record {
int id;
std::string name;
float value;
};
std::vector<Record> read_csv(const std::string& filename);
void write_csv(const std::string& filename, const std::vector<Record>& records);
pipeline.cpp
#include "pipeline.hpp"
#include <fstream>
#include <sstream>
#include <iostream>
std::vector<Record> read_csv(const std::string& filename) {
std::ifstream file(filename);
std::vector<Record> records;
std::string line;
std::getline(file, line); // skip header
while (std::getline(file, line)) {
std::stringstream ss(line);
std::string token;
Record r;
std::getline(ss, token, ',');
r.id = std::stoi(token);
std::getline(ss, token, ',');
r.name = token;
std::getline(ss, token, ',');
r.value = std::stof(token);
records.push_back(r);
}
return records;
}
void write_csv(const std::string& filename, const std::vector<Record>& records) {
std::ofstream file(filename);
file << "ID,Name,Value\n";
for (const auto& r : records) {
file << r.id << "," << r.name << "," << r.value << "\n";
}
}
main.cpp
#include "pipeline.hpp"
#include <iostream>
int main() {
auto records = read_csv("data/input.csv");
for (auto& r : records) {
r.value *= 1.1; // simulate transformation
}
write_csv("data/output.csv", records);
std::cout << "Pipeline complete!" << std::endl;
return 0;
}
⚡ Performance Benchmarking
Use time ./DataPipeline
or integrate with Google Benchmark (opens in a new tab) for micro-benchmarks.
sudo apt install libbenchmark-dev
Add to CMakeLists:
find_package(benchmark REQUIRED)
target_link_libraries(DataPipeline benchmark::benchmark)
🔄 Multi-threading with std::thread
#include <thread>
void process_chunk(std::vector<Record>& chunk) {
for (auto& r : chunk) {
r.value *= 1.1;
}
}
// Usage:
std::thread t1(process_chunk, std::ref(records1));
std::thread t2(process_chunk, std::ref(records2));
t1.join();
t2.join();
🔌 Kafka Integration (Consumer Example)
Use librdkafka
:
sudo apt install librdkafka-dev
#include <rdkafkacpp.h>
class ExampleEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event &event) override {
std::cerr << "Event: " << event.str() << std::endl;
}
};
void consume_kafka() {
std::string brokers = "localhost:9092";
std::string topic = "input-topic";
std::string group_id = "cpp-consumer";
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("group.id", group_id, errstr);
ExampleEventCb event_cb;
conf->set("event_cb", &event_cb, errstr);
auto consumer = RdKafka::KafkaConsumer::create(conf, errstr);
consumer->subscribe({topic});
while (true) {
auto msg = consumer->consume(1000);
if (!msg->err()) {
std::cout << "Received: " << msg->payload() << std::endl;
}
delete msg;
}
}
🧪 Testing
Use Catch2
:
sudo apt install catch2
Example:
#define CATCH_CONFIG_MAIN
#include <catch2/catch.hpp>
TEST_CASE("Value Scaling") {
Record r{1, "Test", 100.0};
r.value *= 1.1;
REQUIRE(r.value == Approx(110.0));
}
🌐 Deployment: AWS S3
Use libcurl
for uploading to S3:
sudo apt install libcurl4-openssl-dev
#include <curl/curl.h>
void upload_to_s3(const std::string& filename) {
CURL *curl = curl_easy_init();
if (curl) {
FILE *file = fopen(filename.c_str(), "rb");
curl_easy_setopt(curl, CURLOPT_URL, "https://s3.amazonaws.com/bucket/output.csv");
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(curl, CURLOPT_READDATA, file);
curl_easy_perform(curl);
curl_easy_cleanup(curl);
fclose(file);
}
}
🧠 Final Thoughts
C++ is a powerful but often overlooked option in the data engineering stack. With careful architecture, multi-threading, and integration with modern tools, you can achieve robust and performant pipelines for even the most demanding workloads.
Last updated on April 20, 2025