Extending UCM Store#

πŸ“– Overview#

In the Unified Cache Manager (UCM) architecture, the Store component handles:

  • Space Management: Allocation and scaling of KV Cache storage

  • Persistence: Durable storage and recovery of KV Cache data

  • Tiered Transfer: Efficient data movement between storage hierarchies

  • Data Processing: Quantization, compression, and encoding transformations

Built-in Store Ecosystem#

UCM provides production-ready Store implementations with the following dependency architecture:

ucmstore

Extension Options#

Beyond built-in Stores, UCM supports three extension patterns:

Method

Implementation

Performance

Use Case

Rating

Reference Implementation

Pure Python

Python only

β­β­β­β˜†β˜†

Prototyping, algorithm validation

⭐⭐

UcmMooncakeStore

Python/C++ Hybrid

Python + C++

β­β­β­β­β˜†

Complex logic with performance-critical paths

⭐⭐⭐

UcmPCStore

Pure C++

C++ only

⭐⭐⭐⭐⭐

Production, high-performance scenarios

⭐⭐⭐⭐⭐

UC::CacheStore::CacheStore


πŸ”§ Pure Python Extension#

For rapid prototyping and algorithm validation where development speed outweighs runtime performance.

Implementation Steps#

  1. Inherit the base class

    from ucm.store.ucmstore_v1 import Task, UcmKVStoreBaseV1
    
    class UcmCustomPythonStore(UcmKVStoreBaseV1):
        def __init__(self, config: dict):
            super().__init__(config)
    
  2. Implement required methods

     @abstractmethod
     def lookup(self, block_ids: List[bytes]) -> List[bool]:
         """Check presence of blocks in external storage."""
         pass
     @abstractmethod
     def lookup_on_prefix(self, block_ids: List[bytes]) -> int:
         """Check presence of blocks in external storage."""
         pass
     @abstractmethod
     def prefetch(self, block_ids: List[bytes]) -> None:
         """Asynchronously prefetch blocks into high-speed cache."""
         pass
     @abstractmethod
     def load(self, block_ids: List[bytes], shard_index: List[int], dst_tensor: List[List[torch.Tensor]]) -> Task:
         """Initiate transfer of KV cache from storage to device."""
         pass
     @abstractmethod
     def dump(self, block_ids: List[bytes], shard_index: List[int], src_tensor: List[List[torch.Tensor]]) -> Task:
         """Initiate transfer of KV cache from device to storage."""
         pass
     @abstractmethod
     def load_data(self, block_ids: List[bytes], shard_index: List[int], dst_addr: List[List[int]] | np.ndarray) -> Task:
         """Low-level fetch: copy KV data to device pointers."""
         pass
     @abstractmethod
     def dump_data(self, block_ids: List[bytes], shard_index: List[int], src_addr: List[List[int]] | np.ndarray) -> Task:
         """Low-level dump: copy KV data from device pointers."""
         pass
     @abstractmethod
     def wait(self, task: Task) -> None:
         """Block until the given transfer task completes."""
         pass
     @abstractmethod
     def check(self, task: Task) -> bool:
         """Non-blocking poll for task completion."""
         pass
    

    Note: Full interface specifications are in ucm/store/ucmstore_v1.py

  3. Register your Store

    # ucm/store/factory_v1.py
    UcmConnectorFactoryV1.register_connector(
        "UcmCustomPythonStore",
        "ucm.store.custom.connector",
        "UcmCustomPythonStore"
    )
    

⚠️ Performance Warning: Python implementations are GIL-bound and unsuitable for high-throughput scenarios. Use only for development and testing.


⚑ Hybrid Python/C++ Extension#

Best for balancing productivity with performanceβ€”implement hot paths in C++, orchestrate with Python.

Architecture#

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Python Wrapper Layer   β”‚  ← Business logic, config, API surface
β”‚    (ucm/store/cpy/*)    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
             β”‚ pybind11
             β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚    C++ Core Layer       β”‚  ← Performance-critical operations
β”‚    (ucm/store/cc/*)     β”‚    Memory management, compute kernels
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Implementation Steps#

  1. Implement C++ core (hybrid_store.cc)

    #include "ucm/store/ucmstore_v1.h"
    
    namespace UC::HybridStore {
    
    class HybridStore : public StoreV1 {
    public:
        ~HybridStore() override;
        Status Setup(const Detail::Dictionary& config) override;
        std::string Readme() const override;
    
        // Core operations
        Expected<std::vector<uint8_t>> Lookup(const Detail::BlockId* blocks, size_t num) override;
        Expected<ssize_t> LookupOnPrefix(const Detail::BlockId* blocks, size_t num) override;
        void Prefetch(const Detail::BlockId* blocks, size_t num) override;
        Expected<Detail::TaskHandle> Load(Detail::TaskDesc task) override;
        Expected<Detail::TaskHandle> Dump(Detail::TaskDesc task) override;
        Expected<bool> Check(Detail::TaskHandle taskId) override;
        Status Wait(Detail::TaskHandle taskId) override;
    };
    
    }  // namespace UC::HybridStore
    

    Note: Full interface specifications are in ucm/store/ucmstore_v1.h

  2. Create Python bindings (hybrid_store.cpy.cc)

    #include <pybind11/pybind11.h>
    
    PYBIND11_MODULE(ucmhybridstore, m) {
        py::class_<UC::HybridStore::HybridStore>(m, "HybridStore")
            .def(py::init<const Config&>())
            .def("Lookup", &UC::HybridStore::HybridStore::Lookup, py::arg("blocks_ids").noconvert())
            .def("Load", &UC::HybridStore::HybridStore::Load)
            ...; // other interface
    }
    
  3. Python wrapper layer

    from ucmhybridstore import HybridStore
    
    class UcmHybridStoreWrapper:
        def __init__(self, config: dict):
            self._store = HybridStore(config)
    
        def lookup_with_retry(self, block_ids):
            """Add Python-level retry logic"""
            result = self._store.Lookup(block_ids)
            if not result:
                result = self._handle_miss(block_ids)
            return result
    


🎯 Quick Decision Guide#

Decision Matrix

Requirement

Pure Python

Hybrid

Pure C++

Development speed

⭐⭐⭐⭐⭐

⭐⭐⭐⭐

⭐⭐⭐

Runtime performance

⭐⭐⭐

⭐⭐⭐⭐

⭐⭐⭐⭐⭐

Threading support

❌

⚠️

βœ…

Production ready

❌

⚠️

βœ…


βœ… Pre-implementation Checklist#

  • [ ] Reviewed ucm/store/ucmstore_v1.h

  • [ ] Reviewed ucm/store/ucmstore_v1.py

  • [ ] Defined supported data types and compression algorithms

  • [ ] Estimated target QPS and latency SLOs

  • [ ] Prepared unit tests (reference: ucm/store/test/)

  • [ ] Selected extension method based on performance requirements

  • [ ] Created stub implementation and validated registration


🀝 Getting Help#


Next Steps: Once your Store is implemented, see Prefix Cache Guide for pipeline configuration.