Program Listing for File migraphx.cpp

Return to documentation for file (/workspace/amdinfer/src/amdinfer/workers/migraphx.cpp)

// Copyright 2022 Advanced Micro Devices, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <migraphx/migraphx.h>  // for migraphx_shape_datatype_t

#include <algorithm>              // for max
#include <cstddef>                // for byte, size_t
#include <cstring>                // for memcpy
#include <exception>              // for exception
#include <filesystem>             // for path
#include <fstream>                // for ifstream, operator<<
#include <map>                    // for map
#include <memory>                 // for allocator, unique_ptr
#include <migraphx/migraphx.hpp>  // for shape, program, progra...
#include <ratio>                  // for micro
#include <stdexcept>              // for invalid_argument, runt...
#include <string>                 // for string, operator+, to_...
#include <thread>                 // for thread
#include <utility>                // for move
#include <vector>                 // for vector

#include "amdinfer/batching/hard.hpp"          // for BatchPtr, Batch, Batch...
#include "amdinfer/buffers/vector_buffer.hpp"  // for VectorBuffer
#include "amdinfer/build_options.hpp"          // for AMDINFER_ENABLE_LOGGING
#include "amdinfer/core/data_types.hpp"        // for DataType, operator<<
#include "amdinfer/core/exceptions.hpp"        // for invalid_argument, runt...
#include "amdinfer/core/predict_api.hpp"       // for InferenceRequest, Infe...
#include "amdinfer/declarations.hpp"           // for InferenceResponseOutput
#include "amdinfer/observation/logging.hpp"    // for AMDINFER_LOG_INFO, AMD...
#include "amdinfer/observation/metrics.hpp"    // for Metrics, MetricCounterIDs
#include "amdinfer/util/containers.hpp"        // for containerProduct
#include "amdinfer/util/queue.hpp"             // for BufferPtrsQueue
#include "amdinfer/util/thread.hpp"            // for setThreadName
#include "amdinfer/util/timer.hpp"             // for Timer
#include "amdinfer/workers/worker.hpp"         // for Worker, kNumBufferAuto

namespace amdinfer::workers {

class MIGraphXWorker : public Worker {
 public:
  using Worker::Worker;
  std::thread spawn(BatchPtrQueue* input_queue) override;

 private:
  void doInit(RequestParameters* parameters) override;
  size_t doAllocate(size_t num) override;
  void doAcquire(RequestParameters* parameters) override;
  void doRun(BatchPtrQueue* input_queue) override;
  void doRelease() override;
  void doDeallocate() override;
  void doDestroy() override;

  // the model file to be loaded.  Supported types are *.onnx and *.mxr
  std::filesystem::path input_file_;
  // The prog_ is populated by reading the model file and contains most of
  // the worker's important info such as number, data types and sizes of
  // input and output buffers
  migraphx::program prog_;

