Commit b305c21a by zhlj

fix bugs and add APAN

parents 29970325 82337762
bound.png

16.4 KB

...@@ -37,6 +37,9 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) ...@@ -37,6 +37,9 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m)
.def("src_index", [](const TemporalGraphBlock &tgb) { return vecToTensor<EdgeIDType>(tgb.src_index); }) .def("src_index", [](const TemporalGraphBlock &tgb) { return vecToTensor<EdgeIDType>(tgb.src_index); })
.def("sample_nodes", [](const TemporalGraphBlock &tgb) { return vecToTensor<NodeIDType>(tgb.sample_nodes); }) .def("sample_nodes", [](const TemporalGraphBlock &tgb) { return vecToTensor<NodeIDType>(tgb.sample_nodes); })
.def("sample_nodes_ts", [](const TemporalGraphBlock &tgb) { return vecToTensor<TimeStampType>(tgb.sample_nodes_ts); }) .def("sample_nodes_ts", [](const TemporalGraphBlock &tgb) { return vecToTensor<TimeStampType>(tgb.sample_nodes_ts); })
.def("sample_weight",[](const TemporalGraphBlock &tgb){
return vecToTensor<float>(tgb.sample_weight);
})
.def_readonly("sample_time", &TemporalGraphBlock::sample_time, py::return_value_policy::reference) .def_readonly("sample_time", &TemporalGraphBlock::sample_time, py::return_value_policy::reference)
.def_readonly("tot_time", &TemporalGraphBlock::tot_time, py::return_value_policy::reference) .def_readonly("tot_time", &TemporalGraphBlock::tot_time, py::return_value_policy::reference)
.def_readonly("sample_edge_num", &TemporalGraphBlock::sample_edge_num, py::return_value_policy::reference); .def_readonly("sample_edge_num", &TemporalGraphBlock::sample_edge_num, py::return_value_policy::reference);
......
...@@ -11,6 +11,7 @@ class TemporalGraphBlock ...@@ -11,6 +11,7 @@ class TemporalGraphBlock
vector<int64_t> src_index; vector<int64_t> src_index;
vector<NodeIDType> sample_nodes; vector<NodeIDType> sample_nodes;
vector<TimeStampType> sample_nodes_ts; vector<TimeStampType> sample_nodes_ts;
vector<float> sample_weight;
vector<WeightType> e_weights; vector<WeightType> e_weights;
double sample_time = 0; double sample_time = 0;
double tot_time = 0; double tot_time = 0;
......
...@@ -308,9 +308,14 @@ void ParallelSampler :: neighbor_sample_from_nodes_with_before_layer( ...@@ -308,9 +308,14 @@ void ParallelSampler :: neighbor_sample_from_nodes_with_before_layer(
double p0 = (double)rand_r(&loc_seeds[tid]) / (RAND_MAX + 1.0); double p0 = (double)rand_r(&loc_seeds[tid]) / (RAND_MAX + 1.0);
double ep = boundery_probility*pr[cal_cnt-1]/sum_p*sum_1; double ep = boundery_probility*pr[cal_cnt-1]/sum_p*sum_1;
if(p0 > ep)continue; if(p0 > ep)continue;
tgb_i[tid].sample_weight.emplace_back((float)ep);
} }
else continue; else continue;
//cout<<"in"<<endl; //cout<<"in"<<endl;
}
else{
tgb_i[tid].sample_weight.emplace_back((float)1.0);
} }
tgb_i[tid].src_index.emplace_back(i); tgb_i[tid].src_index.emplace_back(i);
tgb_i[tid].sample_nodes.emplace_back(tnb.neighbors[node][cid]); tgb_i[tid].sample_nodes.emplace_back(tnb.neighbors[node][cid]);
...@@ -358,6 +363,8 @@ void ParallelSampler :: neighbor_sample_from_nodes_with_before_layer( ...@@ -358,6 +363,8 @@ void ParallelSampler :: neighbor_sample_from_nodes_with_before_layer(
each_begin[i]=size; each_begin[i]=size;
size += s; size += s;
} }
if(policy == "boundery_recent_decay")
ret[cur_layer].sample_weight.resize(size);
ret[cur_layer].eid.resize(size); ret[cur_layer].eid.resize(size);
ret[cur_layer].src_index.resize(size); ret[cur_layer].src_index.resize(size);
ret[cur_layer].delta_ts.resize(size); ret[cur_layer].delta_ts.resize(size);
...@@ -366,6 +373,8 @@ void ParallelSampler :: neighbor_sample_from_nodes_with_before_layer( ...@@ -366,6 +373,8 @@ void ParallelSampler :: neighbor_sample_from_nodes_with_before_layer(
#pragma omp parallel for schedule(static, 1) #pragma omp parallel for schedule(static, 1)
for(int i = 0; i<threads; i++){ for(int i = 0; i<threads; i++){
if(policy == "boundery_recent_decay")
copy(tgb_i[i].sample_weight.begin(), tgb_i[i].sample_weight.end(), ret[cur_layer].sample_weight.begin()+each_begin[i]);
copy(tgb_i[i].eid.begin(), tgb_i[i].eid.end(), ret[cur_layer].eid.begin()+each_begin[i]); copy(tgb_i[i].eid.begin(), tgb_i[i].eid.end(), ret[cur_layer].eid.begin()+each_begin[i]);
copy(tgb_i[i].src_index.begin(), tgb_i[i].src_index.end(), ret[cur_layer].src_index.begin()+each_begin[i]); copy(tgb_i[i].src_index.begin(), tgb_i[i].src_index.end(), ret[cur_layer].src_index.begin()+each_begin[i]);
copy(tgb_i[i].delta_ts.begin(), tgb_i[i].delta_ts.end(), ret[cur_layer].delta_ts.begin()+each_begin[i]); copy(tgb_i[i].delta_ts.begin(), tgb_i[i].delta_ts.end(), ret[cur_layer].delta_ts.begin()+each_begin[i]);
......
bash test_all.sh 13357 > 13357.out
wait
bash test_all.sh 12347 > 12347.out
wait
bash test_all.sh 63377 > 63377.out
wait
bash test_all.sh 53473 > 53473.out
wait
bash test_all.sh 54763 > 54763.out
wait
\ No newline at end of file
import matplotlib.pyplot as plt
import numpy as np
import torch
# 读取文件内容
ssim_values = [0, 0.1, 0.2, 0.3, 0.4, 2] # 假设这是你的 ssim 参数值
probability_values = [1,0.1,0.05,0.01,0]
data_values = ['WIKI','LASTFM','WikiTalk','DGraphFin'] # 存储从文件中读取的数据
seed = ['13357','12347','63377','53473',' 54763']
partition = 'ours_shared'
# 从文件中读取数据,假设数据存储在文件 data.txt 中
#all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out
partitions=4
topk=0.01
mem='all_update'#'historical'
model='TGN'
for sd in seed :
for data in data_values:
ap_list = []
comm_list = []
for p in probability_values:
if data == 'WIKI' or data =='LASTFM':
model = 'TGN'
else:
model = 'TGN_large'
if p == 1:
file = 'all_{}/{}/{}/{}-{}-{}-{}-recent.out'.format(sd,data,model,partitions,partition,topk,mem)
else:
file = 'all_{}/{}/{}/{}-{}-{}-{}-boundery_recent_decay-{}.out'.format(sd,data,model,partitions,partition,topk,mem,p)
prefix = "val ap:"
max_val_ap = 0
test_ap = 0
with open(file, 'r') as file:
for line in file:
if line.find(prefix)!=-1:
pos = line.find(prefix)+len(prefix)
posr = line.find(' ',pos)
#print(line[pos:posr])
val_ap = float(line[pos:posr])
pos = line.find("test ap ")+len("test ap ")
posr = line.find(' ',pos)
#print(line[pos:posr])
_test_ap = float(line[pos:posr])
if(val_ap>max_val_ap):
max_val_ap = val_ap
test_ap = _test_ap
ap_list.append(test_ap)
print('data {} seed {} ap: {}'.format(data,sd,ap_list))
# prefix = 'best test AP:'
# cnt = 0
# sum = 0
# with open(file, 'r') as file:
# for line in file:
# if line.startswith(prefix):
# ap = float(line.lstrip(prefix).split(' ')[0])
# pos = line.find('remote node number tensor')
# if(pos!=-1):
# posr = line.find(']',pos+2+len('remote node number tensor'),)
# #print(line,line[pos+2+len('remote node number tensor'):posr])
# comm = int(line[pos+2+len('remote node number tensor'):posr])
# #print()
# sum = sum+comm
# cnt = cnt+1
# #print(comm)
# ap_list.append(ap)
# comm_list.append(sum/cnt*4)
# # 绘制柱状图
# print('{} TestAP={}\n'.format(data,ap_list))
# bar_width = 0.4
# #shared comm tensor
# # 设置柱状图的位置
# bars = range(len(probability_values))
# # 绘制柱状图
# plt.bar([b for b in bars], ap_list, width=bar_width)
# # 绘制柱状图
# plt.ylim([0.9,1])
# plt.xticks([b for b in bars], probability_values)
# plt.xlabel('probability')
# plt.ylabel('Test AP')
# plt.title('{}({} partitions)'.format(data,partitions))
# plt.savefig('boundary_AP_{}_{}_{}.png'.format(data,partitions,model))
# plt.clf()
# print(comm_list)
# plt.bar([b for b in bars], comm_list, width=bar_width)
# # 绘制柱状图
# plt.xticks([b for b in bars], probability_values)
# plt.xlabel('probability')
# plt.ylabel('Communication volume')
# plt.title('{}({} partitions)'.format(data,partitions))
# plt.savefig('boundary_comm_{}_{}_{}.png'.format(data,partitions,model))
# plt.clf()
# if partition == 'ours_shared':
# partition0 = 'ours'
# else:
# partition0=partition
# for p in probability_values:
# file = '{}/{}/test_{}_{}_{}_0_boundery_recent_uniform_{}_all_update_2.pt'.format(data,model,partition0,topk,partitions,float(p))
# val_ap = torch.tensor(torch.load(file))[:,0]
# epoch = torch.arange(val_ap.shape[0])
# #绘制曲线图
# plt.plot(epoch,val_ap, label='probability={}'.format(p))
# plt.xlabel('Epoch')
# plt.ylabel('Val AP')
# plt.title('{}({} partitions)'.format(data,partitions))
# # plt.grid(True)
# plt.legend()
# plt.savefig('{}_{}_{}_boundary_Convergence_rate.png'.format(data,partitions,model))
# plt.clf()
#!/bin/bash
# 定义数组变量
seed=$1
addr="192.168.1.107"
partition_params=("ours" )
#"metis" "ldg" "random")
#("ours" "metis" "ldg" "random")
partitions="4"
node_per="4"
nnodes="1"
node_rank="0"
probability_params=("0.1" "0" "0.05" "0.01")
sample_type_params=("boundery_recent_decay" "recent")
#sample_type_params=("recent" "boundery_recent_decay") #"boundery_recent_uniform")
#memory_type=("all_update" "p2p" "all_reduce" "historical" "local")
memory_type=( "all_update")
#memory_type=("local" "all_update" "historical" "all_reduce")
shared_memory_ssim=("0.3" "0.7")
#data_param=("WIKI" "REDDIT" "LASTFM" "WikiTalk")
data_param=("WIKI" "LASTFM" "WikiTalk" "DGraphFin")
#data_param=("WIKI" "REDDIT" "LASTFM" "DGraphFin" "WikiTalk" "StackOverflow")
#data_param=("WIKI" "REDDIT" "LASTFM" "WikiTalk" "StackOverflow")
#data_param=("REDDIT" "WikiTalk")
# 创建输出目录
# 遍历数组并执行命令
#seed=(( RANDOM % 1000000 + 1 ))
mkdir -p all_"$seed"
for data in "${data_param[@]}"; do
model="TGN_large"
if [ "$data" = "WIKI" ] || [ "$data" = "REDDIT" ] || [ "$data" = "LASTFM" ]; then
model="TGN"
fi
#model="APAN"
mkdir all_"$seed"/"$data"
mkdir all_"$seed"/"$data"/"$model"
mkdir all_"$seed"/"$data"/"$model"/comm
#torchrun --nnodes "$nnodes" --node_rank 0 --nproc-per-node 1 --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition ours --memory_type local --sample_type recent --topk 0 --seed "$seed" > all_"$seed"/"$data"/"$model"/1.out &
wait
for partition in "${partition_params[@]}"; do
for sample in "${sample_type_params[@]}"; do
if [ "$sample" = "recent" ]; then
for mem in "${memory_type[@]}"; do
if [ "$mem" = "historical" ]; then
for ssim in "${shared_memory_ssim[@]}"; do
if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --memory_type "$mem" --shared_memory_ssim "$ssim" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out &
wait
fi
done
elif [ "$mem" = "all_reduce" ]; then
if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --memory_type "$mem" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out &
wait
fi
else
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --memory_type "$mem" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-"$partition"-0-"$mem"-"$sample".out &
wait
if [ "$partition" = "ours" ] && [ "$mem" != "all_local" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --memory_type "$mem" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out &
wait
fi
fi
done
else
for pro in "${probability_params[@]}"; do
for mem in "${memory_type[@]}"; do
if [ "$mem" = "historical" ]; then
for ssim in "${shared_memory_ssim[@]}"; do
if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --shared_memory_ssim "$ssim" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample"-"$pro".out &
wait
fi
done
elif [ "$mem" = "all_reduce" ]; then
if [ "$partition" = "ours"]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out&
wait
fi
else
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-"$partition"-0-"$mem"-"$sample"-"$pro".out &
wait
if [ "$partition" = "ours" ] && [ "$mem" != "all_local" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out &
wait
fi
fi
done
done
fi
done
done
done
# for data in "${data_param[@]}"; do
# model="JODILE"
# if [ "$data" = "WIKI" ] || [ "$data" = "REDDIT" ] || [ "$data" = "LASTFM" ]; then
# model="JODIE"
# fi
# #model="APAN"
# mkdir all_"$seed"/"$data"
# mkdir all_"$seed"/"$data"/"$model"
# mkdir all_"$seed"/"$data"/"$model"/comm
# #torchrun --nnodes "$nnodes" --node_rank 0 --nproc-per-node 1 --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition ours --memory_type local --sample_type recent --topk 0 --seed "$seed" > all_"$seed"/"$data"/"$model"/1.out &
# wait
# for partition in "${partition_params[@]}"; do
# for sample in "${sample_type_params[@]}"; do
# if [ "$sample" = "recent" ]; then
# for mem in "${memory_type[@]}"; do
# if [ "$mem" = "historical" ]; then
# for ssim in "${shared_memory_ssim[@]}"; do
# if [ "$partition" = "ours" ]; then
# torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" --shared_memory_ssim "$ssim" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out &
# wait
# fi
# done
# elif [ "$mem" = "all_reduce" ]; then
# if [ "$partition" = "ours" ]; then
# torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out &
# wait
# fi
# else
# torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --memory_type "$mem" > all_"$seed"/"$data"/"$model"/"$partitions"-"$partition"-0-"$mem"-"$sample".out &
# wait
# if [ "$partition" = "ours" ] && [ "$mem" != "all_local" ]; then
# torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out &
# wait
# fi
# fi
# done
# else
# for pro in "${probability_params[@]}"; do
# for mem in "${memory_type[@]}"; do
# if [ "$mem" = "historical" ]; then
# continue
# # for ssim in "${shared_memory_ssim[@]}"; do
# # if [ "$partition" = "ours" ]; then
# # torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --shared_memory_ssim "$ssim" > all_"$seed"/"$data"/"$partitions"-ours_shared-0.01"$mem"-"$ssim"-"$sample"-"$pro".out &
# # wait
# # fi
# # done
# elif [ "$mem" = "all_reduce" ]; then
# if [ "$partition" = "ours"]; then
# torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out&
# wait
# fi
# else
# torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all_"$seed"/"$data"/"$model"/"$partitions"-"$partition"-0-"$mem"-"$sample"-"$pro".out &
# wait
# if [ "$partition" = "ours" ] && [ "$mem" != "all_local" ]; then
# torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out &
# wait
# fi
# fi
# done
# done
# fi
# done
# done
# done
import argparse
import os
import profile
import sys
import psutil
from os.path import abspath, join, dirname
current_path = os.path.dirname(os.path.abspath(__file__))
parent_path = os.path.abspath(os.path.join(current_path, os.pardir))
sys.path.append(parent_path)
from starrygl.sample.part_utils.transformer_from_speed import load_from_shared_node_partition, load_from_speed
from starrygl.sample.count_static import time_count
from starrygl.sample.sample_core.LocalNegSampling import LocalNegativeSampling
from starrygl.distributed.context import DistributedContext
from starrygl.distributed.utils import DistIndex
from starrygl.module.modules import GeneralModel
from pathlib import Path
from pathlib import Path
from starrygl.module.utils import parse_config
from starrygl.sample.cache.fetch_cache import FetchFeatureCache
from starrygl.sample.graph_core import DataSet, DistributedGraphStore, TemporalNeighborSampleGraph
from starrygl.module.utils import parse_config, EarlyStopMonitor
from starrygl.sample.graph_core import DataSet, DistributedGraphStore, TemporalNeighborSampleGraph
from starrygl.sample.memory.shared_mailbox import SharedMailBox
from starrygl.sample.sample_core.base import NegativeSampling
from starrygl.sample.sample_core.neighbor_sampler import NeighborSampler
from starrygl.sample.part_utils.partition_tgnn import partition_load
import torch
import time
import torch
import torch.nn.functional as F
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
from starrygl.sample.count_static import time_count as tt
import os
from starrygl.sample.data_loader import DistributedDataLoader
from starrygl.sample.batch_data import SAMPLE_TYPE
from starrygl.sample.stream_manager import getPipelineManger
from torch.profiler import profile, record_function, ProfilerActivity
parser = argparse.ArgumentParser(
description="RPC Reinforcement Learning Example",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument('--rank', default=0, type=int, metavar='W',
help='name of dataset')
parser.add_argument('--local_rank', default=0, type=int, metavar='W',
help='name of dataset')
parser.add_argument('--patience', type=int, default=20, help='Patience for early stopping')
parser.add_argument('--world_size', default=1, type=int, metavar='W',
help='number of negative samples')
parser.add_argument('--dataname', default="WIKI", type=str, metavar='W',
help='name of dataset')
parser.add_argument('--model', default='TGN', type=str, metavar='W',
help='name of model')
parser.add_argument('--part_test', default='part', type=str, metavar='W',
help='name of model')
parser.add_argument('--partition', default='part', type=str, metavar='W',
help='name of model')
parser.add_argument('--topk', default='0', type=str, metavar='W',
help='name of model')
parser.add_argument('--probability', default=1, type=float, metavar='W',
help='name of model')
parser.add_argument('--sample_type', default='recent', type=str, metavar='W',
help='name of model')
parser.add_argument('--local_neg_sample', default=False, type=bool, metavar='W',
help='name of model')
parser.add_argument('--shared_memory_ssim', default=2, type=float, metavar='W',
help='name of model')
parser.add_argument('--neg_samples', default=1, type=int, metavar='W',
help='name of model')
parser.add_argument('--eval_neg_samples', default=1, type=int, metavar='W',
help='name of model')
parser.add_argument('--memory_type', default='all_update', type=str, metavar='W',
help='name of model')
parser.add_argument('--seed', default=6773, type=int, metavar='W',
help='name of model')
#boundery_recent_uniform boundery_recent_decay
args = parser.parse_args()
if args.memory_type == 'all_local' or args.topk != '0':
train_cross_probability = 0
else:
train_cross_probability = 1
if args.memory_type == 'all_local':
args.sample_type = 'boundery_recent_uniform'
args.probability = 0
from sklearn.metrics import average_precision_score, roc_auc_score
import torch
import time
import random
import numpy as np
from sklearn.metrics import average_precision_score, roc_auc_score
from torch.nn.parallel import DistributedDataParallel as DDP
def get_network_interfaces_with_prefix(prefixs):
interfaces = psutil.net_if_addrs()
matching_interfaces = [iface for iface in interfaces if iface.startswith(prefixs[0]) or iface.startswith(prefixs[1])]
return matching_interfaces
# Example usage
prefix = ("ens4f1np1","ens6f0np0")
matching_interfaces = get_network_interfaces_with_prefix(prefix)
print(f"Network interfaces with prefix '{prefix}': {matching_interfaces}")
#os.environ['CUDA_VISIBLE_DEVICES'] = '2'#str(args.rank)
if not 'WORLD_SIZE' in os.environ:
os.environ["RANK"] = str(args.rank)
os.environ["WORLD_SIZE"] = str(args.world_size)
os.environ["LOCAL_RANK"] = str(args.local_rank)
if not 'MASTER_ADDR' in os.environ:
os.environ["MASTER_ADDR"] = '192.168.2.107'
if not 'MASTER_PORT' in os.environ:
os.environ["MASTER_PORT"] = '9337'
os.environ["NCCL_IB_DISABLE"]='1'
os.environ['NCCL_SOCKET_IFNAME']=matching_interfaces[0]
print('rank {}'.format(int(os.environ["LOCAL_RANK"])))
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
local_rank = int(os.environ["LOCAL_RANK"])
def seed_everything(seed=42):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
total_next_batch = 0
total_forward = 0
total_count_score = 0
total_backward = 0
total_prepare_mail = 0
total_update_mail = 0
total_update_memory =0
total_remote_update = 0
def count_empty():
global total_next_batch
global total_forward
global total_count_score
global total_backward
global total_prepare_mail
global total_update_mail
global total_update_memory
global total_remote_update
total_next_batch = 0
total_forward = 0
total_count_score = 0
total_backward = 0
total_prepare_mail = 0
total_update_mail = 0
total_update_memory =0
total_remote_update = 0
def add(t1,t2,t3,t4,t5,t6,t7,t8):
global total_next_batch
global total_forward
global total_count_score
global total_backward
global total_prepare_mail
global total_update_mail
global total_update_memory
global total_remote_update
total_next_batch += t1
total_forward += t2
total_count_score += t4
total_backward += t3
total_prepare_mail += t5
total_update_mail += t6
total_update_memory +=t7
total_remote_update += t8
def query():
global total_next_batch
global total_forward
global total_count_score
global total_backward
global total_prepare_mail
global total_update_mail
global total_update_memory
global total_remote_update
global total_next_batch
return {
"total_next_batch":total_next_batch,
"total_forward" :total_forward ,
"total_count_score" :total_count_score ,
"total_backward" :total_backward ,
"total_prepare_mail" :total_prepare_mail ,
"total_update_mail" :total_update_mail ,
"total_update_memory":total_update_memory,
"total_remote_update":total_remote_update,}
def main():
#torch.autograd.set_detect_anomaly(True)
print('LOCAL RANK {}, RANK{}'.format(os.environ["LOCAL_RANK"],os.environ["RANK"]))
use_cuda = True
sample_param, memory_param, gnn_param, train_param = parse_config('../config/{}.yml'.format(args.model))
memory_param['mode'] = args.memory_type
ctx = DistributedContext.init(backend="nccl", use_gpu=True,memory_group_num=1,cache_use_rpc=True)
torch.set_num_threads(10)
device_id = torch.cuda.current_device()
if ((args.dataname =='GDELT') & (dist.get_world_size() <=4 )):
graph,full_sampler_graph,train_mask,val_mask,test_mask,full_train_mask,cache_route = load_from_speed(args.dataname,seed=123457,top=args.topk,sampler_graph_add_rev=True, feature_device=torch.device('cpu'),partition=args.partition)#torch.device('cpu'))
else:
graph,full_sampler_graph,train_mask,val_mask,test_mask,full_train_mask,cache_route = load_from_speed(args.dataname,seed=123457,top=args.topk,sampler_graph_add_rev=True, feature_device=torch.device('cuda:{}'.format(ctx.local_rank)),partition=args.partition)#torch.device('cpu'))
if(args.dataname=='GDELT'):
train_param['epoch'] = 10
#torch.autograd.set_detect_anomaly(True)
# 确保 CUDA 可用
if torch.cuda.is_available():
print("Total GPU memory: ", torch.cuda.get_device_properties(0).total_memory/1024**3)
print("Current GPU memory allocated: ", torch.cuda.memory_allocated(0)/1024**3)
print("Current GPU memory reserved: ", torch.cuda.memory_reserved(0)/1024**3)
print("Max GPU memory allocated during this session: ", torch.cuda.max_memory_allocated(0))
print("Max GPU memory reserved during this session: ", torch.cuda.max_memory_reserved(0))
else:
print("CUDA is not available.")
full_dst = full_sampler_graph['edge_index'][1,torch.arange(0,full_sampler_graph['edge_index'].shape[1],2)]
sample_graph = TemporalNeighborSampleGraph(full_sampler_graph,mode = 'full',dist_eid_mapper=graph.eids_mapper)
eval_sample_graph = TemporalNeighborSampleGraph(full_sampler_graph,mode = 'full',dist_eid_mapper=graph.eids_mapper)
Path("../saved_models/").mkdir(parents=True, exist_ok=True)
Path("../saved_checkpoints/").mkdir(parents=True, exist_ok=True)
get_checkpoint_path = lambda \
epoch: f'../saved_checkpoints/{args.model}-{args.dataname}-{epoch}.pth'
gnn_param['dyrep'] = True if args.model == 'DyRep' else False
use_src_emb = gnn_param['use_src_emb'] if 'use_src_emb' in gnn_param else False
use_dst_emb = gnn_param['use_dst_emb'] if 'use_dst_emb' in gnn_param else False
fanout = []
num_layers = sample_param['layer'] if 'layer' in sample_param else 1
fanout = sample_param['neighbor'] if 'neighbor' in sample_param else [10]
policy = sample_param['strategy'] if 'strategy' in sample_param else 'recent'
policy_train = args.sample_type#'boundery_recent_decay'
if memory_param['type'] != 'none':
mailbox = SharedMailBox(graph.ids.shape[0], memory_param, dim_edge_feat = graph.efeat.shape[1] if graph.efeat is not None else 0,
shared_nodes_index=graph.shared_nids_list[ctx.memory_group_rank],device = torch.device('cuda:{}'.format(local_rank)),cache_route = cache_route,shared_ssim=args.shared_memory_ssim)
else:
mailbox = None
sampler = NeighborSampler(num_nodes=graph.num_nodes, num_layers=num_layers, fanout=fanout,graph_data=sample_graph, workers=10,policy = policy_train, graph_name = "train",local_part=dist.get_rank(),edge_part=DistIndex(graph.eids_mapper).part,node_part=DistIndex(graph.nids_mapper).part,probability=args.probability)
eval_sampler = NeighborSampler(num_nodes=graph.num_nodes, num_layers=num_layers, fanout=fanout,graph_data=eval_sample_graph, workers=10,policy = policy_train, graph_name = "eval",local_part=dist.get_rank(),edge_part=DistIndex(graph.eids_mapper).part,node_part=DistIndex(graph.nids_mapper).part,probability=args.probability)
train_data = torch.masked_select(graph.edge_index,train_mask.to(graph.edge_index.device)).reshape(2,-1)
train_ts = torch.masked_select(graph.ts,train_mask.to(graph.edge_index.device))
print('part {}\n'.format(DistIndex(graph.nids_mapper[train_data]).part))
test_range = torch.arange(0,full_sampler_graph['eids'].shape[0],2)
eval_train_data = torch.masked_select(full_sampler_graph['edge_index'][:,test_range],full_train_mask.to(graph.edge_index.device)).reshape(2,-1)
eval_train_ts = torch.masked_select(full_sampler_graph['ts'][test_range],full_train_mask.to(graph.edge_index.device))
test_data = torch.masked_select(full_sampler_graph['edge_index'][:,test_range],test_mask.to(graph.edge_index.device)).reshape(2,-1)
test_ts = torch.masked_select(full_sampler_graph['ts'][test_range],test_mask.to(graph.edge_index.device))
val_data = torch.masked_select(full_sampler_graph['edge_index'][:,test_range],val_mask.to(graph.edge_index.device)).reshape(2,-1)
val_ts = torch.masked_select(full_sampler_graph['ts'][test_range],val_mask.to(graph.edge_index.device))
train_data = DataSet(edges = train_data,ts =train_ts,eids = torch.nonzero(train_mask).reshape(-1))
eval_train_data = DataSet(edges = eval_train_data,ts = eval_train_ts,eids = full_train_mask.nonzero().reshape(-1))
test_data = DataSet(edges = test_data,ts =test_ts,eids = test_mask.nonzero().reshape(-1))
val_data = DataSet(edges = val_data,ts = val_ts,eids = val_mask.nonzero().reshape(-1))
print('ts {} {} {} {}'.format(train_data.ts,eval_train_data.ts,test_data.ts,val_data.ts))
neg_samples = args.eval_neg_samples
mask = DistIndex(graph.nids_mapper[graph.edge_index[1,:]].to('cpu')).part == dist.get_rank()
if args.local_neg_sample:
print('dst len {} origin len {}'.format(graph.edge_index[1,mask].unique().shape[0],full_dst.unique().shape[0]))
train_neg_sampler = LocalNegativeSampling('triplet',amount = args.neg_samples,dst_node_list = graph.edge_index[1,mask].unique())
else:
#train_neg_sampler = LocalNegativeSampling('triplet',amount = args.neg_samples,dst_node_list = full_dst.unique())
train_neg_sampler = LocalNegativeSampling('triplet',amount = args.neg_samples,dst_node_list = full_dst.unique(),local_mask=(DistIndex(graph.nids_mapper[full_dst.unique()].to('cpu')).part == dist.get_rank()),prob=args.probability)
print(train_neg_sampler.dst_node_list)
neg_sampler = LocalNegativeSampling('triplet',amount= neg_samples,dst_node_list = full_dst.unique(),seed=args.seed)
trainloader = DistributedDataLoader(graph,eval_train_data,sampler = sampler,
sampler_fn = SAMPLE_TYPE.SAMPLE_FROM_TEMPORAL_EDGES,
neg_sampler=train_neg_sampler,
batch_size = int(train_param['batch_size'])*dist.get_world_size(),
shuffle=False,
drop_last=True,
chunk_size = None,
mode='train',
queue_size = 200,
mailbox = mailbox,
is_pipeline=True,
use_local_feature = False,
device = torch.device('cuda:{}'.format(local_rank)),
probability=args.probability,
reversed = (gnn_param['arch'] == 'identity')
)
eval_trainloader = DistributedDataLoader(graph,eval_train_data,sampler = eval_sampler,
sampler_fn = SAMPLE_TYPE.SAMPLE_FROM_TEMPORAL_EDGES,
neg_sampler=neg_sampler,
batch_size = train_param['batch_size'],
shuffle=False,
drop_last=False,
chunk_size = None,
mode='eval_train',
queue_size = 100,
mailbox = mailbox,
device = torch.device('cuda:{}'.format(local_rank)),
reversed = (gnn_param['arch']=='identity')
)
testloader = DistributedDataLoader(graph,test_data,sampler = eval_sampler,
sampler_fn = SAMPLE_TYPE.SAMPLE_FROM_TEMPORAL_EDGES,
neg_sampler=neg_sampler,
batch_size = train_param['batch_size']*dist.get_world_size(),
shuffle=False,
drop_last=False,
chunk_size = None,
mode='test',
queue_size = 100,
mailbox = mailbox,
device = torch.device('cuda:{}'.format(local_rank)),
reversed = (gnn_param['arch']=='identity')
)
valloader = DistributedDataLoader(graph,val_data,sampler = eval_sampler,
sampler_fn = SAMPLE_TYPE.SAMPLE_FROM_TEMPORAL_EDGES,
neg_sampler=neg_sampler,
batch_size = train_param['batch_size']*dist.get_world_size(),
shuffle=False,
drop_last=False,
chunk_size = None,
train=False,
mode='val',
queue_size = 100,
mailbox = mailbox,
device = torch.device('cuda:{}'.format(local_rank)),
reversed = (gnn_param['arch']=='identity')
)
print('init dataloader')
gnn_dim_node = 0 if graph.nfeat is None else graph.nfeat.shape[1]
gnn_dim_edge = 0 if graph.efeat is None else graph.efeat.shape[1]
print('dim_node {} dim_edge {}\n'.format(gnn_dim_node,gnn_dim_edge))
avg_time = 0
if use_cuda:
model = GeneralModel(gnn_dim_node, gnn_dim_edge, sample_param, memory_param, gnn_param, train_param,graph.ids.shape[0],mailbox).cuda()
device = torch.device('cuda')
else:
model = GeneralModel(gnn_dim_node, gnn_dim_edge, sample_param, memory_param, gnn_param, train_param,graph.ids.shape[0],mailbox)
device = torch.device('cpu')
model = DDP(model,find_unused_parameters=True)
def count_parameters(model):
return sum(p.numel()*p.element_size()/1024/1024 for p in model.parameters() if p.requires_grad)
print(f'The model has {count_parameters(model):,} trainable parameters')
train_stream = torch.cuda.Stream()
def eval(mode='val'):
model.eval()
aps = list()
aucs_mrrs = list()
if mode == 'val':
loader = valloader
elif mode == 'test':
loader = testloader
elif mode == 'train':
loader = eval_trainloader
err_cnt = 0
err_cross_part = 0
true_cnt = 0
true_cross_cnt = 0
with torch.no_grad():
total_loss = 0
signal = torch.tensor([0],dtype = int,device = device)
for roots,mfgs,metadata in loader:
"""
if ctx.memory_group == 0:
pred_pos, pred_neg = model(mfgs,metadata,neg_samples=neg_samples)
#print('check {}\n'.format(model.module.memory_updater.last_updated_nid))
y_pred = torch.cat([pred_pos, pred_neg], dim=0).sigmoid().cpu()
y_true = torch.cat([torch.ones(pred_pos.size(0)), torch.zeros(pred_neg.size(0))], dim=0)
aps.append(average_precision_score(y_true, y_pred.detach().numpy()))
aucs_mrrs.append(roc_auc_score(y_true, y_pred))
"""
if mailbox is not None:
if(graph.efeat.device.type != 'cpu'):
edge_feats = graph.get_local_efeat(graph.eids_mapper[roots.eids.to('cpu')]).to('cuda')
#edge_feats = graph.get_dist_efeat(graph.eids_mapper[roots.eids.to('cpu')].to('cuda'),is_sorted = False) #graph.efeat[roots.eids.to('cpu')].to('cuda')
else:
edge_feats = graph.get_local_efeat(graph.eids_mapper[roots.eids.to('cpu')])
src = metadata['src_pos_index']
dst = metadata['dst_pos_index']
ts = roots.ts
update_mail = True
param = (update_mail,src,dst,ts,edge_feats,loader.async_feature)
else:
param = None
pred_pos, pred_neg = model(mfgs,metadata,neg_samples=args.neg_samples,async_param = param)
y_pred = torch.cat([pred_pos, pred_neg], dim=0).sigmoid().cpu()
y_true = torch.cat([torch.ones(pred_pos.size(0)), torch.zeros(pred_neg.size(0))], dim=0)
aps.append(average_precision_score(y_true, y_pred.detach().numpy()))
aucs_mrrs.append(roc_auc_score(y_true, y_pred))
mailbox.update_shared()
mailbox.update_p2p()
"""
if mailbox is not None:
src = metadata['src_pos_index']
dst = metadata['dst_pos_index']
ts = roots.ts
if graph.efeat is None:
edge_feats = None
elif(graph.efeat.device.type != 'cpu'):
edge_feats = graph.get_local_efeat(graph.eids_mapper[roots.eids.to('cpu')]).to('cuda')
#edge_feats = graph.get_dist_efeat(graph.eids_mapper[roots.eids.to('cpu')].to('cuda'),is_sorted = False)#graph.efeat[roots.eids.to('cpu')].to('cuda')
else:
edge_feats = graph.get_local_efeat(graph.eids_mapper[roots.eids.to('cpu')])
#edge_feats = graph.get_dist_efeat(graph.eids_mapper[roots.eids.to('cpu')],is_sorted=False)#graph.efeat[roots.eids]
#print(mfgs[0][0].srcdata['ID'])
dist_index_mapper = mfgs[0][0].srcdata['ID']
root_index = torch.cat((src,dst))
#print('{} {} {}'.format((~(dist_index_mapper==model.module.memory_updater.last_updated_nid)).nonzero(),model.module.memory_updater.last_updated_nid,dist_index_mapper))
last_updated_nid = model.module.memory_updater.last_updated_nid[root_index]
last_updated_memory = model.module.memory_updater.last_updated_memory[root_index]
last_updated_ts=model.module.memory_updater.last_updated_ts[root_index]
#print('root shape {} unique {} {}\n'.format(root_index.shape,dist_index_mapper[root_index].unique().shape,last_updated_nid.unique().shape))
index, memory, memory_ts = mailbox.get_update_memory(last_updated_nid,
last_updated_memory,
last_updated_ts,
model.module.embedding)
#print('index {} {}\n'.format(index.shape,dist_index_mapper[torch.cat((src,dst))].unique().shape))
index, mail, mail_ts = mailbox.get_update_mail(dist_index_mapper,
src,dst,ts,edge_feats,
model.module.memory_updater.last_updated_memory,
model.module.embedding,use_src_emb,use_dst_emb,
)
if memory_param['historical_fix'] == True:
mailbox.set_memory_all_reduce(index,memory,memory_ts,mail,mail_ts,reduce_Op = 'max', async_op = False,filter=model.module.memory_updater.filter,set_remote=True,mode='historical')
else:
mailbox.set_memory_all_reduce(index,memory,memory_ts,mail,mail_ts,reduce_Op = 'max', async_op = False,filter=None,set_remote=True,mode='all_reduce',submit=False)
mailbox.sychronize_shared()
"""
ap = torch.empty([1])
auc_mrr = torch.empty([1])
if(ctx.memory_group==0):
world_size = dist.get_world_size()
ap[0] = torch.tensor(aps).mean()
auc_mrr[0] = torch.tensor(aucs_mrrs).mean()#float(aucs_mrrs.clone().mean())
print('mode: {} {} {}'.format(mode,ap,auc_mrr))
dist.all_reduce(ap,group = ctx.gloo_group)
ap/=ctx.memory_group_size
dist.all_reduce(auc_mrr,group=ctx.gloo_group)
auc_mrr/=ctx.memory_group_size
dist.broadcast(ap,0,group=ctx.gloo_group)
dist.broadcast(auc_mrr,0,group=ctx.gloo_group)
return ap.item(), auc_mrr.item()
def normalize(x):
if not (x.max().item() == 0):
x = x - x.min()
x = x / x.max()
x = 2*x - 1
return x
def inner_prod(x1,x2):
cos = torch.nn.CosineSimilarity(dim=0)
return cos(normalize(x1),normalize(x2)).sum()/x1.size(dim=0)
creterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=train_param['lr'],weight_decay=1e-4)
early_stopper = EarlyStopMonitor(max_round=args.patience)
MODEL_SAVE_PATH = f'../saved_models/{args.model}-{args.dataname}-{dist.get_world_size()}.pth'
total_test_time = 0
epoch_cnt = 0
test_ap_list = []
val_list = []
loss_list = []
for e in range(train_param['epoch']):
model.module.memory_updater.empty_cache()
tt._zero()
torch.cuda.synchronize()
epoch_start_time = time.time()
epoch_cnt = epoch_cnt + 1
train_aps = list()
print('Epoch {:d}:'.format(e))
time_prep = 0
total_loss = 0
model.train()
if mailbox is not None:
mailbox.reset()
model.module.memory_updater.last_updated_nid = None
model.module.memory_updater.last_updated_memory = None
model.module.memory_updater.last_updated_ts = None
sum_local_comm = 0
sum_remote_comm = 0
sum_local_edge_comm = 0
sum_remote_edge_comm = 0
local_access = []
remote_access = []
local_comm = []
remote_comm = []
local_edge_access = []
remote_edge_access = []
local_edge_comm = []
remote_edge_comm = []
b_cnt = 0
for roots,mfgs,metadata in trainloader:
#print('rank is {} batch max ts is {} batch min ts is {}'.format(dist.get_rank(),roots.ts.min(),roots.ts.max()))
b_cnt = b_cnt + 1
#local_access.append(trainloader.local_node)
#remote_access.append(trainloader.remote_node)
#local_edge_access.append(trainloader.local_edge)
#remote_edge_access.append(trainloader.remote_edge)
#local_comm.append((DistIndex(mfgs[0][0].srcdata['ID']).part == dist.get_rank()).sum().item())
#remote_comm.append((DistIndex(mfgs[0][0].srcdata['ID']).part != dist.get_rank()).sum().item())
#if 'ID' in mfgs[0][0].edata:
# local_edge_comm.append((DistIndex(mfgs[0][0].edata['ID']).part == dist.get_rank()).sum().item())
# remote_edge_comm.append((DistIndex(mfgs[0][0].edata['ID']).part != dist.get_rank()).sum().item())
# sum_local_edge_comm +=local_edge_comm[b_cnt-1]
# sum_remote_edge_comm +=remote_edge_comm[b_cnt-1]
#sum_local_comm +=local_comm[b_cnt-1]
#sum_remote_comm +=remote_comm[b_cnt-1]
if mailbox is not None:
if(graph.efeat.device.type != 'cpu'):
edge_feats = graph.get_local_efeat(graph.eids_mapper[roots.eids.to('cpu')]).to('cuda')
#edge_feats = graph.get_dist_efeat(graph.eids_mapper[roots.eids.to('cpu')].to('cuda'),is_sorted = False)#graph.efeat[roots.eids.to('cpu')].to('cuda')
else:
edge_feats = graph.get_local_efeat(graph.eids_mapper[roots.eids.to('cpu')])
src = metadata['src_pos_index']
dst = metadata['dst_pos_index']
ts = roots.ts
update_mail = True
param = (update_mail,src,dst,ts,edge_feats,trainloader.async_feature)
else:
param = None
model.train()
optimizer.zero_grad()
pred_pos, pred_neg = model(mfgs,metadata,neg_samples=args.neg_samples,async_param = param)
loss = creterion(pred_pos, torch.ones_like(pred_pos))
loss += creterion(pred_neg, torch.zeros_like(pred_neg))
total_loss += float(loss.item())
#mailbox.handle_last_async()
#trainloader.async_feature()
#torch.cuda.synchronize()
loss.backward()
optimizer.step()
#torch.cuda.synchronize()
## train aps
#y_pred = torch.cat([pred_pos, pred_neg], dim=0).sigmoid().cpu()
#y_true = torch.cat([torch.ones(pred_pos.size(0)), torch.zeros(pred_neg.size(0))], dim=0)
#train_aps.append(average_precision_score(y_true, y_pred.detach().numpy()))
#torch.cuda.synchronize()
mailbox.update_shared()
mailbox.update_p2p()
#torch.cuda.empty_cache()
"""
if mailbox is not None:
#src = metadata['src_pos_index']
#dst = metadata['dst_pos_index']
#ts = roots.ts
#if graph.efeat is None:
# edge_feats = None
#elif(graph.efeat.device.type != 'cpu'):
# edge_feats = graph.get_local_efeat(graph.eids_mapper[roots.eids.to('cpu')]).to('cuda')
#edge_feats = graph.get_dist_efeat(graph.eids_mapper[roots.eids.to('cpu')].to('cuda'),is_sorted = False)#graph.efeat[roots.eids.to('cpu')].to('cuda')
#else:
# edge_feats = graph.get_local_efeat(graph.eids_mapper[roots.eids.to('cpu')])
#edge_feats = graph.get_dist_efeat(graph.eids_mapper[roots.eids.to('cpu')],is_sorted=False)#graph.efeat[roots.eids]
#print(mfgs[0][0].srcdata['ID'])
dist_index_mapper = mfgs[0][0].srcdata['ID']
root_index = torch.cat((src,dst))
#print('{} {} {}'.format((~(dist_index_mapper==model.module.memory_updater.last_updated_nid)).nonzero(),model.module.memory_updater.last_updated_nid,dist_index_mapper))
last_updated_nid = model.module.memory_updater.last_updated_nid[root_index]
last_updated_memory = model.module.memory_updater.last_updated_memory[root_index]
last_updated_ts=model.module.memory_updater.last_updated_ts[root_index]
#print('root shape {} unique {} {}\n'.format(root_index.shape,dist_index_mapper[root_index].unique().shape,last_updated_nid.unique().shape))
index, memory, memory_ts = mailbox.get_update_memory(last_updated_nid,
last_updated_memory,
last_updated_ts,
model.module.embedding)
#print('index {} {}\n'.format(index.shape,dist_index_mapper[torch.cat((src,dst))].unique().shape))
index, mail, mail_ts = mailbox.get_update_mail(dist_index_mapper,
src,dst,ts,edge_feats,
model.module.memory_updater.last_updated_memory,
model.module.embedding,use_src_emb,use_dst_emb,
)
t7 = time.time()
if memory_param['historical'] == True:
mailbox.set_memory_all_reduce(index,memory,memory_ts,mail,mail_ts,reduce_Op = 'max', async_op = False,filter=model.module.memory_updater.filter,set_remote=True,mode='historical')
else:
mailbox.set_memory_all_reduce(index,memory,memory_ts,mail,mail_ts,reduce_Op = 'max', async_op = False,filter=None,set_remote=True,mode='all_reduce')
"""
torch.cuda.synchronize()
time_prep = time.time() - epoch_start_time
avg_time += time.time() - epoch_start_time
train_ap = float(torch.tensor(train_aps).mean())
print('\ttrain time:{:.2f}s\n'.format(time_prep))
print(trainloader.local_node)
local_node=torch.tensor([trainloader.local_node])
remote_node=torch.tensor([trainloader.remote_node])
local_edge=torch.tensor([trainloader.local_edge])
remote_edge=torch.tensor([trainloader.remote_edge])
tot_comm_count=torch.tensor([mailbox.tot_comm_count])
tot_shared_count=torch.tensor([mailbox.tot_shared_count])
torch.distributed.all_reduce(local_node,group=ctx.gloo_group)
torch.distributed.all_reduce(remote_node,group=ctx.gloo_group)
torch.distributed.all_reduce(local_edge,group=ctx.gloo_group)
torch.distributed.all_reduce(remote_edge,group=ctx.gloo_group)
torch.distributed.all_reduce(tot_comm_count,group=ctx.gloo_group)
torch.distributed.all_reduce(tot_shared_count,group=ctx.gloo_group)
print('local node number {} remote node number {} local edge {} remote edge{}\n'.format(local_node,remote_node,local_edge,remote_edge))
print(' comm local node number {} remote node number {} local edge {} remote edge{}\n'.format(sum_local_comm,sum_remote_comm,sum_local_edge_comm,sum_remote_edge_comm))
print('memory comm {} shared comm {}\n'.format(tot_comm_count,tot_shared_count))
#if(e==0):
# torch.save((local_access,remote_access,local_edge_access,remote_edge_access,local_comm,remote_comm,local_edge_comm,remote_edge_comm),'all_args.seed/{}/{}/comm/comm_{}_{}_{}_{}_{}_{}_{}_{}.pt'.format(args.dataname,args.model,args.partition,args.topk,dist.get_world_size(),dist.get_rank(),args.sample_type,args.probability,args.memory_type,args.shared_memory_ssim))
ap = 0
auc = 0
tt.ssim_remote=0
tt.ssim_local=0
tt.weight_count_local=0
tt.weight_count_remote=0
tt.ssim_cnt=0
ap, auc = eval('val')
torch.cuda.synchronize()
t_test = time.time()
test_ap,test_auc = eval('test')
torch.cuda.synchronize()
t_test = time.time() - t_test
total_test_time += t_test
test_ap_list.append((test_ap,test_auc))
early_stopper.early_stop_check(ap)
early_stop = False
trainloader.local_node = 0
trainloader.remote_node = 0
trainloader.local_edge = 0
trainloader.remote_edge = 0
mailbox.tot_comm_count = 0
mailbox.tot_shared_count = 0
value,counts = torch.unique(graph.edge_index.reshape(-1),return_counts = True)
node_degree = torch.zeros(graph.num_nodes,dtype=torch.long)
value = value.to('cpu')
counts = counts.to('cpu')
node_degree[value] = counts
if dist.get_world_size()==1:
mailbox.mon.draw(node_degree,args.dataname,args.model,e)
mailbox.mon.set_zero()
#mailbox.mon.draw(node_degree,args.dataname,e)
#mailbox.mon.set_zero()
loss_list.append(total_loss)
val_list.append(ap)
if early_stop:
dist.barrier()
print("Early stopping at epoch {:d}\n".format(e))
print(f"Loading the best model at epoch {early_stopper.best_epoch}\n")
best_model_path = get_checkpoint_path(early_stopper.best_epoch)
model.module.load_state_dict(torch.load(best_model_path))
break
else:
print('\ttrain loss:{:.4f} train ap:{:4f} val ap:{:4f} val auc:{:4f} test ap {:4f} test auc{:4f}\n'.format(total_loss,train_ap, ap, auc,test_ap,test_auc))
print('\ttotal time:{:.2f}s prep time:{:.2f}s\n test time {:.2f}'.format(time.time()-epoch_start_time, time_prep,t_test))
torch.save(model.module.state_dict(), get_checkpoint_path(e))
if args.model == 'TGN':
pass
# print('weight {} {}\n'.format(tt.weight_count_local,tt.weight_count_remote))
# print('ssim {} {}\n'.format(tt.ssim_local/tt.ssim_cnt,tt.ssim_remote/tt.ssim_cnt))
torch.save(val_list,'all_args.seed/{}/{}/val_{}_{}_{}_{}_{}_{}_{}_{}.pt'.format(args.dataname,args.model,args.partition,args.topk,dist.get_world_size(),dist.get_rank(),args.sample_type,args.probability,args.memory_type,args.shared_memory_ssim))
torch.save(loss_list,'all_args.seed/{}/{}/loss_{}_{}_{}_{}_{}_{}_{}_{}.pt'.format(args.dataname,args.model,args.partition,args.topk,dist.get_world_size(),dist.get_rank(),args.sample_type,args.probability,args.memory_type,args.shared_memory_ssim))
torch.save(test_ap_list,'all_args.seed/{}/{}/test_{}_{}_{}_{}_{}_{}_{}_{}.pt'.format(args.dataname,args.model,args.partition,args.topk,dist.get_world_size(),dist.get_rank(),args.sample_type,args.probability,args.memory_type,args.shared_memory_ssim))
print(avg_time)
if not early_stop:
dist.barrier()
print(f"Loading the best model at epoch {early_stopper.best_epoch}")
best_model_path = get_checkpoint_path(early_stopper.best_epoch)
model.module.load_state_dict(torch.load(best_model_path))
print('best test AP:{:4f} test auc{:4f}'.format(*test_ap_list[early_stopper.best_epoch]))
val_list = torch.tensor(val_list)
loss_list = torch.tensor(loss_list)
print('test_dataset {} avg_time {} test time {}\n'.format(test_data.edges.shape[1],avg_time/epoch_cnt,total_test_time/epoch_cnt))
torch.save(model.module.state_dict(), MODEL_SAVE_PATH)
ctx.shutdown()
if __name__ == "__main__":
main()
import matplotlib.pyplot as plt
import numpy as np
import torch
# 读取文件内容
ssim_values = [0, 0.1, 0.2, 0.3, 0.4, 2] # 假设这是你的 ssim 参数值
probability_values = [1,0.5,0.1,0.05,0.01,0]
data_values = ['WIKI_3','LASTFM_3','WikiTalk','StackOverflow'] # 存储从文件中读取的数据
partition = 'ours'
# 从文件中读取数据,假设数据存储在文件 data.txt 中
#all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out
partitions=4
topk=0
mem='all_update'#'historical'
model='TGN'
for data in data_values:
ap_list = []
comm_list = []
for p in probability_values:
file = '{}/{}/{}-{}-{}-{}-boundery_recent_uniform-{}.out'.format(data,model,partitions,partition,topk,mem,p)
prefix = 'best test AP:'
cnt = 0
sum = 0
with open(file, 'r') as file:
for line in file:
if line.startswith(prefix):
ap = float(line.lstrip(prefix).split(' ')[0])
pos = line.find('remote node number tensor')
if(pos!=-1):
posr = line.find(']',pos+2+len('remote node number tensor'),)
#print(line,line[pos+2+len('remote node number tensor'):posr])
comm = int(line[pos+2+len('remote node number tensor'):posr])
#print()
sum = sum+comm
cnt = cnt+1
#print(comm)
ap_list.append(ap)
comm_list.append(sum/cnt*4)
# 绘制柱状图
print('{} TestAP={}\n'.format(data,ap_list))
bar_width = 0.4
#shared comm tensor
# 设置柱状图的位置
bars = range(len(probability_values))
# 绘制柱状图
plt.bar([b for b in bars], ap_list, width=bar_width)
# 绘制柱状图
plt.ylim([0.9,1])
plt.xticks([b for b in bars], probability_values)
plt.xlabel('probability')
plt.ylabel('Test AP')
plt.title('{}({} partitions)'.format(data,partitions))
plt.savefig('boundary_AP_{}_{}_{}.png'.format(data,partitions,model))
plt.clf()
print(comm_list)
plt.bar([b for b in bars], comm_list, width=bar_width)
# 绘制柱状图
plt.xticks([b for b in bars], probability_values)
plt.xlabel('probability')
plt.ylabel('Communication volume')
plt.title('{}({} partitions)'.format(data,partitions))
plt.savefig('boundary_comm_{}_{}_{}.png'.format(data,partitions,model))
plt.clf()
if partition == 'ours_shared':
partition0 = 'ours'
else:
partition0=partition
for p in probability_values:
file = '{}/{}/test_{}_{}_{}_0_boundery_recent_uniform_{}_all_update_2.pt'.format(data,model,partition0,topk,partitions,float(p))
val_ap = torch.tensor(torch.load(file))[:,0]
epoch = torch.arange(val_ap.shape[0])
#绘制曲线图
plt.plot(epoch,val_ap, label='probability={}'.format(p))
plt.xlabel('Epoch')
plt.ylabel('Val AP')
plt.title('{}({} partitions)'.format(data,partitions))
# plt.grid(True)
plt.legend()
plt.savefig('{}_{}_{}_boundary_Convergence_rate.png'.format(data,partitions,model))
plt.clf()
import matplotlib.pyplot as plt
import numpy as np
import torch
# 读取文件内容
ssim_values = [-1,0.3,0.5,0.7,2] # 假设这是你的 ssim 参数值
data_values = ['WIKI','LASTFM','WikiTalk','REDDIT','LASTFM','DGraphFin'] # 存储从文件中读取的数据
partition = 'ours_shared'
# 从文件中读取数据,假设数据存储在文件 data.txt 中
#all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out
partitions=4
model = 'TGN'
topk=0.01
mem='historical'
for data in data_values:
ap_list = []
comm_list = []
for ssim in ssim_values:
if ssim == 2:
file = '{}/{}/{}-{}-{}-local-recent.out'.format(data,model,partitions,partition,topk)
elif ssim == -1:
file = '{}/{}/{}-{}-{}-all_update-recent.out'.format(data,model,partitions,partition,topk)
else:
file = '{}/{}/{}-{}-{}-{}-{}-recent.out'.format(data,model,partitions,partition,topk,mem,ssim)
prefix = 'best test AP:'
with open(file, 'r') as file:
for line in file:
if line.startswith(prefix):
ap = float(line.lstrip(prefix).split(' ')[0])
pos = line.find('shared comm tensor')
if(pos!=-1):
comm = int(line[pos+2+len('shared comm tensor'):len(line)-3])
print(ap)
ap_list.append(ap)
comm_list.append(comm)
print('{} TestAP={}\n'.format(data,ap_list))
# 绘制柱状图
bar_width = 0.4
#shared comm tensor
print('{} TestAP={}\n'.format(data,ap_list))
# 设置柱状图的位置
bars = range(len(ssim_values))
# 绘制柱状图
plt.bar([b for b in bars], ap_list, width=bar_width)
# 绘制柱状图
plt.xticks([b for b in bars], ssim_values)
plt.xlabel('SSIM threshold Values')
plt.ylabel('Test AP')
#if(data=='WIKI'):
# plt.ylim([0.97,1])
plt.title('{}({} partitions)'.format(data,partitions))
plt.savefig('ssim_{}_{}_{}.png'.format(data,partitions,model))
plt.clf()
plt.bar([b for b in bars], comm_list, width=bar_width)
# 绘制柱状图
plt.xticks([b for b in bars], ssim_values)
plt.xlabel('SSIM threshold Values')
plt.ylabel('Communication volume')
plt.title('{}({} partitions)'.format(data,partitions))
plt.savefig('ssim_comm_{}_{}_{}.png'.format(data,partitions,model))
plt.clf()
if partition == 'ours_shared':
partition0 = 'ours'
else:
partition0=partition
for ssim in ssim_values:
if ssim == 2:
file = '{}/{}/test_{}_{}_{}_0_recent_0.1_local_2.pt'.format(data,model,partition0,topk,partitions,)
elif ssim == -1:
file = '{}/{}/test_{}_{}_{}_0_recent_0.1_all_update_2.pt'.format(data,model,partition0,topk,partitions,)
else:
file = '{}/{}/test_{}_{}_{}_0_recent_0.1_{}_{}.pt'.format(data,model,partition0,topk,partitions,mem,float(ssim))
val_ap = torch.tensor(torch.load(file))[:,0]
print(val_ap)
epoch = torch.arange(val_ap.shape[0])
#绘制曲线图
#print(val_ap)
if ssim == -1:
plt.plot(epoch,val_ap, label='all-update')
elif ssim == 2:
plt.plot(epoch,val_ap, label='local')
else:
plt.plot(epoch,val_ap, label='ssim = {}'.format(ssim))
if(data=='WIKI'):
plt.ylim([0.85,0.90])
plt.xlabel('Epoch')
plt.ylabel('Val AP')
plt.title('{}({} partitions)'.format(data,partitions))
# plt.grid(True)
plt.legend()
plt.savefig('{}_{}_{}_ssim_Convergence_rate.png'.format(data,partitions,model))
plt.clf()
#!/bin/bash #!/bin/bash
#跑了4卡的TaoBao #跑了4卡的TaoBao
# 定义数组变量 # 定义数组变量
addr="192.168.1.105" seed=$1
addr="192.168.1.107"
partition_params=("ours" ) partition_params=("ours" )
#"metis" "ldg" "random") #"metis" "ldg" "random")
#("ours" "metis" "ldg" "random") #("ours" "metis" "ldg" "random")
...@@ -9,32 +10,39 @@ partitions="8" ...@@ -9,32 +10,39 @@ partitions="8"
node_per="4" node_per="4"
nnodes="2" nnodes="2"
node_rank="0" node_rank="0"
probability_params=("0.1" "0" "0.05" "0.01") probability_params=("0.1" "0.01" "0.05")
sample_type_params=("boundery_recent_decay" "recent") sample_type_params=("boundery_recent_decay")
#sample_type_params=("recent" "boundery_recent_decay") #"boundery_recent_uniform") #sample_type_params=("recent" "boundery_recent_decay") #"boundery_recent_uniform")
#memory_type=("all_update" "p2p" "all_reduce" "historical" "local") #memory_type=("all_update" "p2p" "all_reduce" "historical" "local")
memory_type=( "historical" "local" "all_update") memory_type=( "all_update")
#memory_type=("local" "all_update" "historical" "all_reduce") #memory_type=("local" "all_update" "historical" "all_reduce")
shared_memory_ssim=("0.3" "0.7") shared_memory_ssim=("0.3" "0.7")
#data_param=("WIKI" "REDDIT" "LASTFM" "WikiTalk") #data_param=("WIKI" "REDDIT" "LASTFM" "WikiTalk")
<<<<<<< HEAD
data_param=("GDELT") data_param=("GDELT")
=======
data_param=("LASTFM")
>>>>>>> 8233776274204f6cf2f8a2eb37022d426d6197d8
#data_param=("WIKI" "REDDIT" "LASTFM" "DGraphFin" "WikiTalk" "StackOverflow") #data_param=("WIKI" "REDDIT" "LASTFM" "DGraphFin" "WikiTalk" "StackOverflow")
#data_param=("WIKI" "REDDIT" "LASTFM" "WikiTalk" "StackOverflow") #data_param=("WIKI" "REDDIT" "LASTFM" "WikiTalk" "StackOverflow")
#data_param=("REDDIT" "WikiTalk") #data_param=("REDDIT" "WikiTalk")
# 创建输出目录 # 创建输出目录
mkdir -p all
# 遍历数组并执行命令 # 遍历数组并执行命令
#seed=(( RANDOM % 1000000 + 1 ))
mkdir -p all_"$seed"
for data in "${data_param[@]}"; do for data in "${data_param[@]}"; do
model="TGN_large" model="TGN_large"
if [ "$data" = "WIKI" ] || [ "$data" = "REDDIT" ] || [ "$data" = "LASTFM" ]; then if [ "$data" = "WIKI" ] || [ "$data" = "REDDIT" ] || [ "$data" = "LASTFM" ]; then
model="TGN" model="TGN"
fi fi
#model="APAN" #model="APAN"
mkdir all/"$data" mkdir all_"$seed"/"$data"
mkdir all/"$data"/"$model" mkdir all_"$seed"/"$data"/"$model"
mkdir all/"$data"/"$model"/comm mkdir all_"$seed"/"$data"/"$model"/comm
#torchrun --nnodes "$nnodes" --node_rank 0 --nproc-per-node 1 --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition ours --memory_type local --sample_type recent --topk 0 > all/"$data"/"$model"/1.out & #torchrun --nnodes "$nnodes" --node_rank 0 --nproc-per-node 1 --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition ours --memory_type local --sample_type recent --topk 0 --seed "$seed" > all_"$seed"/"$data"/"$model"/1.out &
wait wait
for partition in "${partition_params[@]}"; do for partition in "${partition_params[@]}"; do
for sample in "${sample_type_params[@]}"; do for sample in "${sample_type_params[@]}"; do
...@@ -43,20 +51,20 @@ for data in "${data_param[@]}"; do ...@@ -43,20 +51,20 @@ for data in "${data_param[@]}"; do
if [ "$mem" = "historical" ]; then if [ "$mem" = "historical" ]; then
for ssim in "${shared_memory_ssim[@]}"; do for ssim in "${shared_memory_ssim[@]}"; do
if [ "$partition" = "ours" ]; then if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --memory_type "$mem" --shared_memory_ssim "$ssim" > all/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out & torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --memory_type "$mem" --shared_memory_ssim "$ssim" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out &
wait wait
fi fi
done done
elif [ "$mem" = "all_reduce" ]; then elif [ "$mem" = "all_reduce" ]; then
if [ "$partition" = "ours" ]; then if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --memory_type "$mem" > all/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out & torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --memory_type "$mem" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out &
wait wait
fi fi
else else
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --memory_type "$mem" > all/"$data"/"$model"/"$partitions"-"$partition"-0-"$mem"-"$sample".out & torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --memory_type "$mem" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-"$partition"-0-"$mem"-"$sample".out &
wait wait
if [ "$partition" = "ours" ] && [ "$mem" != "all_local" ]; then if [ "$partition" = "ours" ] && [ "$mem" != "all_local" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --memory_type "$mem" > all/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out & torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --memory_type "$mem" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out &
wait wait
fi fi
fi fi
...@@ -67,20 +75,20 @@ for data in "${data_param[@]}"; do ...@@ -67,20 +75,20 @@ for data in "${data_param[@]}"; do
if [ "$mem" = "historical" ]; then if [ "$mem" = "historical" ]; then
for ssim in "${shared_memory_ssim[@]}"; do for ssim in "${shared_memory_ssim[@]}"; do
if [ "$partition" = "ours" ]; then if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --shared_memory_ssim "$ssim" > all/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample"-"$pro".out & torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --shared_memory_ssim "$ssim" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample"-"$pro".out &
wait wait
fi fi
done done
elif [ "$mem" = "all_reduce" ]; then elif [ "$mem" = "all_reduce" ]; then
if [ "$partition" = "ours"]; then if [ "$partition" = "ours"]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out& torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out&
wait wait
fi fi
else else
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all/"$data"/"$model"/"$partitions"-"$partition"-0-"$mem"-"$sample"-"$pro".out & #torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-"$partition"-0-"$mem"-"$sample"-"$pro".out &
wait wait
if [ "$partition" = "ours" ] && [ "$mem" != "all_local" ]; then if [ "$partition" = "ours" ] && [ "$mem" != "all_local" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out & torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.1 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --seed "$seed" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out &
wait wait
fi fi
fi fi
...@@ -93,69 +101,69 @@ done ...@@ -93,69 +101,69 @@ done
for data in "${data_param[@]}"; do # for data in "${data_param[@]}"; do
model="JODILE" # model="JODILE"
if [ "$data" = "WIKI" ] || [ "$data" = "REDDIT" ] || [ "$data" = "LASTFM" ]; then # if [ "$data" = "WIKI" ] || [ "$data" = "REDDIT" ] || [ "$data" = "LASTFM" ]; then
model="JODIE" # model="JODIE"
fi # fi
#model="APAN" # #model="APAN"
mkdir all/"$data" # mkdir all_"$seed"/"$data"
mkdir all/"$data"/"$model" # mkdir all_"$seed"/"$data"/"$model"
mkdir all/"$data"/"$model"/comm # mkdir all_"$seed"/"$data"/"$model"/comm
#torchrun --nnodes "$nnodes" --node_rank 0 --nproc-per-node 1 --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition ours --memory_type local --sample_type recent --topk 0 > all/"$data"/"$model"/1.out & # #torchrun --nnodes "$nnodes" --node_rank 0 --nproc-per-node 1 --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition ours --memory_type local --sample_type recent --topk 0 --seed "$seed" > all_"$seed"/"$data"/"$model"/1.out &
wait # wait
for partition in "${partition_params[@]}"; do # for partition in "${partition_params[@]}"; do
for sample in "${sample_type_params[@]}"; do # for sample in "${sample_type_params[@]}"; do
if [ "$sample" = "recent" ]; then # if [ "$sample" = "recent" ]; then
for mem in "${memory_type[@]}"; do # for mem in "${memory_type[@]}"; do
if [ "$mem" = "historical" ]; then # if [ "$mem" = "historical" ]; then
for ssim in "${shared_memory_ssim[@]}"; do # for ssim in "${shared_memory_ssim[@]}"; do
if [ "$partition" = "ours" ]; then # if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" --shared_memory_ssim "$ssim" > all/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out & # torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" --shared_memory_ssim "$ssim" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out &
wait # wait
fi # fi
done # done
elif [ "$mem" = "all_reduce" ]; then # elif [ "$mem" = "all_reduce" ]; then
if [ "$partition" = "ours" ]; then # if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" > all/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out & # torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out &
wait # wait
fi # fi
else # else
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --memory_type "$mem" > all/"$data"/"$model"/"$partitions"-"$partition"-0-"$mem"-"$sample".out & # torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --memory_type "$mem" > all_"$seed"/"$data"/"$model"/"$partitions"-"$partition"-0-"$mem"-"$sample".out &
wait # wait
if [ "$partition" = "ours" ] && [ "$mem" != "all_local" ]; then # if [ "$partition" = "ours" ] && [ "$mem" != "all_local" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" > all/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out & # torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out &
wait # wait
fi # fi
fi # fi
done # done
else # else
for pro in "${probability_params[@]}"; do # for pro in "${probability_params[@]}"; do
for mem in "${memory_type[@]}"; do # for mem in "${memory_type[@]}"; do
if [ "$mem" = "historical" ]; then # if [ "$mem" = "historical" ]; then
continue # continue
# for ssim in "${shared_memory_ssim[@]}"; do # # for ssim in "${shared_memory_ssim[@]}"; do
# if [ "$partition" = "ours" ]; then # # if [ "$partition" = "ours" ]; then
# torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --shared_memory_ssim "$ssim" > all/"$data"/"$partitions"-ours_shared-0.01"$mem"-"$ssim"-"$sample"-"$pro".out & # # torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --shared_memory_ssim "$ssim" > all_"$seed"/"$data"/"$partitions"-ours_shared-0.01"$mem"-"$ssim"-"$sample"-"$pro".out &
# wait # # wait
# fi # # fi
# done # # done
elif [ "$mem" = "all_reduce" ]; then # elif [ "$mem" = "all_reduce" ]; then
if [ "$partition" = "ours"]; then # if [ "$partition" = "ours"]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out& # torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out&
wait # wait
fi # fi
else # else
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all/"$data"/"$model"/"$partitions"-"$partition"-0-"$mem"-"$sample"-"$pro".out & # torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all_"$seed"/"$data"/"$model"/"$partitions"-"$partition"-0-"$mem"-"$sample"-"$pro".out &
wait # wait
if [ "$partition" = "ours" ] && [ "$mem" != "all_local" ]; then # if [ "$partition" = "ours" ] && [ "$mem" != "all_local" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out & # torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all_"$seed"/"$data"/"$model"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out &
wait # wait
fi # fi
fi # fi
done # done
done # done
fi # fi
done # done
done # done
done # done
...@@ -77,6 +77,8 @@ parser.add_argument('--eval_neg_samples', default=1, type=int, metavar='W', ...@@ -77,6 +77,8 @@ parser.add_argument('--eval_neg_samples', default=1, type=int, metavar='W',
help='name of model') help='name of model')
parser.add_argument('--memory_type', default='all_update', type=str, metavar='W', parser.add_argument('--memory_type', default='all_update', type=str, metavar='W',
help='name of model') help='name of model')
parser.add_argument('--seed', default=6773, type=int, metavar='W',
help='name of model')
#boundery_recent_uniform boundery_recent_decay #boundery_recent_uniform boundery_recent_decay
args = parser.parse_args() args = parser.parse_args()
if args.memory_type == 'all_local' or args.topk != '0': if args.memory_type == 'all_local' or args.topk != '0':
...@@ -124,7 +126,7 @@ def seed_everything(seed=42): ...@@ -124,7 +126,7 @@ def seed_everything(seed=42):
torch.cuda.manual_seed(seed) torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False torch.backends.cudnn.benchmark = False
seed_everything(args.seed)
total_next_batch = 0 total_next_batch = 0
total_forward = 0 total_forward = 0
total_count_score = 0 total_count_score = 0
...@@ -186,7 +188,6 @@ def query(): ...@@ -186,7 +188,6 @@ def query():
"total_update_mail" :total_update_mail , "total_update_mail" :total_update_mail ,
"total_update_memory":total_update_memory, "total_update_memory":total_update_memory,
"total_remote_update":total_remote_update,} "total_remote_update":total_remote_update,}
seed_everything(34)
def main(): def main():
#torch.autograd.set_detect_anomaly(True) #torch.autograd.set_detect_anomaly(True)
print('LOCAL RANK {}, RANK{}'.format(os.environ["LOCAL_RANK"],os.environ["RANK"])) print('LOCAL RANK {}, RANK{}'.format(os.environ["LOCAL_RANK"],os.environ["RANK"]))
...@@ -266,11 +267,15 @@ def main(): ...@@ -266,11 +267,15 @@ def main():
if args.local_neg_sample: if args.local_neg_sample:
print('dst len {} origin len {}'.format(graph.edge_index[1,mask].unique().shape[0],full_dst.unique().shape[0])) print('dst len {} origin len {}'.format(graph.edge_index[1,mask].unique().shape[0],full_dst.unique().shape[0]))
train_neg_sampler = LocalNegativeSampling('triplet',amount = args.neg_samples,dst_node_list = graph.edge_index[1,mask].unique()) train_neg_sampler = LocalNegativeSampling('triplet',amount = args.neg_samples,dst_node_list = graph.edge_index[1,mask].unique())
else: else:
#train_neg_sampler = LocalNegativeSampling('triplet',amount = args.neg_samples,dst_node_list = full_dst.unique()) #train_neg_sampler = LocalNegativeSampling('triplet',amount = args.neg_samples,dst_node_list = full_dst.unique())
train_neg_sampler = LocalNegativeSampling('triplet',amount = args.neg_samples,dst_node_list = full_dst.unique(),local_mask=(DistIndex(graph.nids_mapper[full_dst.unique()].to('cpu')).part == dist.get_rank()),prob=args.probability) train_neg_sampler = LocalNegativeSampling('triplet',amount = args.neg_samples,dst_node_list = full_dst.unique(),local_mask=(DistIndex(graph.nids_mapper[full_dst.unique()].to('cpu')).part == dist.get_rank()),prob=args.probability)
remote_ratio = train_neg_sampler.local_dst.shape[0] / train_neg_sampler.dst_node_list.shape[0]
train_ratio_pos = (1 - args.probability) + args.probability * remote_ratio
train_ratio_neg = args.probability * (1-remote_ratio)
print(train_neg_sampler.dst_node_list) print(train_neg_sampler.dst_node_list)
neg_sampler = LocalNegativeSampling('triplet',amount= neg_samples,dst_node_list = full_dst.unique(),seed=6773) neg_sampler = LocalNegativeSampling('triplet',amount= neg_samples,dst_node_list = full_dst.unique(),seed=args.seed)
trainloader = DistributedDataLoader(graph,eval_train_data,sampler = sampler, trainloader = DistributedDataLoader(graph,eval_train_data,sampler = sampler,
sampler_fn = SAMPLE_TYPE.SAMPLE_FROM_TEMPORAL_EDGES, sampler_fn = SAMPLE_TYPE.SAMPLE_FROM_TEMPORAL_EDGES,
...@@ -337,10 +342,10 @@ def main(): ...@@ -337,10 +342,10 @@ def main():
print('dim_node {} dim_edge {}\n'.format(gnn_dim_node,gnn_dim_edge)) print('dim_node {} dim_edge {}\n'.format(gnn_dim_node,gnn_dim_edge))
avg_time = 0 avg_time = 0
if use_cuda: if use_cuda:
model = GeneralModel(gnn_dim_node, gnn_dim_edge, sample_param, memory_param, gnn_param, train_param,graph.ids.shape[0],mailbox).cuda() model = GeneralModel(gnn_dim_node, gnn_dim_edge, sample_param, memory_param, gnn_param, train_param,graph.ids.shape[0],mailbox,train_ratio=(train_ratio_pos,train_ratio_neg)).cuda()
device = torch.device('cuda') device = torch.device('cuda')
else: else:
model = GeneralModel(gnn_dim_node, gnn_dim_edge, sample_param, memory_param, gnn_param, train_param,graph.ids.shape[0],mailbox) model = GeneralModel(gnn_dim_node, gnn_dim_edge, sample_param, memory_param, gnn_param, train_param,graph.ids.shape[0],mailbox,train_ratio=(train_ratio_pos,train_ratio_neg))
device = torch.device('cpu') device = torch.device('cpu')
model = DDP(model,find_unused_parameters=True) model = DDP(model,find_unused_parameters=True)
def count_parameters(model): def count_parameters(model):
...@@ -530,9 +535,12 @@ def main(): ...@@ -530,9 +535,12 @@ def main():
model.train() model.train()
optimizer.zero_grad() optimizer.zero_grad()
ones = torch.ones(metadata['dst_neg_index'].shape[0],device = model.device,dtype=torch.float)
weight = torch.where(DistIndex(mfgs[0][0].srcdata['ID'][metadata['dst_neg_index']]).part == torch.distributed.get_rank(),ones/train_ratio_pos,ones/train_ratio_neg).reshape(-1,1)
pred_pos, pred_neg = model(mfgs,metadata,neg_samples=args.neg_samples,async_param = param) pred_pos, pred_neg = model(mfgs,metadata,neg_samples=args.neg_samples,async_param = param)
loss = creterion(pred_pos, torch.ones_like(pred_pos)) loss = creterion(pred_pos, torch.ones_like(pred_pos))
loss += creterion(pred_neg, torch.zeros_like(pred_neg)) neg_creterion = torch.nn.BCEWithLogitsLoss(weight)
loss += neg_creterion(pred_neg, torch.zeros_like(pred_neg))
total_loss += float(loss.item()) total_loss += float(loss.item())
#mailbox.handle_last_async() #mailbox.handle_last_async()
#trainloader.async_feature() #trainloader.async_feature()
...@@ -610,7 +618,7 @@ def main(): ...@@ -610,7 +618,7 @@ def main():
print(' comm local node number {} remote node number {} local edge {} remote edge{}\n'.format(sum_local_comm,sum_remote_comm,sum_local_edge_comm,sum_remote_edge_comm)) print(' comm local node number {} remote node number {} local edge {} remote edge{}\n'.format(sum_local_comm,sum_remote_comm,sum_local_edge_comm,sum_remote_edge_comm))
print('memory comm {} shared comm {}\n'.format(tot_comm_count,tot_shared_count)) print('memory comm {} shared comm {}\n'.format(tot_comm_count,tot_shared_count))
#if(e==0): #if(e==0):
# torch.save((local_access,remote_access,local_edge_access,remote_edge_access,local_comm,remote_comm,local_edge_comm,remote_edge_comm),'all/{}/{}/comm/comm_{}_{}_{}_{}_{}_{}_{}_{}.pt'.format(args.dataname,args.model,args.partition,args.topk,dist.get_world_size(),dist.get_rank(),args.sample_type,args.probability,args.memory_type,args.shared_memory_ssim)) # torch.save((local_access,remote_access,local_edge_access,remote_edge_access,local_comm,remote_comm,local_edge_comm,remote_edge_comm),'all_args.seed/{}/{}/comm/comm_{}_{}_{}_{}_{}_{}_{}_{}.pt'.format(args.dataname,args.model,args.partition,args.topk,dist.get_world_size(),dist.get_rank(),args.sample_type,args.probability,args.memory_type,args.shared_memory_ssim))
ap = 0 ap = 0
auc = 0 auc = 0
tt.ssim_remote=0 tt.ssim_remote=0
...@@ -662,9 +670,9 @@ def main(): ...@@ -662,9 +670,9 @@ def main():
pass pass
# print('weight {} {}\n'.format(tt.weight_count_local,tt.weight_count_remote)) # print('weight {} {}\n'.format(tt.weight_count_local,tt.weight_count_remote))
# print('ssim {} {}\n'.format(tt.ssim_local/tt.ssim_cnt,tt.ssim_remote/tt.ssim_cnt)) # print('ssim {} {}\n'.format(tt.ssim_local/tt.ssim_cnt,tt.ssim_remote/tt.ssim_cnt))
torch.save(val_list,'all/{}/{}/val_{}_{}_{}_{}_{}_{}_{}_{}.pt'.format(args.dataname,args.model,args.partition,args.topk,dist.get_world_size(),dist.get_rank(),args.sample_type,args.probability,args.memory_type,args.shared_memory_ssim)) torch.save(val_list,'all_{}/{}/{}/val_{}_{}_{}_{}_{}_{}_{}_{}.pt'.format(args.seed,args.dataname,args.model,args.partition,args.topk,dist.get_world_size(),dist.get_rank(),args.sample_type,args.probability,args.memory_type,args.shared_memory_ssim))
torch.save(loss_list,'all/{}/{}/loss_{}_{}_{}_{}_{}_{}_{}_{}.pt'.format(args.dataname,args.model,args.partition,args.topk,dist.get_world_size(),dist.get_rank(),args.sample_type,args.probability,args.memory_type,args.shared_memory_ssim)) torch.save(loss_list,'all_{}/{}/{}/loss_{}_{}_{}_{}_{}_{}_{}_{}.pt'.format(args.seed,args.dataname,args.model,args.partition,args.topk,dist.get_world_size(),dist.get_rank(),args.sample_type,args.probability,args.memory_type,args.shared_memory_ssim))
torch.save(test_ap_list,'all/{}/{}/test_{}_{}_{}_{}_{}_{}_{}_{}.pt'.format(args.dataname,args.model,args.partition,args.topk,dist.get_world_size(),dist.get_rank(),args.sample_type,args.probability,args.memory_type,args.shared_memory_ssim)) torch.save(test_ap_list,'all_{}/{}/{}/test_{}_{}_{}_{}_{}_{}_{}_{}.pt'.format(args.seed,args.dataname,args.model,args.partition,args.topk,dist.get_world_size(),dist.get_rank(),args.sample_type,args.probability,args.memory_type,args.shared_memory_ssim))
print(avg_time) print(avg_time)
if not early_stop: if not early_stop:
......
import matplotlib.pyplot as plt
import numpy as np
# 数据
p_values = ['recent', 'p=0.1', 'p=0.05', 'p=0.01', 'p=0']
wiki_values = [0.979832, 0.980298, 0.975079, 0.97349, 0.96381]
lastfm_values = [0.820161, 0.852725, 0.848085, 0.817381, 0.796689]
wikitalk_values = [0.969647, 0.974473, 0.973996, 0.968961, 0.964867]
gdelt_values = [0.987338, 0.987454, 0.987038, 0.98812, 0.98726]
# 柱状图的宽度
barWidth = 0.15
# 柱状图的位置
r1 = np.arange(len(wiki_values))
r2 = [x + barWidth for x in r1]
r3 = [x + barWidth for x in r2]
r4 = [x + barWidth for x in r3]
# 创建图形
plt.figure(figsize=(12,8))
plt.bar(r1, wiki_values, color='b', width=barWidth, edgecolor='grey', label='WIKI')
plt.bar(r2, lastfm_values, color='r', width=barWidth, edgecolor='grey', label='LASTFM')
plt.bar(r3, wikitalk_values, color='g', width=barWidth, edgecolor='grey', label='WikiTalk')
plt.bar(r4, gdelt_values, color='y', width=barWidth, edgecolor='grey', label='GDELT')
# 添加标签
plt.xlabel('p values', fontweight='bold', fontsize=15)
plt.ylabel('SSIM', fontweight='bold', fontsize=15)
plt.xticks([r + barWidth for r in range(len(wiki_values))], p_values)
plt.savefig('bound.png')
plt.legend()
plt.show()
...@@ -295,11 +295,20 @@ class TransfomerAttentionLayer(torch.nn.Module): ...@@ -295,11 +295,20 @@ class TransfomerAttentionLayer(torch.nn.Module):
#V_remote = V.clone() #V_remote = V.clone()
#V_local[DistIndex(b.srcdata['ID']).part[b.edges()[0]]!=torch.distributed.get_rank()] = 0 #V_local[DistIndex(b.srcdata['ID']).part[b.edges()[0]]!=torch.distributed.get_rank()] = 0
#V_remote[DistIndex(b.srcdata['ID']).part[b.edges()[0]]==torch.distributed.get_rank()] = 0 #V_remote[DistIndex(b.srcdata['ID']).part[b.edges()[0]]==torch.distributed.get_rank()] = 0
b.edata['v'] = V
#b.edata['v0'] = V_local #b.edata['v0'] = V_local
#b.edata['v1'] = V_remote #b.edata['v1'] = V_remote
#b.update_all(dgl.function.copy_e('v0', 'm0'), dgl.function.sum('m0', 'h0')) #b.update_all(dgl.function.copy_e('v0', 'm0'), dgl.function.sum('m0', 'h0'))
#b.update_all(dgl.function.copy_e('v1', 'm1'), dgl.function.sum('m1', 'h1')) #b.update_all(dgl.function.copy_e('v1', 'm1'), dgl.function.sum('m1', 'h1'))
#if 'weight' in b.edata and self.training is True:
# with torch.no_grad():
# weight = b.edata['weight'].reshape(-1,1)#(b.edata['weight']/torch.sum(b.edata['weight']).item()).reshape(-1,1)
#weight =
#print(weight.max())
# b.edata['v'] = V*weight
#else:
# weight = b.edata['weight'].reshape(-1,1)
b.edata['v'] = V
#print(torch.sum(torch.sum(((V-V*weight)**2))))
b.update_all(dgl.function.copy_e('v', 'm'), dgl.function.sum('m', 'h')) b.update_all(dgl.function.copy_e('v', 'm'), dgl.function.sum('m', 'h'))
#tt.ssim_local+=torch.sum(torch.cosine_similarity(b.dstdata['h'],b.dstdata['h0'])) #tt.ssim_local+=torch.sum(torch.cosine_similarity(b.dstdata['h'],b.dstdata['h0']))
#tt.ssim_remote+=torch.sum(torch.cosine_similarity(b.dstdata['h'],b.dstdata['h1'])) #tt.ssim_remote+=torch.sum(torch.cosine_similarity(b.dstdata['h'],b.dstdata['h1']))
......
...@@ -52,20 +52,36 @@ class all_to_all_embedding(torch.autograd.Function): ...@@ -52,20 +52,36 @@ class all_to_all_embedding(torch.autograd.Function):
grad[dst_pos_index] = grad_pos_dst grad[dst_pos_index] = grad_pos_dst
grad[dst_neg_index] = grad_neg_dst grad[dst_neg_index] = grad_neg_dst
return grad,None,None return grad,None,None
class NegFixLayer(torch.autograd.Function):
def __init__(self):
super(NegFixLayer, self).__init__()
def forward(ctx, input, weight):
ctx.save_for_backward(weight)
return input
def backward(ctx, grad_output):
# Define your backward pass
# ...
weight, = ctx.saved_tensors
#print(weight)
return grad_output/weight,None
class GeneralModel(torch.nn.Module): class GeneralModel(torch.nn.Module):
def __init__(self, dim_node, dim_edge, sample_param, memory_param, gnn_param, train_param, num_nodes = None,mailbox = None,combined=False): def __init__(self, dim_node, dim_edge, sample_param, memory_param, gnn_param, train_param, num_nodes = None,mailbox = None,combined=False,train_ratio = None):
super(GeneralModel, self).__init__() super(GeneralModel, self).__init__()
self.dim_node = dim_node self.dim_node = dim_node
self.dim_node_input = dim_node self.dim_node_input = dim_node
self.dim_edge = dim_edge self.dim_edge = dim_edge
self.sample_param = sample_param self.sample_param = sample_param
self.memory_param = memory_param self.memory_param = memory_param
self.train_pos_ratio,self.train_neg_ratio = train_ratio
if not 'dim_out' in gnn_param: if not 'dim_out' in gnn_param:
gnn_param['dim_out'] = memory_param['dim_out'] gnn_param['dim_out'] = memory_param['dim_out']
self.gnn_param = gnn_param self.gnn_param = gnn_param
self.train_param = train_param self.train_param = train_param
self.neg_fix_layer = NegFixLayer()
if memory_param['type'] == 'node': if memory_param['type'] == 'node':
if memory_param['memory_update'] == 'gru': if memory_param['memory_update'] == 'gru':
#if memory_param['async'] == False: #if memory_param['async'] == False:
...@@ -138,12 +154,24 @@ class GeneralModel(torch.nn.Module): ...@@ -138,12 +154,24 @@ class GeneralModel(torch.nn.Module):
h_pos_src = out[metadata['src_pos_index']] h_pos_src = out[metadata['src_pos_index']]
h_pos_dst = out[metadata['dst_pos_index']] h_pos_dst = out[metadata['dst_pos_index']]
h_neg_dst = out[metadata['dst_neg_index']] h_neg_dst = out[metadata['dst_neg_index']]
#end.record() #end.record()
#end.synchronize() #end.synchronize()
#elapsed_time_ms = start.elapsed_time(end) #elapsed_time_ms = start.elapsed_time(end)
#print('time {}\n'.format(elapsed_time_ms)) #print('time {}\n'.format(elapsed_time_ms))
#print('pos src {} \n pos dst {} \n neg dst{} \n'.format(h_pos_src, h_pos_dst,h_neg_dst)) #print('pos src {} \n pos dst {} \n neg dst{} \n'.format(h_pos_src, h_pos_dst,h_neg_dst))
#print('pre predict {}'.format(mfgs[0][0].srcdata['ID'])) #print('pre predict {}'.format(mfgs[0][0].srcdata['ID']))
#if self.training is True:
# with torch.no_grad():
# ones = torch.ones(h_neg_dst.shape[0],device = h_neg_dst.device,dtype=torch.float)
# weight = torch.where(DistIndex(mfgs[0][0].srcdata['ID'][metadata['dst_neg_index']]).part == torch.distributed.get_rank(),ones/self.train_pos_ratio,ones/self.train_neg_ratio).reshape(-1,1)
#weight = torch.clip(weigh)
#weight = weight/weight.max().item()
#print(weight)
#weight =
#h_neg_dst*weight
# pred = self.edge_predictor(h_pos_src, h_pos_dst, None , self.neg_fix_layer.apply(h_neg_dst,weight), neg_samples=neg_samples, mode = mode)
#else:
pred = self.edge_predictor(h_pos_src, h_pos_dst, None , h_neg_dst, neg_samples=neg_samples, mode = mode) pred = self.edge_predictor(h_pos_src, h_pos_dst, None , h_neg_dst, neg_samples=neg_samples, mode = mode)
t_embedding = tt.elapsed_event(t1) t_embedding = tt.elapsed_event(t1)
tt.time_embedding+=t_embedding tt.time_embedding+=t_embedding
......
...@@ -290,7 +290,9 @@ def to_block(graph,data, sample_out,device = torch.device('cuda'),unique = True) ...@@ -290,7 +290,9 @@ def to_block(graph,data, sample_out,device = torch.device('cuda'),unique = True)
if sample_out[r].delta_ts().shape[0] > 0: if sample_out[r].delta_ts().shape[0] > 0:
b.edata['dt'] = sample_out[r].delta_ts().to(device) b.edata['dt'] = sample_out[r].delta_ts().to(device)
b.srcdata['ts'] = block_node_list[1,b.srcnodes()].to(torch.float) b.srcdata['ts'] = block_node_list[1,b.srcnodes()].to(torch.float)
weight = sample_out[r].sample_weight()
if(weight.shape[0] > 0):
b.edata['weight'] = 1/torch.clamp(sample_out[r].sample_weight(),0.0001).to(b.device)
b.edata['__ID'] = e_idx b.edata['__ID'] = e_idx
col = row col = row
col_len += eid_len[r] col_len += eid_len[r]
......
...@@ -18,9 +18,9 @@ class MemoryMoniter: ...@@ -18,9 +18,9 @@ class MemoryMoniter:
#self.memory_ssim.append(self.ssim(pre_memory,now_memory,method = 'cos')) #self.memory_ssim.append(self.ssim(pre_memory,now_memory,method = 'cos'))
#self.nid_list.append(nid) #self.nid_list.append(nid)
def draw(self,degree,data,model,e): def draw(self,degree,data,model,e):
torch.save(self.nid_list,'all/{}/{}/memorynid_{}.pt'.format(data,model,e)) torch.save(self.nid_list,'all_args.seed/{}/{}/memorynid_{}.pt'.format(data,model,e))
torch.save(self.memorychange,'all/{}/{}/memoryF_{}.pt'.format(data,model,e)) torch.save(self.memorychange,'all_args.seed/{}/{}/memoryF_{}.pt'.format(data,model,e))
torch.save(self.memory_ssim,'all/{}/{}/memcos_{}.pt'.format(data,model,e)) torch.save(self.memory_ssim,'all_args.seed/{}/{}/memcos_{}.pt'.format(data,model,e))
# path = './memory/{}/'.format(data) # path = './memory/{}/'.format(data)
# if not os.path.exists(path): # if not os.path.exists(path):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment