Commit aee067ba by XXX

update Sample v4: use C++ to implement single node single layer sample

parent 3e80e491
......@@ -4,6 +4,8 @@ import torch_scatter
import torch.multiprocessing as mp
from abc import ABC
from Sample.sample_cores import neighbor_sample_from_node, TemporalGraphBlock
class BaseSampler(ABC):
r"""An abstract base class that initializes a graph sampler and provides
:meth:`sample_from_nodes` and :meth:`sample_from_edges` routines.
......@@ -56,7 +58,7 @@ class BaseSampler(ABC):
edge_index: torch.Tensor,
num_nodes: int,
workers: int,
fanout: int
**kwargs
) -> Tuple[torch.Tensor, torch.tensor]:
r"""Performs sampling paralleled from the nodes specified in: nodes,
returning a sampled subgraph in the specified output format: Tuple[int, torch.tensor].
......@@ -66,7 +68,7 @@ class BaseSampler(ABC):
edge_index: edges in the graph
num_nodes: the num of all node in the graph
workers: the number of threads
fanout: the number of max neighbor chosen
**kwargs: other kwargs
Returns:
nodes: the node sampled
edge_index: the edge sampled
......@@ -97,17 +99,13 @@ class NeighborSampler(BaseSampler):
edge_index: the edge sampled
"""
row, col = edge_index
deg = torch_scatter.scatter_add(torch.ones_like(row), row, dim=0, dim_size=num_nodes)
neighbors=torch.stack([row[row==node],col[row==node]],dim=0)
print('neighbors: \n', neighbors)
if deg[node]<=fanout:
return torch.unique(neighbors[1], dim=0), neighbors
else:
random_index = torch.multinomial(torch.ones(deg[node]), fanout, replacement=False)# torch.randperm(neighbors.shape[1])[0:fanout]
print("random_index:\n", random_index)
edge_index = neighbors.index_select(dim = 1, index=random_index)
samples_nodes = torch.unique(edge_index.view(-1), dim=0)
return samples_nodes, edge_index
row = row.numpy().tolist()
col = col.numpy().tolist()
tgb = neighbor_sample_from_node(node, row, col, num_nodes, fanout)
row = torch.IntTensor(tgb.row())
col = torch.IntTensor(tgb.col())
samples_nodes = torch.IntTensor(tgb.nodes())
return samples_nodes, torch.stack([row, col], dim=0)
def sample_from_nodes(
self,
......
from typing import Tuple
import torch
import torch_scatter
import torch.multiprocessing as mp
from abc import ABC
class BaseSampler(ABC):
r"""An abstract base class that initializes a graph sampler and provides
:meth:`sample_from_nodes` and :meth:`sample_from_edges` routines.
"""
def sample_from_node(
self,
node:int,
edge_index:torch.Tensor,
num_nodes:int,
**kwargs
) -> Tuple[torch.tensor, torch.tensor]:
r"""Performs sampling from the node specified in: node,
returning a sampled subgraph in the specified output format: Tuple[int, torch.tensor].
Args:
node: the seed node index
edge_index: edges in the graph
num_nodes: the num of all node in the graph
**kwargs: other kwargs
Returns:
samples_nodes: the node sampled
edge_index: the edge sampled
"""
raise NotImplementedError
def sample_from_nodes(
self,
nodes:torch.Tensor,
edge_index:torch.Tensor,
num_nodes:int,
**kwargs
) -> Tuple[torch.Tensor, torch.tensor]:
r"""Performs sampling from the nodes specified in: nodes,
returning a sampled subgraph in the specified output format: Tuple[int, torch.tensor].
Args:
nodes: the seed nodes index
edge_index: edges in the graph
num_nodes: the num of all node in the graph
**kwargs: other kwargs
Returns:
samples_nodes: the node sampled
edge_index: the edge sampled
"""
raise NotImplementedError
def sample_from_nodes_parallel(
self,
nodes: torch.Tensor,
edge_index: torch.Tensor,
num_nodes: int,
workers: int,
**kwargs
) -> Tuple[torch.Tensor, torch.tensor]:
r"""Performs sampling paralleled from the nodes specified in: nodes,
returning a sampled subgraph in the specified output format: Tuple[int, torch.tensor].
Args:
node: the seed node index
edge_index: edges in the graph1
num_nodes: the num of all node in the graph
workers: the number of threads
**kwargs: other kwargs
Returns:
nodes: the node sampled
edge_index: the edge sampled
"""
raise NotImplementedError
class NeighborSampler(BaseSampler):
def __init__(self) -> None:
super().__init__()
def sample_from_node(
self,
node: int,
edge_index: torch.Tensor,
num_nodes: int,
fanout: int
) -> Tuple[torch.Tensor, torch.tensor]:
r"""Performs sampling from the node specified in: node,
returning a sampled subgraph in the specified output format: Tuple[int, torch.tensor].
Args:
node: the seed node index
edge_index: edges in the graph
num_nodes: the num of all node in the graph
fanout: the number of max neighbor chosen
Returns:
samples_nodes: the node sampled
edge_index: the edge sampled
"""
row, col = edge_index
deg = torch_scatter.scatter_add(torch.ones_like(row), row, dim=0, dim_size=num_nodes)
neighbors=torch.stack([row[row==node],col[row==node]],dim=0)
print('neighbors: \n', neighbors)
if deg[node]<=fanout:
return torch.unique(neighbors[1], dim=0), neighbors
else:
random_index = torch.multinomial(torch.ones(deg[node]), fanout, replacement=False)# torch.randperm(neighbors.shape[1])[0:fanout]
print("random_index:\n", random_index)
edge_index = neighbors.index_select(dim = 1, index=random_index)
samples_nodes = torch.unique(edge_index.view(-1), dim=0)
return samples_nodes, edge_index
def sample_from_nodes(
self,
nodes: torch.Tensor,
edge_index: torch.Tensor,
num_nodes: int,
fanout: int
) -> Tuple[torch.Tensor, torch.tensor]:
r"""Performs sampling from the nodes specified in: nodes,
returning a sampled subgraph in the specified output format: Tuple[int, torch.tensor].
Args:
nodes: the seed nodes index
edge_index: edges in the graph
num_nodes: the num of all node in the graph
**kwargs: other kwargs
Returns:
samples_nodes: the node sampled
edge_index: the edge sampled
"""
if len(nodes)==1:
return self.sample_from_node(nodes[0], edge_index, num_nodes, fanout)
samples_nodes=torch.IntTensor([])
row=torch.IntTensor([])
col=torch.IntTensor([])
# 单线程循环法:
for node in nodes:
samples_nodes_i,edge_index_i = self.sample_from_node(node, edge_index, num_nodes, fanout)
samples_nodes=torch.unique(torch.concat([samples_nodes,samples_nodes_i]))
row=torch.concat([row,edge_index_i[0]])
col=torch.concat([col,edge_index_i[1]])
return samples_nodes, torch.stack([row, col], dim=0)
def sample_from_nodes_parallel(
self,
nodes: torch.Tensor,
edge_index: torch.Tensor,
num_nodes: int,
workers: int,
fanout: int
) -> Tuple[torch.Tensor, torch.tensor]:
r"""Performs sampling from the nodes specified in: nodes,
returning a sampled subgraph in the specified output format: Tuple[int, torch.tensor].
Args:
node: the seed node index
edge_index: edges in the graph
num_nodes: the num of all node in the graph
workers: the number of threads
fanout: the number of max neighbor chosen
Returns:
nodes: the node sampled
edge_index: the edge sampled
"""
samples_nodes=torch.IntTensor([])
row=torch.IntTensor([])
col=torch.IntTensor([])
with mp.Pool(processes=torch.get_num_threads()) as p:
n=len(nodes)
if(workers>=n):
results = [p.apply_async(self.sample_from_node,
(node, edge_index, num_nodes, fanout))
for node in nodes]
else:
quotient = n//workers
remainder = n%workers
# 每个batch先分配quotient个nodes,然后将余数remainder平均分配给其中一些batch
nodes1 = nodes[0:(quotient+1)*(remainder)].resize_(remainder,quotient+1)# 分配了余数的batch
nodes2 = nodes[(quotient+1)*(remainder):n].resize_(workers - remainder,quotient)# 未分配余数的batch
results = [p.apply_async(self.sample_from_nodes,
(nodes1[i], edge_index, num_nodes, fanout))
for i in range(0, remainder)]
results.extend([p.apply_async(self.sample_from_nodes,
(nodes2[i], edge_index, num_nodes, fanout))
for i in range(0, workers - remainder)])
for result in results:
samples_nodes_i, edge_index_i = result.get()
samples_nodes = torch.unique(torch.cat([samples_nodes, samples_nodes_i]))
row = torch.cat([row, edge_index_i[0]])
col = torch.cat([col, edge_index_i[1]])
return samples_nodes, torch.stack([row, col], dim=0)
# 不使用sample_from_node直接取所有点邻居方法:
# row, col = edge_index
# neighbors1=torch.concat([row[row==nodes[i]] for i in range(0, nodes.shape[0])])
# neighbors2=torch.concat([col[row==nodes[i]] for i in range(0, nodes.shape[0])])
# neighbors=torch.stack([neighbors1, neighbors2], dim=0)
# print('neighbors: \n', neighbors)
if __name__=="__main__":
edge_index = torch.tensor([[0, 1, 1, 1, 2, 2, 2, 3, 3, 4, 4, 4, 5], [1, 0, 2, 4, 1, 3, 0, 2, 5, 3, 5, 0, 2]])
num_nodes = 6
num_neighbors = 2
# Run the neighbor sampling
sampler=NeighborSampler()
# neighbor_nodes, edge_index = sampler.sample_from_node(2, edge_index, num_nodes, num_neighbors)
# neighbor_nodes, edge_index = sampler.sample_from_nodes(torch.tensor([1,2]), edge_index, num_nodes, num_neighbors)
# neighbor_nodes, edge_index = sampler.sample_from_nodes_parallel(torch.tensor([1,2,3]), edge_index, num_nodes, workers=3, fanout=num_neighbors)
neighbor_nodes, edge_index = sampler.sample_from_nodes_parallel(torch.tensor([1,2,3,4,5]), edge_index, num_nodes, workers=4, fanout=num_neighbors)
# Print the result
print('neighbor_nodes_id: \n',neighbor_nodes, '\nedge_index: \n',edge_index)
#include <iostream>
#include<pybind11/pybind11.h> // pybind11的头文件
#include<pybind11/numpy.h>
#include <pybind11/stl.h>
using namespace std;
namespace py = pybind11; // 名字空间别名,简化代码
typedef int NodeIDType;
// typedef int EdgeIDType;
// typedef float TimeStampType;
template<typename T>
inline py::array vec2npy(const std::vector<T> &vec)
{
// need to let python garbage collector handle C++ vector memory
// see https://github.com/pybind/pybind11/issues/1042
// non-copy value transfer
auto v = new std::vector<T>(vec);
auto capsule = py::capsule(v, [](void *v)
{ delete reinterpret_cast<std::vector<T> *>(v); });
return py::array(v->size(), v->data(), capsule);
// return py::array(vec.size(), vec.data());
}
class TemporalGraphBlock
{
public:
std::vector<NodeIDType> row;
std::vector<NodeIDType> col;
std::vector<NodeIDType> nodes;
// NodeIDType dim_in, dim_out;
// double ptr_time = 0;
// double search_time = 0;
// double sample_time = 0;
// double tot_time = 0;
// double coo_time = 0;
TemporalGraphBlock(){}
TemporalGraphBlock(std::vector<NodeIDType> &_row, std::vector<NodeIDType> &_col,
std::vector<NodeIDType> &_nodes):
row(_row), col(_col), nodes(_nodes){}
};
TemporalGraphBlock neighbor_sample_from_node(
NodeIDType node, vector<NodeIDType>& row,
vector<NodeIDType>& col, int num_nodes, int fanout){
int edge_num = row.size();
int deg = 0;
TemporalGraphBlock tgb = TemporalGraphBlock();
srand((int)time(0));
for(int i=0; i<edge_num; i++){
//计算deg,循环找出邻居
if(row[i]==node){
deg++;
tgb.col.push_back(col[i]);
}
}
if(deg>fanout){
//如果节点的度大于fanout 需要删除一些邻居
for(int i=0; i<deg-fanout; i++){
//循环随机删除邻居,共删除deg-fanout个邻居
auto erase_iter = tgb.col.begin() + rand()%(deg-i);
tgb.col.erase(erase_iter);
}
}
tgb.row = vector<NodeIDType>(tgb.col.size(), node);
tgb.nodes = vector<NodeIDType>(tgb.col);
tgb.nodes.push_back(node);
return tgb;
}
PYBIND11_MODULE(sample_cores, m) // 定义Python模块sample_cores
{
m.def("neighbor_sample_from_node",
&neighbor_sample_from_node
); // 定义Python函数
py::class_<TemporalGraphBlock>(m, "TemporalGraphBlock")
.def(py::init<std::vector<NodeIDType> &, std::vector<NodeIDType> &,
std::vector<NodeIDType> &>())
.def("row", [](const TemporalGraphBlock &tgb) { return vec2npy(tgb.row); })
.def("col", [](const TemporalGraphBlock &tgb) { return vec2npy(tgb.col); })
.def("nodes", [](const TemporalGraphBlock &tgb) { return vec2npy(tgb.nodes); });
// .def("tot_time", [](const TemporalGraphBlock &tgb) { return tgb.tot_time; })
// .def("ptr_time", [](const TemporalGraphBlock &tgb) { return tgb.ptr_time; })
// .def("search_time", [](const TemporalGraphBlock &tgb) { return tgb.search_time; })
// .def("sample_time", [](const TemporalGraphBlock &tgb) { return tgb.sample_time; })
// .def("coo_time", [](const TemporalGraphBlock &tgb) { return tgb.coo_time; });
}// Python模块定义结束
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment