Commit f2ee424a by Wenjie Huang
parents 17306a86 38eb626e
install.sh merge=ours
\ No newline at end of file
*.tgz
*.my
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
......
#include<head.h>
#include <sampler.h>
#include <tppr.h>
#include <output.h>
#include <neighbors.h>
#include <temporal_utils.h>
......@@ -88,4 +89,22 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m)
.def("reset", &ParallelSampler::reset)
.def("get_ret", [](const ParallelSampler &ps) { return ps.ret; });
py::class_<ParallelTppRComputer>(m, "ParallelTppRComputer")
.def(py::init<TemporalNeighborBlock &, NodeIDType, EdgeIDType, int,
int, int, int, vector<float>&, vector<float>& >())
.def_readonly("ret", &ParallelTppRComputer::ret, py::return_value_policy::reference)
.def("reset_ret", &ParallelTppRComputer::reset_ret)
.def("reset_tppr", &ParallelTppRComputer::reset_tppr)
.def("reset_val_tppr", &ParallelTppRComputer::reset_val_tppr)
.def("backup_tppr", &ParallelTppRComputer::backup_tppr)
.def("restore_tppr", &ParallelTppRComputer::restore_tppr)
.def("restore_val_tppr", &ParallelTppRComputer::restore_val_tppr)
.def("get_pruned_topk", &ParallelTppRComputer::get_pruned_topk)
.def("extract_streaming_tppr", &ParallelTppRComputer::extract_streaming_tppr)
.def("streaming_topk", &ParallelTppRComputer::streaming_topk)
.def("single_streaming_topk", &ParallelTppRComputer::single_streaming_topk)
.def("streaming_topk_no_fake", &ParallelTppRComputer::streaming_topk_no_fake)
.def("compute_val_tppr", &ParallelTppRComputer::compute_val_tppr)
.def("get_ret", [](const ParallelTppRComputer &ps) { return ps.ret; });
}
\ No newline at end of file
#pragma once
#include <iostream>
#include <algorithm>
#include <torch/extension.h>
#include <omp.h>
#include <time.h>
......@@ -17,6 +18,12 @@ typedef int64_t NodeIDType;
typedef int64_t EdgeIDType;
typedef float WeightType;
typedef float TimeStampType;
typedef tuple<NodeIDType, EdgeIDType, TimeStampType> PPRKeyType;
typedef double PPRValueType;
typedef phmap::parallel_flat_hash_map<PPRKeyType, PPRValueType> PPRDictType;
typedef vector<PPRDictType> PPRListDictType;
typedef vector<vector<PPRDictType>> PPRListListDictType;
typedef vector<vector<double>> NormListType;
class TemporalNeighborBlock;
class TemporalGraphBlock;
......@@ -28,6 +35,7 @@ int nodeIdToInOut(NodeIDType nid, int pid, const vector<NodeIDType>& part_ptr);
int nodeIdToPartId(NodeIDType nid, const vector<NodeIDType>& part_ptr);
vector<th::Tensor> divide_nodes_to_part(th::Tensor nodes, const vector<NodeIDType>& part_ptr, int threads);
NodeIDType sample_multinomial(const vector<WeightType>& weights, default_random_engine& e);
vector<int64_t> sample_max(const vector<WeightType>& weights, int k);
......@@ -173,3 +181,17 @@ NodeIDType sample_multinomial(const vector<WeightType>& weights, default_random_
sample_indice = distance(cumulative_weights.begin(), it);
return sample_indice;
}
vector<int64_t> sample_max(const vector<WeightType>& weights, int k) {
vector<int64_t> indices(weights.size());
for (int i = 0; i < weights.size(); ++i) {
indices[i] = i;
}
// 使用部分排序算法(选择算法)找到前k个最大值的索引
partial_sort(indices.begin(), indices.begin() + k, indices.end(),
[&weights](int64_t a, int64_t b) { return weights[a] > weights[b]; });
// 返回前k个最大值的索引
return vector<int64_t>(indices.begin(), indices.begin() + k);
}
\ No newline at end of file
......@@ -287,10 +287,15 @@ void TemporalNeighborBlock::update_edge_weight(
for(int64_t i=0; i<edge_num; i++){
//修改节点与邻居边的权重
AT_ASSERTM(this->inverted_index[dst[i]].count(src[i])==1, "Unexist Edge Index: "+to_string(src[i])+", "+to_string(dst[i]));
int index;
if(this->with_eid) index = this->inverted_index[dst[i]][eid_ptr[i]];
else index = this->inverted_index[dst[i]][src[i]];
if(this->with_eid){
AT_ASSERTM(this->inverted_index[dst[i]].count(eid_ptr[i])==1, "Unexist Eid --> Col: "+to_string(eid_ptr[i])+"-->"+to_string(dst[i]));
index = this->inverted_index[dst[i]][eid_ptr[i]];
}
else{
AT_ASSERTM(this->inverted_index[dst[i]].count(src[i])==1, "Unexist Edge Index: "+to_string(src[i])+", "+to_string(dst[i]));
index = this->inverted_index[dst[i]][src[i]];
}
this->edge_weight[dst[i]][index] = ew[i];
}
}
......
......@@ -11,6 +11,7 @@ class TemporalGraphBlock
vector<int64_t> src_index;
vector<NodeIDType> sample_nodes;
vector<TimeStampType> sample_nodes_ts;
vector<WeightType> e_weights;
double sample_time = 0;
double tot_time = 0;
int64_t sample_edge_num = 0;
......
......@@ -105,13 +105,13 @@ void ParallelSampler :: neighbor_sample_from_nodes_static_layer(th::Tensor nodes
// uniform_int_distribution<> u(0, tnb.deg[node]-1);
// while(temp_s.size()!=fanout && temp_s.size()<tnb.neighbors_set[node].size()){
for(int i=0;i<fanout;i++){
//ѭ��ѡ��fanout���ھ�
//循环选择fanout个邻居
NodeIDType indice;
if(policy == "weighted"){//���DZ�Ȩ����Ϣ
if(policy == "weighted"){//考虑边权重信
const vector<WeightType>& ew = tnb.edge_weight[node];
indice = sample_multinomial(ew, e);
}
else if(policy == "uniform"){//���Ȳ���
else if(policy == "uniform"){//均匀采样
// indice = u(e);
indice = rand_r(&loc_seed) % (nei.size());
}
......@@ -119,7 +119,7 @@ void ParallelSampler :: neighbor_sample_from_nodes_static_layer(th::Tensor nodes
auto chosen_e_iter = edge.begin() + indice;
if(part_unique){
auto rst = temp_s.insert(*chosen_n_iter);
if(rst.second){ //���ظ�
if(rst.second){ //不重复
eid_threads[tid].emplace_back(*chosen_e_iter);
node_s_threads[tid].insert(*chosen_n_iter);
if(!tnb.neighbors_set.empty() && temp_s.size()<fanout && temp_s.size()<tnb.neighbors_set[node].size()) fanout++;
......@@ -229,7 +229,7 @@ void ParallelSampler :: neighbor_sample_from_nodes_with_before_layer(
}
}
else{
//��ѡ�ھӱߴ����ȳ��Ļ���Ҫ���ѡ��fanout���ھ�
//可选邻居边大于扇出的话需要随机选择fanout个邻居
tgb_i[tid].src_index.insert(tgb_i[tid].src_index.end(), fanout, i);
uniform_int_distribution<> u(0, end_index-1);
//cout<<end_index<<endl;
......
#pragma once
#include <head.h>
#include <neighbors.h>
# include <output.h>
class ParallelTppRComputer
{
public:
TemporalNeighborBlock& tnb;
NodeIDType num_nodes;
EdgeIDType num_edges;
int threads;
int fanout;//k, width
int num_layers;//depth
int num_tpprs;//n_tpprs
vector<float> alpha_list;
vector<float> beta_list;
// string policy;
PPRListListDictType PPR_list;
PPRListListDictType val_PPR_list;
NormListType norm_list;
NormListType val_norm_list;
vector<vector<TemporalGraphBlock>> ret;
ParallelTppRComputer(TemporalNeighborBlock& _tnb, NodeIDType _num_nodes, EdgeIDType _num_edges, int _threads,
int _fanout, int _num_layers, int _num_tpprs, vector<float>& _alpha_list, vector<float>& _beta_list) :
tnb(_tnb), num_nodes(_num_nodes), num_edges(_num_edges), threads(_threads),
fanout(_fanout), num_layers(_num_layers), num_tpprs(_num_tpprs), alpha_list(_alpha_list), beta_list(_beta_list)
{
omp_set_num_threads(_threads);
ret.clear();
ret = vector<vector<TemporalGraphBlock>>(_num_tpprs, vector<TemporalGraphBlock>());
}
void reset_ret() {
for (int i = 0; i < num_tpprs; ++i) {
ret[i].clear(); // 清空每个内部的 vector
}
}
void reset_ret_i(int tppr_id) {
ret[tppr_id].clear(); // 清空 tppr_id 处的 vector
}
void reset_tppr(){
PPR_list = PPRListListDictType(num_tpprs, PPRListDictType(num_nodes));
norm_list = NormListType(num_tpprs, vector<double>(num_nodes, 0.0));
}
void reset_val_tppr(){
val_PPR_list = PPRListListDictType(num_tpprs, PPRListDictType(num_nodes));
val_norm_list = NormListType(num_tpprs, vector<double>(num_nodes, 0.0));
}
py::tuple backup_tppr(){
return py::make_tuple(this->PPR_list, this->norm_list);
}
void restore_tppr(PPRListListDictType& input_PPR_list, NormListType& input_norm_list){
this->PPR_list = input_PPR_list;
this->norm_list = input_norm_list;
}
void restore_val_tppr(PPRListListDictType& input_PPR_list, NormListType& input_norm_list){
this->val_PPR_list = input_PPR_list;
this->val_norm_list = input_norm_list;
}
PPRDictType compute_s1_s2(NodeIDType s1, NodeIDType s2, int tppr_id, EdgeIDType eid, TimeStampType ts);
void get_pruned_topk(th::Tensor src_nodes, th::Tensor root_ts, int tppr_id);
void extract_streaming_tppr(PPRDictType tppr_dict, TimeStampType current_ts, int index0, int position);
void streaming_topk(th::Tensor src_nodes, th::Tensor root_ts, th::Tensor eids);
void single_streaming_topk(th::Tensor src_nodes, th::Tensor root_ts, th::Tensor eids, int tppr_id);
void streaming_topk_no_fake(th::Tensor src_nodes, th::Tensor root_ts, th::Tensor eids);
void compute_val_tppr(th::Tensor src_nodes, th::Tensor dst_nodes, th::Tensor root_ts, th::Tensor eids);
};
PPRDictType ParallelTppRComputer :: compute_s1_s2(NodeIDType s1, NodeIDType s2, int tppr_id, EdgeIDType eid, TimeStampType ts){
int alpha = alpha_list[tppr_id], beta = beta_list[tppr_id];
vector<double> norm_list = this->norm_list[tppr_id];
PPRListDictType PPR_list = this->PPR_list[tppr_id];
PPRDictType t_s1_PPR= PPRDictType();
PPRDictType updated_tppr= PPRDictType();
float scala_s1, scala_s2;
/***************s1 side*******************/
if(norm_list[s1]==0){
scala_s2 = 1-alpha;
}
else{
t_s1_PPR = PPR_list[s1];
double last_norm = norm_list[s1], new_norm;
new_norm = last_norm*beta+beta;
scala_s1 = last_norm/new_norm*beta;
scala_s2 = beta/new_norm*(1-alpha);
for (const auto& pair : t_s1_PPR)
t_s1_PPR[pair.first] = pair.second*scala_s1;
}
/**************s2 side*******************/
if(norm_list[s1]==0){
t_s1_PPR[make_tuple(eid, s2, ts)] = alpha!=0 ? scala_s2*alpha : scala_s2;
}
else{
PPRDictType s2_PPR= PPR_list[s2];
for (const auto& pair : s2_PPR){
if(t_s1_PPR.count(pair.first)==1)
t_s1_PPR[pair.first] += pair.second*scala_s2;
else
t_s1_PPR[pair.first] = pair.second*scala_s2;
}
t_s1_PPR[make_tuple(eid, s2, ts)] = alpha!=0 ? scala_s2*alpha : scala_s2;
}
/*********exract the top-k items ********/
int tppr_size = t_s1_PPR.size();
if(tppr_size<=this->fanout)
updated_tppr = t_s1_PPR;
else{
std::vector<std::pair<PPRKeyType, PPRValueType>> pairs;
pairs.reserve(t_s1_PPR.size());
// 提取键值对到 pairs 向量
for (const auto& pair : t_s1_PPR) {
pairs.emplace_back(pair.first, pair.second);
}
// 使用并行部分排序来获得前 this->fanout 个元素
std::partial_sort(pairs.begin(), pairs.begin() + this->fanout, pairs.end(),
[](const auto& a, const auto& b) { return a.second > b.second; });
// 将部分排序后的键值对添加到 updated_tppr
for (size_t i = 0; i < this->fanout; ++i) {
const auto& pair = pairs[i];
updated_tppr[pair.first] = pair.second;
}
}
return updated_tppr;
}
void ParallelTppRComputer :: get_pruned_topk(th::Tensor src_nodes, th::Tensor root_ts, int tppr_id){
auto src_nodes_data = get_data_ptr<NodeIDType>(src_nodes);
auto ts_data = get_data_ptr<TimeStampType>(root_ts);
int64_t n_edges = src_nodes.size(0);
float alpha = alpha_list[tppr_id], beta = beta_list[tppr_id];
this->reset_ret_i(tppr_id);
for(int i=0;i<n_edges;i++)
{
NodeIDType target_node = src_nodes_data[i];
TimeStampType target_timestamp = ts_data[i];
PPRDictType tppr_dict;
/*******get dictionary of neighbors*********************/
vector<tuple<NodeIDType, TimeStampType, PPRValueType>> query_list;
query_list.push_back(make_tuple(target_node, target_timestamp, 1.0));
for(int depth=0;depth<this->num_layers;depth++)
{
vector<tuple<NodeIDType, TimeStampType, PPRValueType>> new_query_list;
/*******traverse the query list*********************/
for(int j=0;j<query_list.size();j++)
{
NodeIDType query_node = get<0>(query_list[j]);
NodeIDType query_ts = get<1>(query_list[j]);
NodeIDType query_weight = get<2>(query_list[j]);
int end_index = lower_bound(tnb.timestamp[query_node].begin(), tnb.timestamp[query_node].end(), query_ts)-tnb.timestamp[query_node].begin();
int n_ngh = end_index;
if(n_ngh==0) continue;
else
{
double norm = beta/(1-beta)*(1-pow(beta, n_ngh));
double weight = alpha!=0 && depth==0 ? query_weight*(1-alpha)*beta/norm*alpha : query_weight*(1-alpha)*beta/norm;
for(int z=0;z<min(this->fanout, n_ngh);z++){
EdgeIDType eid = tnb.eid[query_node][end_index-z-1];
NodeIDType node = tnb.neighbors[query_node][end_index-z-1];
// the timestamp here is a neighbor timestamp,
// so that it is indeed a temporal random walk
TimeStampType timestamp = tnb.timestamp[query_node][end_index-z-1];
PPRKeyType state = make_tuple(eid, node, timestamp);
// update dict
if(tppr_dict.count(state)==1)
tppr_dict[state] = tppr_dict[state]+weight;
else
tppr_dict[state] = weight;
// update query list
tuple<NodeIDType, TimeStampType, PPRValueType> new_query = make_tuple(node, timestamp, weight);
new_query_list.push_back(new_query);
// update weight
weight = weight*beta;
}
}
}
if(new_query_list.empty()) break;
else query_list = new_query_list;
}
/*****sort and get the top-k neighbors********/
int tppr_size = tppr_dict.size();
if(tppr_size==0) continue;
TimeStampType current_timestamp = ts_data[i];
PPRDictType updated_tppr= PPRDictType();
if(tppr_size<=this->fanout)
updated_tppr = tppr_dict;
else
{
std::vector<std::pair<PPRKeyType, PPRValueType>> pairs;
pairs.reserve(tppr_dict.size());
// 提取键值对到 pairs 向量
for (const auto& pair : tppr_dict) {
pairs.emplace_back(pair.first, pair.second);
}
// 使用并行部分排序来获得前 this->fanout 个元素
std::partial_sort(pairs.begin(), pairs.begin() + this->fanout, pairs.end(),
[](const auto& a, const auto& b) { return a.second > b.second; });
// 将部分排序后的键值对添加到 updated_tppr
for (size_t i = 0; i < this->fanout; ++i) {
const auto& pair = pairs[i];
updated_tppr[pair.first] = pair.second;
}
}
// this->PPR_list[tppr_id][target_node] = updated_tppr;
extract_streaming_tppr(updated_tppr, current_timestamp, tppr_id, i);
}
}
// category=0-src category=1-dst category=2-fake
void ParallelTppRComputer :: extract_streaming_tppr(PPRDictType tppr_dict, TimeStampType current_ts, int index0, int position){
ret[index0][position] = TemporalGraphBlock();
if(!tppr_dict.empty()){
ret[index0][position].sample_nodes.resize(this->fanout);
ret[index0][position].eid.resize(this->fanout);
ret[index0][position].sample_nodes_ts.resize(this->fanout);
ret[index0][position].e_weights.resize(this->fanout);
ret[index0][position].delta_ts.resize(this->fanout);
int j=0;
for (const auto& pair : tppr_dict){
auto tuple = pair.first;
auto weight = pair.second;
EdgeIDType eid = get<0>(tuple);
NodeIDType dst = get<1>(tuple);
TimeStampType ets = get<2>(tuple);
ret[index0][position].sample_nodes[j]=dst;
ret[index0][position].eid[j]=eid;
ret[index0][position].sample_nodes_ts[j]=ets;
ret[index0][position].e_weights[j]=weight;
ret[index0][position].delta_ts[j]=current_ts-ets;
j++;
}
}
}
void ParallelTppRComputer :: streaming_topk(th::Tensor src_nodes, th::Tensor root_ts, th::Tensor eids){
auto src_nodes_data = get_data_ptr<NodeIDType>(src_nodes);
auto ts_data = get_data_ptr<TimeStampType>(root_ts);
auto eids_data = get_data_ptr<EdgeIDType>(eids);
int n_nodes = src_nodes.size(0);
int n_edges = num_nodes/3;
this->reset_ret();
for(int index0=0;index0<num_tpprs;index0++){
int alpha = alpha_list[index0], beta = beta_list[index0];
ret[index0].resize(n_nodes);
vector<double>& norm_list = this->norm_list[index0];
PPRListDictType& PPR_list = this->PPR_list[index0];
for(int i=0; i<n_edges; i++){
NodeIDType src = src_nodes_data[i];
NodeIDType dst = src_nodes_data[i+n_edges];
NodeIDType fake = src_nodes_data[i+(n_edges<<1)];
TimeStampType ts = ts_data[i];
EdgeIDType eid = eids_data[i];
/******first extract the top-k neighbors and fill the list******/
extract_streaming_tppr(PPR_list[src], ts, index0, i);
extract_streaming_tppr(PPR_list[dst], ts, index0, i+n_edges);
extract_streaming_tppr(PPR_list[fake], ts, index0, i+(n_edges<<1));
/******then update the PPR values here**************************/
PPR_list[src] = compute_s1_s2(src, dst, index0, eid, ts);
norm_list[src] = norm_list[src]*beta+beta;
if(src!=dst){
PPR_list[dst] = compute_s1_s2(dst, src, index0, eid, ts);
norm_list[dst] = norm_list[dst]*beta+beta;
}
}
}
}
void ParallelTppRComputer :: single_streaming_topk(th::Tensor src_nodes, th::Tensor root_ts, th::Tensor eids, int tppr_id){
auto src_nodes_data = get_data_ptr<NodeIDType>(src_nodes);
auto ts_data = get_data_ptr<TimeStampType>(root_ts);
auto eids_data = get_data_ptr<EdgeIDType>(eids);
int n_nodes = src_nodes.size(0);
int n_edges = num_nodes/3;
this->reset_ret_i(tppr_id);
int alpha = alpha_list[tppr_id], beta = beta_list[tppr_id];
ret[tppr_id].resize(n_nodes);
vector<double>& norm_list = this->norm_list[tppr_id];
PPRListDictType& PPR_list = this->PPR_list[tppr_id];
for(int i=0; i<n_edges; i++){
NodeIDType src = src_nodes_data[i];
NodeIDType dst = src_nodes_data[i+n_edges];
NodeIDType fake = src_nodes_data[i+(n_edges<<1)];
TimeStampType ts = ts_data[i];
EdgeIDType eid = eids_data[i];
/******first extract the top-k neighbors and fill the list******/
extract_streaming_tppr(PPR_list[src], ts, tppr_id, i);
extract_streaming_tppr(PPR_list[dst], ts, tppr_id, i+n_edges);
extract_streaming_tppr(PPR_list[fake], ts, tppr_id, i+(n_edges<<1));
/******then update the PPR values here**************************/
PPR_list[src] = compute_s1_s2(src, dst, tppr_id, eid, ts);
norm_list[src] = norm_list[src]*beta+beta;
if(src!=dst){
PPR_list[dst] = compute_s1_s2(dst, src, tppr_id, eid, ts);
norm_list[dst] = norm_list[dst]*beta+beta;
}
}
}
void ParallelTppRComputer :: streaming_topk_no_fake(th::Tensor src_nodes, th::Tensor root_ts, th::Tensor eids){
auto src_nodes_data = get_data_ptr<NodeIDType>(src_nodes);
auto ts_data = get_data_ptr<TimeStampType>(root_ts);
auto eids_data = get_data_ptr<EdgeIDType>(eids);
int n_nodes = src_nodes.size(0);
int n_edges = num_nodes/2;
this->reset_ret();
for(int index0=0;index0<num_tpprs;index0++){
int alpha = alpha_list[index0], beta = beta_list[index0];
ret[index0].resize(n_nodes);
vector<double>& norm_list = this->norm_list[index0];
PPRListDictType& PPR_list = this->PPR_list[index0];
for(int i=0; i<n_edges; i++){
NodeIDType src = src_nodes_data[i];
NodeIDType dst = src_nodes_data[i+n_edges];
TimeStampType ts = ts_data[i];
EdgeIDType eid = eids_data[i];
/******first extract the top-k neighbors and fill the list******/
extract_streaming_tppr(PPR_list[src], ts, index0, i);
extract_streaming_tppr(PPR_list[dst], ts, index0, i+n_edges);
/******then update the PPR values here**************************/
PPR_list[src] = compute_s1_s2(src, dst, index0, eid, ts);
norm_list[src] = norm_list[src]*beta+beta;
if(src!=dst){
PPR_list[dst] = compute_s1_s2(dst, src, index0, eid, ts);
norm_list[dst] = norm_list[dst]*beta+beta;
}
}
}
}
void ParallelTppRComputer :: compute_val_tppr(th::Tensor src_nodes, th::Tensor dst_nodes, th::Tensor root_ts, th::Tensor eids){
auto src_nodes_data = get_data_ptr<NodeIDType>(src_nodes);
auto dst_nodes_data = get_data_ptr<NodeIDType>(dst_nodes);
auto ts_data = get_data_ptr<TimeStampType>(root_ts);
auto eids_data = get_data_ptr<EdgeIDType>(eids);
int n_edges = src_nodes.size(0);
for(int index0=0;index0<num_tpprs;index0++){
int alpha = alpha_list[index0], beta = beta_list[index0];
vector<double>& norm_list = this->norm_list[index0];
PPRListDictType& PPR_list = this->PPR_list[index0];
for(int i=0; i<n_edges; i++){
NodeIDType src = src_nodes_data[i];
NodeIDType dst = dst_nodes_data[i];
TimeStampType ts = ts_data[i];
EdgeIDType eid = eids_data[i];
PPR_list[src] = compute_s1_s2(src, dst, index0, eid, ts);
norm_list[src] = norm_list[src]*beta+beta;
if(src!=dst){
PPR_list[dst] = compute_s1_s2(dst, src, index0, eid, ts);
norm_list[dst] = norm_list[dst]*beta+beta;
}
}
}
this->val_norm_list.assign(this->norm_list.begin(), this->norm_list.end());
this->val_PPR_list.assign(this->PPR_list.begin(), this->PPR_list.end());
}
\ No newline at end of file
......@@ -69,7 +69,7 @@ After defining the configuration file, we can firstly read the parameters from t
Then a :code:`GeneralModel` object is created. If needed, we can adjust the model's parameters by modifying the contents of the configuration file. Here we provide 5 models for continuous-time temporal GNNs:
- :code:`TGN`: The TGN model proposed in `Temporal Graph Networks for Deep Learning on Dynamic Graphs <https://arxiv.org/abs/2006.10637>`__.
- :code:`DyRep`: The DyRep model proposed in `Representation Learning and Reasoning on Temporal Knowledge Graphs <https://arxiv.org/abs/2010.02844>`__.
- :code:`TIGER`: The TIGER model proposed in `TIGER: A Transformer-Based Framework for Temporal Knowledge Graph Completion <https://arxiv.org/abs/2101.00529>`__.
- :code:`Jodie`: The Jodie model proposed in `JODIE: Joint Optimization of Dynamics and Importance for Online Embedding <https://arxiv.org/abs/1902.10197>`__.
- :code:`TGAT`: The TGAT model proposed in `Temporal Graph Attention for Deep Temporal Modeling <https://arxiv.org/abs/2002.06902>`__.
\ No newline at end of file
- :code:`DyRep`: The DyRep model proposed in `Representation Learning and Reasoning on Temporal Knowledge Graphs <https://arxiv.org/abs/1803.04051>`__.
- :code:`TIGER`: The TIGER model proposed in `TIGER: A Transformer-Based Framework for Temporal Knowledge Graph Completion <https://arxiv.org/abs/2302.06057>`__.
- :code:`Jodie`: The Jodie model proposed in `JODIE: Joint Optimization of Dynamics and Importance for Online Embedding <https://arxiv.org/abs/1908.01207>`__.
- :code:`TGAT`: The TGAT model proposed in `Temporal Graph Attention for Deep Temporal Modeling <https://arxiv.org/abs/2002.07962>`__.
\ No newline at end of file
......@@ -11,3 +11,8 @@ cmake .. \
&& mkdir ../starrygl/lib \
&& cp lib*.so ../starrygl/lib/ \
&& patchelf --set-rpath '$ORIGIN:$ORIGIN/lib' --force-rpath ../starrygl/lib/*.so
# -DCMAKE_PREFIX_PATH="/home/zlj/.miniconda3/envs/dgnn/lib/python3.10/site-packages" \
# -DPython3_ROOT_DIR="/home/zlj/.miniconda3/envs/dgnn" \
# -DCUDA_TOOLKIT_ROOT_DIR="/home/zlj/local/cuda-12.2" \
\ No newline at end of file
import torch
import dgl
from os.path import abspath, join, dirname
import sys
sys.path.insert(0, join(abspath(dirname(__file__))))
......
......@@ -155,10 +155,10 @@ class DistributedDataLoader:
self.expected_idx = data_size // self.batch_size if self.drop_last is True else int(math.ceil(data_size/self.batch_size))
if dist.get_world_size() > 1:
num_epochs = torch.tensor([self.expected_idx],dtype = torch.long,device=self.device)
print(num_epochs)
dist.all_reduce(num_epochs, op=op)
self.expected_idx = int(num_epochs.item())
num_batchs = torch.tensor([self.expected_idx],dtype = torch.long,device=self.device)
print("num_batchs:", num_batchs)
dist.all_reduce(num_batchs, op=op)
self.expected_idx = int(num_batchs.item())
def _next_data(self):
if self.current_pos >= self.dataset.len:
......
import os.path as osp
import torch
class GraphData():
def __init__(self, path):
assert path is not None and osp.exists(path),'path 不存在'
id,edge_index,data,partptr =torch.load(path)
# 当前分区序号
self.partition_id = id
# 总分区数
self.partitions = partptr.numel() - 1
# 全图结构数据
self.num_nodes = partptr[self.partitions]
self.num_edges = edge_index[0].numel()
self.edge_index = edge_index
# 该分区下的数据(包含特征向量和子图结构)pyg Data数据结构
self.data = data
# 分区映射关系
self.partptr = partptr
self.eid = [i for i in range(self.num_edges)]
def __init__(self, id, edge_index, data, partptr, timestamp=None):
# 当前分区序号
self.partition_id = id
# 总分区数
self.partitions = partptr.numel() - 1
# 全图结构数据
self.num_nodes = partptr[self.partitions]
if edge_index is not None:
self.num_edges = edge_index[0].numel()
self.edge_index = edge_index
self.edge_ts = timestamp
# 该分区下的数据(包含特征向量和子图结构)pyg Data数据结构
self.data = data
# 分区映射关系
self.partptr = partptr
# edge id
self.eid = torch.tensor([i for i in range(0, self.num_edges)])
def select_attr(self,index):
return torch.index_select(self.data.x,0,index)
#返回全局的节点id 所对应的分区
def get_part_num(self):
return self.data.x.size()[0]
def select_attr(self,index):
return torch.index_select(self.data.x,0,index)
def select_y(self,index):
return torch.index_select(self.data.y,0,index)
#返回全局的节点id 所对应的分区
def get_localId_by_partitionId(self,id,index):
#print(index)
if(id == -1 or id == 0):
return index
else:
return torch.add(index,-self.partptr[id])
def get_globalId_by_partitionId(self,id,index):
if(id == -1 or id == 0):
return index
else:
return torch.add(index,self.partptr[id])
def get_node_num(self):
return self.num_nodes
def localId_to_globalId(self,id,partitionId:int = -1):
'''
将分区partitionId内的点id映射为全局的id
'''
if partitionId == -1:
partitionId = self.partition_id
assert id >=self.partptr[self.partition_id] and id < self.partptr[self.partition_id+1]
ids_before = 0
if self.partition_id>0:
ids_before = self.partptr[self.partition_id-1]
return id+ids_before
def get_partitionId_by_globalId(self,id):
'''
通过全局id得到对应的分区序号
'''
partitionId = -1
assert id>=0 and id<self.num_nodes,'id 超过范围'
for i in range(self.partitions):
if id>=self.partptr[i] and id<self.partptr[i+1]:
partitionId = i
break
assert partitionId>=0, 'id 不存在对应的分区'
return partitionId
def get_nodes_by_partitionId(self,id):
'''
根据partitioId 返回该分区的节点数量
'''
assert id>=0 and id<self.partitions,'partitionId 非法'
return (int)(self.partptr[id+1]-self.partptr[id])
def __repr__(self):
return (f'{self.__class__.__name__}(\n'
f' partition_id={self.partition_id}\n'
f' data={self.data},\n'
f' global_info('
f'num_nodes={self.num_nodes},'
f' num_edges={self.num_edges},'
f' num_parts={self.partitions},'
f' edge_index=[2,{self.edge_index[0].numel()}])\n'
f')')
......@@ -8,7 +8,7 @@ import torch.multiprocessing as mp
from typing import Optional, Tuple
from .base import BaseSampler, NegativeSampling, SampleOutput, SampleType
# from sample_cores import ParallelSampler, get_neighbors, heads_unique
from starrygl.lib.libstarrygl_sampler import ParallelSampler, get_neighbors, heads_unique
from torch.distributed.rpc import rpc_async
......
......@@ -62,7 +62,7 @@ def test():
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
g_data, df = load_gdelt_dataset()
g_data, df = load_reddit_dataset()
print(g_data)
# for worker in [1,2,3,4,5,6,7,8,9,10,20,30]:
# import random
......
......@@ -70,7 +70,7 @@ def test():
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
g_data = load_ogb_dataset()
g_data = load_reddit_dataset()
print(g_data)
from .neighbor_sampler import NeighborSampler, get_neighbors
......
......@@ -43,7 +43,7 @@ parser.add_argument('--rank', default=0, type=int, metavar='W',
parser.add_argument('--patience', type=int, default=5, help='Patience for early stopping')
parser.add_argument('--world_size', default=1, type=int, metavar='W',
help='number of negative samples')
parser.add_argument('--dataname', default=1, type=str, metavar='W',
parser.add_argument('--dataname', default="WIKI", type=str, metavar='W',
help='name of dataset')
parser.add_argument('--model', default='TGN', type=str, metavar='W',
help='name of model')
......@@ -52,17 +52,18 @@ from sklearn.metrics import average_precision_score, roc_auc_score
import torch
import time
import random
import dgl
import numpy as np
from sklearn.metrics import average_precision_score, roc_auc_score
from torch.nn.parallel import DistributedDataParallel as DDP
#os.environ['CUDA_VISIBLE_DEVICES'] = str(args.rank)
#os.environ["RANK"] = str(args.rank)
#os.environ["WORLD_SIZE"] = str(args.world_size)
#os.environ["LOCAL_RANK"] = str(0)
os.environ['CUDA_VISIBLE_DEVICES'] = str(args.rank)
os.environ["RANK"] = str(args.rank)
os.environ["WORLD_SIZE"] = str(args.world_size)
os.environ["LOCAL_RANK"] = str(0)
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
os.environ["MASTER_ADDR"] = '10.214.211.187'
os.environ["MASTER_PORT"] = '9337'
os.environ["MASTER_ADDR"] = '10.214.211.186'
os.environ["MASTER_PORT"] = '9667'
def seed_everything(seed=42):
random.seed(seed)
np.random.seed(seed)
......@@ -80,7 +81,7 @@ def main():
ctx = DistributedContext.init(backend="nccl", use_gpu=True)
device_id = torch.cuda.current_device()
print('use cuda on',device_id)
pdata = partition_load("/mnt/data/part_data/dataset/here/{}".format(args.dataname), algo="metis_for_tgnn")
pdata = partition_load("/mnt/data/part_data/here/{}".format(args.dataname), algo="metis_for_tgnn")
graph = DistributedGraphStore(pdata = pdata)
Path("./saved_models/").mkdir(parents=True, exist_ok=True)
......@@ -149,7 +150,7 @@ def main():
#cache.init_cache_with_presample(trainloader,3)
gnn_dim_node = 0 if graph.x is None else pdata.x.shape[1]
gnn_dim_edge = 0 if graph.edge_attr is None else pdata.edge_attr.shape[1]
print(gnn_dim_node,gnn_dim_edge)
print("gnn_dim_node:", gnn_dim_node, "gnn_dim_edge:", gnn_dim_edge)
avg_time = 0
if use_cuda:
model = GeneralModel(gnn_dim_node, gnn_dim_edge, sample_param, memory_param, gnn_param, train_param).cuda()
......@@ -157,7 +158,7 @@ def main():
else:
model = GeneralModel(gnn_dim_node, gnn_dim_edge, sample_param, memory_param, gnn_param, train_param)
device = torch.device('cpu')
model = DDP(model,find_unused_parameters=True)
model = DDP(model,find_unused_parameters=False)
train_stream = torch.cuda.Stream()
send_stream = torch.cuda.Stream()
scatter_stream = torch.cuda.Stream()
......@@ -178,7 +179,7 @@ def main():
total_loss = 0
signal = torch.tensor([0],dtype = int,device = device)
with torch.cuda.stream(train_stream):
for roots,mfgs,metadata,sample_time in loader:
for roots,mfgs,metadata in loader:
pred_pos, pred_neg = model(mfgs,metadata)
total_loss += creterion(pred_pos, torch.ones_like(pred_pos))
......@@ -226,8 +227,11 @@ def main():
auc_mrr = torch.empty([loader.expected_idx*world_size],dtype = torch.float,device = 'cuda')
dist.all_gather_into_tensor(apc,torch.tensor(aps,device ='cuda',dtype=torch.float))
dist.all_gather_into_tensor(auc_mrr,torch.tensor(aucs_mrrs,device ='cuda',dtype=torch.float))
ap = float(torch.tensor(apc).mean())
auc_mrr = float(torch.tensor(auc_mrr).mean())
# ap = float(torch.tensor(apc).mean())
# auc_mrr = float(torch.tensor(auc_mrr).mean())
ap = float(apc.clone().mean())
auc_mrr = float(auc_mrr.clone().mean())
return ap, auc_mrr
creterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=train_param['lr'])
......@@ -248,8 +252,8 @@ def main():
model.module.memory_updater.last_updated_nid = None
model.module.memory_updater.last_updated_memory = None
model.module.memory_updater.last_updated_ts = None
for roots,mfgs,metadata,sample_time in trainloader:
fetch_time +=sample_time/1000
for roots,mfgs,metadata in trainloader:
# fetch_time +=sample_time/1000
t_prep_s = time.time()
with torch.cuda.stream(train_stream):
......@@ -264,9 +268,9 @@ def main():
optimizer.step()
#torch.cuda.synchronize()
t_prep_s = time.time()
y_pred = torch.cat([pred_pos, pred_neg], dim=0).sigmoid().cpu()
y_true = torch.cat([torch.ones(pred_pos.size(0)), torch.zeros(pred_neg.size(0))], dim=0)
train_aps.append(average_precision_score(y_true, y_pred.detach().numpy()))
# y_pred = torch.cat([pred_pos, pred_neg], dim=0).sigmoid().cpu()
# y_true = torch.cat([torch.ones(pred_pos.size(0)), torch.zeros(pred_neg.size(0))], dim=0)
# train_aps.append(average_precision_score(y_true, y_pred.detach().numpy()))
#start_event = torch.cuda.Event(enable_timing=True)
#end_event = torch.cuda.Event(enable_timing=True)
#start_event.record()
......@@ -323,7 +327,7 @@ def main():
else:
print('\ttrain loss:{:.4f} train ap:{:4f} val ap:{:4f} val auc:{:4f}'.format(total_loss,train_ap, ap, auc))
print('\ttotal time:{:.2f}s prep time:{:.2f}s'.format(time.time()-epoch_start_time, time_prep))
print('\t fetch time:{:.2f}s write back time:{:.2f}s'.format(fetch_time,write_back_time))
# print('\t fetch time:{:.2f}s write back time:{:.2f}s'.format(fetch_time,write_back_time))
torch.save(model.state_dict(), get_checkpoint_path(e))
model.eval()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment