Languages
C++
Data Engineering with C++: High-Performance Pipelines

Data Engineering with C++: High-Performance Pipelines

Data engineering is often associated with languages like Python or Java, but C++ offers unmatched performance, control, and efficiency for high-throughput, low-latency data systems. In this blog, we’ll walk through how to build fast and efficient data pipelines using C++.

🚀 Why C++ for Data Engineering?

  • Blazing Speed: C++ is close to the metal—perfect for performance-critical tasks.
  • Deterministic Memory Management: Unlike GC-based languages, C++ gives you fine-grained control.
  • System-Level Access: Ideal for custom ingestion, transformation, and storage logic.

🧱 Project Structure

data-pipeline-cpp/
├── include/
│   └── pipeline.hpp
├── src/
│   ├── main.cpp
│   └── pipeline.cpp
├── CMakeLists.txt
└── data/
    └── input.csv

🔧 Step 1: Setup Your Environment

CMakeLists.txt

cmake_minimum_required(VERSION 3.10)
project(DataPipeline)
set(CMAKE_CXX_STANDARD 17)
 
add_executable(DataPipeline src/main.cpp src/pipeline.cpp)
target_include_directories(DataPipeline PRIVATE include)

📥 Step 2: Reading CSV Data

pipeline.hpp

#pragma once
#include <string>
#include <vector>
 
struct Record {
    int id;
    std::string name;
    float value;
};
 
std::vector<Record> read_csv(const std::string& filename);
void write_csv(const std::string& filename, const std::vector<Record>& records);

pipeline.cpp

#include "pipeline.hpp"
#include <fstream>
#include <sstream>
#include <iostream>
 
std::vector<Record> read_csv(const std::string& filename) {
    std::ifstream file(filename);
    std::vector<Record> records;
    std::string line;
    std::getline(file, line); // skip header
    while (std::getline(file, line)) {
        std::stringstream ss(line);
        std::string token;
        Record r;
        std::getline(ss, token, ',');
        r.id = std::stoi(token);
        std::getline(ss, token, ',');
        r.name = token;
        std::getline(ss, token, ',');
        r.value = std::stof(token);
        records.push_back(r);
    }
    return records;
}
 
void write_csv(const std::string& filename, const std::vector<Record>& records) {
    std::ofstream file(filename);
    file << "ID,Name,Value\n";
    for (const auto& r : records) {
        file << r.id << "," << r.name << "," << r.value << "\n";
    }
}

main.cpp

#include "pipeline.hpp"
#include <iostream>
 
int main() {
    auto records = read_csv("data/input.csv");
 
    for (auto& r : records) {
        r.value *= 1.1; // simulate transformation
    }
 
    write_csv("data/output.csv", records);
    std::cout << "Pipeline complete!" << std::endl;
    return 0;
}

⚡ Performance Benchmarking

Use time ./DataPipeline or integrate with Google Benchmark (opens in a new tab) for micro-benchmarks.

sudo apt install libbenchmark-dev

Add to CMakeLists:

find_package(benchmark REQUIRED)
target_link_libraries(DataPipeline benchmark::benchmark)

🔄 Multi-threading with std::thread

#include <thread>
 
void process_chunk(std::vector<Record>& chunk) {
    for (auto& r : chunk) {
        r.value *= 1.1;
    }
}
 
// Usage:
std::thread t1(process_chunk, std::ref(records1));
std::thread t2(process_chunk, std::ref(records2));
t1.join();
t2.join();

🔌 Kafka Integration (Consumer Example)

Use librdkafka:

sudo apt install librdkafka-dev
#include <rdkafkacpp.h>
 
class ExampleEventCb : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event &event) override {
        std::cerr << "Event: " << event.str() << std::endl;
    }
};
 
void consume_kafka() {
    std::string brokers = "localhost:9092";
    std::string topic = "input-topic";
    std::string group_id = "cpp-consumer";
 
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", brokers, errstr);
    conf->set("group.id", group_id, errstr);
 
    ExampleEventCb event_cb;
    conf->set("event_cb", &event_cb, errstr);
 
    auto consumer = RdKafka::KafkaConsumer::create(conf, errstr);
    consumer->subscribe({topic});
 
    while (true) {
        auto msg = consumer->consume(1000);
        if (!msg->err()) {
            std::cout << "Received: " << msg->payload() << std::endl;
        }
        delete msg;
    }
}

🧪 Testing

Use Catch2:

sudo apt install catch2

Example:

#define CATCH_CONFIG_MAIN
#include <catch2/catch.hpp>
 
TEST_CASE("Value Scaling") {
    Record r{1, "Test", 100.0};
    r.value *= 1.1;
    REQUIRE(r.value == Approx(110.0));
}

🌐 Deployment: AWS S3

Use libcurl for uploading to S3:

sudo apt install libcurl4-openssl-dev
#include <curl/curl.h>
 
void upload_to_s3(const std::string& filename) {
    CURL *curl = curl_easy_init();
    if (curl) {
        FILE *file = fopen(filename.c_str(), "rb");
        curl_easy_setopt(curl, CURLOPT_URL, "https://s3.amazonaws.com/bucket/output.csv");
        curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
        curl_easy_setopt(curl, CURLOPT_READDATA, file);
        curl_easy_perform(curl);
        curl_easy_cleanup(curl);
        fclose(file);
    }
}

🧠 Final Thoughts

C++ is a powerful but often overlooked option in the data engineering stack. With careful architecture, multi-threading, and integration with modern tools, you can achieve robust and performant pipelines for even the most demanding workloads.

Last updated on April 20, 2025