Program Listing for File manager.hpp

Return to documentation for file (/workspace/amdinfer/src/amdinfer/core/manager.hpp)

// Copyright 2021 Xilinx, Inc.
// Copyright 2022 Advanced Micro Devices, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GUARD_AMDINFER_CORE_MANAGER
#define GUARD_AMDINFER_CORE_MANAGER

#include <exception>      // for exception_ptr
#include <map>            // for map
#include <memory>         // for allocator, unique_ptr
#include <string>         // for string
#include <thread>         // for thread
#include <unordered_map>  // for unordered_map
#include <utility>        // for move, pair
#include <vector>         // for vector

#include "amdinfer/build_options.hpp"        // for AMDINFER_ENABLE_LOGGING
#include "amdinfer/core/predict_api.hpp"     // for RequestParameters
#include "amdinfer/core/worker_info.hpp"     // for WorkerInfo
#include "amdinfer/observation/logging.hpp"  // for LoggerPtr
#include "amdinfer/util/queue.hpp"           // for BlockingConcurrentQueue

// IWYU pragma: no_forward_declare amdinfer::RequestParameters
// IWYU pragma: no_forward_declare amdinfer::WorkerInfo

namespace amdinfer {

enum class UpdateCommandType {
  Shutdown,
  Allocate,
  Add,
  Delete,
  Ready,
};

struct UpdateCommand {
  explicit UpdateCommand(UpdateCommandType cmd, std::string key = "",
                         void* object = nullptr, void* retval = nullptr)
    : cmd(cmd), key(std::move(key)), object(object), retval(retval) {}
  UpdateCommandType cmd;
  std::string key;
  void* object;
  void* retval = nullptr;
  std::exception_ptr eptr = nullptr;
};
using UpdateCommandQueue = BlockingQueue<std::shared_ptr<UpdateCommand>>;

class Manager {
 public:
  static Manager& getInstance() {
    // Guaranteed to be destroyed. Instantiated on first use.
    static Manager instance;
    return instance;
  }

  Manager(Manager const&) = delete;
  Manager& operator=(const Manager&) = delete;
  Manager(Manager&& other) = delete;
  Manager& operator=(Manager&& other) =
    delete;

  std::string loadWorker(std::string const& key, RequestParameters parameters);
  void unloadWorker(std::string const& key);

  WorkerInfo* getWorker(std::string const& key) const;

  std::vector<std::string> getWorkerEndpoints();

  bool workerReady(const std::string& key) const;
  ModelMetadata getWorkerMetadata(const std::string& key) const;

  void workerAllocate(std::string const& key, int num);

  void init();
  void shutdown();

 private:
  Manager();
  ~Manager();

  class Endpoints {
   public:
    std::string load(const std::string& worker, RequestParameters* parameters);
    void unload(const std::string& endpoint);

    bool exists(const std::string& endpoint);
    WorkerInfo* get(const std::string& endpoint) const;
    std::vector<std::string> list() const;

    std::string add(const std::string& worker, RequestParameters parameters);

    void shutdown();

   private:
    // worker -> map[parameters -> endpoint]
    std::unordered_map<std::string, std::map<RequestParameters, std::string>>
      worker_endpoints_;
    // worker -> index
    std::unordered_map<std::string, int> worker_indices_;
    // endpoint -> parameters
    std::unordered_map<std::string, RequestParameters> worker_parameters_;
    // endpoint -> Worker_Info*
    std::unordered_map<std::string, std::unique_ptr<WorkerInfo>> workers_;
  };

  Endpoints endpoints_;
  std::unique_ptr<UpdateCommandQueue> update_queue_;
  std::thread update_thread_;
#ifdef AMDINFER_ENABLE_LOGGING
  Logger logger_{Loggers::Server};
#endif

  void updateManager(UpdateCommandQueue* input_queue);
};

}  // namespace amdinfer
#endif  // GUARD_AMDINFER_CORE_MANAGER