内存序问题排查-5 源码

时间:2025-04-09 08:54:21
#include <iostream>
#include <fstream>
#include <vector>
#include <bitset>
#include "xdebug.h"
#include "chan_split.h"

#define BUffER_SIZE 1024 * 1024 * 2

#if 0
int main() {
    uint8_t* pdata = new uint8_t[1024 * 1024 * 2];
    chan_split chan_split_(0, 1024 * 1024 * 2);

    uint8_t* ptmp = pdata;
    size_t loop_count = 1024 * 1024 * 2 / 32;
    for (size_t i = 0; i < loop_count; i++) {
        memset(ptmp, i & 255, 32);
        ptmp += 32;
    }

    std::string filename = "file.bin";
    std::ofstream file(filename, std::ios::binary);
    if (file) {
        file.write((const char*)pdata, 1024 * 1024 * 2);
    }

    chan_split_.start_split_async();
    for (int i = 0; i < 150; i++) {
        chan_split_.push_data(pdata, 1024 * 1024 * 2, 2);
    }

    std::this_thread::sleep_for(std::chrono::seconds(1));
    chan_split_.stop_split_async();
    xdebug("done.");
    return 0;
}
#else
int main() {
    chan_split chan_split_(0, BUffER_SIZE);
    uint8_t* pdata = new uint8_t[BUffER_SIZE];
    std::ifstream file("xdma_test.dat", std::ios::binary);
    if (!file) {
        xdebug("open file failed.");
        return 1;
    }

    chan_split_.start_split_async();

    int count = 0;
    while (file.eof() == false) {
        file.read((char*)pdata, BUffER_SIZE);
        std::streamsize byte_read_len = file.gcount();
        chan_split_.push_data(pdata, byte_read_len, 2);
        count++;
    }
    xdebug("count=%d", count);

    std::this_thread::sleep_for(std::chrono::seconds(1));
    chan_split_.stop_split_async();
    delete[] pdata;
    return 0;
}

#endif
#pragma once

#include <thread>
#include <shared_mutex>
#include <future>
#include <atomic>
#include "ThreadPool.h"

#define USE_ATIMIC_BOOL_C 1

struct split_info_t {
    int chan_index{0};         /* 拆分的索引 */
    int dma_index{0};          /* dma索引 */
    int chan_count{1};         /* 通道数 */

    uint8_t* pdata{nullptr};   /* 待拆分数据 */
    size_t pdata_len{0};       /* 待拆分数据长度 */
    std::vector<uint8_t> data; /* 处理完成的数据 */
    size_t data_len{0};        /* 待拆分数据长度 */

#ifdef USE_ATIMIC_BOOL_C
    std::atomic<bool> is_finished; /* 是否拆分完成 */
#else
    bool is_finished;
#endif
};

class chan_split {
public:
    chan_split(int dma_index, size_t read_dma_len);
    ~chan_split();

    void push_data(uint8_t* pdata, size_t len, int chan_count);
    void start_split_async();
    void stop_split_async();

private:
    void split(int chan_index);
    void extract(split_info_t& pinfo);

private:
    split_info_t info_arr_[2];
    bool is_running_{true};
    ThreadPool pool_;
    std::vector<std::future<int>> results_;
};


#include "chan_split.h"
#include <string>
#include <fstream>
#include "xdebug.h"

chan_split::chan_split(int dma_index, size_t read_dma_len)
    : pool_(2) {
    for (int i = 0; i < 2; i++) {
        split_info_t& info = info_arr_[i];
        info.chan_index = i;
        info.dma_index = dma_index;
        info.data.resize(read_dma_len);
    }
}

chan_split::~chan_split() {
    is_running_ = false;
}

void chan_split::start_split_async() {
    for (int i = 0; i < 2; i++) {
        /* clang-format off */
        results_.emplace_back(
            pool_.enqueue([this, i] {
                while (is_running_) {
#ifdef USE_ATIMIC_BOOL_C
                    if (info_arr_[i].is_finished.load(std::memory_order::memory_order_acquire) == false) {

#else
                    if(info_arr_[i].is_finished == false){
#endif
                        split(i);
                    }
                }
                return 0;
            })
        );
        /* clang-format on */
    }
}

void chan_split::stop_split_async() {
    is_running_ = false;
    for (int i = 0; i < 2; i++) {
        if (results_[i].valid() == true) {
            results_[i].wait();
        }
    }
    results_.clear();
}

void chan_split::push_data(uint8_t* pdata, size_t len, int chan_count) {
    for (int i = 0; i < chan_count; i++) {
        split_info_t& info = info_arr_[i];
        info.chan_count = chan_count;
        info.pdata = pdata;
        info.pdata_len = len;
#ifdef USE_ATIMIC_BOOL_C
        info.is_finished.store(false, std::memory_order::memory_order_release);

#else
        info.is_finished = false;
#endif
    }

    for (int i = 0; i < chan_count; i++) {
#ifdef USE_ATIMIC_BOOL_C
        while (info_arr_[i].is_finished.load(std::memory_order::memory_order_acquire) == false) {}

#else
        while (info_arr_[i].is_finished == false) {}
#endif
    }
}

void chan_split::split(int chan_index) {
    split_info_t& info = info_arr_[chan_index];

    switch (info.chan_count) {
        case 1: {
            /* 直接调用外部函数 */
            break;
        }

        default: {
            /* 拆分数据 */
            extract(info);
            break;
        }
    }

#ifdef USE_ATIMIC_BOOL_C
    info.is_finished.store(true, std::memory_order::memory_order_release);
#else
    info.is_finished = true;

#endif
}

void chan_split::extract(split_info_t& info) {
    int ele_size = 16;
    int loop_size = ele_size * info.chan_count;
    size_t loop_count = info.pdata_len / ele_size;
    uint8_t* src = info.pdata + info.chan_index * ele_size;
    uint8_t* dst = info.data.data();

    for (int i = 0; i < loop_count; i++) {
        memcpy(dst, src, ele_size);
        dst += ele_size;
        src += loop_size;
        i++;
    }

    info.data_len = info.pdata_len / info.chan_count;

    /* 调用回调函数 */
    std::string filename = "xdma_test_" + std::to_string(info.chan_index) + ".dat";
    std::ofstream file(filename, std::ios::binary | std::ios::app);
    if (file) {
        file.write((const char*)info.data.data(), info.data_len);
    }
}
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
    ThreadPool(size_t);
    ~ThreadPool();

    /* 函数为enqueue(F&& f, Args&&... args)
        返回的类型是推导出来的 std::future<typename std::result_of<F(Args...)>::type>
     */
    template <class F, class... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;

private:
    // need to keep track of threads so we can join them 需要跟踪线程,便于join等待线程
    std::vector<std::thread> workers_;
    // the task queue  任务队列
    std::queue<std::function<void()>> tasks_;

    // synchronization
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    bool stop_;
};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads_count)
    : stop_(false) {
    for (size_t i = 0; i < threads_count; ++i)
        workers_.emplace_back([this] {
            for (;;) {
                std::function<void()> task;

                {
                    std::unique_lock<std::mutex> lock(this->queue_mutex_);
                    this->condition_.wait(lock, [this] {  // 先释放锁,再阻塞直到stop或者任务队列非空
                        return this->stop_ || !this->tasks_.empty();
                    });
                    if (this->stop_ && this->tasks_.empty())
                        return;
                    task = std::move(this->tasks_.front());
                    this->tasks_.pop();
                }

                task();
            }
        });
}

// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task =
        std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));

    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex_);

        // don't allow enqueueing after stopping the pool
        if (stop_)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks_.emplace([task]() {
            (*task)();
        });
    }
    condition_.notify_one();
    return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        stop_ = true;
    }
    condition_.notify_all();
    for (std::thread& worker : workers_)
        worker.join();
}

#endif

相关文章