Commit 1818b134 by Tianxing Wang

finish LRU cache

parents
.cache/
cmake-build-debug
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<module classpath="CMake" type="CPP_MODULE" version="4" />
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CMakeWorkspace" PROJECT_DIR="$PROJECT_DIR$" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/gpucache.iml" filepath="$PROJECT_DIR$/.idea/gpucache.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
cmake_minimum_required(VERSION 3.16)
project(gpucache CXX CUDA)
file(GLOB SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/src/cuda/*
${CMAKE_CURRENT_SOURCE_DIR}/src/hash/*
${CMAKE_CURRENT_SOURCE_DIR}/src/*.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/*.cu
)
message(STATUS "source files:" ${SOURCE_FILES})
add_library(gpucache SHARED ${SOURCE_FILES})
set_target_properties(gpucache PROPERTIES
CUDA_SEPARABLE_COMPILATION ON
CUDA_ARCHITECTURES "86"
)
#include "common.cuh"
namespace gpucache {
struct CacheConfig {
enum CacheEvictStrategy {
FIFO,
LRU,
LFU
};
CacheEvictStrategy strategy;
uint64_t capacity;
uint32_t keySize;
uint32_t valueSize;
uint32_t maxQueryNum;
};
// Cache Interface
template<typename KeyType, typename ElemType>
class Cache {
public:
// using Strategy = CacheConfig::CacheEvictStrategy;
Cache() = default;
virtual ~Cache() = default;
Cache(const Cache &) = delete;
Cache &operator=(const Cache &) = delete;
Cache(Cache &&) = delete;
Cache &operator=(Cache &&) = delete;
virtual uint32_t KeySize() = 0;
virtual uint32_t ValueSize() = 0;
virtual CacheConfig::CacheEvictStrategy Strategy() = 0;
virtual uint64_t Capacity() = 0;
virtual uint32_t NumElemsPerValue() = 0;
virtual void Get(cudaStream_t *stream, uint32_t num_keys, KeyType *keys, ElemType *values) = 0;
virtual void Put(cudaStream_t *stream, uint32_t num_keys, KeyType *keys, ElemType *values) = 0;
virtual void Clear() = 0;
virtual uint32_t MaxQueryNum() = 0;
};
// TODO: 添加其他种类的cache
template<typename KeyType, typename ValueType>
std::unique_ptr<Cache<KeyType, ValueType>> NewCache(const CacheConfig &cfg) {
assert(cfg.keySize > 0);
assert(cfg.valueSize > 0);
assert(cfg.capacity > 0);
return nullptr;
}
}
#pragma once
#include <cstdint>
#include <cuda_runtime.h>
#include <memory>
#include <cassert>
#include <cstdio>
#define CHECK(call) \
{ \
const cudaError_t error = call; \
if (error != cudaSuccess) \
{ \
fprintf(stderr, "Error: %s:%d, ", __FILE__, __LINE__); \
fprintf(stderr, "code: %d, reason: %s\n", error, \
cudaGetErrorString(error)); \
} \
}
#include <cuda_runtime.h>
#include <iostream>
/*
* different
*/
#ifndef GPUCACHE_HASH_FUCNTION_H
#define GPUCACHE_HASH_FUCNTION_H
#include <string>
template<typename T>
class HashFunc;
template<typename T>
class DefaultHashFunc;
template<typename T>
void getHash(const T& obj, std::string type="default"){
if (type == "default") {
return DefaultHashFunc<T>()(obj);
}
}
#endif //GPUCACHE_HASH_FUCNTION_H
#include <functional>
template<typename T>
class HashFunc {
public:
virtual std::size_t operator()(T &obj) = 0;
};
template<typename T>
class DefaultHashFunc : public HashFunc<T> {
virtual std::size_t operator()(T &obj){
return std::hash<T>()(obj);
}
};
#include "common.cuh"
#include "cache.h"
#include "hash/hash_function.cpp"
#include "hash/hash_fucntion.h"
namespace gpucache {
constexpr unsigned int defaultBlockX = 256;
constexpr unsigned int warpsize = 32;
constexpr unsigned int defaultNumWarpsPerBlock = defaultBlockX / warpsize;
struct ThreadCtx {
__device__ ThreadCtx() {
auto global_thread_id = blockIdx.x * blockDim.x + threadIdx.x;
global_warp_idx = global_thread_id / warpsize;
block_warp_idx = threadIdx.x / warpsize;
lane_id = threadIdx.x % warpsize;
num_warps = blockDim.x * gridDim.x / warpsize;
}
uint32_t global_warp_idx;
uint32_t block_warp_idx;
uint32_t num_warps;
uint32_t lane_id;
};
struct WarpMutex {
public:
__device__ WarpMutex() : flag(0) {}
__device__ ~WarpMutex() = default;
WarpMutex(const WarpMutex &) = delete;
WarpMutex &operator=(const WarpMutex &) = delete;
WarpMutex(WarpMutex &&) = delete;
WarpMutex &operator=(WarpMutex &&) = delete;
__device__ void Lock(ThreadCtx &ctx) {
if (ctx.lane_id == 0) {
while (atomicCAS(&flag, 0, 1) != 0) {
}
}
__threadfence();
__syncwarp();
}
__device__ void UnLock(ThreadCtx &ctx) {
__syncwarp();
__threadfence();
if (ctx.lane_id == 0) { atomicExch(&flag, 0); };
}
private:
uint32_t flag;
};
__global__ void initLocks(uint32_t n_bucket, void *bucketMutexes) {
uint32_t global_thread_idx = blockDim.x * blockIdx.x + threadIdx.x;
if (global_thread_idx < n_bucket) {
new(reinterpret_cast<WarpMutex *>(bucketMutexes) + global_thread_idx)WarpMutex();
}
}
template<typename KeyType, typename ElemType>
class LRUCache : public Cache<KeyType, ElemType> {
friend __global__ void
GetInternal(LRUCache<KeyType, ElemType> &cache, cudaStream_t *stream, uint32_t num_query, KeyType *queries,
ElemType *results);
friend __global__ void
PutWithoutEvictInternal(LRUCache<KeyType, ElemType> &cache, uint32_t num_query, KeyType *put_keys,
ElemType *put_values, uint32_t n_missing, KeyType *missing_keys,
uint32_t *missing_indices);
friend __global__ void
EvictInternal(LRUCache<KeyType, ElemType> &cache, ElemType *put_values, uint32_t n_missing, KeyType *missing_keys,
uint32_t *missing_indices);
template<uint32_t ValueSize>
friend void setBucketView(LRUCache<KeyType, ElemType> &cache, uint32_t bucket_id);
public:
explicit LRUCache(const CacheConfig &cfg) : strategy(cfg.strategy), keySize(cfg.keySize),
valueSize(cfg.valueSize),
capacity(cfg.capacity), maxQueryNum(cfg.maxQueryNum) {
numElemPerValue = valueSize / sizeof(ElemType);
nbucket = (capacity + warpsize - 1) / warpSize;
CHECK(cudaMalloc((void **) &keys, capacity * sizeof(KeyType)));
CHECK(cudaMalloc((void **) &values, capacity * sizeof(ElemType) * numElemPerValue));
CHECK(cudaMalloc((void **) &timestamps, capacity * sizeof(uint8_t)));
CHECK(cudaMalloc((void **) &bucketMutexes, capacity * sizeof(WarpMutex)));
dim3 block(defaultBlockX);
dim3 grid((nbucket + defaultBlockX - 1) / defaultBlockX);
initLocks<<<grid, block>>>(nbucket, bucketMutexes);
CHECK(cudaMalloc((void **) &queryKeyBuffer, maxQueryNum * sizeof(KeyType)));
CHECK(cudaMalloc((void **) &queryIndiceBuffer, maxQueryNum * sizeof(uint32_t)));
}
~LRUCache() {
// TODO need to add CudaDeviceGuard
CHECK(cudaFree(keys));
CHECK(cudaFree(values));
CHECK(cudaFree(timestamps));
CHECK(cudaFree(bucketMutexes));
CHECK(cudaFree(queryKeyBuffer));
CHECK(cudaFree(queryIndiceBuffer))
}
uint32_t KeySize() { return keySize; }
uint32_t ValueSize() { return valueSize; }
uint64_t Capacity() { return capacity; }
uint32_t NumElemsPerValue() { return numElemPerValue; }
uint32_t MaxQueryNum() { return maxQueryNum; }
CacheConfig::CacheEvictStrategy Strategy() { return CacheConfig::LRU; }
void Clear() {
CHECK(cudaMemset(keys, 0, capacity * sizeof(KeyType)));
CHECK(cudaMemset(timestamps, 0, capacity * sizeof(uint32_t)));
dim3 block(defaultBlockX);
dim3 grid((nbucket + defaultBlockX - 1) / defaultBlockX);
initLocks<<<grid, block>>>(nbucket, bucketMutexes);
}
void Get(cudaStream_t *stream, uint32_t num_query, KeyType *queries, ElemType *results);
void Put(cudaStream_t *stream, uint32_t num_query, KeyType *putkeys, ElemType *putvalues);
private:
KeyType *keys;
ElemType *values;
uint8_t *timestamps;
uint32_t nbucket; // 32 values for one bucket
void *bucketMutexes;
CacheConfig::CacheEvictStrategy strategy;
uint64_t capacity;
uint32_t keySize;
uint32_t valueSize;
uint32_t numElemPerValue; // embedding dim
// store missing keys and indices for
KeyType *queryKeyBuffer;
uint32_t *queryIndiceBuffer;
uint32_t maxQueryNum;
};
// bucket view for warp put/get
template<typename KeyType, typename ElemType, uint32_t ValueSize>
struct BucketView {
__device__ BucketView(KeyType *k, ElemType *v, uint32_t *ts, WarpMutex *m) : mutex(m), bkeys(k), bvalues(v),
btimestamps(ts) {}
__device__ int Get(const ThreadCtx &ctx, const KeyType key) {
KeyType lane_key = bkeys[ctx.lane_id];
uint32_t ts = btimestamps[ctx.lane_id];
bool exist = (ts > 0 && lane_key == key);
uint32_t exist_mask = __ballot_sync(0xFFFFFFFF, exist);
int slot_num = __ffs(static_cast<int>(exist_mask)) - 1;
if (exist_mask == 0) { // not found
return -1;
} else {
// auto max_ts = __ffs(static_cast<int>(__ballot_sync(0xFFFFFFFF,ts != 0)));
auto slot_ts = __shfl_sync(0xFFFFFFFF, ts, slot_num);
if (ts > slot_ts) {
btimestamps[ctx.lane_id]--;
}
if (ctx.lane_id == slot_num) {
btimestamps[ctx.lane_id] = warpsize;
}
return slot_num; // return exist slot num
};
}
__device__ int TryPut(const ThreadCtx &ctx, const KeyType key) {
KeyType lane_key = bkeys[ctx.lane_id];
uint32_t ts = btimestamps[ctx.lane_id];
bool exist = (ts > 0 && lane_key == key);
uint32_t exist_mask = __ballot_sync(0xFFFFFFFF, exist);
int slot_num = __ffs(static_cast<int>(exist_mask)) - 1;
if (exist_mask != 0) { // find key just update value
uint32_t slot_ts = __shfl_sync(0xFFFFFFFF, ts, slot_num);
if (ts > slot_ts) {
ts--;
} else if (ctx.lane_id == slot_num) {
ts = warpsize;
}
__syncwarp();
}
if (slot_num == -1) { // key don't exist
uint32_t emptyMask = __ballot_sync(0xFFFFFFFF, ts != 0);
if (emptyMask != 0xFFFFFFFF) { // bucket not full
slot_num = __popc(static_cast<int>(emptyMask));
if (ts > 0) {
ts--;
} else if (ctx.lane_id == slot_num) {
bkeys[slot_num] = key; // insert key
ts = warpsize;
}
}
__syncwarp();
}
if (slot_num != -1) { btimestamps[ctx.lane_id] = ts; }
return slot_num;
}
__device__ int Evict(const ThreadCtx &ctx, const KeyType key) {
KeyType lane_key = bkeys[ctx.lane_id];
uint32_t ts = btimestamps[ctx.lane_id];
uint32_t evict_mask = __ballot_sync(0xFFFFFFFF, ts == 1);
int slot_num = __ffs(static_cast<int>(evict_mask)) - 1;
uint32_t slot_ts = __shfl_sync(0xFFFFFFFF,ts,slot_num);
if(ts > slot_ts){
ts--;
}else if(ctx.lane_id == slot_num){
bkeys[ctx.lane_id] = key;
ts = warpsize;
}
btimestamps[ctx.lane_id] = ts;
return slot_num;
}
__device__ void ReadOneValue(const ThreadCtx &ctx, uint8_t slot_num, ElemType *out) {
for (int i = ctx.lane_id; i < ValueSize; i += warpsize) {
out[i] = bvalues[slot_num * ValueSize + i];
}
}
__device__ void WriteOneValue(const ThreadCtx &ctx, uint8_t slot_num, ElemType *v) {
for (int i = ctx.lane_id; i < ValueSize; i += warpsize) {
bvalues[slot_num * ValueSize + i] = v[i];
}
}
WarpMutex *mutex;
KeyType *bkeys;
ElemType *bvalues;
uint32_t *btimestamps;
};
template<typename KeyType, typename ElemType, uint32_t ValueSize>
BucketView<KeyType, ElemType, ValueSize> setBucketView(LRUCache<KeyType, ElemType> &cache, uint32_t bucket_id) {
return BucketView<KeyType, ElemType, ValueSize>(cache.keys + bucket_id * warpsize,
cache.values + bucket_id * warpsize * cache.numElemPerValue,
cache.timestamps + bucket_id * warpsize,
reinterpret_cast<WarpMutex *>(cache.bucketMutexes) + bucket_id);
}
template<typename KeyType, typename ElemType>
__global__ void
GetInternal(LRUCache<KeyType, ElemType> &cache, uint32_t num_query, KeyType *queries,
ElemType *results) {
ThreadCtx ctx{};
__shared__ KeyType blockQueryKeys[defaultNumWarpsPerBlock][warpsize];
__shared__ uint32_t blockBucketIds[defaultNumWarpsPerBlock][warpsize];
for (uint32_t offset = ctx.global_warp_idx * warpsize; offset < num_query; offset += ctx.num_warps * warpsize) {
uint32_t n_query = min(warpsize, num_query - offset);
if (ctx.lane_id < n_query) {
uint32_t idx = offset + ctx.lane_id;
KeyType key = queries[idx];
size_t hash = getHash(key);
uint32_t bucket_id = hash % cache.nbucket;
blockQueryKeys[ctx.block_warp_idx][ctx.lane_id] = key;
blockBucketIds[ctx.block_warp_idx][ctx.lane_id] = bucket_id;
}
__syncwarp();
// 32 threads compare it own slot with key
// if find parallel write to result
for (uint32_t i = 0; i < n_query; i++) {
uint32_t idx = offset + i;
KeyType key = blockQueryKeys[ctx.block_warp_idx][i];
uint32_t bucket_id = blockBucketIds[ctx.block_warp_idx][i];
auto bucket = setBucketView<KeyType, ElemType, cache.valueSize>(cache, bucket_id);
bucket.mutex->Lock();
int slot_num = bucket.Get(ctx, key);
if (slot_num != -1) {
bucket.ReadOneValue(ctx, slot_num, results[idx]);
}
bucket.mutex->UnLock();
}
__syncwarp();
}
}
template<typename KeyType, typename ElemType>
__global__ void
PutWithoutEvictInternal(LRUCache<KeyType, ElemType> &cache, uint32_t num_query, KeyType *put_keys,
ElemType *put_values, uint32_t *n_missing, KeyType *missing_keys,
uint32_t *missing_indices) {
ThreadCtx ctx{};
__shared__ KeyType blockPutKeys[defaultNumWarpsPerBlock][warpsize];
__shared__ uint32_t blockBucketIds[defaultNumWarpsPerBlock][warpsize];
for (uint32_t offset = ctx.global_warp_idx * warpsize; offset < num_query; offset += ctx.num_warps * warpsize) {
uint32_t n_query = min(warpsize, num_query - offset);
if (ctx.lane_id < n_query) {
uint32_t idx = offset + ctx.lane_id;
KeyType key = put_keys[idx];
size_t hash = getHash(key);
uint32_t bucket_id = hash % cache.nbucket;
blockPutKeys[ctx.block_warp_idx][ctx.lane_id] = key;
blockBucketIds[ctx.block_warp_idx][ctx.lane_id] = bucket_id;
}
__syncwarp();
uint32_t n_warp_missing = 0;
KeyType warp_missing_key;
uint32_t warp_missing_index = 0;
// 32 threads handle n_query keys together instead of 1 thread for 1 key
for (uint32_t i = 0; i < n_query; i++) {
uint32_t idx = offset + i;
KeyType key = blockPutKeys[ctx.block_warp_idx][i];
//ElemType* Value = &put_values[idx];
uint32_t bucket_id = blockBucketIds[ctx.block_warp_idx][i];
auto bucket = setBucketView<KeyType, ElemType, cache.valueSize>(cache, bucket_id);
bucket.mutex->Lock();
int slot_num = bucket.TryPut(ctx, key);
if (slot_num != -1) { // insert value
bucket.WriteOneValue(ctx, slot_num, put_values[idx * cache.numElemPerValue]);
} else { // bucket full record missing_key + idx for Evict
if (ctx.lane_id == n_warp_missing) { // i-th thread keep i-th missing key
warp_missing_key = key;
warp_missing_index = idx;
}
n_warp_missing += 1;
}
bucket.mutex->UnLock();
}
// reduce missing_key & idx
if (n_warp_missing > 0) {
uint32_t base_missing_idx = 0;
if (ctx.lane_id == 0) {
base_missing_idx = atomicAdd(n_missing, n_warp_missing);
}
base_missing_idx = __shfl_sync(0xFFFFFFFF, base_missing_idx, 0);
if (ctx.lane_id < n_warp_missing) {
missing_keys[base_missing_idx + ctx.lane_id] = warp_missing_key;
missing_indices[base_missing_idx + ctx.lane_id] = warp_missing_index;
}
__syncwarp();
}
}
}
template<typename KeyType, typename ElemType>
__global__ void
EvictInternal(LRUCache<KeyType, ElemType> &cache, ElemType *put_values, uint32_t n_missing, KeyType *missing_keys,
uint32_t *missing_indices) {
ThreadCtx ctx{};
__shared__ KeyType blockPutKeys[defaultNumWarpsPerBlock][warpsize];
__shared__ uint32_t blockBucketIds[defaultNumWarpsPerBlock][warpsize];
for (uint32_t offset = ctx.global_warp_idx * warpsize; offset < n_missing; offset += ctx.num_warps * warpsize) {
uint32_t n_evict = min(warpsize, n_missing - offset);
if(ctx.lane_id < n_evict){
uint32_t idx = offset + ctx.lane_id;
blockPutKeys[ctx.block_warp_idx][ctx.lane_id] = missing_keys[idx];
blockBucketIds[ctx.block_warp_idx][ctx.lane_id] = missing_indices[idx];
}
__syncwarp();
for (uint32_t i = 0; i < n_evict; i++){
uint32_t idx = offset + i;
KeyType key = blockPutKeys[ctx.block_warp_idx][i];
uint32_t bucket_id = blockBucketIds[ctx.block_warp_idx][i];
auto bucket = setBucketView<KeyType,ElemType,cache.valueSize>(cache,bucket_id);
bucket.mutex->Lock();
int slot_num = bucket.Evict(ctx,key);
bucket.WriteOneValue(ctx,slot_num,put_values, put_values + missing_indices[idx] * cache.valueSize);
bucket.mutex->UnLock();
}
}
}
// TODO switch to cuda stream
template<typename KeyType, typename ElemType>
void LRUCache<KeyType, ElemType>::Put(cudaStream_t *stream, uint32_t num_query, KeyType *put_keys,
ElemType *put_values) {
assert(num_query <= maxQueryNum);
if (num_query == 0) {
return;
}
dim3 block(defaultBlockX);
dim3 grid((num_query + defaultBlockX - 1) / defaultBlockX);
int n_missing = 0;
PutWithoutEvictInternal<<<grid, block>>>(this, num_query, put_keys, put_values, &n_missing, queryKeyBuffer,
queryIndiceBuffer);
EvictInternal<<<grid, block>>>(this,put_values,n_missing,queryKeyBuffer,queryIndiceBuffer);
}
// TODO switch to cuda stream
template<typename KeyType, typename ElemType>
void
LRUCache<KeyType, ElemType>::Get(cudaStream_t *stream, uint32_t num_query, KeyType *queries, ElemType *results) {
assert(num_query <= maxQueryNum);
if (num_query == 0) { return; }
dim3 block(defaultBlockX);
dim3 grid((num_query + defaultBlockX - 1) / defaultBlockX);
GetInternal<<<grid, block>>>(this, num_query, queries, results);
}
}
#include <cuda_runtime.h>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment