Program Listing for File xmodel.cpp¶
↰ Return to documentation for file (/workspace/amdinfer/src/amdinfer/workers/xmodel.cpp)
// Copyright 2021 Xilinx, Inc.
// Copyright 2022 Advanced Micro Devices, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <cxxabi.h> // for __forced_unwind
#include <algorithm> // for copy, copy_backward
#include <atomic> // for atomic_int32_t
#include <cstddef> // for size_t, byte
#include <cstdint> // for uint64_t, uint32_t
#include <cstdlib> // for getenv
#include <cstring> // for memcpy
#include <ext/alloc_traits.h> // for __alloc_traits<>::...
#include <memory> // for unique_ptr, allocator
#include <queue> // for queue
#include <ratio> // for micro
#include <string> // for string, operator!=
#include <thread> // for thread, sleep_for
#include <utility> // for pair, move
#include <vart/runner.hpp> // for Runner
#include <vart/runner_ext.hpp> // for RunnerExt
#include <vart/tensor_buffer.hpp> // for TensorBuffer
#include <vector> // for vector
#include <vitis/ai/target_factory.hpp> // for target_factory
#include <xir/graph/graph.hpp> // for Graph
#include <xir/graph/subgraph.hpp> // for Subgraph
#include <xir/tensor/tensor.hpp> // for Tensor
#include "amdinfer/batching/batcher.hpp" // for BatchPtr, Batch
#include "amdinfer/buffers/buffer.hpp" // for Buffer
#include "amdinfer/buffers/vart_tensor_buffer.hpp" // for VartTensorBuffer
#include "amdinfer/build_options.hpp" // for AMDINFER_ENABLE_ME...
#include "amdinfer/core/data_types.hpp" // for DataType
#include "amdinfer/core/data_types_internal.hpp" // for mapXirToType
#include "amdinfer/core/exceptions.hpp" // for invalid_argument
#include "amdinfer/core/predict_api.hpp" // for InferenceResponse
#include "amdinfer/declarations.hpp" // for BufferPtrs, Infere...
#include "amdinfer/observation/observer.hpp" // for Loggers, Metrics...
#include "amdinfer/util/containers.hpp" // for containerProduct
#include "amdinfer/util/ctpl.hpp" // for ThreadPool
#include "amdinfer/util/parse_env.hpp" // for autoExpandEnvironm...
#include "amdinfer/util/queue.hpp" // for BufferPtrsQueue
#include "amdinfer/util/thread.hpp" // for setThreadName
#include "amdinfer/util/timer.hpp" // for Timer
#include "amdinfer/workers/worker.hpp" // for Worker, kNumBuffer...
namespace amdinfer::workers {
class XModel : public Worker {
public:
XModel() : Worker("XModel", "XModel") {}
std::thread spawn(BatchPtrQueue* input_queue) override;
private:
void doInit(RequestParameters* parameters) override;
size_t doAllocate(size_t num) override;
void doAcquire(RequestParameters* parameters) override;
void doRun(BatchPtrQueue* input_queue) override;
void doRelease() override;
void doDeallocate() override;
void doDestroy() override;
vart::RunnerExt* getRunner();
std::unique_ptr<xir::Graph> graph_;
const xir::Subgraph* subgraph_ = nullptr;
std::string kernel_;
std::unique_ptr<vart::Runner> runner_;
std::vector<DataType> output_type_;
std::vector<uint32_t> output_size_;
util::ThreadPool pool_;
};
std::thread XModel::spawn(BatchPtrQueue* input_queue) {
return std::thread(&XModel::run, this, input_queue);
}
vart::RunnerExt* XModel::getRunner() {
return dynamic_cast<vart::RunnerExt*>(this->runner_.get());
}
void XModel::doInit(RequestParameters* parameters) {
const auto max_buffer_num = 50;
const auto* aks_xmodel_root = std::getenv("AKS_XMODEL_ROOT");
if (aks_xmodel_root == nullptr) {
throw environment_not_set_error("AKS_XMODEL_ROOT not set");
}
const auto default_path =
std::string{aks_xmodel_root} +
"/artifacts/u200_u250/resnet_v1_50_tf/resnet_v1_50_tf.xmodel";
max_buffer_num_ = max_buffer_num;
if (parameters->has("max_buffer_num")) {
max_buffer_num_ = parameters->get<int32_t>("max_buffer_num");
}
auto path = default_path;
if (parameters->has("model")) {
path = parameters->get<std::string>("model");
}
util::autoExpandEnvironmentVariables(path);
graph_ = xir::Graph::deserialize(path);
auto subgraphs = graph_->get_root_subgraph()->children_topological_sort();
std::vector<const xir::Subgraph*> dpu_graphs;
for (const auto* c : subgraphs) {
// CHECK(c->has_attr("device"));
auto device = c->get_attr<std::string>("device");
if (device == "DPU") {
dpu_graphs.emplace_back(c);
}
}
// TODO(varunsh): we want to eventually support arbitrary numbers of dpu
// graphs but may need model chaining for that
if (dpu_graphs.size() > 1) {
throw invalid_argument("Unsupported XModel with more than 1 DPU subgraph");
}
this->subgraph_ = dpu_graphs[0];
if (this->subgraph_->has_attr("dpu_fingerprint")) {
const auto fingerprint =
this->subgraph_->get_attr<std::uint64_t>("dpu_fingerprint");
this->kernel_ = vitis::ai::target_factory()->create(fingerprint).type();
} else {
this->kernel_ = this->subgraph_->get_attr<std::string>("kernel");
}
}
size_t XModel::doAllocate(size_t num) {
constexpr auto kBufferNum = 10U;
size_t buffer_num =
static_cast<int>(num) == kNumBufferAuto ? kBufferNum : num;
for (size_t i = 0; i < buffer_num; i++) {
BufferPtrs vec;
auto input_tensors = subgraph_->get_sorted_input_tensors();
if (input_tensors.size() > 1) {
// rt-engine/vart does not support more than one input tensor
throw(
invalid_argument("Unsupported XModel with more than one input tensor"));
}
for (const auto* tensor : input_tensors) {
auto input_shape = tensor->get_shape();
auto input_type = tensor->get_data_type();
vec.emplace_back(std::make_unique<VartTensorBuffer>(
tensor->get_name(), input_shape, input_type));
}
this->input_buffers_->enqueue(std::move(vec));
}
for (size_t i = 0; i < buffer_num; i++) {
BufferPtrs vec;
auto output_tensors = subgraph_->get_sorted_output_tensors();
for (const auto* tensor : output_tensors) {
auto input_shape = tensor->get_shape();
auto input_type = tensor->get_data_type();
vec.emplace_back(std::make_unique<VartTensorBuffer>(
tensor->get_name(), input_shape, input_type));
}
this->output_buffers_->enqueue(std::move(vec));
}
return buffer_num;
}
void XModel::doAcquire(RequestParameters* parameters) {
constexpr auto kThreads = 3;
auto threads = kThreads;
if (parameters->has("threads")) {
threads = parameters->get<int32_t>("threads");
}
this->pool_.resize(threads);
runner_ = vart::Runner::create_runner(this->subgraph_, "run");
auto input_tensors = runner_->get_input_tensors();
// assuming only one tensor as in doInit()
auto input_shape = input_tensors[0]->get_shape();
auto input_type = mapXirToType(input_tensors[0]->get_data_type());
this->batch_size_ = input_shape[0];
this->metadata_.addInputTensor("input", input_type, input_shape);
auto output_tensors = runner_->get_output_tensors();
for (const auto* tensor : output_tensors) {
auto output_shape = tensor->get_shape();
output_type_.emplace_back(mapXirToType(tensor->get_data_type()));
// +1 to skip the batch size
output_size_.emplace_back(
util::containerProduct(output_shape.begin() + 1, output_shape.end()));
// TODO(varunsh): what should we return here?
this->metadata_.addOutputTensor("output", output_type_.back(),
output_shape);
}
}
void XModel::doRun(BatchPtrQueue* input_queue) {
std::atomic_int32_t pool_size = 0;
const int max_pool_size = this->pool_.getSize() * 4; // 4 is arbitrary
util::setThreadName("XModel");
#ifdef AMDINFER_ENABLE_LOGGING
const auto& logger = this->getLogger();
#endif
const auto pool_delay = std::chrono::milliseconds(10);
while (true) {
BatchPtr batch;
input_queue->wait_dequeue(batch);
if (batch == nullptr) {
break;
}
AMDINFER_LOG_INFO(
logger, "Got request in xmodel: " + std::to_string(batch->size()));
#ifdef AMDINFER_ENABLE_METRICS
Metrics::getInstance().incrementCounter(
MetricCounterIDs::PipelineIngressWorker);
#endif
pool_size++;
if (pool_size > max_pool_size) {
std::this_thread::sleep_for(pool_delay);
}
this->pool_.push([this, batch = std::move(batch), &pool_size](int id) {
(void)id; // suppress unused variable warning
#ifdef AMDINFER_ENABLE_TRACING
for (unsigned int j = 0; j < batch->size(); j++) {
auto& trace = batch->getTrace(j);
trace->startSpan("xmodel");
}
#endif
std::queue<std::pair<uint32_t, int>> futures;
std::vector<vart::TensorBuffer*> inputs_ptr;
std::vector<vart::TensorBuffer*> outputs_ptr;
const auto& input_buffers = batch->getInputBuffers();
inputs_ptr.reserve(input_buffers.size());
const auto& output_buffers = batch->getOutputBuffers();
outputs_ptr.reserve(output_buffers.size());
for (const auto& buffer : input_buffers) {
logTraceBuffer(getLogger(), buffer->data(0));
}
for (const auto& buffer : input_buffers) {
auto* vart = dynamic_cast<VartTensorBuffer*>(buffer.get());
inputs_ptr.emplace_back(vart->getTensorBuffer());
}
for (const auto& buffer : output_buffers) {
auto* vart = dynamic_cast<VartTensorBuffer*>(buffer.get());
outputs_ptr.emplace_back(vart->getTensorBuffer());
}
for (auto* input : inputs_ptr) {
const auto* tensor = input->get_tensor();
input->sync_for_write(
0, tensor->get_element_num() / (tensor->get_shape())[0]);
}
futures.push(getRunner()->execute_async(inputs_ptr, outputs_ptr));
std::vector<InferenceResponse> responses;
responses.reserve(batch->size());
for (const auto& req : *batch) {
auto& resp = responses.emplace_back();
resp.setID(req->getID());
resp.setModel("xmodel");
}
while (!futures.empty()) {
auto job_id = futures.front();
futures.pop();
getRunner()->wait(job_id.first, -1);
}
for (auto* output : outputs_ptr) {
const auto* tensor = output->get_tensor();
output->sync_for_read(
0, tensor->get_element_num() / (tensor->get_shape())[0]);
}
const auto num_batches = batch->size();
for (unsigned int k = 0; k < num_batches; k++) {
const auto& req = batch->getRequest(k);
auto inputs = req->getInputs();
auto outputs = req->getOutputs();
auto& resp = responses[k];
const auto num_outputs = outputs.size();
for (unsigned int i = 0; i < num_outputs; i++) {
auto* output_index = output_buffers[i]->data(0);
InferenceResponseOutput output;
auto output_tensors = getRunner()->get_output_tensors();
auto output_shape = output_tensors[i]->get_shape();
std::vector<uint64_t> new_shape;
new_shape.reserve(output_shape.size() - 1);
for (size_t j = 0; j < output_shape.size() - 1; j++) {
new_shape.push_back(output_shape[j + 1]);
}
output.setShape(new_shape);
output.setDatatype(this->output_type_[i]);
std::vector<std::byte> buffer;
buffer.resize(this->output_size_[i] * sizeof(uint8_t));
memcpy(buffer.data(),
reinterpret_cast<int8_t*>(output_index) +
(i * this->output_size_[i]),
this->output_size_[i] * sizeof(uint8_t));
output.setData(std::move(buffer));
std::string output_name = outputs[i].getName();
if (output_name.empty()) {
output.setName(inputs[0].getName());
} else {
output.setName(output_name);
}
resp.addOutput(output);
}
#ifdef AMDINFER_ENABLE_TRACING
auto context = batch->getTrace(k)->propagate();
resp.setContext(std::move(context));
#endif
req->runCallbackOnce(resp);
#ifdef AMDINFER_ENABLE_METRICS
Metrics::getInstance().incrementCounter(
MetricCounterIDs::PipelineEgressWorker);
util::Timer timer{batch->getTime(k)};
timer.stop();
auto duration = timer.count<std::micro>();
Metrics::getInstance().observeSummary(MetricSummaryIDs::RequestLatency,
duration);
#endif
}
pool_size--;
});
}
AMDINFER_LOG_INFO(logger, "XModel ending");
}
void XModel::doRelease() {}
void XModel::doDeallocate() { this->pool_.stop(true); }
void XModel::doDestroy() {}
} // namespace amdinfer::workers
extern "C" {
// using smart pointer here may cause problems inside shared object so managing
// manually
amdinfer::workers::Worker* getWorker() {
return new amdinfer::workers::XModel();
}
} // extern C