  // flag to pad out a batch with dummy data.  Sending a batch of requests
  // with uninitialized data may crash migraphx, for certain models.
  // If pad_batch_ is true, this worker will pad any unused request slots
  // in a batch with dummy copies of the first request.
  bool pad_batch_ = true;
  // Calculated sizes in bytes for each input tensor, by input name
  std::map<std::string, size_t> input_sizes_;
};

std::thread MIGraphXWorker::spawn(BatchPtrQueue* input_queue) {
  return std::thread(&MIGraphXWorker::run, this, input_queue);
}

// Enum-to-enum conversion to let us read data type from migraphx model.
// The definitions are taken from the MIGraphX macro
// MIGRAPHX_SHAPE_VISIT_TYPES
DataType toDataType(migraphx_shape_datatype_t in) {
  switch (in) {
    // case 0 is tuple_type which we don't support here
    case migraphx_shape_bool_type:
      return DataType::Bool;
    case migraphx_shape_half_type:
      return DataType::Fp16;
    case migraphx_shape_float_type:
      return DataType::Fp32;
    case migraphx_shape_double_type:
      return DataType::Fp64;
    case migraphx_shape_uint8_type:
      return DataType::Uint8;
    case migraphx_shape_int8_type:
      return DataType::Int8;
    case migraphx_shape_uint16_type:
      return DataType::Uint16;
    case migraphx_shape_int16_type:
      return DataType::Int16;
    case migraphx_shape_int32_type:
      return DataType::Int32;
    case migraphx_shape_int64_type:
      return DataType::Int64;
    case migraphx_shape_uint32_type:
      return DataType::Uint32;
    case migraphx_shape_uint64_type:
      return DataType::Uint64;
    default:
      return DataType::Unknown;
  }
}

void MIGraphXWorker::doInit(RequestParameters* parameters) {
  // default batch size; client may request a change. Arbitrarily set to 64
  const int default_batch_size = 64;

  batch_size_ = default_batch_size;
  pad_batch_ = true;
#ifdef AMDINFER_ENABLE_LOGGING
  const auto& logger = this->getLogger();
#endif
  // stringstream used for formatting logger messages
  std::string msg;
  std::stringstream smsg(msg);

  AMDINFER_LOG_INFO(logger, " MIGraphXWorker::doInit \n");

  if (parameters->has("batch")) {
    this->batch_size_ = parameters->get<int>("batch");
  }
  if (parameters->has("model")) {
    input_file_ = parameters->get<std::string>("model");
  } else {
    AMDINFER_LOG_ERROR(
      logger, "MIGraphXWorker parameters required:  \"model\": \"<filepath>\"");
    // Throwing an exception causes server to delete this worker instance.
    // Client must try again.
    throw std::invalid_argument(
      "model file argument missing from model load request");
  }
  if (parameters->has("pad_batch")) {
    this->pad_batch_ = parameters->get<bool>("pad_batch");
  }

  // Only load/compile the model once during the lifetime of the worker.
  // This worker does not deallocate or release resources until it's destroyed;
  // if you want to change them, request a new worker.
  //
  //                        Load the model.
  //
  std::filesystem::path filepath(this->input_file_);
  std::filesystem::path compiled_path(this->input_file_);
  std::filesystem::path onnx_path(this->input_file_);

  // Filename processing.
  // Take the root of the given model file name and look for either an *.mxr
  // or *.onnx extension (after loading and compiling an *.onnx file, this
  // worker saves it as an *.mxr file for future use)
  // A *.mxr file should also have its baked-in batch size
  // tacked onto its name, eg. resnet50-v2-7_b64.mxr
  compiled_path.replace_extension();
  compiled_path += (std::string("_b") + std::to_string(batch_size_));
  compiled_path.replace_extension(".mxr");

  onnx_path.replace_extension(".onnx");

  // Is there an mxr file?
  std::ifstream f(compiled_path.c_str());
  if (f.good()) {
    // Load the compiled MessagePack (*.mxr) file
    AMDINFER_LOG_INFO(
      logger, std::string("migraphx worker loading compiled model file ") +
                compiled_path.c_str());
    migraphx::file_options options;
    options.set_file_format("msgpack");

    // The hip library will throw a cryptic error if unable to connect with a
    // GPU at this point.
    try {
      this->prog_ = migraphx::load(compiled_path.c_str(), options);
    } catch (const std::exception& e) {
      std::string emsg = e.what();
      if (emsg.find("Failed to call function") != std::string::npos) {
        emsg = emsg + ".  Server could not connect to a GPU.";
      }
      AMDINFER_LOG_ERROR(logger, emsg);
      throw std::runtime_error(emsg);
      // prog_ does not need to be compiled.
    }
  } else {
    // Look for onnx file.  ifstream tests that the file can be opened
    f = std::ifstream(onnx_path.c_str());
    if (f.good()) {
      // Load the onnx file
      // Using parse_onnx() instead of load() because there's a bug at the
      // time of writing
      AMDINFER_LOG_INFO(
        logger, std::string("migraphx worker loading ONNX model file ") +
                  onnx_path.c_str());

      migraphx::onnx_options onnx_opts;
      onnx_opts.set_default_dim_value(batch_size_);
      this->prog_ = migraphx::parse_onnx(onnx_path.c_str(), onnx_opts);

      auto param_shapes =
        prog_.get_parameter_shapes();  // program_parameter_shapes struct

      AMDINFER_LOG_INFO(logger,
                        std::string("migraphx worker loaded ONNX model file ") +
                          onnx_path.c_str());

      // Compile the model.  Hard-coded choices of offload_copy and gpu
      // target.
      migraphx::compile_options comp_opts;
      comp_opts.set_offload_copy();

      // migraphx can support a reference (cpu) target as a fallback if GPU is
      // not found; not implemented here
      const bool use_gpu = true;
      std::string target_str = use_gpu ? "gpu" : "ref";
      migraphx::target targ{target_str.c_str()};
      // The hip library will throw a cryptic error if unable to connect with
      // a GPU at this point.
      try {
        prog_.compile(migraphx::target("gpu"), comp_opts);
      } catch (const std::exception& e) {
        std::string emsg = e.what();
        if (emsg.find("Failed to call function") != std::string::npos) {
          emsg = emsg + ".  Server could not connect to a GPU.";
        }
        AMDINFER_LOG_ERROR(logger, emsg);
        throw std::runtime_error(emsg);
      }

      // Save the compiled program as a MessagePack (*.mxr) file
      f = std::ifstream(compiled_path.c_str());
      if (!f.good()) {
        migraphx::file_options options;
        options.set_file_format("msgpack");

        migraphx::save(this->prog_, compiled_path.c_str(), options);
        AMDINFER_LOG_INFO(logger, std::string(" Saved compiled model file ") +
                                    compiled_path.c_str());
      }

    } else {
      // Not finding the model file makes it impossible to finish initializing
      // this worker
      AMDINFER_LOG_INFO(
        logger, std::string("migraphx worker cannot open the model file ") +
                  onnx_path.c_str() + " or " + compiled_path.c_str() +
                  ".  Does this path exist?");
      throw std::invalid_argument(std::string("model file ") +
                                  onnx_path.c_str() +
                                  " not found or can't be opened");
    }
  }
  //
  // Fetch the expected dimensions of the input from the parsed model.
  migraphx::program_parameter_shapes input_shapes =
    this->prog_.get_parameter_shapes();
  const auto* input_name = input_shapes.names()[0];
  auto sh = input_shapes[input_name];
  auto length = sh.lengths();
  migraphx::api::shapes output_shapes = prog_.get_output_shapes();
  this->batch_size_ = length[0];
}

size_t MIGraphXWorker::doAllocate(size_t num) {
#ifdef AMDINFER_ENABLE_LOGGING
  const auto& logger = this->getLogger();
#endif
  AMDINFER_LOG_INFO(logger, "MIGraphXWorker::doAllocate");
  //
  // Allocate
  //
  constexpr auto kBufferNum = 3U;
  size_t buffer_num =
    static_cast<int>(num) == kNumBufferAuto ? kBufferNum : num;
  // Allocate enough to hold buffer_num batches' worth of input sets.
  // Extra batches allow server to hold more requests at one time
  // todo:  this try/catch was observed to just get stuck when batch size
  // is too big (approx. 56 for Yolov4 model); how to catch the error?

  // Calculate the total number of bytes required for all inputs

  BufferPtrs buffer_vec;

  migraphx::program_parameter_shapes input_shapes =
    this->prog_.get_parameter_shapes();

  // Work out the max. size of any input buffer, in bytes.  We'll allocate all
  // of them the same size in case a request puts them in mixed-up order.
  size_t max_buffer(0);
  for (const auto* aname : input_shapes.names()) {
    migraphx::shape ashape = input_shapes[aname];
    auto llen = ashape.lengths();
    // size of the buffer needed for this input
    auto asize = ashape.bytes();
    // size of a single request input (divide by batch size)
    input_sizes_[aname] = asize / *(ashape.lengths().begin());
    max_buffer = std::max(max_buffer, asize);
  }

  // Now, allocate the input and output buffers.

  try {
    for (const auto* aname : input_shapes.names()) {
      auto ashape = input_shapes[aname];
      auto llen = ashape.lengths();

      // todo: test whether VectorBuffer::allocate() does this in the right
      // order for multiple (kBufferNum) sets of buffers. It wasn't designed to
      // be called in a loop like this.  Using 1 in place of kBufferNum

      buffer_vec.emplace_back(
        std::make_unique<VectorBuffer>(max_buffer, DataType::Uint8));
    }
    this->input_buffers_->enqueue(std::move(buffer_vec));

    // Calculate max. output buffer size
    size_t out_buffer_size{0};
    migraphx::shapes output_shapes = this->prog_.get_output_shapes();
    for (const auto& ash : output_shapes) {
      out_buffer_size = std::max(out_buffer_size, ash.bytes());
    }

    // Output buffers aren't used by the engine at time of writing this,
    // but allocate them anyway. (Use number of outputs for kBufferNum)
    VectorBuffer::allocate(this->output_buffers_, output_shapes.size(),
                           out_buffer_size, amdinfer::DataType::Int8);
  } catch (...) {
    AMDINFER_LOG_ERROR(
      logger,
      std::string("MIGraphXWorker couldn't allocate buffer (batch size ") +
        std::to_string(batch_size_) + ")");
    throw runtime_error{"MIGraphXWorker couldn't allocate buffer"};
  }
  AMDINFER_LOG_INFO(logger, std::string("MIGraphXWorker::doAllocate() added ") +
                              std::to_string(buffer_num) + " buffers");

  return buffer_num;
}

void MIGraphXWorker::doAcquire(RequestParameters* parameters) {
  (void)parameters;
}

void MIGraphXWorker::doRun(BatchPtrQueue* input_queue) {
#ifdef AMDINFER_ENABLE_LOGGING
  const auto& logger = this->getLogger();
#endif
  AMDINFER_LOG_INFO(logger, "beginning of MIGraphXWorker::doRun");

  util::setThreadName("Migraphx");

  // stringstream used for formatting logger messages
  std::string msg;
  std::stringstream smsg(msg);

  //
  //  Wait for requests from the batcher in an infinite loop.  This thread will
  // run, waiting for more input, until the server kills it.  If a bad request
  // causes an exception, the server will return a REST failure message to the
  // client and continue waiting for requests.
  //

  while (true) {
    BatchPtr batch;
    input_queue->wait_dequeue(batch);
    if (batch == nullptr) {
      break;
    }
    AMDINFER_LOG_INFO(logger, "New batch request in migraphx");
    util::Timer timer;
    timer.add("batch_start");
#ifdef AMDINFER_ENABLE_METRICS
    Metrics::getInstance().incrementCounter(
      MetricCounterIDs::PipelineIngressWorker);
#endif

    // The MIGraphX operation: run the migraphx eval() method.
    // If migraphx exceptions happen, they will be handled

    // We only need to look at the 0'th request to set up evaluation, because
    // its input pointers (one for each input) are the base addresses of the
    // data for the entire batch. The different input tensors are not required
    // to be contiguous with each other.
    const auto& req0 = batch->getRequest(0);
    auto inputs0 =
      req0->getInputs();  // const std::vector<InferenceRequestInput>

    try {
      migraphx::program_parameters params;

      // populate the migraphx parameters with shape read from the onnx
      // model.
      auto param_shapes = prog_.get_parameter_shapes();

      for (const auto& aninput : inputs0) {  // InferenceRequestInput
        auto aname = aninput.getName();

        // Look up the shape by name in the model, but if there's only 1 input
        // then the name in the request isn't required to match.
        if (inputs0.size() == 1) {
          aname = param_shapes.names().front();
        }
        migraphx::shape modelshape = param_shapes[aname.c_str()];

        if (toDataType(modelshape.type()) != aninput.getDatatype()) {
          smsg.str("");
          smsg << "Migraph worker model and input data types don't match:   "
               << toDataType(modelshape.type()) << " vs "
               << aninput.getDatatype();
          throw(invalid_argument(smsg.str()));
        }

        // check that lengths() and type match
        auto llen = modelshape.lengths();
        // clang-format off
        //    compare each dimension of shapes except the 0'th (batch size)
        //  TODO(bpickrel): the following check works inconsistently between different example client scripts.
        // It accepts inputs from the yolo script but rejects hello_migraphx.py inputs
        // const auto& av_shape = aninput.getShape();  // vector<int64>
        // for(size_t ii = 1; ii < av_shape.size(); ii++)
        // {
        //   if( av_shape.size() != llen.size() || av_shape[ii] != llen[ii])
        //   {
        //     smsg.str("");
        //     smsg << "Migraph worker model and input shapes don't match for input \"" << aname << "\":   ";
        //     for(auto j : llen) smsg << j << ", ";
        //     smsg << " vs " ;
        //     for(auto j : av_shape) smsg << j << ", ";
        //     AMDINFER_LOG_DEBUG(logger, smsg.str());
        //     throw invalid_argument(smsg.str());
        //   }
        // }
        // clang-format on

        auto* a_data = aninput.getData();  //  void *
        params.add(aname.c_str(), migraphx::argument(modelshape, a_data));
      }
      // If there were fewer requests in the batch than the stated batch size,
      // pad the various input tensors with copies of the 0'th request's data.

      if (pad_batch_) {
        // for each named input channel
        for (const auto& aninput : inputs0) {
          auto aname = aninput.getName();
          // Look up the shape by name in the model, but if there's only 1 input
          // then the name in the request isn't required to match.
          if (inputs0.size() == 1) {
            aname = param_shapes.names().front();
          }
          auto* a_data = static_cast<char*>(aninput.getData());
          // For each empty slot in buffer, i.e. from end of real requests up to
          // batch size
          for (size_t req_idx = batch->getRequests().size();
               req_idx < batch_size_; req_idx++) {
            memcpy(a_data + req_idx * input_sizes_[aname], a_data,
                   input_sizes_[aname]);
          }
        }
      }

      //
      // Run the inference
      //

      AMDINFER_LOG_INFO(logger, "Beginning migraphx eval");
      timer.add("eval_start");
      migraphx::api::arguments migraphx_output = this->prog_.eval(params);
      timer.add("eval_end");
      auto eval_duration_us = timer.count<std::micro>("eval_start", "eval_end");
      auto eval_duration_s = eval_duration_us / std::mega::num;
      AMDINFER_LOG_INFO(
        logger, std::string("Finished migraphx eval; batch size: ") +
                  std::to_string(batch_size_) + "  elapsed time: " +
                  std::to_string(eval_duration_us) + " us.  Images/sec: " +
                  std::to_string(batch_size_ / (eval_duration_s)));

      //
      //           Fetch the results and populate response to each request in
      //           the batch
      //

      // for each request in the batch
      for (unsigned int j = 0; j < batch->size(); j++) {
        const auto& req = batch->getRequest(j);
        try {
          InferenceResponse resp;
          resp.setID(req->getID());
          resp.setModel("migraphx");

          // We don't use the outputs portion of the request currently.  It is
          // part of the kserve format specification, which the Inference Server
          // is intended to follow. "The $request_output JSON is used to request
          // which output tensors should be returned from the model."
          // https://github.com/kserve/kserve/blob/master/docs/predict-api/v2/required_api.md
          //
          // Selecting the request output is only relevant to models that have
          // more than one output tensor.
          //

          // Fetch the vector shape, data, etc. for output from the
          // parsed/compiled model
          migraphx::api::shapes output_shapes = prog_.get_output_shapes();

          //
          // Transfer the migraphx results to output
          //
          size_t result_size =
            migraphx_output.size();  //   Resnet models have 1 output; yolo and
                                     //   bert models have 3

          // For each output channel in result:
          //
          for (size_t i = 0; i < result_size; i++) {
            // the buffer to populate for return
            InferenceResponseOutput output;

            migraphx_shape_datatype_t output_type = output_shapes[i].type();
            amdinfer::DataType output_dt = toDataType(output_type);
            output.setDatatype(output_dt);

            auto this_output = migraphx_output[i];
            migraphx::api::shape shape = this_output.get_shape();
            auto lengths = shape.lengths();

            auto num_results =
              util::containerProduct(lengths.begin() + 1, lengths.end());

            // remove the 0'th dimension (batch size) from lengths
            lengths.erase(lengths.begin());
            // size of each result array, bytes
            size_t size_of_result = num_results * output_dt.size();

            // pointer to offset in data blob
            char* results = this_output.data() + j * size_of_result;

            // the kserve specification for response output is at
            // https://github.com/kserve/kserve/blob/master/docs/predict-api/v2/required_api.md#response-output
            //
            // The outputs buffer in the InferenceRequest is not used or
            // enforced at the time of writing this, but here it is. Give the
            // output a default name if necessary.
            auto outputs =
              req->getOutputs();  // one result vector for each request

            std::string output_name;
            if (i < outputs.size()) {
              output_name = outputs[i].getName();
            }

            if (output_name.empty()) {
              output.setName(inputs0[0].getName());
            } else {
              output.setName(output_name);
            }
            output.setShape(lengths);
            output.setData(results);

            // Copy migraphx results to a buffer and add to output
            std::vector<std::byte> buffer;
            buffer.resize(size_of_result);
            memcpy(buffer.data(), results, size_of_result);
            output.setData(std::move(buffer));
            resp.addOutput(output);
          }
          // respond back to the client
          req->runCallbackOnce(resp);
#ifdef AMDINFER_ENABLE_METRICS
          Metrics::getInstance().incrementCounter(
            MetricCounterIDs::PipelineEgressWorker);
          timer.add("batch_time", batch->getTime(j));
          timer.add("request_latency");
          auto duration =
            timer.count<std::micro>("batch_time", "request_latency");
          Metrics::getInstance().observeSummary(
            MetricSummaryIDs::RequestLatency, duration);
#endif
        } catch (const std::exception& e) {
          AMDINFER_LOG_ERROR(logger, e.what());
          // Pass error message back as reply to request; continue processing
          // more inference requests

          req->runCallbackError(
            std::string("Error processing Migraphx request: ") + e.what());
        }
      }  // end j, request
    } catch (const std::exception& e) {
      // This outer catch block catches exceptions in evaluation of the batch.
      AMDINFER_LOG_ERROR(logger, e.what());
      // Pass error message back as reply for each request in the batch
      const auto& requests = batch->getRequests();
      for (const auto& req_e : requests) {
        req_e->runCallbackError(std::string("Migraphx inference error: ") +
                                e.what());
      }
    }

    timer.add("batch_stop");
    auto duration = timer.count<std::micro>("batch_start", "batch_stop");
    AMDINFER_LOG_INFO(
      logger, std::string("Finished migraphx batch processing; batch size: ") +
                std::to_string(batch_size_) +
                "  elapsed time: " + std::to_string(duration) + " us");
  }  // end while (batch)
  AMDINFER_LOG_INFO(logger, "Migraphx::doRun ending");
}

void MIGraphXWorker::doRelease() {}
void MIGraphXWorker::doDeallocate() {}
void MIGraphXWorker::doDestroy() {}

}  // namespace amdinfer::workers

extern "C" {
// using smart pointer here may cause problems inside shared object so managing
// manually
amdinfer::workers::Worker* getWorker() {
  return new amdinfer::workers::MIGraphXWorker("MIGraphX", "gpu");
}
}  // extern C