Commit 269cc25a by zhlj

add cache and dist edge

parent 2527aab9
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": false
}
]
}
\ No newline at end of file
{
"files.associations": {
"__nullptr": "cpp",
"exception": "cpp",
"initializer_list": "cpp",
"new": "cpp",
"stdexcept": "cpp",
"type_traits": "cpp",
"typeinfo": "cpp",
"cctype": "cpp",
"cmath": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"array": "cpp",
"atomic": "cpp",
"*.tcc": "cpp",
"bitset": "cpp",
"chrono": "cpp",
"cstdint": "cpp",
"unordered_map": "cpp",
"vector": "cpp",
"algorithm": "cpp",
"functional": "cpp",
"iterator": "cpp",
"memory": "cpp",
"memory_resource": "cpp",
"numeric": "cpp",
"random": "cpp",
"ratio": "cpp",
"string": "cpp",
"string_view": "cpp",
"system_error": "cpp",
"tuple": "cpp",
"utility": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"limits": "cpp",
"mutex": "cpp",
"ostream": "cpp",
"streambuf": "cpp",
"bit": "cpp",
"__bit_reference": "cpp",
"__bits": "cpp",
"__config": "cpp",
"__debug": "cpp",
"__errc": "cpp",
"__locale": "cpp",
"__split_buffer": "cpp",
"__threading_support": "cpp",
"__tuple": "cpp",
"__verbose_abort": "cpp",
"ios": "cpp",
"locale": "cpp",
"__tree": "cpp",
"map": "cpp",
"__node_handle": "cpp",
"__hash_table": "cpp",
"__string": "cpp",
"set": "cpp",
"deque": "cpp",
"filesystem": "cpp",
"queue": "cpp",
"stack": "cpp",
"variant": "cpp",
"optional": "cpp",
"shared_mutex": "cpp",
"clocale": "cpp",
"condition_variable": "cpp",
"cwctype": "cpp",
"fstream": "cpp",
"sstream": "cpp",
"thread": "cpp"
},
"svg.preview.background": "editor"
}
\ No newline at end of file
{
"tasks": [
{
"type": "cppbuild",
"label": "C/C++: g++ build active file",
"command": "/usr/bin/g++",
"args": [
"-fdiagnostics-color=always",
"-g",
"${file}",
"-o",
"${fileDirname}/${fileBasenameNoExtension}"
],
"options": {
"cwd": "${fileDirname}"
},
"problemMatcher": [
"$gcc"
],
"group": {
"kind": "build",
"isDefault": true
},
"detail": "Task generated by Debugger."
}
],
"version": "2.0.0"
}
\ No newline at end of file
init distribution
! 4 0
get_neighbors consume: 0.00214444s
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 0, 'rpc_worker_rank': 1}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 0, 'rpc_worker_rank': 2}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 0, 'rpc_worker_rank': 3}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 0, 'rpc_worker_rank': 4}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 0, 'rpc_worker_rank': 6}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 0, 'rpc_worker_rank': 5}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 0, 'rpc_worker_rank': 7}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 0, 'rpc_worker_rank': 8}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 0, 'rpc_worker_rank': 0}
127.0.0.1 10001
load graph /home/sxx/zlj/rpc_ps/part/metis_4/rank_0
data_x tensor([[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
...,
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.]])
init distribution
! 4 1
get_neighbors consume: 0.0020944s
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 1, 'rpc_worker_rank': 9}
127.0.0.1 10001
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 1, 'rpc_worker_rank': 10}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 1, 'rpc_worker_rank': 11}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 1, 'rpc_worker_rank': 12}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 1, 'rpc_worker_rank': 14}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 1, 'rpc_worker_rank': 15}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 1, 'rpc_worker_rank': 16}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 1, 'rpc_worker_rank': 17}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 1, 'rpc_worker_rank': 13}
load graph /home/sxx/zlj/rpc_ps/part/metis_4/rank_1
data_x tensor([[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
...,
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.]])
init distribution
! 4 2
get_neighbors consume: 0.0021429s
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 2, 'rpc_worker_rank': 19}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 2, 'rpc_worker_rank': 18}
127.0.0.1 10001
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 2, 'rpc_worker_rank': 20}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 2, 'rpc_worker_rank': 21}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 2, 'rpc_worker_rank': 22}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 2, 'rpc_worker_rank': 24}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 2, 'rpc_worker_rank': 25}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 2, 'rpc_worker_rank': 26}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 2, 'rpc_worker_rank': 23}
load graph /home/sxx/zlj/rpc_ps/part/metis_4/rank_2
data_x tensor([[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
...,
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.]])
init distribution
! 4 3
get_neighbors consume: 0.00242997s
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 3, 'rpc_worker_rank': 27}
127.0.0.1 10001
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 3, 'rpc_worker_rank': 28}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 3, 'rpc_worker_rank': 29}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 3, 'rpc_worker_rank': 31}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 3, 'rpc_worker_rank': 30}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 3, 'rpc_worker_rank': 32}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 3, 'rpc_worker_rank': 33}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 3, 'rpc_worker_rank': 35}
{'num_worker_threads': 16, 'rpc_name': 'rpcserver{}', 'rpc_world_size': 36, 'worker_rank': 3, 'rpc_worker_rank': 34}
load graph /home/sxx/zlj/rpc_ps/part/metis_4/rank_3
data_x tensor([[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
...,
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 1., ..., 0., 0., 0.]])
from distparser import WORLD_SIZE
import torch
class BatchData:
'''
Args:
......@@ -23,11 +26,29 @@ class BatchData:
self.val_mask=val_mask
self.test_mask=test_mask
def _check_with_graph(self,graph1,graph2):
for i,id in enumerate(self.nids):
if(id >= graph1.partptr[0] and id < graph1.partptr[1]):
real_x = graph1.select_attr(graph1.get_localId_by_partitionId(0,torch.tensor(id)))
else:
real_x = graph2.select_attr(graph2.get_localId_by_partitionId(1,torch.tensor(id)))
def __repr__(self):
return "BatchData(batch_size = {},roots = {} , \
nides = {} , edge_index = {} , x= {}, \
y ={})".format(self.batch_size,self.roots.__repr__,
self.nids.__repr__,
self.edge_index.__repr__,
self.train_mask.__repr__,
self.x.__repr__,self.y.__repr__)
def to(self,device = 'cpu'):
if device == 'cpu':
return
else:
self.y.to(device)
self.edge_index.to(device)
self.roots.to(device)
self.nids.to(device)
self.eids.to(device)
\ No newline at end of file
import parser
import time
import distparser as parser
from graph_store import _get_graph
from share_memory_util import _copy_to_share_memory, _get_existing_share_memory, _get_from_share_memory
import torch
from torch.distributed.rpc import RRef, rpc_async, remote
from os.path import abspath, join, dirname
import sys
sys.path.insert(0, join(abspath(dirname(__file__))))
from cpu_cache_manager import cache_data2mem,get_from_cache;
cache_index = {}
cache_mem = {}
sparse_index = {}
#def get_from_cpu(nodeId):
def _get_local_attr(data_name,nodes):
graph = _get_graph(data_name)
local_id = graph.get_localId_by_partitionId(parser._get_worker_rank(),nodes)
return graph.select_attr(local_id)
def _request_remote_attr(rank,data_name,nodes):
t1 = time.time()
fut = rpc_async(
parser._get_RPC_NAME().format(rank),
_get_local_attr,
args=(data_name,nodes,)
)
#logger.debug('request {}'.format(time.time()-t1))
return fut
#ThreadPoolExecutor pool
def _request_all_remote_attr(data_name,nodes_list):
worker_size = parser._get_world_size()
worker_rank = parser._get_worker_rank()
futs = []
for rank in range(worker_size):
if(rank == worker_rank):
futs.append(None)
continue
else:
if(nodes_list[rank].size(0) == 0):
futs.append(None)
else:
futs.append(_request_remote_attr(rank,data_name,nodes_list[rank]))
return futs
def _split_node_part_and_submit(data_name,node_id_list):
t0 = time.time()
graph = _get_graph(data_name)
worker_size = parser._get_world_size()
local_rank = parser._get_worker_rank()
futs = []
for rank in range(worker_size):
if(rank != local_rank):
part_mask = (graph.partptr[rank]<=node_id_list) & (node_id_list<graph.partptr[rank+1])
part_node = torch.masked_select(node_id_list,part_mask)
sendlen = 0
totlen = len(part_node)
limit_len = 500
if(part_node.size(0) != 0):
while(totlen - sendlen >= limit_len):
futs.append((part_node[sendlen:sendlen+limit_len],_request_remote_attr(rank,data_name,part_node[sendlen:sendlen+limit_len])))
sendlen += limit_len
if(sendlen < totlen):
futs.append((part_node[sendlen:],_request_remote_attr(rank,data_name,part_node[sendlen:])))
local_mask = (graph.partptr[local_rank]<=node_id_list) & (node_id_list<graph.partptr[local_rank+1])
local_node = torch.masked_select(node_id_list,local_mask)
#logger.debug('size {},split {} {} {}'.format(node_id_list.size(0),t2-t1,t3-t2,t4-t3))
return local_node,futs
def create_empty_cache(data_name,mem_size,x_len):
mem = torch.zeros(mem_size,x_len)
mem.contiguous()
indx = torch.zeros(mem_size).long()
indx.contiguous()
cache_index[data_name] = indx
cache_mem[data_name] = mem
def load_data2cache(data_name,index):
local,futs = _split_node_part_and_submit(data_name,index)
num = 0
mem = cache_mem[data_name]
indx = cache_index[data_name]
for id,fut in futs:
while(fut.done() == False):
continue
f = fut.value()
mem[num:num+len(f),:] = f[:,:]
indx[num:num+len(f)] = id[:]
num = num + len(f)
print('len',len(indx),mem,indx)
#while(True):
# continue
cache_data2mem(data_name,indx,mem)
#ind = torch.unsqueeze(indx,0)
#values = torch.arange(1,len(indx)+1)
#print(len(indx))
#sparse_index[data_name] = dict(zip(indx.tolist(),torch.arange(len(indx)).tolist()))#torch.sparse_coo_tensor(indices = ind,values=values)
#print(sparse_index[data_name].size(-1))
def load_cpu_cache_memory(data_name,indx_data,mem_data):
name,data_shape,data_dtype =indx_data
shm = _get_existing_share_memory(name)
idx = _get_from_share_memory(shm,data_shape,data_dtype)
name,data_shape,data_dtype = mem_data
shm = _get_existing_share_memory(name)
mem = _get_from_share_memory(shm,data_shape,data_dtype)
cache_index[data_name] = idx
cache_mem[data_name] = mem
all_time = 0
cnt = 0
def get_cpu_cache_data(data_name,index):
#return None,index,None
cache_data = get_from_cache(data_name,index)
global all_time
all_time = all_time + len(cache_data.cache_index)
global cnt
cnt = cnt+len(index)
print(len(cache_data.cache_index),len(cache_data.uncache_index),len(index),all_time/cnt)
return cache_data.cache_index,cache_data.uncache_index,cache_data.cache_data
#t0 = time.time()
#ind = []
#sparse = sparse_index[data_name]
#is_cached = []
#for i in index:
# id = int(i.item())
#print(id)
# if id in sparse:
# ind.append(sparse[id])
# is_cached.append(True)
# else:
# is_cached.append(False)
#if i < sparse.size(-1):
# idx = sparse[i]
# if(idx != 0):
# ind.append(sparse[i])
# is_cached.append(True)
# else:
# is_cached.append(False)
#else:
# is_cached.append(False)
#t1 = time.time()
#is_cached =torch.tensor(is_cached)
#cache_index = torch.masked_select(index,is_cached)
#uncache_index = torch.masked_select(index,~is_cached)
#if ind:
# ind = torch.tensor(ind)
# cache_value = torch.index_select(cache_mem[data_name],0,ind)
#else:
# cache_value = None
#global all_time
#all_time = all_time +t1-t0#len(ind)
#global cnt
#cnt = cnt+1#len(index)
#print(all_time/cnt)
#return cache_index,uncache_index,cache_value
\ No newline at end of file
#include <pybind11/pybind11.h>
#include <pybind11/numpy.h>
#include <pybind11/stl.h>
#include <torch/extension.h>
#include <parallel_hashmap/phmap.h>
#include <cstring>
#include <vector>
#include <iostream>
#include <ctime>
using namespace std;
namespace py = pybind11;
namespace th = torch;
typedef int64_t NodeIDType;
map<string,phmap::parallel_flat_hash_map <NodeIDType,NodeIDType> *> cache_index;
map<string,th::Tensor > cache_data;
class DataFromCPUCache
{
public:
th::Tensor data;
th::Tensor cached_index;
th::Tensor uncached_index;
DataFromCPUCache(){}
DataFromCPUCache(th::Tensor & _cached_index, th::Tensor & _uncached_index,
th::Tensor & _data):
data(_data), cached_index(_cached_index), uncached_index(_uncached_index){}
};
void cache_data2mem(string data_name,th::Tensor index,th::Tensor data){
AT_ASSERTM(data.is_contiguous(), "Offset tensor must be contiguous");
cache_data[data_name] = data;
auto array = index.data_ptr<NodeIDType>();
vector<pair<NodeIDType,NodeIDType>> v;
int mem_size = data.size(0);
for(int i=0;i<mem_size;i++){
v.push_back(make_pair((NodeIDType)array[i],(NodeIDType)i));
//cout<<(NodeIDType)array[i]<<" "<<(NodeIDType)i<<endl;
}
cache_index[data_name] = new phmap::parallel_flat_hash_map<NodeIDType,NodeIDType>(v.begin(),v.end());
}
double tot1 = 0;
double tot2 = 0;
double tot3 = 0;
double tot4 = 0;
int cnt = 0;
DataFromCPUCache get_from_cache(string data_name,th::Tensor index){
int len = index.size(0);
auto array = index.data_ptr<NodeIDType>();
phmap::parallel_flat_hash_map <NodeIDType,NodeIDType> * mp = cache_index[data_name];
th::Tensor data = cache_data[data_name];
vector<NodeIDType> iscached(len);
//cout<<len<<endl;
#pragma omp parallel for num_threads(10)
for(int i=0 ; i < len ; i++){
NodeIDType id = (NodeIDType)array[i];
if(mp->find(id) != mp->end()){
iscached[i] = mp->find(id)->second;
//cout<<i<<" "<<id<<" "<<mp->find(id)->second<<endl;
}
else{
iscached[i] = -1;
// cout<<i<<" "<<id<<" "<<-1<<endl;
}
}
clock_t t0 = clock();
th::Tensor is_cache = th::tensor(iscached);
th::Tensor is_select = is_cache >= 0;
clock_t t1 = clock();
//cout<<is_cache.size(0)<<" "<<no_select.size(0)<<endl;
th::Tensor cached_index = index.masked_select(is_select);
th::Tensor uncache_index = index.masked_select(~is_select);
clock_t t4 = clock();
th::Tensor mem_index = is_cache.masked_select(is_select);
clock_t t2 = clock();
th::Tensor cached_data = data.index_select(0,mem_index);
clock_t t3 = clock();
DataFromCPUCache dataFromCache = DataFromCPUCache(cached_index,uncache_index,cached_data);
cnt = cnt+1;
tot1+=(double)(t1-t0)/CLOCKS_PER_SEC;
tot2+=(double)(t4-t1)/CLOCKS_PER_SEC;
tot3+=(double)(t2-t4)/CLOCKS_PER_SEC;
tot4+=(double)(t3-t2)/CLOCKS_PER_SEC;
// cout<<"cache"<<" "<<tot1/cnt<<" "<<tot2/cnt<<" "<<tot3/cnt<<" "<<tot4/cnt<<endl;
return dataFromCache;
}
PYBIND11_MODULE(cpu_cache_manager, m)
{
m
.def("cache_data2mem",
&cache_data2mem,
py::return_value_policy::reference)
.def("get_from_cache",
&get_from_cache,
py::return_value_policy::reference);
py::class_<DataFromCPUCache>(m, "DataFromCPUCache")
.def_readonly("cache_index", &DataFromCPUCache::cached_index, py::return_value_policy::reference)
.def_readonly("uncache_index", &DataFromCPUCache::uncached_index,py::return_value_policy::reference)
.def_readonly("cache_data", &DataFromCPUCache::data, py::return_value_policy::reference);
}
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from conans import ConanFile, tools
import os
class SparseppConan(ConanFile):
name = "parallel_hashmap"
version = "1.27"
description = "A header-only, very fast and memory-friendly hash map"
# Indicates License type of the packaged library
license = "https://github.com/greg7mdp/parallel-hashmap/blob/master/LICENSE"
# Packages the license for the conanfile.py
exports = ["LICENSE"]
# Custom attributes for Bincrafters recipe conventions
source_subfolder = "source_subfolder"
def source(self):
source_url = "https://github.com/greg7mdp/parallel-hashmap"
tools.get("{0}/archive/{1}.tar.gz".format(source_url, self.version))
extracted_dir = self.name + "-" + self.version
#Rename to "source_folder" is a convention to simplify later steps
os.rename(extracted_dir, self.source_subfolder)
def package(self):
include_folder = os.path.join(self.source_subfolder, "parallel_hashmap")
self.copy(pattern="LICENSE")
self.copy(pattern="*", dst="include/parallel_hashmap", src=include_folder)
def package_id(self):
self.info.header_only()
#if !defined(spp_memory_h_guard)
#define spp_memory_h_guard
#include <cstdint>
#include <cstring>
#include <cstdlib>
#if defined(_WIN32) || defined( __CYGWIN__)
#define SPP_WIN
#endif
#ifdef SPP_WIN
#include <windows.h>
#include <Psapi.h>
#undef min
#undef max
#elif defined(__linux__)
#include <sys/types.h>
#include <sys/sysinfo.h>
#elif defined(__FreeBSD__)
#include <paths.h>
#include <fcntl.h>
#include <kvm.h>
#include <unistd.h>
#include <sys/sysctl.h>
#include <sys/user.h>
#endif
namespace spp
{
uint64_t GetSystemMemory();
uint64_t GetTotalMemoryUsed();
uint64_t GetProcessMemoryUsed();
uint64_t GetPhysicalMemory();
uint64_t GetSystemMemory()
{
#ifdef SPP_WIN
MEMORYSTATUSEX memInfo;
memInfo.dwLength = sizeof(MEMORYSTATUSEX);
GlobalMemoryStatusEx(&memInfo);
return static_cast<uint64_t>(memInfo.ullTotalPageFile);
#elif defined(__linux__)
struct sysinfo memInfo;
sysinfo (&memInfo);
auto totalVirtualMem = memInfo.totalram;
totalVirtualMem += memInfo.totalswap;
totalVirtualMem *= memInfo.mem_unit;
return static_cast<uint64_t>(totalVirtualMem);
#elif defined(__FreeBSD__)
kvm_t *kd;
u_int pageCnt;
size_t pageCntLen = sizeof(pageCnt);
u_int pageSize;
struct kvm_swap kswap;
uint64_t totalVirtualMem;
pageSize = static_cast<u_int>(getpagesize());
sysctlbyname("vm.stats.vm.v_page_count", &pageCnt, &pageCntLen, NULL, 0);
totalVirtualMem = pageCnt * pageSize;
kd = kvm_open(NULL, _PATH_DEVNULL, NULL, O_RDONLY, "kvm_open");
kvm_getswapinfo(kd, &kswap, 1, 0);
kvm_close(kd);
totalVirtualMem += kswap.ksw_total * pageSize;
return totalVirtualMem;
#else
return 0;
#endif
}
uint64_t GetTotalMemoryUsed()
{
#ifdef SPP_WIN
MEMORYSTATUSEX memInfo;
memInfo.dwLength = sizeof(MEMORYSTATUSEX);
GlobalMemoryStatusEx(&memInfo);
return static_cast<uint64_t>(memInfo.ullTotalPageFile - memInfo.ullAvailPageFile);
#elif defined(__linux__)
struct sysinfo memInfo;
sysinfo(&memInfo);
auto virtualMemUsed = memInfo.totalram - memInfo.freeram;
virtualMemUsed += memInfo.totalswap - memInfo.freeswap;
virtualMemUsed *= memInfo.mem_unit;
return static_cast<uint64_t>(virtualMemUsed);
#elif defined(__FreeBSD__)
kvm_t *kd;
u_int pageSize;
u_int pageCnt, freeCnt;
size_t pageCntLen = sizeof(pageCnt);
size_t freeCntLen = sizeof(freeCnt);
struct kvm_swap kswap;
uint64_t virtualMemUsed;
pageSize = static_cast<u_int>(getpagesize());
sysctlbyname("vm.stats.vm.v_page_count", &pageCnt, &pageCntLen, NULL, 0);
sysctlbyname("vm.stats.vm.v_free_count", &freeCnt, &freeCntLen, NULL, 0);
virtualMemUsed = (pageCnt - freeCnt) * pageSize;
kd = kvm_open(NULL, _PATH_DEVNULL, NULL, O_RDONLY, "kvm_open");
kvm_getswapinfo(kd, &kswap, 1, 0);
kvm_close(kd);
virtualMemUsed += kswap.ksw_used * pageSize;
return virtualMemUsed;
#else
return 0;
#endif
}
uint64_t GetProcessMemoryUsed()
{
#ifdef SPP_WIN
PROCESS_MEMORY_COUNTERS_EX pmc;
GetProcessMemoryInfo(GetCurrentProcess(), reinterpret_cast<PPROCESS_MEMORY_COUNTERS>(&pmc), sizeof(pmc));
return static_cast<uint64_t>(pmc.PrivateUsage);
#elif defined(__linux__)
auto parseLine =
[](char* line)->int
{
auto i = strlen(line);
while(*line < '0' || *line > '9')
{
line++;
}
line[i-3] = '\0';
i = atoi(line);
return i;
};
auto file = fopen("/proc/self/status", "r");
auto result = -1;
char line[128];
while(fgets(line, 128, file) != nullptr)
{
if(strncmp(line, "VmSize:", 7) == 0)
{
result = parseLine(line);
break;
}
}
fclose(file);
return static_cast<uint64_t>(result) * 1024;
#elif defined(__FreeBSD__)
struct kinfo_proc info;
size_t infoLen = sizeof(info);
int mib[] = { CTL_KERN, KERN_PROC, KERN_PROC_PID, getpid() };
sysctl(mib, sizeof(mib) / sizeof(*mib), &info, &infoLen, NULL, 0);
return static_cast<uint64_t>(info.ki_rssize * getpagesize());
#else
return 0;
#endif
}
uint64_t GetPhysicalMemory()
{
#ifdef SPP_WIN
MEMORYSTATUSEX memInfo;
memInfo.dwLength = sizeof(MEMORYSTATUSEX);
GlobalMemoryStatusEx(&memInfo);
return static_cast<uint64_t>(memInfo.ullTotalPhys);
#elif defined(__linux__)
struct sysinfo memInfo;
sysinfo(&memInfo);
auto totalPhysMem = memInfo.totalram;
totalPhysMem *= memInfo.mem_unit;
return static_cast<uint64_t>(totalPhysMem);
#elif defined(__FreeBSD__)
u_long physMem;
size_t physMemLen = sizeof(physMem);
int mib[] = { CTL_HW, HW_PHYSMEM };
sysctl(mib, sizeof(mib) / sizeof(*mib), &physMem, &physMemLen, NULL, 0);
return physMem;
#else
return 0;
#endif
}
}
#endif // spp_memory_h_guard
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
#if !defined(phmap_dump_h_guard_)
#define phmap_dump_h_guard_
// ---------------------------------------------------------------------------
// Copyright (c) 2019, Gregory Popovitch - greg7mdp@gmail.com
//
// providing dump/load/mmap_load
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ---------------------------------------------------------------------------
#include <iostream>
#include <fstream>
#include <sstream>
#include "phmap.h"
namespace phmap
{
namespace type_traits_internal {
#if defined(__GLIBCXX__) && __GLIBCXX__ < 20150801
template<typename T> struct IsTriviallyCopyable : public std::integral_constant<bool, __has_trivial_copy(T)> {};
#else
template<typename T> struct IsTriviallyCopyable : public std::is_trivially_copyable<T> {};
#endif
template <class T1, class T2>
struct IsTriviallyCopyable<std::pair<T1, T2>> {
static constexpr bool value = IsTriviallyCopyable<T1>::value && IsTriviallyCopyable<T2>::value;
};
}
namespace priv {
// ------------------------------------------------------------------------
// dump/load for raw_hash_set
// ------------------------------------------------------------------------
template <class Policy, class Hash, class Eq, class Alloc>
template<typename OutputArchive>
bool raw_hash_set<Policy, Hash, Eq, Alloc>::dump(OutputArchive& ar) const {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
if (!ar.dump(size_)) {
std::cerr << "Failed to dump size_" << std::endl;
return false;
}
if (size_ == 0) {
return true;
}
if (!ar.dump(capacity_)) {
std::cerr << "Failed to dump capacity_" << std::endl;
return false;
}
if (!ar.dump(reinterpret_cast<char*>(ctrl_),
sizeof(ctrl_t) * (capacity_ + Group::kWidth + 1))) {
std::cerr << "Failed to dump ctrl_" << std::endl;
return false;
}
if (!ar.dump(reinterpret_cast<char*>(slots_),
sizeof(slot_type) * capacity_)) {
std::cerr << "Failed to dump slot_" << std::endl;
return false;
}
return true;
}
template <class Policy, class Hash, class Eq, class Alloc>
template<typename InputArchive>
bool raw_hash_set<Policy, Hash, Eq, Alloc>::load(InputArchive& ar) {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
raw_hash_set<Policy, Hash, Eq, Alloc>().swap(*this); // clear any existing content
if (!ar.load(&size_)) {
std::cerr << "Failed to load size_" << std::endl;
return false;
}
if (size_ == 0) {
return true;
}
if (!ar.load(&capacity_)) {
std::cerr << "Failed to load capacity_" << std::endl;
return false;
}
// allocate memory for ctrl_ and slots_
initialize_slots();
if (!ar.load(reinterpret_cast<char*>(ctrl_),
sizeof(ctrl_t) * (capacity_ + Group::kWidth + 1))) {
std::cerr << "Failed to load ctrl" << std::endl;
return false;
}
if (!ar.load(reinterpret_cast<char*>(slots_),
sizeof(slot_type) * capacity_)) {
std::cerr << "Failed to load slot" << std::endl;
return false;
}
return true;
}
// ------------------------------------------------------------------------
// dump/load for parallel_hash_set
// ------------------------------------------------------------------------
template <size_t N,
template <class, class, class, class> class RefSet,
class Mtx_,
class Policy, class Hash, class Eq, class Alloc>
template<typename OutputArchive>
bool parallel_hash_set<N, RefSet, Mtx_, Policy, Hash, Eq, Alloc>::dump(OutputArchive& ar) const {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
if (! ar.dump(subcnt())) {
std::cerr << "Failed to dump meta!" << std::endl;
return false;
}
for (size_t i = 0; i < sets_.size(); ++i) {
auto& inner = sets_[i];
typename Lockable::UniqueLock m(const_cast<Inner&>(inner));
if (!inner.set_.dump(ar)) {
std::cerr << "Failed to dump submap " << i << std::endl;
return false;
}
}
return true;
}
template <size_t N,
template <class, class, class, class> class RefSet,
class Mtx_,
class Policy, class Hash, class Eq, class Alloc>
template<typename InputArchive>
bool parallel_hash_set<N, RefSet, Mtx_, Policy, Hash, Eq, Alloc>::load(InputArchive& ar) {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
size_t submap_count = 0;
if (!ar.load(&submap_count)) {
std::cerr << "Failed to load submap count!" << std::endl;
return false;
}
if (submap_count != subcnt()) {
std::cerr << "submap count(" << submap_count << ") != N(" << N << ")" << std::endl;
return false;
}
for (size_t i = 0; i < submap_count; ++i) {
auto& inner = sets_[i];
typename Lockable::UniqueLock m(const_cast<Inner&>(inner));
if (!inner.set_.load(ar)) {
std::cerr << "Failed to load submap " << i << std::endl;
return false;
}
}
return true;
}
} // namespace priv
// ------------------------------------------------------------------------
// BinaryArchive
// File is closed when archive object is destroyed
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
class BinaryOutputArchive {
public:
BinaryOutputArchive(const char *file_path) {
ofs_.open(file_path, std::ios_base::binary);
}
bool dump(const char *p, size_t sz) {
ofs_.write(p, sz);
return true;
}
template<typename V>
typename std::enable_if<type_traits_internal::IsTriviallyCopyable<V>::value, bool>::type
dump(const V& v) {
ofs_.write(reinterpret_cast<const char *>(&v), sizeof(V));
return true;
}
private:
std::ofstream ofs_;
};
class BinaryInputArchive {
public:
BinaryInputArchive(const char * file_path) {
ifs_.open(file_path, std::ios_base::binary);
}
bool load(char* p, size_t sz) {
ifs_.read(p, sz);
return true;
}
template<typename V>
typename std::enable_if<type_traits_internal::IsTriviallyCopyable<V>::value, bool>::type
load(V* v) {
ifs_.read(reinterpret_cast<char *>(v), sizeof(V));
return true;
}
private:
std::ifstream ifs_;
};
} // namespace phmap
#endif // phmap_dump_h_guard_
#if !defined(phmap_fwd_decl_h_guard_)
#define phmap_fwd_decl_h_guard_
// ---------------------------------------------------------------------------
// Copyright (c) 2019, Gregory Popovitch - greg7mdp@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
// ---------------------------------------------------------------------------
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4514) // unreferenced inline function has been removed
#pragma warning(disable : 4710) // function not inlined
#pragma warning(disable : 4711) // selected for automatic inline expansion
#endif
#include <memory>
#include <utility>
#if defined(PHMAP_USE_ABSL_HASH) && !defined(ABSL_HASH_HASH_H_)
namespace absl { template <class T> struct Hash; };
#endif
namespace phmap {
#if defined(PHMAP_USE_ABSL_HASH)
template <class T> using Hash = ::absl::Hash<T>;
#else
template <class T> struct Hash;
#endif
template <class T> struct EqualTo;
template <class T> struct Less;
template <class T> using Allocator = typename std::allocator<T>;
template<class T1, class T2> using Pair = typename std::pair<T1, T2>;
class NullMutex;
namespace priv {
// The hash of an object of type T is computed by using phmap::Hash.
template <class T, class E = void>
struct HashEq
{
using Hash = phmap::Hash<T>;
using Eq = phmap::EqualTo<T>;
};
template <class T>
using hash_default_hash = typename priv::HashEq<T>::Hash;
template <class T>
using hash_default_eq = typename priv::HashEq<T>::Eq;
// type alias for std::allocator so we can forward declare without including other headers
template <class T>
using Allocator = typename phmap::Allocator<T>;
// type alias for std::pair so we can forward declare without including other headers
template<class T1, class T2>
using Pair = typename phmap::Pair<T1, T2>;
} // namespace priv
// ------------- forward declarations for hash containers ----------------------------------
template <class T,
class Hash = phmap::priv::hash_default_hash<T>,
class Eq = phmap::priv::hash_default_eq<T>,
class Alloc = phmap::priv::Allocator<T>> // alias for std::allocator
class flat_hash_set;
template <class K, class V,
class Hash = phmap::priv::hash_default_hash<K>,
class Eq = phmap::priv::hash_default_eq<K>,
class Alloc = phmap::priv::Allocator<
phmap::priv::Pair<const K, V>>> // alias for std::allocator
class flat_hash_map;
template <class T,
class Hash = phmap::priv::hash_default_hash<T>,
class Eq = phmap::priv::hash_default_eq<T>,
class Alloc = phmap::priv::Allocator<T>> // alias for std::allocator
class node_hash_set;
template <class Key, class Value,
class Hash = phmap::priv::hash_default_hash<Key>,
class Eq = phmap::priv::hash_default_eq<Key>,
class Alloc = phmap::priv::Allocator<
phmap::priv::Pair<const Key, Value>>> // alias for std::allocator
class node_hash_map;
template <class T,
class Hash = phmap::priv::hash_default_hash<T>,
class Eq = phmap::priv::hash_default_eq<T>,
class Alloc = phmap::priv::Allocator<T>, // alias for std::allocator
size_t N = 4, // 2**N submaps
class Mutex = phmap::NullMutex> // use std::mutex to enable internal locks
class parallel_flat_hash_set;
template <class K, class V,
class Hash = phmap::priv::hash_default_hash<K>,
class Eq = phmap::priv::hash_default_eq<K>,
class Alloc = phmap::priv::Allocator<
phmap::priv::Pair<const K, V>>, // alias for std::allocator
size_t N = 4, // 2**N submaps
class Mutex = phmap::NullMutex> // use std::mutex to enable internal locks
class parallel_flat_hash_map;
template <class T,
class Hash = phmap::priv::hash_default_hash<T>,
class Eq = phmap::priv::hash_default_eq<T>,
class Alloc = phmap::priv::Allocator<T>, // alias for std::allocator
size_t N = 4, // 2**N submaps
class Mutex = phmap::NullMutex> // use std::mutex to enable internal locks
class parallel_node_hash_set;
template <class Key, class Value,
class Hash = phmap::priv::hash_default_hash<Key>,
class Eq = phmap::priv::hash_default_eq<Key>,
class Alloc = phmap::priv::Allocator<
phmap::priv::Pair<const Key, Value>>, // alias for std::allocator
size_t N = 4, // 2**N submaps
class Mutex = phmap::NullMutex> // use std::mutex to enable internal locks
class parallel_node_hash_map;
// ------------- forward declarations for btree containers ----------------------------------
template <typename Key, typename Compare = phmap::Less<Key>,
typename Alloc = phmap::Allocator<Key>>
class btree_set;
template <typename Key, typename Compare = phmap::Less<Key>,
typename Alloc = phmap::Allocator<Key>>
class btree_multiset;
template <typename Key, typename Value, typename Compare = phmap::Less<Key>,
typename Alloc = phmap::Allocator<phmap::priv::Pair<const Key, Value>>>
class btree_map;
template <typename Key, typename Value, typename Compare = phmap::Less<Key>,
typename Alloc = phmap::Allocator<phmap::priv::Pair<const Key, Value>>>
class btree_multimap;
} // namespace phmap
#ifdef _MSC_VER
#pragma warning(pop)
#endif
#endif // phmap_fwd_decl_h_guard_
#include <pybind11/pybind11.h>
#include <pybind11/numpy.h>
#include <pybind11/stl.h>
#include <torch/extension.h>
#include <parallel_hashmap/phmap.h>
#include <cstring>
#include <vector>
#include <iostream>
#include <map>
using namespace std;
namespace py = pybind11;
namespace th = torch;
typedef int64_t NodeIDType;
//phmap::btree_map<string,phmap::btree_map<string,int>> node_freq;
map<string,map<NodeIDType,double> *> node_freq;
void update_count(string dataname,th::Tensor IdArray,char* device,double weight,NodeIDType l,NodeIDType r){
//phmap::btree_map<NodeIDType,int> freq;
auto array = IdArray.data_ptr<NodeIDType>();
map<NodeIDType,double>* freq = nullptr;
if(node_freq.find(dataname)!=node_freq.end()){
freq = node_freq[dataname];
}
else{
cout<<"no this data "<<dataname<<endl;
freq = new map<NodeIDType,double>();
node_freq[dataname]=freq;
}
if(strcmp(device,"cpu")==0){
for(int i=0;i<IdArray.size(0);i++){
NodeIDType u = NodeIDType(array[i]);
//cout<<u<<" "<<l<<" "<<r<<" "<<device<<endl;
if(u<r and u>=l)continue;
if(freq->find(u) != freq->end()){
//cout<<u<<" ! "<<freq->find(u)->second + 1<<endl;
freq->find(u)->second++;
//freq->insert(std::make_pair(u,(freq->find(u))->second + 1));
}
else freq->insert(std::make_pair(u,1));
};
}
else{
for(int i=0;i<IdArray.size(0);i++){
NodeIDType u = NodeIDType(array[i]);
if(u<r and u>=l){
if(freq->find(u) != freq->end())freq->insert(std::make_pair(u,(freq->find(u))->second + 1));
else freq->insert(std::make_pair(u,1));
}
else{
if(freq->find(u) != freq->end())freq->insert(std::make_pair(u,(freq->find(u))->second + weight));
else freq->insert(std::make_pair(u,weight));
}
};
}
fflush(stdout);
}
typedef pair<NodeIDType,int> PAIR;
bool cmp(const PAIR &A, const PAIR &B){
return A.second > B.second;
}
th::Tensor get_max_rank(string dataname,int size){
map<NodeIDType,double>* nmap = node_freq[dataname];
vector<PAIR> vec(nmap->begin(),nmap->end());
sort(vec.begin(), vec.end(), cmp);
vector<NodeIDType> out;
int vector_size = (int)vec.size();
for(int i = 0; i < min(size,vector_size);i++){
out.push_back(vec[i].first);
//cout<<vec[i].first<<" "<<vec[i].second<<endl;
}
node_freq[dataname]->clear();
delete node_freq[dataname];
node_freq.erase(dataname);
return th::tensor(out);
}
PYBIND11_MODULE(presample_cores, m)
{
m
.def("update_count",
&update_count,
py::return_value_policy::reference)
.def("get_max_rank",
&get_max_rank,
py::return_value_policy::reference);
}
\ No newline at end of file
from setuptools import setup
from torch.utils.cpp_extension import BuildExtension, CppExtension
setup(
name='presample_cores',
ext_modules=[
CppExtension(
name='presample_cores',
sources=['presample_cores.cpp'],
extra_compile_args=['-fopenmp','-Xlinker',' -export-dynamic'],
include_dirs=["./"],
),
],
cmdclass={
'build_ext': BuildExtension
})#
#setup(
# name='cpu_cache_manager',
# ext_modules=[
# CppExtension(
# name='cpu_cache_manager',
# sources=['cpu_cache_manager.cpp'],
# extra_compile_args=['-fopenmp','-Xlinker',' -export-dynamic'],
# include_dirs=["./"],
# ),
# ],
# cmdclass={
# 'build_ext': BuildExtension
# })#
#
\ No newline at end of file
from Cache.cache_memory import create_empty_cache, load_data2cache
from DistCustomPool import get_sampler_pool
import distparser as parser
from message_worker import _split_node_part_and_submit
from share_memory_util import _copy_to_share_memory, _get_existing_share_memory, _get_from_share_memory
import torch
def create_memory(data_name,graph,memory_size,index,device='cpu'):
if device == 'cpu':
if parser._get_num_sampler()== 1:
create_empty_cache(data_name,len(index),graph.data.x.size(1))
load_data2cache(data_name,index)
else:
cache_mem = torch.zeros(len(index),graph.data.x.size(1))
indx = torch.zeros(len(index))
mem_data = _copy_to_share_memory(cache_mem)
idx_data = _copy_to_share_memory(index)
pool = get_sampler_pool()
pool.load_share_cache(data_name,idx_data,mem_data)
pool.initilize_cache(data_name,index)
import os.path as osp
import torch
class GraphData():
def __init__(self, path):
assert path is not None and osp.exists(path),'path 不存在'
id,edge_index,data,partptr =torch.load(path)
# 当前分区序号
self.partition_id = id
# 总分区数
self.partitions = partptr.numel() - 1
# 全图结构数据
self.num_nodes = partptr[self.partitions]
self.num_edges = edge_index[0].numel()
self.edge_index = edge_index
# 该分区下的数据(包含特征向量和子图结构)pyg Data数据结构
self.data = data
# 分区映射关系
self.partptr = partptr
def __init__(self, id, edge_index, data, partptr):
# 当前分区序号
self.partition_id = id
# 总分区数
self.partitions = partptr.numel() - 1
# 全图结构数据
self.num_nodes = partptr[self.partitions]
if edge_index is not None:
self.num_edges = edge_index[0].numel()
self.edge_index = edge_index
# 该分区下的数据(包含特征向量和子图结构)pyg Data数据结构
self.data = data
# 分区映射关系
self.partptr = partptr
def select_attr(self,index):
return torch.index_select(self.data.x,0,index)
#返回全局的节点id 所对应的分区
def get_part_num(self):
return self.data.x.size()[0]
def select_attr(self,index):
return torch.index_select(self.data.x,0,index)
def select_y(self,index):
return torch.index_select(self.data.y,0,index)
#返回全局的节点id 所对应的分区
def get_localId_by_partitionId(self,id,index):
#print(index)
if(id == -1 or id == 0):
return index
else:
return torch.add(index,-self.partptr[id])
def get_globalId_by_partitionId(self,id,index):
if(id == -1 or id == 0):
return index
else:
return torch.add(index,self.partptr[id])
def get_node_num(self):
return self.num_nodes
def localId_to_globalId(self,id,partitionId:int = -1):
'''
将分区partitionId内的点id映射为全局的id
'''
if partitionId == -1:
partitionId = self.partition_id
assert id >=self.partptr[self.partition_id] and id < self.partptr[self.partition_id+1]
ids_before = 0
if self.partition_id>0:
ids_before = self.partptr[self.partition_id-1]
return id+ids_before
def get_partitionId_by_globalId(self,id):
'''
通过全局id得到对应的分区序号
'''
partitionId = -1
assert id>=0 and id<self.num_nodes,'id 超过范围'
for i in range(self.partitions):
if id>=self.partptr[i] and id<self.partptr[i+1]:
partitionId = i
break
assert partitionId>=0, 'id 不存在对应的分区'
return partitionId
def get_nodes_by_partitionId(self,id):
'''
根据partitioId 返回该分区的节点数量
'''
assert id>=0 and id<self.partitions,'partitionId 非法'
return (int)(self.partptr[id+1]-self.partptr[id])
def __repr__(self):
return (f'{self.__class__.__name__}(\n'
f' partition_id={self.partition_id}\n'
f' data={self.data},\n'
f' global_info('
f'num_nodes={self.num_nodes},'
f' num_edges={self.num_edges},'
f' num_parts={self.partitions},'
f' edge_index=[2,{self.edge_index[0].numel()}])\n'
f')')
import torch
from torch import Tensor
from enum import Enum
import math
from abc import ABC
from typing import Optional, Tuple, Union
class NegativeSamplingMode(Enum):
# 'binary': Randomly sample negative edges in the graph.
binary = 'binary'
# 'triplet': Randomly sample negative destination nodes for each positive
# source node.
triplet = 'triplet'
class NegativeSampling:
r"""The negative sampling configuration of a
:class:`~torch_geometric.sampler.BaseSampler` when calling
:meth:`~torch_geometric.sampler.BaseSampler.sample_from_edges`.
Args:
mode (str): The negative sampling mode
(:obj:`"binary"` or :obj:`"triplet"`).
If set to :obj:`"binary"`, will randomly sample negative links
from the graph.
If set to :obj:`"triplet"`, will randomly sample negative
destination nodes for each positive source node.
amount (int or float, optional): The ratio of sampled negative edges to
the number of positive edges. (default: :obj:`1`)
weight (torch.Tensor, optional): A node-level vector determining the
sampling of nodes. Does not necessariyl need to sum up to one.
If not given, negative nodes will be sampled uniformly.
(default: :obj:`None`)
"""
mode: NegativeSamplingMode
amount: Union[int, float] = 1
weight: Optional[Tensor] = None
def __init__(
self,
mode: Union[NegativeSamplingMode, str],
amount: Union[int, float] = 1,
weight: Optional[Tensor] = None,
):
self.mode = NegativeSamplingMode(mode)
self.amount = amount
self.weight = weight
if self.amount <= 0:
raise ValueError(f"The attribute 'amount' needs to be positive "
f"for '{self.__class__.__name__}' "
f"(got {self.amount})")
if self.is_triplet():
if self.amount != math.ceil(self.amount):
raise ValueError(f"The attribute 'amount' needs to be an "
f"integer for '{self.__class__.__name__}' "
f"with 'triplet' negative sampling "
f"(got {self.amount}).")
self.amount = math.ceil(self.amount)
def is_binary(self) -> bool:
return self.mode == NegativeSamplingMode.binary
def is_triplet(self) -> bool:
return self.mode == NegativeSamplingMode.triplet
def sample(self, num_samples: int,
num_nodes: Optional[int] = None) -> Tensor:
r"""Generates :obj:`num_samples` negative samples."""
if self.weight is None:
if num_nodes is None:
raise ValueError(
f"Cannot sample negatives in '{self.__class__.__name__}' "
f"without passing the 'num_nodes' argument")
return torch.randint(num_nodes, (num_samples, ))
if num_nodes is not None and self.weight.numel() != num_nodes:
raise ValueError(
f"The 'weight' attribute in '{self.__class__.__name__}' "
f"needs to match the number of nodes {num_nodes} "
f"(got {self.weight.numel()})")
return torch.multinomial(self.weight, num_samples, replacement=True)
class BaseSampler(ABC):
r"""An abstract base class that initializes a graph sampler and provides
:meth:`_sample_one_layer_from_nodes`
:meth:`_sample_one_layer_from_nodes_parallel`
:meth:`sample_from_nodes` routines.
"""
def sample_from_nodes(
self,
nodes: torch.Tensor,
**kwargs
) -> Tuple[torch.Tensor, list]:
r"""Performs mutilayer sampling from the nodes specified in: nodes
The specific number of layers is determined by parameter: num_layers
returning a sampled subgraph in the specified output format: Tuple[torch.Tensor, list].
Args:
nodes: the list of seed nodes index
**kwargs: other kwargs
Returns:
sampled_nodes: the nodes sampled
sampled_edge_index_list: the edges sampled
"""
raise NotImplementedError
def sample_from_edges(
self,
edges: torch.Tensor,
edge_label: Optional[torch.Tensor] = None,
neg_sampling: Optional[NegativeSampling] = None
) -> Tuple[torch.Tensor, list]:
r"""Performs sampling from the edges specified in :obj:`index`,
returning a sampled subgraph in the specified output format.
Args:
edges: the list of seed edges index
edge_label: the label for the seed edges.
neg_sampling: The negative sampling configuration
Returns:
sampled_nodes: the nodes sampled
sampled_edge_index_list: the edges sampled
metadata: other infomation
"""
raise NotImplementedError
# def _sample_one_layer_from_nodes(
# self,
# nodes:torch.Tensor,
# **kwargs
# ) -> Tuple[torch.Tensor, torch.Tensor]:
# r"""Performs sampling from the nodes specified in: nodes,
# returning a sampled subgraph in the specified output format: Tuple[torch.Tensor, torch.Tensor].
# Args:
# nodes: the list of seed nodes index
# **kwargs: other kwargs
# Returns:
# sampled_nodes: the nodes sampled
# sampled_edge_index: the edges sampled
# """
# raise NotImplementedError
# def _sample_one_layer_from_nodes_parallel(
# self,
# nodes: torch.Tensor,
# **kwargs
# ) -> Tuple[torch.Tensor, torch.Tensor]:
# r"""Performs sampling paralleled from the nodes specified in: nodes,
# returning a sampled subgraph in the specified output format: Tuple[torch.Tensor, torch.Tensor].
# Args:
# nodes: the list of seed nodes index
# **kwargs: other kwargs
# Returns:
# sampled_nodes: the nodes sampled
# sampled_edge_index: the edges sampled
# """
# raise NotImplementedError
import time
import torch
# import sys
# from os.path import abspath, dirname
# sys.path.insert(0, abspath(dirname(__file__)))
# print(sys.path)
from neighbor_sampler import NeighborSampler
# edge_index = torch.tensor([[0, 1, 1, 2, 2, 2, 3], [1, 0, 2, 1, 3, 0, 2]])
# num_nodes = 4
edge_index = torch.tensor([[0, 1, 1, 1, 2, 2, 2, 3, 3, 4, 4, 4, 5], [1, 0, 2, 4, 1, 3, 0, 2, 5, 3, 5, 0, 2]])
num_nodes = 6
num_neighbors = 2
# Run the neighbor sampling
pre = time.time()
sampler=NeighborSampler(edge_index=edge_index, num_nodes=num_nodes, num_layers=2, workers=2, fanout=[2, 1])
end = time.time()
print("neighbor time:", end-pre)
# neighbor_nodes, sampled_edge_index = sampler._sample_one_layer_from_nodes(nodes=torch.tensor([1,3]), fanout=num_neighbors)
neighbor_nodes, sampled_edge_index = sampler.sample_from_nodes(torch.tensor([1,2,3]))
# Print the result
print('neighbor_nodes_id: \n',neighbor_nodes, '\nedge_index: \n',sampled_edge_index)
# import torch_scatter
# nodes=torch.Tensor([1,2])
# row, col = edge_index
# deg = torch_scatter.scatter_add(torch.ones_like(row), row, dim=0, dim_size=num_nodes)
# neighbors1=torch.concat([row[row==nodes[i]] for i in range(0, nodes.shape[0])])
# print(neighbors1)
# neighbors2=torch.concat([col[row==nodes[i]] for i in range(0, nodes.shape[0])])
# print(neighbors2)
# neighbors=torch.stack([neighbors1, neighbors2], dim=0)
# print('neighbors: \n', neighbors[0]==1)
from base import NegativeSampling
edge_index = torch.tensor([[0, 1, 1, 1, 2, 2, 2, 3, 3, 4, 4, 4, 5],
[1, 0, 2, 4, 1, 3, 0, 2, 5, 3, 5, 0, 2]])
num_nodes = 6
# sampler
sampler=NeighborSampler(edge_index=edge_index.clone(), num_nodes=num_nodes, num_layers=2, workers=2, fanout=[2, 1])
# negative
weight = torch.tensor([0.3,0.1,0.1,0.1,0.3,0.1])
negative = NegativeSampling('binary', 2, weight)
# negative = NegativeSampling('triplet', 2, weight)
label=torch.tensor([1,2])
seed_edges = torch.tensor([[0,1],
[1,4]])
# result = sampler.sample_from_edges(edges=seed_edges)
result = sampler.sample_from_edges(edges=seed_edges, edge_label=label, neg_sampling=negative)
# Print the result
print('neighbor_nodes_id: \n',result[0], '\nedge_index: \n',result[1], '\nmetadata: \n',result[2])
\ No newline at end of file
import torch
edge_index = torch.tensor([[0, 1, 1, 1, 2, 2, 2, 3, 3, 4, 4, 4, 5], [1, 0, 2, 4, 1, 3, 0, 2, 5, 3, 5, 0, 2]])
row,col=edge_index
row.numpy().tolist()
from .neighbor_sampler import NeighborSampler
edge_index = torch.tensor([[0, 1, 1, 1, 2, 2, 2, 3, 3, 4, 4, 4, 5], [1, 0, 2, 4, 1, 3, 0, 2, 5, 3, 5, 0, 2]])
num_nodes = 6
num_neighbors = 2
# Run the neighbor sampling
sampler=NeighborSampler
neighbor_nodes, edge_index = sampler.sample_from_node(2, edge_index, num_nodes, num_neighbors)
neighbor_nodes, edge_index = sampler.sample_from_nodes(torch.tensor([1,2]), edge_index, num_nodes, num_neighbors)
neighbor_nodes, edge_index = sampler.sample_from_nodes_parallel(torch.tensor([1,2,3]), edge_index, num_nodes, workers=3, fanout=num_neighbors)
neighbor_nodes, edge_index = sampler.sample_from_nodes_parallel(torch.tensor([1,2,3,4,5]), edge_index, num_nodes, workers=4, fanout=num_neighbors)
import torch
from ogb.nodeproppred import PygNodePropPredDataset
from torch_geometric import datasets
import time
from Utils import GraphData
def load_ogb_dataset(name, data_path):
dataset = PygNodePropPredDataset(name=name, root=data_path)
split_idx = dataset.get_idx_split()
g = dataset[0]
n_node = g.num_nodes
node_data={}
node_data['train_mask'] = torch.zeros(n_node, dtype=torch.bool)
node_data['val_mask'] = torch.zeros(n_node, dtype=torch.bool)
node_data['test_mask'] = torch.zeros(n_node, dtype=torch.bool)
node_data['train_mask'][split_idx["train"]] = True
node_data['val_mask'][split_idx["valid"]] = True
node_data['test_mask'][split_idx["test"]] = True
return g, node_data
g, node_data = load_ogb_dataset('ogbn-products', "/home/hzq/code/gnn/my_sampler/NewSample/dataset")
print(g)
# for worker in [1,2,3,4,5,6,7,8,9,10,20,30]:
# 使用tnb时需将edge_index设为空
g_data = GraphData(1, edge_index=None, data=g, partptr=torch.tensor([0, g.num_nodes//3, g.num_nodes//3*2]))
from neighbor_sampler import NeighborSampler
pre = time.time()
from neighbor_sampler import get_neighbors
from random_walk_sampler import RandomWalkSampler
row, col = g.edge_index
tnb = get_neighbors(row.contiguous(), col.contiguous(), g.num_nodes)
sampler = NeighborSampler(g.num_nodes, num_layers=2, fanout=[100,100], graph_data=g_data, workers=10, tnb=tnb)
# sampler = RandomWalkSampler(g.num_nodes, num_layers=2, graph_data=g_data, workers=10, tnb=tnb)
end = time.time()
print("init time:", end-pre)
# from torch_geometric.sampler import NeighborSampler, NumNeighbors, NodeSamplerInput, SamplerOutput
# pre = time.time()
# num_nei = NumNeighbors([100, 100])
# node_idx = NodeSamplerInput(input_id=None, node=torch.tensor(range(g.num_nodes//3, g.num_nodes//3+800000)))# (input_id=None, node=torch.masked_select(torch.arange(g.num_nodes),node_data['train_mask']))
# sampler = NeighborSampler(g, num_nei)
# end = time.time()
# print("init time:", end-pre)
pre = time.time()
node, edge = sampler.sample_from_nodes(torch.tensor(range(g.num_nodes//3, g.num_nodes//3+800000)))# sampler.sample_from_nodes(torch.masked_select(torch.arange(g.num_nodes),node_data['train_mask']))
# out = sampler.sample_from_nodes(node_idx)
# node = out.node
# edge = [out.row, out.col]
end = time.time()
print('node:', node)
print('edge:', edge)
print("sample time", end-pre)
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from conans import ConanFile, tools
import os
class SparseppConan(ConanFile):
name = "parallel_hashmap"
version = "1.27"
description = "A header-only, very fast and memory-friendly hash map"
# Indicates License type of the packaged library
license = "https://github.com/greg7mdp/parallel-hashmap/blob/master/LICENSE"
# Packages the license for the conanfile.py
exports = ["LICENSE"]
# Custom attributes for Bincrafters recipe conventions
source_subfolder = "source_subfolder"
def source(self):
source_url = "https://github.com/greg7mdp/parallel-hashmap"
tools.get("{0}/archive/{1}.tar.gz".format(source_url, self.version))
extracted_dir = self.name + "-" + self.version
#Rename to "source_folder" is a convention to simplify later steps
os.rename(extracted_dir, self.source_subfolder)
def package(self):
include_folder = os.path.join(self.source_subfolder, "parallel_hashmap")
self.copy(pattern="LICENSE")
self.copy(pattern="*", dst="include/parallel_hashmap", src=include_folder)
def package_id(self):
self.info.header_only()
#if !defined(spp_memory_h_guard)
#define spp_memory_h_guard
#include <cstdint>
#include <cstring>
#include <cstdlib>
#if defined(_WIN32) || defined( __CYGWIN__)
#define SPP_WIN
#endif
#ifdef SPP_WIN
#include <windows.h>
#include <Psapi.h>
#undef min
#undef max
#elif defined(__linux__)
#include <sys/types.h>
#include <sys/sysinfo.h>
#elif defined(__FreeBSD__)
#include <paths.h>
#include <fcntl.h>
#include <kvm.h>
#include <unistd.h>
#include <sys/sysctl.h>
#include <sys/user.h>
#endif
namespace spp
{
uint64_t GetSystemMemory();
uint64_t GetTotalMemoryUsed();
uint64_t GetProcessMemoryUsed();
uint64_t GetPhysicalMemory();
uint64_t GetSystemMemory()
{
#ifdef SPP_WIN
MEMORYSTATUSEX memInfo;
memInfo.dwLength = sizeof(MEMORYSTATUSEX);
GlobalMemoryStatusEx(&memInfo);
return static_cast<uint64_t>(memInfo.ullTotalPageFile);
#elif defined(__linux__)
struct sysinfo memInfo;
sysinfo (&memInfo);
auto totalVirtualMem = memInfo.totalram;
totalVirtualMem += memInfo.totalswap;
totalVirtualMem *= memInfo.mem_unit;
return static_cast<uint64_t>(totalVirtualMem);
#elif defined(__FreeBSD__)
kvm_t *kd;
u_int pageCnt;
size_t pageCntLen = sizeof(pageCnt);
u_int pageSize;
struct kvm_swap kswap;
uint64_t totalVirtualMem;
pageSize = static_cast<u_int>(getpagesize());
sysctlbyname("vm.stats.vm.v_page_count", &pageCnt, &pageCntLen, NULL, 0);
totalVirtualMem = pageCnt * pageSize;
kd = kvm_open(NULL, _PATH_DEVNULL, NULL, O_RDONLY, "kvm_open");
kvm_getswapinfo(kd, &kswap, 1, 0);
kvm_close(kd);
totalVirtualMem += kswap.ksw_total * pageSize;
return totalVirtualMem;
#else
return 0;
#endif
}
uint64_t GetTotalMemoryUsed()
{
#ifdef SPP_WIN
MEMORYSTATUSEX memInfo;
memInfo.dwLength = sizeof(MEMORYSTATUSEX);
GlobalMemoryStatusEx(&memInfo);
return static_cast<uint64_t>(memInfo.ullTotalPageFile - memInfo.ullAvailPageFile);
#elif defined(__linux__)
struct sysinfo memInfo;
sysinfo(&memInfo);
auto virtualMemUsed = memInfo.totalram - memInfo.freeram;
virtualMemUsed += memInfo.totalswap - memInfo.freeswap;
virtualMemUsed *= memInfo.mem_unit;
return static_cast<uint64_t>(virtualMemUsed);
#elif defined(__FreeBSD__)
kvm_t *kd;
u_int pageSize;
u_int pageCnt, freeCnt;
size_t pageCntLen = sizeof(pageCnt);
size_t freeCntLen = sizeof(freeCnt);
struct kvm_swap kswap;
uint64_t virtualMemUsed;
pageSize = static_cast<u_int>(getpagesize());
sysctlbyname("vm.stats.vm.v_page_count", &pageCnt, &pageCntLen, NULL, 0);
sysctlbyname("vm.stats.vm.v_free_count", &freeCnt, &freeCntLen, NULL, 0);
virtualMemUsed = (pageCnt - freeCnt) * pageSize;
kd = kvm_open(NULL, _PATH_DEVNULL, NULL, O_RDONLY, "kvm_open");
kvm_getswapinfo(kd, &kswap, 1, 0);
kvm_close(kd);
virtualMemUsed += kswap.ksw_used * pageSize;
return virtualMemUsed;
#else
return 0;
#endif
}
uint64_t GetProcessMemoryUsed()
{
#ifdef SPP_WIN
PROCESS_MEMORY_COUNTERS_EX pmc;
GetProcessMemoryInfo(GetCurrentProcess(), reinterpret_cast<PPROCESS_MEMORY_COUNTERS>(&pmc), sizeof(pmc));
return static_cast<uint64_t>(pmc.PrivateUsage);
#elif defined(__linux__)
auto parseLine =
[](char* line)->int
{
auto i = strlen(line);
while(*line < '0' || *line > '9')
{
line++;
}
line[i-3] = '\0';
i = atoi(line);
return i;
};
auto file = fopen("/proc/self/status", "r");
auto result = -1;
char line[128];
while(fgets(line, 128, file) != nullptr)
{
if(strncmp(line, "VmSize:", 7) == 0)
{
result = parseLine(line);
break;
}
}
fclose(file);
return static_cast<uint64_t>(result) * 1024;
#elif defined(__FreeBSD__)
struct kinfo_proc info;
size_t infoLen = sizeof(info);
int mib[] = { CTL_KERN, KERN_PROC, KERN_PROC_PID, getpid() };
sysctl(mib, sizeof(mib) / sizeof(*mib), &info, &infoLen, NULL, 0);
return static_cast<uint64_t>(info.ki_rssize * getpagesize());
#else
return 0;
#endif
}
uint64_t GetPhysicalMemory()
{
#ifdef SPP_WIN
MEMORYSTATUSEX memInfo;
memInfo.dwLength = sizeof(MEMORYSTATUSEX);
GlobalMemoryStatusEx(&memInfo);
return static_cast<uint64_t>(memInfo.ullTotalPhys);
#elif defined(__linux__)
struct sysinfo memInfo;
sysinfo(&memInfo);
auto totalPhysMem = memInfo.totalram;
totalPhysMem *= memInfo.mem_unit;
return static_cast<uint64_t>(totalPhysMem);
#elif defined(__FreeBSD__)
u_long physMem;
size_t physMemLen = sizeof(physMem);
int mib[] = { CTL_HW, HW_PHYSMEM };
sysctl(mib, sizeof(mib) / sizeof(*mib), &physMem, &physMemLen, NULL, 0);
return physMem;
#else
return 0;
#endif
}
}
#endif // spp_memory_h_guard
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
#if !defined(phmap_dump_h_guard_)
#define phmap_dump_h_guard_
// ---------------------------------------------------------------------------
// Copyright (c) 2019, Gregory Popovitch - greg7mdp@gmail.com
//
// providing dump/load/mmap_load
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ---------------------------------------------------------------------------
#include <iostream>
#include <fstream>
#include <sstream>
#include "phmap.h"
namespace phmap
{
namespace type_traits_internal {
#if defined(__GLIBCXX__) && __GLIBCXX__ < 20150801
template<typename T> struct IsTriviallyCopyable : public std::integral_constant<bool, __has_trivial_copy(T)> {};
#else
template<typename T> struct IsTriviallyCopyable : public std::is_trivially_copyable<T> {};
#endif
template <class T1, class T2>
struct IsTriviallyCopyable<std::pair<T1, T2>> {
static constexpr bool value = IsTriviallyCopyable<T1>::value && IsTriviallyCopyable<T2>::value;
};
}
namespace priv {
// ------------------------------------------------------------------------
// dump/load for raw_hash_set
// ------------------------------------------------------------------------
template <class Policy, class Hash, class Eq, class Alloc>
template<typename OutputArchive>
bool raw_hash_set<Policy, Hash, Eq, Alloc>::dump(OutputArchive& ar) const {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
if (!ar.dump(size_)) {
std::cerr << "Failed to dump size_" << std::endl;
return false;
}
if (size_ == 0) {
return true;
}
if (!ar.dump(capacity_)) {
std::cerr << "Failed to dump capacity_" << std::endl;
return false;
}
if (!ar.dump(reinterpret_cast<char*>(ctrl_),
sizeof(ctrl_t) * (capacity_ + Group::kWidth + 1))) {
std::cerr << "Failed to dump ctrl_" << std::endl;
return false;
}
if (!ar.dump(reinterpret_cast<char*>(slots_),
sizeof(slot_type) * capacity_)) {
std::cerr << "Failed to dump slot_" << std::endl;
return false;
}
return true;
}
template <class Policy, class Hash, class Eq, class Alloc>
template<typename InputArchive>
bool raw_hash_set<Policy, Hash, Eq, Alloc>::load(InputArchive& ar) {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
raw_hash_set<Policy, Hash, Eq, Alloc>().swap(*this); // clear any existing content
if (!ar.load(&size_)) {
std::cerr << "Failed to load size_" << std::endl;
return false;
}
if (size_ == 0) {
return true;
}
if (!ar.load(&capacity_)) {
std::cerr << "Failed to load capacity_" << std::endl;
return false;
}
// allocate memory for ctrl_ and slots_
initialize_slots();
if (!ar.load(reinterpret_cast<char*>(ctrl_),
sizeof(ctrl_t) * (capacity_ + Group::kWidth + 1))) {
std::cerr << "Failed to load ctrl" << std::endl;
return false;
}
if (!ar.load(reinterpret_cast<char*>(slots_),
sizeof(slot_type) * capacity_)) {
std::cerr << "Failed to load slot" << std::endl;
return false;
}
return true;
}
// ------------------------------------------------------------------------
// dump/load for parallel_hash_set
// ------------------------------------------------------------------------
template <size_t N,
template <class, class, class, class> class RefSet,
class Mtx_,
class Policy, class Hash, class Eq, class Alloc>
template<typename OutputArchive>
bool parallel_hash_set<N, RefSet, Mtx_, Policy, Hash, Eq, Alloc>::dump(OutputArchive& ar) const {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
if (! ar.dump(subcnt())) {
std::cerr << "Failed to dump meta!" << std::endl;
return false;
}
for (size_t i = 0; i < sets_.size(); ++i) {
auto& inner = sets_[i];
typename Lockable::UniqueLock m(const_cast<Inner&>(inner));
if (!inner.set_.dump(ar)) {
std::cerr << "Failed to dump submap " << i << std::endl;
return false;
}
}
return true;
}
template <size_t N,
template <class, class, class, class> class RefSet,
class Mtx_,
class Policy, class Hash, class Eq, class Alloc>
template<typename InputArchive>
bool parallel_hash_set<N, RefSet, Mtx_, Policy, Hash, Eq, Alloc>::load(InputArchive& ar) {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
size_t submap_count = 0;
if (!ar.load(&submap_count)) {
std::cerr << "Failed to load submap count!" << std::endl;
return false;
}
if (submap_count != subcnt()) {
std::cerr << "submap count(" << submap_count << ") != N(" << N << ")" << std::endl;
return false;
}
for (size_t i = 0; i < submap_count; ++i) {
auto& inner = sets_[i];
typename Lockable::UniqueLock m(const_cast<Inner&>(inner));
if (!inner.set_.load(ar)) {
std::cerr << "Failed to load submap " << i << std::endl;
return false;
}
}
return true;
}
} // namespace priv
// ------------------------------------------------------------------------
// BinaryArchive
// File is closed when archive object is destroyed
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
class BinaryOutputArchive {
public:
BinaryOutputArchive(const char *file_path) {
ofs_.open(file_path, std::ios_base::binary);
}
bool dump(const char *p, size_t sz) {
ofs_.write(p, sz);
return true;
}
template<typename V>
typename std::enable_if<type_traits_internal::IsTriviallyCopyable<V>::value, bool>::type
dump(const V& v) {
ofs_.write(reinterpret_cast<const char *>(&v), sizeof(V));
return true;
}
private:
std::ofstream ofs_;
};
class BinaryInputArchive {
public:
BinaryInputArchive(const char * file_path) {
ifs_.open(file_path, std::ios_base::binary);
}
bool load(char* p, size_t sz) {
ifs_.read(p, sz);
return true;
}
template<typename V>
typename std::enable_if<type_traits_internal::IsTriviallyCopyable<V>::value, bool>::type
load(V* v) {
ifs_.read(reinterpret_cast<char *>(v), sizeof(V));
return true;
}
private:
std::ifstream ifs_;
};
} // namespace phmap
#endif // phmap_dump_h_guard_
#if !defined(phmap_fwd_decl_h_guard_)
#define phmap_fwd_decl_h_guard_
// ---------------------------------------------------------------------------
// Copyright (c) 2019, Gregory Popovitch - greg7mdp@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
// ---------------------------------------------------------------------------
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4514) // unreferenced inline function has been removed
#pragma warning(disable : 4710) // function not inlined
#pragma warning(disable : 4711) // selected for automatic inline expansion
#endif
#include <memory>
#include <utility>
#if defined(PHMAP_USE_ABSL_HASH) && !defined(ABSL_HASH_HASH_H_)
namespace absl { template <class T> struct Hash; };
#endif
namespace phmap {
#if defined(PHMAP_USE_ABSL_HASH)
template <class T> using Hash = ::absl::Hash<T>;
#else
template <class T> struct Hash;
#endif
template <class T> struct EqualTo;
template <class T> struct Less;
template <class T> using Allocator = typename std::allocator<T>;
template<class T1, class T2> using Pair = typename std::pair<T1, T2>;
class NullMutex;
namespace priv {
// The hash of an object of type T is computed by using phmap::Hash.
template <class T, class E = void>
struct HashEq
{
using Hash = phmap::Hash<T>;
using Eq = phmap::EqualTo<T>;
};
template <class T>
using hash_default_hash = typename priv::HashEq<T>::Hash;
template <class T>
using hash_default_eq = typename priv::HashEq<T>::Eq;
// type alias for std::allocator so we can forward declare without including other headers
template <class T>
using Allocator = typename phmap::Allocator<T>;
// type alias for std::pair so we can forward declare without including other headers
template<class T1, class T2>
using Pair = typename phmap::Pair<T1, T2>;
} // namespace priv
// ------------- forward declarations for hash containers ----------------------------------
template <class T,
class Hash = phmap::priv::hash_default_hash<T>,
class Eq = phmap::priv::hash_default_eq<T>,
class Alloc = phmap::priv::Allocator<T>> // alias for std::allocator
class flat_hash_set;
template <class K, class V,
class Hash = phmap::priv::hash_default_hash<K>,
class Eq = phmap::priv::hash_default_eq<K>,
class Alloc = phmap::priv::Allocator<
phmap::priv::Pair<const K, V>>> // alias for std::allocator
class flat_hash_map;
template <class T,
class Hash = phmap::priv::hash_default_hash<T>,
class Eq = phmap::priv::hash_default_eq<T>,
class Alloc = phmap::priv::Allocator<T>> // alias for std::allocator
class node_hash_set;
template <class Key, class Value,
class Hash = phmap::priv::hash_default_hash<Key>,
class Eq = phmap::priv::hash_default_eq<Key>,
class Alloc = phmap::priv::Allocator<
phmap::priv::Pair<const Key, Value>>> // alias for std::allocator
class node_hash_map;
template <class T,
class Hash = phmap::priv::hash_default_hash<T>,
class Eq = phmap::priv::hash_default_eq<T>,
class Alloc = phmap::priv::Allocator<T>, // alias for std::allocator
size_t N = 4, // 2**N submaps
class Mutex = phmap::NullMutex> // use std::mutex to enable internal locks
class parallel_flat_hash_set;
template <class K, class V,
class Hash = phmap::priv::hash_default_hash<K>,
class Eq = phmap::priv::hash_default_eq<K>,
class Alloc = phmap::priv::Allocator<
phmap::priv::Pair<const K, V>>, // alias for std::allocator
size_t N = 4, // 2**N submaps
class Mutex = phmap::NullMutex> // use std::mutex to enable internal locks
class parallel_flat_hash_map;
template <class T,
class Hash = phmap::priv::hash_default_hash<T>,
class Eq = phmap::priv::hash_default_eq<T>,
class Alloc = phmap::priv::Allocator<T>, // alias for std::allocator
size_t N = 4, // 2**N submaps
class Mutex = phmap::NullMutex> // use std::mutex to enable internal locks
class parallel_node_hash_set;
template <class Key, class Value,
class Hash = phmap::priv::hash_default_hash<Key>,
class Eq = phmap::priv::hash_default_eq<Key>,
class Alloc = phmap::priv::Allocator<
phmap::priv::Pair<const Key, Value>>, // alias for std::allocator
size_t N = 4, // 2**N submaps
class Mutex = phmap::NullMutex> // use std::mutex to enable internal locks
class parallel_node_hash_map;
// ------------- forward declarations for btree containers ----------------------------------
template <typename Key, typename Compare = phmap::Less<Key>,
typename Alloc = phmap::Allocator<Key>>
class btree_set;
template <typename Key, typename Compare = phmap::Less<Key>,
typename Alloc = phmap::Allocator<Key>>
class btree_multiset;
template <typename Key, typename Value, typename Compare = phmap::Less<Key>,
typename Alloc = phmap::Allocator<phmap::priv::Pair<const Key, Value>>>
class btree_map;
template <typename Key, typename Value, typename Compare = phmap::Less<Key>,
typename Alloc = phmap::Allocator<phmap::priv::Pair<const Key, Value>>>
class btree_multimap;
} // namespace phmap
#ifdef _MSC_VER
#pragma warning(pop)
#endif
#endif // phmap_fwd_decl_h_guard_
import torch
import torch.multiprocessing as mp
from typing import Optional, Tuple
from base import BaseSampler, NegativeSampling
from neighbor_sampler import NeighborSampler
class RandomWalkSampler(BaseSampler):
def __init__(
self,
num_nodes: int,
num_layers: int,
graph_data,
workers = 1,
tnb = None
) -> None:
r"""__init__
Args:
num_nodes: the num of all nodes in the graph
num_layers: the num of layers to be sampled
fanout: the list of max neighbors' number chosen for each layer
workers: the number of threads, default value is 1
graph_data: graph data in this partition
tnb: all nodes' neighbors info
"""
super().__init__()
# if(edge_index is not None):
# self.sampler = NeighborSampler(num_nodes, num_layers, [1 for _ in range(num_layers)],
# workers, edge_index=edge_index)
# elif(tnb is not None):
# self.sampler = NeighborSampler(num_nodes, num_layers, [1 for _ in range(num_layers)],
# workers, tnb=tnb)
# else:
# raise Exception("Not enough parameters")
if(tnb is not None):
self.sampler = NeighborSampler(num_nodes, num_layers, [1 for _ in range(num_layers)],
graph_data, workers, tnb=tnb)
else:
self.sampler = NeighborSampler(num_nodes, num_layers, [1 for _ in range(num_layers)],
graph_data, workers)
self.num_layers = num_layers
# 线程数不超过torch默认的omp线程数
self.workers = min(workers, torch.get_num_threads())
def sample_from_nodes(
self,
nodes: torch.Tensor
) -> Tuple[torch.Tensor, list]:
r"""Performs mutilayer sampling from the nodes specified in: nodes
The specific number of layers is determined by parameter: num_layers
returning a sampled subgraph in the specified output format: Tuple[torch.Tensor, list].
Args:
nodes: the list of seed nodes index
Returns:
sampled_nodes: the node sampled
sampled_edge_index: the edge sampled
"""
return self.sampler.sample_from_nodes(nodes)
def sample_from_edges(
self,
edges: torch.Tensor,
edge_label: Optional[torch.Tensor] = None,
neg_sampling: Optional[NegativeSampling] = None
) -> Tuple[torch.Tensor, list]:
r"""Performs sampling from the edges specified in :obj:`index`,
returning a sampled subgraph in the specified output format.
Args:
edges: the list of seed edges index
edge_label: the label for the seed edges.
neg_sampling: The negative sampling configuration
Returns:
sampled_nodes: the nodes sampled
sampled_edge_index_list: the edges sampled
"""
return self.sampler.sample_from_edges(edges, edge_label, neg_sampling)
if __name__=="__main__":
edge_index1 = torch.tensor([[0, 1, 1, 1, 2, 2, 2, 4, 4, 4, 5], # , 3, 3
[1, 0, 2, 4, 1, 3, 0, 3, 5, 0, 2]])# , 2, 5
num_nodes1 = 6
# Run the random walk sampling
sampler=RandomWalkSampler(edge_index=edge_index1, num_nodes=num_nodes1, num_layers=3, workers=4)
sampled_nodes, sampled_edge_index = sampler.sample_from_nodes(torch.tensor([1,2]))
# Print the result
print('sampled_nodes_id: \n',sampled_nodes, '\nedge_index: \n',sampled_edge_index)
from setuptools import setup
from torch.utils.cpp_extension import BuildExtension, CppExtension
setup(
name='sample_cores',
ext_modules=[
CppExtension(
name='sample_cores',
sources=['sample_cores.cpp'],
extra_compile_args=['-fopenmp','-Xlinker',' -export-dynamic'],
include_dirs=["/home/hzq/code/gnn/my_sampler/Sample"],
),
],
cmdclass={
'build_ext': BuildExtension
})
import os.path as osp
import torch
class GraphData():
def __init__(self, path):
assert path is not None and osp.exists(path),'path 不存在'
id,edge_index,data,partptr =torch.load(path)
# 当前分区序号
self.partition_id = id
# 总分区数
self.partitions = partptr.numel() - 1
# 全图结构数据
self.num_nodes = partptr[self.partitions]
self.num_edges = edge_index[0].numel()
self.edge_index = edge_index
# 该分区下的数据(包含特征向量和子图结构)pyg Data数据结构
self.data = data
# 分区映射关系
self.partptr = partptr
def __init__(self, id, edge_index, data, partptr):
# 当前分区序号
self.partition_id = id
# 总分区数
self.partitions = partptr.numel() - 1
# 全图结构数据
self.num_nodes = partptr[self.partitions]
if edge_index is not None:
self.num_edges = edge_index[0].numel()
self.edge_index = edge_index
# 该分区下的数据(包含特征向量和子图结构)pyg Data数据结构
self.data = data
# 分区映射关系
self.partptr = partptr
def select_attr(self,index):
return torch.index_select(self.data.x,0,index)
#返回全局的节点id 所对应的分区
def get_part_num(self):
return self.data.x.size()[0]
def select_attr(self,index):
return torch.index_select(self.data.x,0,index)
def select_y(self,index):
return torch.index_select(self.data.y,0,index)
#返回全局的节点id 所对应的分区
def get_localId_by_partitionId(self,id,index):
#print(index)
if(id == -1 or id == 0):
return index
else:
return torch.add(index,-self.partptr[id])
def get_globalId_by_partitionId(self,id,index):
if(id == -1 or id == 0):
return index
else:
return torch.add(index,self.partptr[id])
def get_node_num(self):
return self.num_nodes
def localId_to_globalId(self,id,partitionId:int = -1):
'''
将分区partitionId内的点id映射为全局的id
'''
if partitionId == -1:
partitionId = self.partition_id
assert id >=self.partptr[self.partition_id] and id < self.partptr[self.partition_id+1]
ids_before = 0
if self.partition_id>0:
ids_before = self.partptr[self.partition_id-1]
return id+ids_before
def get_partitionId_by_globalId(self,id):
'''
通过全局id得到对应的分区序号
'''
partitionId = -1
assert id>=0 and id<self.num_nodes,'id 超过范围'
for i in range(self.partitions):
if id>=self.partptr[i] and id<self.partptr[i+1]:
partitionId = i
break
assert partitionId>=0, 'id 不存在对应的分区'
return partitionId
def get_nodes_by_partitionId(self,id):
'''
根据partitioId 返回该分区的节点数量
'''
assert id>=0 and id<self.partitions,'partitionId 非法'
return (int)(self.partptr[id+1]-self.partptr[id])
def __repr__(self):
return (f'{self.__class__.__name__}(\n'
f' partition_id={self.partition_id}\n'
f' data={self.data},\n'
f' global_info('
f'num_nodes={self.num_nodes},'
f' num_edges={self.num_edges},'
f' num_parts={self.partitions},'
f' edge_index=[2,{self.edge_index[0].numel()}])\n'
f')')
......@@ -125,6 +125,7 @@ class BaseSampler(ABC):
Returns:
sampled_nodes: the nodes sampled
sampled_edge_index_list: the edges sampled
metadata: other infomation
"""
raise NotImplementedError
......
import time
import torch
# import sys
# from os.path import abspath, dirname
......@@ -11,7 +12,10 @@ edge_index = torch.tensor([[0, 1, 1, 1, 2, 2, 2, 3, 3, 4, 4, 4, 5], [1, 0, 2, 4,
num_nodes = 6
num_neighbors = 2
# Run the neighbor sampling
pre = time.time()
sampler=NeighborSampler(edge_index=edge_index, num_nodes=num_nodes, num_layers=2, workers=2, fanout=[2, 1])
end = time.time()
print("neighbor time:", end-pre)
# neighbor_nodes, sampled_edge_index = sampler._sample_one_layer_from_nodes(nodes=torch.tensor([1,3]), fanout=num_neighbors)
neighbor_nodes, sampled_edge_index = sampler.sample_from_nodes(torch.tensor([1,2,3]))
......@@ -31,10 +35,11 @@ print('neighbor_nodes_id: \n',neighbor_nodes, '\nedge_index: \n',sampled_edge_in
# print('neighbors: \n', neighbors[0]==1)
from base import NegativeSampling
edge_index = torch.tensor([[0, 1, 1, 1, 2, 2, 2, 3, 3, 4, 4, 4, 5], [1, 0, 2, 4, 1, 3, 0, 2, 5, 3, 5, 0, 2]])
edge_index = torch.tensor([[0, 1, 1, 1, 2, 2, 2, 3, 3, 4, 4, 4, 5],
[1, 0, 2, 4, 1, 3, 0, 2, 5, 3, 5, 0, 2]])
num_nodes = 6
# sampler
sampler=NeighborSampler(edge_index=edge_index, num_nodes=num_nodes, num_layers=2, workers=2, fanout=[2, 1])
sampler=NeighborSampler(edge_index=edge_index.clone(), num_nodes=num_nodes, num_layers=2, workers=2, fanout=[2, 1])
# negative
weight = torch.tensor([0.3,0.1,0.1,0.1,0.3,0.1])
......
......@@ -7,7 +7,7 @@ import torch.multiprocessing as mp
from typing import Optional, Tuple
from base import BaseSampler, NegativeSampling
from sample_cores import get_neighbors, neighbor_sample_from_nodes, heads_unique
from sample_cores import get_neighbors, neighbor_sample_from_nodes, heads_unique, TemporalNeighborBlock
class NeighborSampler(BaseSampler):
def __init__(
......@@ -15,10 +15,9 @@ class NeighborSampler(BaseSampler):
num_nodes: int,
num_layers: int,
fanout: list,
workers = 1,
edge_index : Optional[torch.Tensor] = None,
deg = None,
neighbors = None
graph_data,
workers = 1,
tnb = None
) -> None:
r"""__init__
Args:
......@@ -26,10 +25,8 @@ class NeighborSampler(BaseSampler):
num_layers: the num of layers to be sampled
fanout: the list of max neighbors' number chosen for each layer
workers: the number of threads, default value is 1
edge_index: all edges in the graph
neighbors: all nodes' neighbors
deg: the degree of all nodes
should provide edge_index or (neighbors, deg)
graph_data: graph data in this partition
tnb: all nodes' neighbors info
"""
super().__init__()
self.num_layers = num_layers
......@@ -37,16 +34,13 @@ class NeighborSampler(BaseSampler):
self.workers = min(workers, torch.get_num_threads())
self.fanout = fanout
self.num_nodes = num_nodes
if(edge_index is not None):
row, col = edge_index
tnb = get_neighbors(row.tolist(), col.tolist(), num_nodes, self.workers)
self.neighbors = tnb.neighbors
self.deg = tnb.deg
self.graph_data=graph_data
if(tnb is None):
row, col = graph_data.edge_index
self.tnb = get_neighbors(row.contiguous(), col.contiguous(), num_nodes)
else:
assert deg is not None
assert neighbors is not None
self.neighbors = neighbors
self.deg = deg
assert tnb is not None
self.tnb = tnb
def _get_sample_info(self):
return self.num_nodes,self.num_layers,self.fanout,self.workers
......@@ -73,16 +67,18 @@ class NeighborSampler(BaseSampler):
sampled_edge_index_list: the edge sampled
"""
sampled_edge_index_list = []
sampled_nodes = torch.IntTensor([])
src_nodes = nodes.tolist()
sampled_nodes = [torch.LongTensor([]), torch.LongTensor([])]
src_nodes = nodes
assert self.workers > 0, 'Workers should be positive integer!!!'
for i in range(0, self.num_layers):
sampled_nodes_i, sampled_edge_index_i = self._sample_one_layer_from_nodes(nodes, self.fanout[i])
sampled_nodes = torch.cat([sampled_nodes, sampled_nodes_i])
nodes = torch.unique(sampled_edge_index_i[1])
sampled_nodes[0] = torch.cat([sampled_nodes[0], sampled_nodes_i[0]])
sampled_nodes[1] = torch.cat([sampled_nodes[1], sampled_nodes_i[1]])
nodes = sampled_nodes_i[0] # 目前只往外部分区采样一跳,因此第二层开始外部分区节点不进行邻居采样 TODO:外部分区发送请求给另外的采样节点
sampled_edge_index_list.append(sampled_edge_index_i)
sampled_nodes = heads_unique(sampled_nodes.tolist(), src_nodes)
return torch.tensor(sampled_nodes), sampled_edge_index_list
sampled_nodes[0] = heads_unique(sampled_nodes[0], src_nodes, self.workers)
sampled_nodes[1] = heads_unique(sampled_nodes[1], torch.tensor([]), self.workers)
return sampled_nodes, sampled_edge_index_list
def sample_from_edges(
self,
......@@ -155,10 +151,12 @@ class NeighborSampler(BaseSampler):
sampled_nodes: the nodes sampled
sampled_edge_index: the edges sampled
"""
tgb = neighbor_sample_from_nodes(nodes.tolist(), self.neighbors, self.deg, fanout, self.workers)
row = torch.IntTensor(tgb.row())
col = torch.IntTensor(tgb.col())
sampled_nodes = torch.IntTensor(tgb.nodes())
import time
pre = time.time()
row, col, a, b = neighbor_sample_from_nodes(nodes.contiguous(), self.tnb, self.graph_data.partition_id, self.graph_data.partptr.tolist(), fanout, self.workers)
sampled_nodes=[a,b]
end = time.time()
print("py sample one layer:", (end-pre))
return sampled_nodes, torch.stack([row,col], dim=0)
# def _sample_one_layer_from_nodes_parallel(
......@@ -176,9 +174,9 @@ class NeighborSampler(BaseSampler):
# sampled_nodes: the node sampled
# sampled_edge_index: the edge sampled
# """
# sampled_nodes=torch.IntTensor([])
# row=torch.IntTensor([])
# col=torch.IntTensor([])
# sampled_nodes=torch.LongTensor([])
# row=torch.LongTensor([])
# col=torch.LongTensor([])
# assert self.workers > 0, 'Workers should be positive integer!!!'
# with mp.Pool(processes=self.workers) as p:
# n=len(nodes)
......
import torch
from ogb.nodeproppred import PygNodePropPredDataset
from torch_geometric import datasets
import time
from Utils import GraphData
def load_ogb_dataset(name, data_path):
dataset = PygNodePropPredDataset(name=name, root=data_path)
split_idx = dataset.get_idx_split()
g = dataset[0]
n_node = g.num_nodes
node_data={}
node_data['train_mask'] = torch.zeros(n_node, dtype=torch.bool)
node_data['val_mask'] = torch.zeros(n_node, dtype=torch.bool)
node_data['test_mask'] = torch.zeros(n_node, dtype=torch.bool)
node_data['train_mask'][split_idx["train"]] = True
node_data['val_mask'][split_idx["valid"]] = True
node_data['test_mask'][split_idx["test"]] = True
return g, node_data
g, node_data = load_ogb_dataset('ogbn-products', "/home/hzq/code/gnn/my_sampler/NewSample/dataset")
print(g)
# for worker in [1,2,3,4,5,6,7,8,9,10,20,30]:
# 使用tnb时需将edge_index设为空
g_data = GraphData(1, edge_index=None, data=g, partptr=torch.tensor([0, g.num_nodes//3, g.num_nodes//3*2]))
from neighbor_sampler import NeighborSampler
pre = time.time()
from neighbor_sampler import get_neighbors
from random_walk_sampler import RandomWalkSampler
row, col = g.edge_index
tnb = get_neighbors(row.contiguous(), col.contiguous(), g.num_nodes)
sampler = NeighborSampler(g.num_nodes, num_layers=2, fanout=[100,100], graph_data=g_data, workers=10, tnb=tnb)
# sampler = RandomWalkSampler(g.num_nodes, num_layers=2, graph_data=g_data, workers=10, tnb=tnb)
end = time.time()
print("init time:", end-pre)
# from torch_geometric.sampler import NeighborSampler, NumNeighbors, NodeSamplerInput, SamplerOutput
# pre = time.time()
# num_nei = NumNeighbors([100, 100])
# node_idx = NodeSamplerInput(input_id=None, node=torch.tensor(range(g.num_nodes//3, g.num_nodes//3+800000)))# (input_id=None, node=torch.masked_select(torch.arange(g.num_nodes),node_data['train_mask']))
# sampler = NeighborSampler(g, num_nei)
# end = time.time()
# print("init time:", end-pre)
pre = time.time()
node, edge = sampler.sample_from_nodes(torch.tensor(range(g.num_nodes//3, g.num_nodes//3+800000)))# sampler.sample_from_nodes(torch.masked_select(torch.arange(g.num_nodes),node_data['train_mask']))
# out = sampler.sample_from_nodes(node_idx)
# node = out.node
# edge = [out.row, out.col]
end = time.time()
print('node:', node)
print('edge:', edge)
print("sample time", end-pre)
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from conans import ConanFile, tools
import os
class SparseppConan(ConanFile):
name = "parallel_hashmap"
version = "1.27"
description = "A header-only, very fast and memory-friendly hash map"
# Indicates License type of the packaged library
license = "https://github.com/greg7mdp/parallel-hashmap/blob/master/LICENSE"
# Packages the license for the conanfile.py
exports = ["LICENSE"]
# Custom attributes for Bincrafters recipe conventions
source_subfolder = "source_subfolder"
def source(self):
source_url = "https://github.com/greg7mdp/parallel-hashmap"
tools.get("{0}/archive/{1}.tar.gz".format(source_url, self.version))
extracted_dir = self.name + "-" + self.version
#Rename to "source_folder" is a convention to simplify later steps
os.rename(extracted_dir, self.source_subfolder)
def package(self):
include_folder = os.path.join(self.source_subfolder, "parallel_hashmap")
self.copy(pattern="LICENSE")
self.copy(pattern="*", dst="include/parallel_hashmap", src=include_folder)
def package_id(self):
self.info.header_only()
#if !defined(spp_memory_h_guard)
#define spp_memory_h_guard
#include <cstdint>
#include <cstring>
#include <cstdlib>
#if defined(_WIN32) || defined( __CYGWIN__)
#define SPP_WIN
#endif
#ifdef SPP_WIN
#include <windows.h>
#include <Psapi.h>
#undef min
#undef max
#elif defined(__linux__)
#include <sys/types.h>
#include <sys/sysinfo.h>
#elif defined(__FreeBSD__)
#include <paths.h>
#include <fcntl.h>
#include <kvm.h>
#include <unistd.h>
#include <sys/sysctl.h>
#include <sys/user.h>
#endif
namespace spp
{
uint64_t GetSystemMemory();
uint64_t GetTotalMemoryUsed();
uint64_t GetProcessMemoryUsed();
uint64_t GetPhysicalMemory();
uint64_t GetSystemMemory()
{
#ifdef SPP_WIN
MEMORYSTATUSEX memInfo;
memInfo.dwLength = sizeof(MEMORYSTATUSEX);
GlobalMemoryStatusEx(&memInfo);
return static_cast<uint64_t>(memInfo.ullTotalPageFile);
#elif defined(__linux__)
struct sysinfo memInfo;
sysinfo (&memInfo);
auto totalVirtualMem = memInfo.totalram;
totalVirtualMem += memInfo.totalswap;
totalVirtualMem *= memInfo.mem_unit;
return static_cast<uint64_t>(totalVirtualMem);
#elif defined(__FreeBSD__)
kvm_t *kd;
u_int pageCnt;
size_t pageCntLen = sizeof(pageCnt);
u_int pageSize;
struct kvm_swap kswap;
uint64_t totalVirtualMem;
pageSize = static_cast<u_int>(getpagesize());
sysctlbyname("vm.stats.vm.v_page_count", &pageCnt, &pageCntLen, NULL, 0);
totalVirtualMem = pageCnt * pageSize;
kd = kvm_open(NULL, _PATH_DEVNULL, NULL, O_RDONLY, "kvm_open");
kvm_getswapinfo(kd, &kswap, 1, 0);
kvm_close(kd);
totalVirtualMem += kswap.ksw_total * pageSize;
return totalVirtualMem;
#else
return 0;
#endif
}
uint64_t GetTotalMemoryUsed()
{
#ifdef SPP_WIN
MEMORYSTATUSEX memInfo;
memInfo.dwLength = sizeof(MEMORYSTATUSEX);
GlobalMemoryStatusEx(&memInfo);
return static_cast<uint64_t>(memInfo.ullTotalPageFile - memInfo.ullAvailPageFile);
#elif defined(__linux__)
struct sysinfo memInfo;
sysinfo(&memInfo);
auto virtualMemUsed = memInfo.totalram - memInfo.freeram;
virtualMemUsed += memInfo.totalswap - memInfo.freeswap;
virtualMemUsed *= memInfo.mem_unit;
return static_cast<uint64_t>(virtualMemUsed);
#elif defined(__FreeBSD__)
kvm_t *kd;
u_int pageSize;
u_int pageCnt, freeCnt;
size_t pageCntLen = sizeof(pageCnt);
size_t freeCntLen = sizeof(freeCnt);
struct kvm_swap kswap;
uint64_t virtualMemUsed;
pageSize = static_cast<u_int>(getpagesize());
sysctlbyname("vm.stats.vm.v_page_count", &pageCnt, &pageCntLen, NULL, 0);
sysctlbyname("vm.stats.vm.v_free_count", &freeCnt, &freeCntLen, NULL, 0);
virtualMemUsed = (pageCnt - freeCnt) * pageSize;
kd = kvm_open(NULL, _PATH_DEVNULL, NULL, O_RDONLY, "kvm_open");
kvm_getswapinfo(kd, &kswap, 1, 0);
kvm_close(kd);
virtualMemUsed += kswap.ksw_used * pageSize;
return virtualMemUsed;
#else
return 0;
#endif
}
uint64_t GetProcessMemoryUsed()
{
#ifdef SPP_WIN
PROCESS_MEMORY_COUNTERS_EX pmc;
GetProcessMemoryInfo(GetCurrentProcess(), reinterpret_cast<PPROCESS_MEMORY_COUNTERS>(&pmc), sizeof(pmc));
return static_cast<uint64_t>(pmc.PrivateUsage);
#elif defined(__linux__)
auto parseLine =
[](char* line)->int
{
auto i = strlen(line);
while(*line < '0' || *line > '9')
{
line++;
}
line[i-3] = '\0';
i = atoi(line);
return i;
};
auto file = fopen("/proc/self/status", "r");
auto result = -1;
char line[128];
while(fgets(line, 128, file) != nullptr)
{
if(strncmp(line, "VmSize:", 7) == 0)
{
result = parseLine(line);
break;
}
}
fclose(file);
return static_cast<uint64_t>(result) * 1024;
#elif defined(__FreeBSD__)
struct kinfo_proc info;
size_t infoLen = sizeof(info);
int mib[] = { CTL_KERN, KERN_PROC, KERN_PROC_PID, getpid() };
sysctl(mib, sizeof(mib) / sizeof(*mib), &info, &infoLen, NULL, 0);
return static_cast<uint64_t>(info.ki_rssize * getpagesize());
#else
return 0;
#endif
}
uint64_t GetPhysicalMemory()
{
#ifdef SPP_WIN
MEMORYSTATUSEX memInfo;
memInfo.dwLength = sizeof(MEMORYSTATUSEX);
GlobalMemoryStatusEx(&memInfo);
return static_cast<uint64_t>(memInfo.ullTotalPhys);
#elif defined(__linux__)
struct sysinfo memInfo;
sysinfo(&memInfo);
auto totalPhysMem = memInfo.totalram;
totalPhysMem *= memInfo.mem_unit;
return static_cast<uint64_t>(totalPhysMem);
#elif defined(__FreeBSD__)
u_long physMem;
size_t physMemLen = sizeof(physMem);
int mib[] = { CTL_HW, HW_PHYSMEM };
sysctl(mib, sizeof(mib) / sizeof(*mib), &physMem, &physMemLen, NULL, 0);
return physMem;
#else
return 0;
#endif
}
}
#endif // spp_memory_h_guard
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
#if !defined(phmap_dump_h_guard_)
#define phmap_dump_h_guard_
// ---------------------------------------------------------------------------
// Copyright (c) 2019, Gregory Popovitch - greg7mdp@gmail.com
//
// providing dump/load/mmap_load
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ---------------------------------------------------------------------------
#include <iostream>
#include <fstream>
#include <sstream>
#include "phmap.h"
namespace phmap
{
namespace type_traits_internal {
#if defined(__GLIBCXX__) && __GLIBCXX__ < 20150801
template<typename T> struct IsTriviallyCopyable : public std::integral_constant<bool, __has_trivial_copy(T)> {};
#else
template<typename T> struct IsTriviallyCopyable : public std::is_trivially_copyable<T> {};
#endif
template <class T1, class T2>
struct IsTriviallyCopyable<std::pair<T1, T2>> {
static constexpr bool value = IsTriviallyCopyable<T1>::value && IsTriviallyCopyable<T2>::value;
};
}
namespace priv {
// ------------------------------------------------------------------------
// dump/load for raw_hash_set
// ------------------------------------------------------------------------
template <class Policy, class Hash, class Eq, class Alloc>
template<typename OutputArchive>
bool raw_hash_set<Policy, Hash, Eq, Alloc>::dump(OutputArchive& ar) const {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
if (!ar.dump(size_)) {
std::cerr << "Failed to dump size_" << std::endl;
return false;
}
if (size_ == 0) {
return true;
}
if (!ar.dump(capacity_)) {
std::cerr << "Failed to dump capacity_" << std::endl;
return false;
}
if (!ar.dump(reinterpret_cast<char*>(ctrl_),
sizeof(ctrl_t) * (capacity_ + Group::kWidth + 1))) {
std::cerr << "Failed to dump ctrl_" << std::endl;
return false;
}
if (!ar.dump(reinterpret_cast<char*>(slots_),
sizeof(slot_type) * capacity_)) {
std::cerr << "Failed to dump slot_" << std::endl;
return false;
}
return true;
}
template <class Policy, class Hash, class Eq, class Alloc>
template<typename InputArchive>
bool raw_hash_set<Policy, Hash, Eq, Alloc>::load(InputArchive& ar) {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
raw_hash_set<Policy, Hash, Eq, Alloc>().swap(*this); // clear any existing content
if (!ar.load(&size_)) {
std::cerr << "Failed to load size_" << std::endl;
return false;
}
if (size_ == 0) {
return true;
}
if (!ar.load(&capacity_)) {
std::cerr << "Failed to load capacity_" << std::endl;
return false;
}
// allocate memory for ctrl_ and slots_
initialize_slots();
if (!ar.load(reinterpret_cast<char*>(ctrl_),
sizeof(ctrl_t) * (capacity_ + Group::kWidth + 1))) {
std::cerr << "Failed to load ctrl" << std::endl;
return false;
}
if (!ar.load(reinterpret_cast<char*>(slots_),
sizeof(slot_type) * capacity_)) {
std::cerr << "Failed to load slot" << std::endl;
return false;
}
return true;
}
// ------------------------------------------------------------------------
// dump/load for parallel_hash_set
// ------------------------------------------------------------------------
template <size_t N,
template <class, class, class, class> class RefSet,
class Mtx_,
class Policy, class Hash, class Eq, class Alloc>
template<typename OutputArchive>
bool parallel_hash_set<N, RefSet, Mtx_, Policy, Hash, Eq, Alloc>::dump(OutputArchive& ar) const {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
if (! ar.dump(subcnt())) {
std::cerr << "Failed to dump meta!" << std::endl;
return false;
}
for (size_t i = 0; i < sets_.size(); ++i) {
auto& inner = sets_[i];
typename Lockable::UniqueLock m(const_cast<Inner&>(inner));
if (!inner.set_.dump(ar)) {
std::cerr << "Failed to dump submap " << i << std::endl;
return false;
}
}
return true;
}
template <size_t N,
template <class, class, class, class> class RefSet,
class Mtx_,
class Policy, class Hash, class Eq, class Alloc>
template<typename InputArchive>
bool parallel_hash_set<N, RefSet, Mtx_, Policy, Hash, Eq, Alloc>::load(InputArchive& ar) {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
size_t submap_count = 0;
if (!ar.load(&submap_count)) {
std::cerr << "Failed to load submap count!" << std::endl;
return false;
}
if (submap_count != subcnt()) {
std::cerr << "submap count(" << submap_count << ") != N(" << N << ")" << std::endl;
return false;
}
for (size_t i = 0; i < submap_count; ++i) {
auto& inner = sets_[i];
typename Lockable::UniqueLock m(const_cast<Inner&>(inner));
if (!inner.set_.load(ar)) {
std::cerr << "Failed to load submap " << i << std::endl;
return false;
}
}
return true;
}
} // namespace priv
// ------------------------------------------------------------------------
// BinaryArchive
// File is closed when archive object is destroyed
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
class BinaryOutputArchive {
public:
BinaryOutputArchive(const char *file_path) {
ofs_.open(file_path, std::ios_base::binary);
}
bool dump(const char *p, size_t sz) {
ofs_.write(p, sz);
return true;
}
template<typename V>
typename std::enable_if<type_traits_internal::IsTriviallyCopyable<V>::value, bool>::type
dump(const V& v) {
ofs_.write(reinterpret_cast<const char *>(&v), sizeof(V));
return true;
}
private:
std::ofstream ofs_;
};
class BinaryInputArchive {
public:
BinaryInputArchive(const char * file_path) {
ifs_.open(file_path, std::ios_base::binary);
}
bool load(char* p, size_t sz) {
ifs_.read(p, sz);
return true;
}
template<typename V>
typename std::enable_if<type_traits_internal::IsTriviallyCopyable<V>::value, bool>::type
load(V* v) {
ifs_.read(reinterpret_cast<char *>(v), sizeof(V));
return true;
}
private:
std::ifstream ifs_;
};
} // namespace phmap
#endif // phmap_dump_h_guard_
#if !defined(phmap_fwd_decl_h_guard_)
#define phmap_fwd_decl_h_guard_
// ---------------------------------------------------------------------------
// Copyright (c) 2019, Gregory Popovitch - greg7mdp@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
// ---------------------------------------------------------------------------
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4514) // unreferenced inline function has been removed
#pragma warning(disable : 4710) // function not inlined
#pragma warning(disable : 4711) // selected for automatic inline expansion
#endif
#include <memory>
#include <utility>
#if defined(PHMAP_USE_ABSL_HASH) && !defined(ABSL_HASH_HASH_H_)
namespace absl { template <class T> struct Hash; };
#endif
namespace phmap {
#if defined(PHMAP_USE_ABSL_HASH)
template <class T> using Hash = ::absl::Hash<T>;
#else
template <class T> struct Hash;
#endif
template <class T> struct EqualTo;
template <class T> struct Less;
template <class T> using Allocator = typename std::allocator<T>;
template<class T1, class T2> using Pair = typename std::pair<T1, T2>;
class NullMutex;
namespace priv {
// The hash of an object of type T is computed by using phmap::Hash.
template <class T, class E = void>
struct HashEq
{
using Hash = phmap::Hash<T>;
using Eq = phmap::EqualTo<T>;
};
template <class T>
using hash_default_hash = typename priv::HashEq<T>::Hash;
template <class T>
using hash_default_eq = typename priv::HashEq<T>::Eq;
// type alias for std::allocator so we can forward declare without including other headers
template <class T>
using Allocator = typename phmap::Allocator<T>;
// type alias for std::pair so we can forward declare without including other headers
template<class T1, class T2>
using Pair = typename phmap::Pair<T1, T2>;
} // namespace priv
// ------------- forward declarations for hash containers ----------------------------------
template <class T,
class Hash = phmap::priv::hash_default_hash<T>,
class Eq = phmap::priv::hash_default_eq<T>,
class Alloc = phmap::priv::Allocator<T>> // alias for std::allocator
class flat_hash_set;
template <class K, class V,
class Hash = phmap::priv::hash_default_hash<K>,
class Eq = phmap::priv::hash_default_eq<K>,
class Alloc = phmap::priv::Allocator<
phmap::priv::Pair<const K, V>>> // alias for std::allocator
class flat_hash_map;
template <class T,
class Hash = phmap::priv::hash_default_hash<T>,
class Eq = phmap::priv::hash_default_eq<T>,
class Alloc = phmap::priv::Allocator<T>> // alias for std::allocator
class node_hash_set;
template <class Key, class Value,
class Hash = phmap::priv::hash_default_hash<Key>,
class Eq = phmap::priv::hash_default_eq<Key>,
class Alloc = phmap::priv::Allocator<
phmap::priv::Pair<const Key, Value>>> // alias for std::allocator
class node_hash_map;
template <class T,
class Hash = phmap::priv::hash_default_hash<T>,
class Eq = phmap::priv::hash_default_eq<T>,
class Alloc = phmap::priv::Allocator<T>, // alias for std::allocator
size_t N = 4, // 2**N submaps
class Mutex = phmap::NullMutex> // use std::mutex to enable internal locks
class parallel_flat_hash_set;
template <class K, class V,
class Hash = phmap::priv::hash_default_hash<K>,
class Eq = phmap::priv::hash_default_eq<K>,
class Alloc = phmap::priv::Allocator<
phmap::priv::Pair<const K, V>>, // alias for std::allocator
size_t N = 4, // 2**N submaps
class Mutex = phmap::NullMutex> // use std::mutex to enable internal locks
class parallel_flat_hash_map;
template <class T,
class Hash = phmap::priv::hash_default_hash<T>,
class Eq = phmap::priv::hash_default_eq<T>,
class Alloc = phmap::priv::Allocator<T>, // alias for std::allocator
size_t N = 4, // 2**N submaps
class Mutex = phmap::NullMutex> // use std::mutex to enable internal locks
class parallel_node_hash_set;
template <class Key, class Value,
class Hash = phmap::priv::hash_default_hash<Key>,
class Eq = phmap::priv::hash_default_eq<Key>,
class Alloc = phmap::priv::Allocator<
phmap::priv::Pair<const Key, Value>>, // alias for std::allocator
size_t N = 4, // 2**N submaps
class Mutex = phmap::NullMutex> // use std::mutex to enable internal locks
class parallel_node_hash_map;
// ------------- forward declarations for btree containers ----------------------------------
template <typename Key, typename Compare = phmap::Less<Key>,
typename Alloc = phmap::Allocator<Key>>
class btree_set;
template <typename Key, typename Compare = phmap::Less<Key>,
typename Alloc = phmap::Allocator<Key>>
class btree_multiset;
template <typename Key, typename Value, typename Compare = phmap::Less<Key>,
typename Alloc = phmap::Allocator<phmap::priv::Pair<const Key, Value>>>
class btree_map;
template <typename Key, typename Value, typename Compare = phmap::Less<Key>,
typename Alloc = phmap::Allocator<phmap::priv::Pair<const Key, Value>>>
class btree_multimap;
} // namespace phmap
#ifdef _MSC_VER
#pragma warning(pop)
#endif
#endif // phmap_fwd_decl_h_guard_
pybind11 @ 0e01c243
Subproject commit 0e01c243c7ffae3a2e52f998bacfe82f56aa96d9
......@@ -8,22 +8,37 @@ from neighbor_sampler import NeighborSampler
class RandomWalkSampler(BaseSampler):
def __init__(
self,
edge_index: torch.Tensor,
num_nodes: int,
num_layers: int,
workers = 1
graph_data,
workers = 1,
tnb = None
) -> None:
r"""__init__
Args:
edge_index: all edges in the graph
num_nodes: the num of all nodes in the graph
num_layers: the num of layers to be sampled
fanout: the list of max neighbors' number chosen for each layer
workers: the number of threads, default value is 1
graph_data: graph data in this partition
tnb: all nodes' neighbors info
"""
super().__init__()
self.sampler = NeighborSampler(edge_index, num_nodes, num_layers,
[1 for _ in range(num_layers)], workers)
# if(edge_index is not None):
# self.sampler = NeighborSampler(num_nodes, num_layers, [1 for _ in range(num_layers)],
# workers, edge_index=edge_index)
# elif(tnb is not None):
# self.sampler = NeighborSampler(num_nodes, num_layers, [1 for _ in range(num_layers)],
# workers, tnb=tnb)
# else:
# raise Exception("Not enough parameters")
if(tnb is not None):
self.sampler = NeighborSampler(num_nodes, num_layers, [1 for _ in range(num_layers)],
graph_data, workers, tnb=tnb)
else:
self.sampler = NeighborSampler(num_nodes, num_layers, [1 for _ in range(num_layers)],
graph_data, workers)
self.num_layers = num_layers
# 线程数不超过torch默认的omp线程数
self.workers = min(workers, torch.get_num_threads())
......
#include <iostream>
#include <set>
#include <omp.h>
#include <pybind11/pybind11.h>
#include <pybind11/numpy.h>
#include <pybind11/stl.h>
using namespace std;
namespace py = pybind11;
typedef int NodeIDType;
// typedef int EdgeIDType;
// typedef float TimeStampType;
class TemporalNeighborBlock;
class TemporalGraphBlock;
TemporalNeighborBlock get_neighbors(vector<NodeIDType>& row, vector<NodeIDType>& col, int num_nodes, int threads);
TemporalGraphBlock neighbor_sample_from_node(NodeIDType node, vector<NodeIDType>& neighbors, int deg, int fanout, int threads);
TemporalGraphBlock neighbor_sample_from_nodes(vector<NodeIDType>& nodes, vector<vector<NodeIDType>>& neighbors, vector<NodeIDType>& deg, int fanout, int threads);
vector<NodeIDType> heads_unique(vector<NodeIDType>& array, vector<NodeIDType>& heads);
template<typename T>
inline py::array vec2npy(const std::vector<T> &vec)
{
// need to let python garbage collector handle C++ vector memory
// see https://github.com/pybind/pybind11/issues/1042
// non-copy value transfer
auto v = new std::vector<T>(vec);
auto capsule = py::capsule(v, [](void *v)
{ delete reinterpret_cast<std::vector<T> *>(v); });
return py::array(v->size(), v->data(), capsule);
// return py::array(vec.size(), vec.data());
}
/*
* NeighborSampler Utils
*/
class TemporalNeighborBlock
{
public:
std::vector<vector<NodeIDType>*> neighbors;
std::vector<int> deg;
TemporalNeighborBlock(){}
TemporalNeighborBlock(std::vector<vector<NodeIDType>*>& neighbors,
std::vector<int> &deg):
neighbors(neighbors), deg(deg){}
py::array get_node_neighbor(int node_id){
return vec2npy(*(neighbors[node_id]));
}
int get_node_deg(int node_id){
return deg[node_id];
}
};
class TemporalGraphBlock
{
public:
std::vector<NodeIDType> row;
std::vector<NodeIDType> col;
std::vector<NodeIDType> nodes;
TemporalGraphBlock(){}
TemporalGraphBlock(std::vector<NodeIDType> &_row, std::vector<NodeIDType> &_col,
std::vector<NodeIDType> &_nodes):
row(_row), col(_col), nodes(_nodes){}
};
TemporalNeighborBlock get_neighbors(
vector<NodeIDType>& row, vector<NodeIDType>& col, int num_nodes, int threads){
int edge_num = row.size();
TemporalNeighborBlock tnb = TemporalNeighborBlock();
tnb.deg.resize(num_nodes, 0);
double start_time = omp_get_wtime();
#pragma omp parallel for num_threads(threads)
for(int i=0; i<num_nodes; i++)
tnb.neighbors.push_back(new vector<NodeIDType>());
#pragma omp parallel for num_threads(threads)
for(int i=0; i<edge_num; i++){
//计算节点邻居
tnb.neighbors[row[i]]->push_back(col[i]);
//计算节点度
tnb.deg[row[i]]++;
}
double end_time = omp_get_wtime();
cout<<"get_neighbors consume: "<<end_time-start_time<<"s"<<endl;
return tnb;
}
TemporalGraphBlock neighbor_sample_from_nodes(
vector<NodeIDType>& nodes, vector<vector<NodeIDType>>& neighbors,
vector<NodeIDType>& deg, int fanout, int threads){
TemporalGraphBlock tgb = TemporalGraphBlock();
double start_time = omp_get_wtime();
#pragma omp parallel for num_threads(threads)
for(int i=0; i<nodes.size(); i++){
NodeIDType node = nodes[i];
TemporalGraphBlock tgb_i = neighbor_sample_from_node(node, neighbors[node], deg[node], fanout, threads);
tgb.row.insert(tgb.row.end(),tgb_i.row.begin(),tgb_i.row.end());
tgb.col.insert(tgb.col.end(),tgb_i.col.begin(),tgb_i.col.end());
}
double end_time = omp_get_wtime();
cout<<"neighbor_sample_from_nodes consume: "<<end_time-start_time<<"s"<<endl;
//sampled nodes 插入去重
start_time = end_time;
tgb.nodes.assign(tgb.col.begin(), tgb.col.end());
heads_unique(tgb.nodes, nodes);
// cout<<"nodes: "<<tgb.nodes.size()<<endl;
end_time = omp_get_wtime();
cout<<"unique consume: "<<end_time-start_time<<"s"<<endl;
return tgb;
}
/*-------------------------------------------------------------------------------------**
**------------Utils--------------------------------------------------------------------**
**-------------------------------------------------------------------------------------*/
TemporalGraphBlock neighbor_sample_from_node(
NodeIDType node, vector<NodeIDType>& neighbors,
int deg, int fanout, int threads){
TemporalGraphBlock tgb = TemporalGraphBlock();
srand((int)time(0));
if(deg>fanout){
//度大于扇出的话需要随机选择fanout个邻居
#pragma omp parallel for num_threads(threads)
for(int i=0; i<fanout; i++){
//循环选择fanout个邻居
auto chosen_iter = neighbors.begin() + rand()%(deg-i);
tgb.col.push_back(*chosen_iter);
neighbors.erase(chosen_iter);
}
}
else
tgb.col.assign(neighbors.begin(), neighbors.end());
tgb.row.resize(tgb.col.size(), node);
//sampled nodes 暂不插入也不去重,待合并后一起插入并去重
return tgb;
}
vector<NodeIDType> heads_unique(vector<NodeIDType>& array, vector<NodeIDType>& heads){
unordered_set<NodeIDType> s(array.begin(), array.end());
#pragma omp parallel for num_threads(threads)
for(int i=0; i<heads.size(); i++){
if(s.count(heads[i])==1)
s.erase(heads[i]);
}
array.assign(s.begin(), s.end());
array.insert(array.begin(), heads.begin(), heads.end());
// cout<<"s: "<<s.size()<<" array: "<<array.size()<<endl;
return array;
}
/*------------Python Bind--------------------------------------------------------------*/
PYBIND11_MODULE(sample_cores, m)
{
m
.def("neighbor_sample_from_nodes",
&neighbor_sample_from_nodes)
.def("get_neighbors",
&get_neighbors)
.def("heads_unique",
&heads_unique);
py::class_<TemporalGraphBlock>(m, "TemporalGraphBlock")
.def(py::init<std::vector<NodeIDType> &, std::vector<NodeIDType> &,
std::vector<NodeIDType> &>())
.def("row", [](const TemporalGraphBlock &tgb) { return vec2npy(tgb.row); })
.def("col", [](const TemporalGraphBlock &tgb) { return vec2npy(tgb.col); })
.def("nodes", [](const TemporalGraphBlock &tgb) { return vec2npy(tgb.nodes); });
py::class_<TemporalNeighborBlock>(m, "TemporalNeighborBlock")
.def(py::init<std::vector<vector<NodeIDType>*>&,
std::vector<int> &>())
.def_readonly("neighbors", &TemporalNeighborBlock::neighbors, py::return_value_policy::copy)
.def_readonly("deg", &TemporalNeighborBlock::deg, py::return_value_policy::copy);
}
from setuptools import setup
from torch.utils.cpp_extension import BuildExtension, CppExtension
setup(
name='sample_cores',
ext_modules=[
CppExtension(
name='sample_cores',
sources=['sample_cores_dist.cpp'],
extra_compile_args=['-fopenmp','-Xlinker',' -export-dynamic'],
include_dirs=["./Sample_lib"],
),
],
cmdclass={
'build_ext': BuildExtension
})
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
pybind11 @ 0e01c243
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment