#include <iostream>
#include <fstream>
#include <vector>
#include <bitset>
#include "xdebug.h"
#include "chan_split.h"
#define BUffER_SIZE 1024 * 1024 * 2
#if 0
int main() {
uint8_t* pdata = new uint8_t[1024 * 1024 * 2];
chan_split chan_split_(0, 1024 * 1024 * 2);
uint8_t* ptmp = pdata;
size_t loop_count = 1024 * 1024 * 2 / 32;
for (size_t i = 0; i < loop_count; i++) {
memset(ptmp, i & 255, 32);
ptmp += 32;
}
std::string filename = "file.bin";
std::ofstream file(filename, std::ios::binary);
if (file) {
file.write((const char*)pdata, 1024 * 1024 * 2);
}
chan_split_.start_split_async();
for (int i = 0; i < 150; i++) {
chan_split_.push_data(pdata, 1024 * 1024 * 2, 2);
}
std::this_thread::sleep_for(std::chrono::seconds(1));
chan_split_.stop_split_async();
xdebug("done.");
return 0;
}
#else
int main() {
chan_split chan_split_(0, BUffER_SIZE);
uint8_t* pdata = new uint8_t[BUffER_SIZE];
std::ifstream file("xdma_test.dat", std::ios::binary);
if (!file) {
xdebug("open file failed.");
return 1;
}
chan_split_.start_split_async();
int count = 0;
while (file.eof() == false) {
file.read((char*)pdata, BUffER_SIZE);
std::streamsize byte_read_len = file.gcount();
chan_split_.push_data(pdata, byte_read_len, 2);
count++;
}
xdebug("count=%d", count);
std::this_thread::sleep_for(std::chrono::seconds(1));
chan_split_.stop_split_async();
delete[] pdata;
return 0;
}
#endif
#pragma once
#include <thread>
#include <shared_mutex>
#include <future>
#include <atomic>
#include "ThreadPool.h"
#define USE_ATIMIC_BOOL_C 1
struct split_info_t {
int chan_index{0};
int dma_index{0};
int chan_count{1};
uint8_t* pdata{nullptr};
size_t pdata_len{0};
std::vector<uint8_t> data;
size_t data_len{0};
#ifdef USE_ATIMIC_BOOL_C
std::atomic<bool> is_finished;
#else
bool is_finished;
#endif
};
class chan_split {
public:
chan_split(int dma_index, size_t read_dma_len);
~chan_split();
void push_data(uint8_t* pdata, size_t len, int chan_count);
void start_split_async();
void stop_split_async();
private:
void split(int chan_index);
void extract(split_info_t& pinfo);
private:
split_info_t info_arr_[2];
bool is_running_{true};
ThreadPool pool_;
std::vector<std::future<int>> results_;
};
#include "chan_split.h"
#include <string>
#include <fstream>
#include "xdebug.h"
chan_split::chan_split(int dma_index, size_t read_dma_len)
: pool_(2) {
for (int i = 0; i < 2; i++) {
split_info_t& info = info_arr_[i];
info.chan_index = i;
info.dma_index = dma_index;
info.data.resize(read_dma_len);
}
}
chan_split::~chan_split() {
is_running_ = false;
}
void chan_split::start_split_async() {
for (int i = 0; i < 2; i++) {
results_.emplace_back(
pool_.enqueue([this, i] {
while (is_running_) {
#ifdef USE_ATIMIC_BOOL_C
if (info_arr_[i].is_finished.load(std::memory_order::memory_order_acquire) == false) {
#else
if(info_arr_[i].is_finished == false){
#endif
split(i);
}
}
return 0;
})
);
}
}
void chan_split::stop_split_async() {
is_running_ = false;
for (int i = 0; i < 2; i++) {
if (results_[i].valid() == true) {
results_[i].wait();
}
}
results_.clear();
}
void chan_split::push_data(uint8_t* pdata, size_t len, int chan_count) {
for (int i = 0; i < chan_count; i++) {
split_info_t& info = info_arr_[i];
info.chan_count = chan_count;
info.pdata = pdata;
info.pdata_len = len;
#ifdef USE_ATIMIC_BOOL_C
info.is_finished.store(false, std::memory_order::memory_order_release);
#else
info.is_finished = false;
#endif
}
for (int i = 0; i < chan_count; i++) {
#ifdef USE_ATIMIC_BOOL_C
while (info_arr_[i].is_finished.load(std::memory_order::memory_order_acquire) == false) {}
#else
while (info_arr_[i].is_finished == false) {}
#endif
}
}
void chan_split::split(int chan_index) {
split_info_t& info = info_arr_[chan_index];
switch (info.chan_count) {
case 1: {
break;
}
default: {
extract(info);
break;
}
}
#ifdef USE_ATIMIC_BOOL_C
info.is_finished.store(true, std::memory_order::memory_order_release);
#else
info.is_finished = true;
#endif
}
void chan_split::extract(split_info_t& info) {
int ele_size = 16;
int loop_size = ele_size * info.chan_count;
size_t loop_count = info.pdata_len / ele_size;
uint8_t* src = info.pdata + info.chan_index * ele_size;
uint8_t* dst = info.data.data();
for (int i = 0; i < loop_count; i++) {
memcpy(dst, src, ele_size);
dst += ele_size;
src += loop_size;
i++;
}
info.data_len = info.pdata_len / info.chan_count;
std::string filename = "xdma_test_" + std::to_string(info.chan_index) + ".dat";
std::ofstream file(filename, std::ios::binary | std::ios::app);
if (file) {
file.write((const char*)info.data.data(), info.data_len);
}
}
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
ThreadPool(size_t);
~ThreadPool();
template <class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_;
};
inline ThreadPool::ThreadPool(size_t threads_count)
: stop_(false) {
for (size_t i = 0; i < threads_count; ++i)
workers_.emplace_back([this] {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex_);
this->condition_.wait(lock, [this] {
return this->stop_ || !this->tasks_.empty();
});
if (this->stop_ && this->tasks_.empty())
return;
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
task();
}
});
}
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task =
std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex_);
if (stop_)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks_.emplace([task]() {
(*task)();
});
}
condition_.notify_one();
return res;
}
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread& worker : workers_)
worker.join();
}
#endif