Commit fe6f61b6 by xxx

merge master

parents 21c40ac0 b0f55b7a
install.sh merge=ours
\ No newline at end of file
*.tgz
*.my
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
......
#include<head.h> #include<head.h>
#include <sampler.h> #include <sampler.h>
#include <tppr.h>
#include <output.h> #include <output.h>
#include <neighbors.h> #include <neighbors.h>
#include <temporal_utils.h> #include <temporal_utils.h>
...@@ -88,4 +89,22 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) ...@@ -88,4 +89,22 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m)
.def("reset", &ParallelSampler::reset) .def("reset", &ParallelSampler::reset)
.def("get_ret", [](const ParallelSampler &ps) { return ps.ret; }); .def("get_ret", [](const ParallelSampler &ps) { return ps.ret; });
py::class_<ParallelTppRComputer>(m, "ParallelTppRComputer")
.def(py::init<TemporalNeighborBlock &, NodeIDType, EdgeIDType, int,
int, int, int, vector<float>&, vector<float>& >())
.def_readonly("ret", &ParallelTppRComputer::ret, py::return_value_policy::reference)
.def("reset_ret", &ParallelTppRComputer::reset_ret)
.def("reset_tppr", &ParallelTppRComputer::reset_tppr)
.def("reset_val_tppr", &ParallelTppRComputer::reset_val_tppr)
.def("backup_tppr", &ParallelTppRComputer::backup_tppr)
.def("restore_tppr", &ParallelTppRComputer::restore_tppr)
.def("restore_val_tppr", &ParallelTppRComputer::restore_val_tppr)
.def("get_pruned_topk", &ParallelTppRComputer::get_pruned_topk)
.def("extract_streaming_tppr", &ParallelTppRComputer::extract_streaming_tppr)
.def("streaming_topk", &ParallelTppRComputer::streaming_topk)
.def("single_streaming_topk", &ParallelTppRComputer::single_streaming_topk)
.def("streaming_topk_no_fake", &ParallelTppRComputer::streaming_topk_no_fake)
.def("compute_val_tppr", &ParallelTppRComputer::compute_val_tppr)
.def("get_ret", [](const ParallelTppRComputer &ps) { return ps.ret; });
} }
\ No newline at end of file
#pragma once #pragma once
#include <iostream> #include <iostream>
#include <algorithm>
#include <torch/extension.h> #include <torch/extension.h>
#include <omp.h> #include <omp.h>
#include <time.h> #include <time.h>
...@@ -17,6 +18,12 @@ typedef int64_t NodeIDType; ...@@ -17,6 +18,12 @@ typedef int64_t NodeIDType;
typedef int64_t EdgeIDType; typedef int64_t EdgeIDType;
typedef float WeightType; typedef float WeightType;
typedef float TimeStampType; typedef float TimeStampType;
typedef tuple<NodeIDType, EdgeIDType, TimeStampType> PPRKeyType;
typedef double PPRValueType;
typedef phmap::parallel_flat_hash_map<PPRKeyType, PPRValueType> PPRDictType;
typedef vector<PPRDictType> PPRListDictType;
typedef vector<vector<PPRDictType>> PPRListListDictType;
typedef vector<vector<double>> NormListType;
class TemporalNeighborBlock; class TemporalNeighborBlock;
class TemporalGraphBlock; class TemporalGraphBlock;
...@@ -28,6 +35,7 @@ int nodeIdToInOut(NodeIDType nid, int pid, const vector<NodeIDType>& part_ptr); ...@@ -28,6 +35,7 @@ int nodeIdToInOut(NodeIDType nid, int pid, const vector<NodeIDType>& part_ptr);
int nodeIdToPartId(NodeIDType nid, const vector<NodeIDType>& part_ptr); int nodeIdToPartId(NodeIDType nid, const vector<NodeIDType>& part_ptr);
vector<th::Tensor> divide_nodes_to_part(th::Tensor nodes, const vector<NodeIDType>& part_ptr, int threads); vector<th::Tensor> divide_nodes_to_part(th::Tensor nodes, const vector<NodeIDType>& part_ptr, int threads);
NodeIDType sample_multinomial(const vector<WeightType>& weights, default_random_engine& e); NodeIDType sample_multinomial(const vector<WeightType>& weights, default_random_engine& e);
vector<int64_t> sample_max(const vector<WeightType>& weights, int k);
...@@ -173,3 +181,17 @@ NodeIDType sample_multinomial(const vector<WeightType>& weights, default_random_ ...@@ -173,3 +181,17 @@ NodeIDType sample_multinomial(const vector<WeightType>& weights, default_random_
sample_indice = distance(cumulative_weights.begin(), it); sample_indice = distance(cumulative_weights.begin(), it);
return sample_indice; return sample_indice;
} }
vector<int64_t> sample_max(const vector<WeightType>& weights, int k) {
vector<int64_t> indices(weights.size());
for (int i = 0; i < weights.size(); ++i) {
indices[i] = i;
}
// 使用部分排序算法(选择算法)找到前k个最大值的索引
partial_sort(indices.begin(), indices.begin() + k, indices.end(),
[&weights](int64_t a, int64_t b) { return weights[a] > weights[b]; });
// 返回前k个最大值的索引
return vector<int64_t>(indices.begin(), indices.begin() + k);
}
\ No newline at end of file
...@@ -11,6 +11,7 @@ class TemporalGraphBlock ...@@ -11,6 +11,7 @@ class TemporalGraphBlock
vector<int64_t> src_index; vector<int64_t> src_index;
vector<NodeIDType> sample_nodes; vector<NodeIDType> sample_nodes;
vector<TimeStampType> sample_nodes_ts; vector<TimeStampType> sample_nodes_ts;
vector<WeightType> e_weights;
double sample_time = 0; double sample_time = 0;
double tot_time = 0; double tot_time = 0;
int64_t sample_edge_num = 0; int64_t sample_edge_num = 0;
......
Installation Installation
============ ============
.. note:: This document provides step-by-step instructions for deploying and installing the StarryGL. Follow the steps below to ensure a successful installation.
安装方法
\ No newline at end of file Prerequisites
=============
Before starting the installation, make sure you meet the following prerequisites:
- Operating System: Only Support for Linux.
- Python Version: Requires Python 3.10.
- CUDA Toolkit Version: Requires CUDA Toolkit 11.8.
Installation Steps
==================
1. Clone the StarryGL Git Repository
-------------------------------------
First, clone the StarryGL Git repository, including the submodule dependencies. Run the following command to clone the repository:
.. code-block:: bash
$ git clone --recursive http://115.236.33.124:7001/wjie98/starrygl.git
If you have already cloned the repository without the submodules, you can run the following command to download the submodules:
.. code-block:: bash
$ git submodule update --init --recursive
This will ensure that all the necessary submodule dependencies are downloaded.
2. Install Dependencies
-----------------------
Before installation, ensure that the necessary dependencies are installed on your system. Run the following command to install these dependencies:
.. code-block:: bash
$ pip install -r requirements.txt
3. Execute the Installation Script
----------------------------------
Locate the installation script file `install.sh` in the installation directory and execute the following command to install StarryGL:
.. code-block:: bash
$ bash ./install.sh
4. Install StarryGL
------------------------
The installation script will execute the `python setup.py install` command to install StarryGL. Make sure you are in the installation directory of StarryGL and execute the following command:
.. code-block:: bash
$ python setup.py install
5. Verify the Installation
--------------------------
After the installation, you can verify if StarryGL is successfully installed by executing the following command:
.. code-block:: bash
$ python -c "import starrygl; print(starrygl.__version__)"
If the StarryGL version is successfully displayed, it means the installation was successful.
...@@ -2,6 +2,9 @@ StarryGL Documentation ...@@ -2,6 +2,9 @@ StarryGL Documentation
====================== ======================
.. toctree:: .. toctree::
:maxdepth: 1
:caption: Contents
guide/index guide/index
tutorial/index tutorial/index
advanced/index advanced/index
......
...@@ -11,3 +11,8 @@ cmake .. \ ...@@ -11,3 +11,8 @@ cmake .. \
&& mkdir ../starrygl/lib \ && mkdir ../starrygl/lib \
&& cp lib*.so ../starrygl/lib/ \ && cp lib*.so ../starrygl/lib/ \
&& patchelf --set-rpath '$ORIGIN:$ORIGIN/lib' --force-rpath ../starrygl/lib/*.so && patchelf --set-rpath '$ORIGIN:$ORIGIN/lib' --force-rpath ../starrygl/lib/*.so
# -DCMAKE_PREFIX_PATH="/home/zlj/.miniconda3/envs/dgnn/lib/python3.10/site-packages" \
# -DPython3_ROOT_DIR="/home/zlj/.miniconda3/envs/dgnn" \
# -DCUDA_TOOLKIT_ROOT_DIR="/home/zlj/local/cuda-12.2" \
\ No newline at end of file
import torch import torch
import dgl
from os.path import abspath, join, dirname from os.path import abspath, join, dirname
import sys import sys
sys.path.insert(0, join(abspath(dirname(__file__)))) sys.path.insert(0, join(abspath(dirname(__file__))))
......
...@@ -155,10 +155,10 @@ class DistributedDataLoader: ...@@ -155,10 +155,10 @@ class DistributedDataLoader:
self.expected_idx = data_size // self.batch_size if self.drop_last is True else int(math.ceil(data_size/self.batch_size)) self.expected_idx = data_size // self.batch_size if self.drop_last is True else int(math.ceil(data_size/self.batch_size))
if dist.get_world_size() > 1: if dist.get_world_size() > 1:
num_epochs = torch.tensor([self.expected_idx],dtype = torch.long,device=self.device) num_batchs = torch.tensor([self.expected_idx],dtype = torch.long,device=self.device)
print(num_epochs) print("num_batchs:", num_batchs)
dist.all_reduce(num_epochs, op=op) dist.all_reduce(num_batchs, op=op)
self.expected_idx = int(num_epochs.item()) self.expected_idx = int(num_batchs.item())
def _next_data(self): def _next_data(self):
if self.current_pos >= self.dataset.len: if self.current_pos >= self.dataset.len:
......
import os.path as osp
import torch
class GraphData():
def __init__(self, path):
assert path is not None and osp.exists(path),'path 不存在'
id,edge_index,data,partptr =torch.load(path)
# 当前分区序号
self.partition_id = id
# 总分区数
self.partitions = partptr.numel() - 1
# 全图结构数据
self.num_nodes = partptr[self.partitions]
self.num_edges = edge_index[0].numel()
self.edge_index = edge_index
# 该分区下的数据(包含特征向量和子图结构)pyg Data数据结构
self.data = data
# 分区映射关系
self.partptr = partptr
self.eid = [i for i in range(self.num_edges)]
def __init__(self, id, edge_index, data, partptr, timestamp=None):
# 当前分区序号
self.partition_id = id
# 总分区数
self.partitions = partptr.numel() - 1
# 全图结构数据
self.num_nodes = partptr[self.partitions]
if edge_index is not None:
self.num_edges = edge_index[0].numel()
self.edge_index = edge_index
self.edge_ts = timestamp
# 该分区下的数据(包含特征向量和子图结构)pyg Data数据结构
self.data = data
# 分区映射关系
self.partptr = partptr
# edge id
self.eid = torch.tensor([i for i in range(0, self.num_edges)])
def select_attr(self,index):
return torch.index_select(self.data.x,0,index)
#返回全局的节点id 所对应的分区
def get_part_num(self):
return self.data.x.size()[0]
def select_attr(self,index):
return torch.index_select(self.data.x,0,index)
def select_y(self,index):
return torch.index_select(self.data.y,0,index)
#返回全局的节点id 所对应的分区
def get_localId_by_partitionId(self,id,index):
#print(index)
if(id == -1 or id == 0):
return index
else:
return torch.add(index,-self.partptr[id])
def get_globalId_by_partitionId(self,id,index):
if(id == -1 or id == 0):
return index
else:
return torch.add(index,self.partptr[id])
def get_node_num(self):
return self.num_nodes
def localId_to_globalId(self,id,partitionId:int = -1):
'''
将分区partitionId内的点id映射为全局的id
'''
if partitionId == -1:
partitionId = self.partition_id
assert id >=self.partptr[self.partition_id] and id < self.partptr[self.partition_id+1]
ids_before = 0
if self.partition_id>0:
ids_before = self.partptr[self.partition_id-1]
return id+ids_before
def get_partitionId_by_globalId(self,id):
'''
通过全局id得到对应的分区序号
'''
partitionId = -1
assert id>=0 and id<self.num_nodes,'id 超过范围'
for i in range(self.partitions):
if id>=self.partptr[i] and id<self.partptr[i+1]:
partitionId = i
break
assert partitionId>=0, 'id 不存在对应的分区'
return partitionId
def get_nodes_by_partitionId(self,id):
'''
根据partitioId 返回该分区的节点数量
'''
assert id>=0 and id<self.partitions,'partitionId 非法'
return (int)(self.partptr[id+1]-self.partptr[id])
def __repr__(self):
return (f'{self.__class__.__name__}(\n'
f' partition_id={self.partition_id}\n'
f' data={self.data},\n'
f' global_info('
f'num_nodes={self.num_nodes},'
f' num_edges={self.num_edges},'
f' num_parts={self.partitions},'
f' edge_index=[2,{self.edge_index[0].numel()}])\n'
f')')
...@@ -8,7 +8,7 @@ import torch.multiprocessing as mp ...@@ -8,7 +8,7 @@ import torch.multiprocessing as mp
from typing import Optional, Tuple from typing import Optional, Tuple
from .base import BaseSampler, NegativeSampling, SampleOutput, SampleType from .base import BaseSampler, NegativeSampling, SampleOutput, SampleType
# from sample_cores import ParallelSampler, get_neighbors, heads_unique from starrygl.lib.libstarrygl_sampler import ParallelSampler, get_neighbors, heads_unique
from torch.distributed.rpc import rpc_async from torch.distributed.rpc import rpc_async
......
...@@ -62,7 +62,7 @@ def test(): ...@@ -62,7 +62,7 @@ def test():
torch.backends.cudnn.benchmark = False torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True torch.backends.cudnn.deterministic = True
g_data, df = load_gdelt_dataset() g_data, df = load_reddit_dataset()
print(g_data) print(g_data)
# for worker in [1,2,3,4,5,6,7,8,9,10,20,30]: # for worker in [1,2,3,4,5,6,7,8,9,10,20,30]:
# import random # import random
......
...@@ -70,7 +70,7 @@ def test(): ...@@ -70,7 +70,7 @@ def test():
torch.backends.cudnn.benchmark = False torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True torch.backends.cudnn.deterministic = True
g_data = load_ogb_dataset() g_data = load_reddit_dataset()
print(g_data) print(g_data)
from .neighbor_sampler import NeighborSampler, get_neighbors from .neighbor_sampler import NeighborSampler, get_neighbors
......
...@@ -43,7 +43,7 @@ parser.add_argument('--rank', default=0, type=int, metavar='W', ...@@ -43,7 +43,7 @@ parser.add_argument('--rank', default=0, type=int, metavar='W',
parser.add_argument('--patience', type=int, default=5, help='Patience for early stopping') parser.add_argument('--patience', type=int, default=5, help='Patience for early stopping')
parser.add_argument('--world_size', default=1, type=int, metavar='W', parser.add_argument('--world_size', default=1, type=int, metavar='W',
help='number of negative samples') help='number of negative samples')
parser.add_argument('--dataname', default=1, type=str, metavar='W', parser.add_argument('--dataname', default="WIKI", type=str, metavar='W',
help='name of dataset') help='name of dataset')
parser.add_argument('--model', default='TGN', type=str, metavar='W', parser.add_argument('--model', default='TGN', type=str, metavar='W',
help='name of model') help='name of model')
...@@ -52,14 +52,15 @@ from sklearn.metrics import average_precision_score, roc_auc_score ...@@ -52,14 +52,15 @@ from sklearn.metrics import average_precision_score, roc_auc_score
import torch import torch
import time import time
import random import random
import dgl
import numpy as np import numpy as np
from sklearn.metrics import average_precision_score, roc_auc_score from sklearn.metrics import average_precision_score, roc_auc_score
from torch.nn.parallel import DistributedDataParallel as DDP from torch.nn.parallel import DistributedDataParallel as DDP
#os.environ['CUDA_VISIBLE_DEVICES'] = str(args.rank)
#os.environ["RANK"] = str(args.rank) os.environ['CUDA_VISIBLE_DEVICES'] = str(args.rank)
#os.environ["WORLD_SIZE"] = str(args.world_size) os.environ["RANK"] = str(args.rank)
#os.environ["LOCAL_RANK"] = str(0) os.environ["WORLD_SIZE"] = str(args.world_size)
os.environ["LOCAL_RANK"] = str(0)
torch.cuda.set_device(int(os.environ["LOCAL_RANK"])) torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
os.environ["MASTER_ADDR"] = '127.0.0.1' os.environ["MASTER_ADDR"] = '127.0.0.1'
os.environ["MASTER_PORT"] = '9437' os.environ["MASTER_PORT"] = '9437'
...@@ -152,7 +153,7 @@ def main(): ...@@ -152,7 +153,7 @@ def main():
else: else:
model = GeneralModel(gnn_dim_node, gnn_dim_edge, sample_param, memory_param, gnn_param, train_param) model = GeneralModel(gnn_dim_node, gnn_dim_edge, sample_param, memory_param, gnn_param, train_param)
device = torch.device('cpu') device = torch.device('cpu')
model = DDP(model,find_unused_parameters=True) model = DDP(model,find_unused_parameters=False)
train_stream = torch.cuda.Stream() train_stream = torch.cuda.Stream()
send_stream = torch.cuda.Stream() send_stream = torch.cuda.Stream()
scatter_stream = torch.cuda.Stream() scatter_stream = torch.cuda.Stream()
...@@ -223,8 +224,11 @@ def main(): ...@@ -223,8 +224,11 @@ def main():
auc_mrr = torch.empty([loader.expected_idx*world_size],dtype = torch.float,device = 'cuda') auc_mrr = torch.empty([loader.expected_idx*world_size],dtype = torch.float,device = 'cuda')
dist.all_gather_into_tensor(apc,torch.tensor(aps,device ='cuda',dtype=torch.float)) dist.all_gather_into_tensor(apc,torch.tensor(aps,device ='cuda',dtype=torch.float))
dist.all_gather_into_tensor(auc_mrr,torch.tensor(aucs_mrrs,device ='cuda',dtype=torch.float)) dist.all_gather_into_tensor(auc_mrr,torch.tensor(aucs_mrrs,device ='cuda',dtype=torch.float))
ap = float(torch.tensor(apc).mean()) # ap = float(torch.tensor(apc).mean())
auc_mrr = float(torch.tensor(auc_mrr).mean()) # auc_mrr = float(torch.tensor(auc_mrr).mean())
ap = float(apc.clone().mean())
auc_mrr = float(auc_mrr.clone().mean())
return ap, auc_mrr return ap, auc_mrr
creterion = torch.nn.BCEWithLogitsLoss() creterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=train_param['lr']) optimizer = torch.optim.Adam(model.parameters(), lr=train_param['lr'])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment