Commit 24a069d6 by zlj

fix boundary

parent 342421ab
sampling:
- layer: 1
neighbor:
- 20
- 10
strategy: 'recent'
prop_time: False
history: 1
......@@ -28,9 +28,9 @@ gnn:
dim_out: 100
train:
- epoch: 100
batch_size: 600
batch_size: 1000
# reorder: 16
lr: 0.0005
lr: 0.0004
dropout: 0.2
att_dropout: 0.2
all_on_gpu: True
......@@ -289,23 +289,25 @@ void ParallelSampler :: neighbor_sample_from_nodes_with_before_layer(
TimeStampType delta = end_index-1>=0?(rtts - tnb.timestamp[node][end_index-1])*fanout:0;
for(int cid = end_index-1;cid>=0;cid--){
cal_cnt++;
if(cal_cnt>2*fanout)break;
if(cal_cnt>fanout)break;
if(part[tnb.eid[node][cid]] != local_part|| node_part[tnb.neighbors[node][cid]]!= local_part){
double ep = exp((double)(tnb.timestamp[node][cid]-rtts)/(delta));
sum_p+=ep;pr[cal_cnt-1]=ep;
sum_1++;
}
}
if(sum_p<1e-6)sum_p=1;
cal_cnt = 0;
for(int cid = end_index-1;cid>=0;cid--){
cal_cnt++;
if(cal_cnt > 2*fanout)break;
if(cal_cnt > fanout)break;
int eid = tnb.eid[node][cid];
if(part[tnb.eid[node][cid]] != local_part|| node_part[tnb.neighbors[node][cid]]!= local_part){
double p0 = (double)rand_r(&loc_seeds[tid]) / (RAND_MAX + 1.0);
double ep = boundery_probility*pr[cal_cnt-1]/sum_p*sum_1;
if(p0 > ep)continue;
//cout<<"in"<<endl;
}
tgb_i[tid].src_index.emplace_back(i);
tgb_i[tid].sample_nodes.emplace_back(tnb.neighbors[node][cid]);
......
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
import matplotlib.pyplot as plt
import numpy as np
import torch
# 读取文件内容
ssim_values = [0, 0.1, 0.2, 0.3, 0.4, 2] # 假设这是你的 ssim 参数值
probability_values = [1,0.5,0.1,0.05,0.01,0]
data_values = ['WIKI'] # 存储从文件中读取的数据
partition = 'ours_shared'
# 从文件中读取数据,假设数据存储在文件 data.txt 中
#all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out
partitions=4
topk=0.01
mem='all_update'#'historical'
for data in data_values:
ap_list = []
comm_list = []
for p in probability_values:
file = '{}/{}-{}-{}-{}-boundery_recent_decay-{}.out'.format(data,partitions,partition,topk,mem,p)
prefix = 'best test AP:'
cnt = 0
sum = 0
with open(file, 'r') as file:
for line in file:
if line.startswith(prefix):
ap = float(line.lstrip(prefix).split(' ')[0])
pos = line.find('remote node number tensor')
if(pos!=-1):
posr = line.find(']',pos+2+len('remote node number tensor'),)
comm = int(line[pos+2+len('remote node number tensor'):posr])
sum = sum+comm
cnt = cnt+1
ap_list.append(ap)
comm_list.append(comm/cnt*4)
# 绘制柱状图
bar_width = 0.4
#shared comm tensor
# 设置柱状图的位置
bars = range(len(ssim_values))
# 绘制柱状图
plt.bar([b for b in bars], ap_list, width=bar_width)
# 绘制柱状图
plt.ylim([0.9,1])
plt.xticks([b for b in bars], probability_values)
plt.xlabel('probability')
plt.ylabel('Test AP')
plt.title('{}({} partitions)'.format(data,partitions))
plt.savefig('boundary_AP_{}.png'.format(data))
plt.clf()
plt.bar([b for b in bars], comm_list, width=bar_width)
# 绘制柱状图
plt.xticks([b for b in bars], probability_values)
plt.xlabel('probability')
plt.ylabel('Communication volume')
plt.title('{}({} partitions)'.format(data,partitions))
plt.savefig('boundary_comm_{}.png'.format(data))
plt.clf()
if partition == 'ours_shared':
partition0 = 'ours'
else:
partition0=partition
for p in probability_values:
file = '{}/val_{}_{}_{}_0_boundery_recent_decay_{}_all_update_2.pt'.format(data,partition0,topk,partitions,float(p))
val_ap = torch.tensor(torch.load(file))
epoch = torch.arange(val_ap.shape[0])
#绘制曲线图
plt.plot(epoch,val_ap, label='probability={}'.format(p))
plt.xlabel('Epoch')
plt.ylabel('Val AP')
plt.title('{}({} partitions)'.format(data,partitions))
# plt.grid(True)
plt.legend()
plt.savefig('{}_boundary_Convergence_rate.png'.format(data))
plt.clf()
import matplotlib.pyplot as plt
import numpy as np
import torch
# 读取文件内容
ssim_values = [0, 0.1, 0.2, 0.3, 0.4, 2] # 假设这是你的 ssim 参数值
data_values = ['WikiTalk'] # 存储从文件中读取的数据
partition = 'ours_shared'
# 从文件中读取数据,假设数据存储在文件 data.txt 中
#all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out
partitions=8
topk=0.01
mem='historical'
for data in data_values:
ap_list = []
comm_list = []
for ssim in ssim_values:
if ssim == 2:
file = '{}/{}-{}-{}-local-recent.out'.format(data,partitions,partition,topk)
else:
file = '{}/{}-{}-{}-{}-{}-recent.out'.format(data,partitions,partition,topk,mem,ssim)
prefix = 'best test AP:'
with open(file, 'r') as file:
for line in file:
if line.startswith(prefix):
ap = float(line.lstrip(prefix).split(' ')[0])
pos = line.find('shared comm tensor')
if(pos!=-1):
comm = int(line[pos+2+len('shared comm tensor'):len(line)-3])
ap_list.append(ap)
comm_list.append(comm)
# 绘制柱状图
bar_width = 0.4
#shared comm tensor
# 设置柱状图的位置
bars = range(len(ssim_values))
# 绘制柱状图
plt.bar([b for b in bars], ap_list, width=bar_width)
# 绘制柱状图
plt.ylim([0.9,1])
plt.xticks([b for b in bars], ssim_values)
plt.xlabel('SSIM threshold Values')
plt.ylabel('Test AP')
plt.title('{}({} partitions)'.format(data,partitions))
plt.savefig('ssim_{}.png'.format(data))
plt.clf()
plt.bar([b for b in bars], comm_list, width=bar_width)
# 绘制柱状图
plt.xticks([b for b in bars], ssim_values)
plt.xlabel('SSIM threshold Values')
plt.ylabel('Communication volume')
plt.title('{}({} partitions)'.format(data,partitions))
plt.savefig('comm_{}.png'.format(data))
plt.clf()
if partition == 'ours_shared':
partition0 = 'ours'
else:
partition0=partition
for ssim in ssim_values:
if ssim == 2:
file = '{}/val_{}_{}_{}_0_recent_0.1_local_2.pt'.format(data,partition0,topk,partitions,)
else:
file = '{}/val_{}_{}_{}_0_recent_0.1_{}_{}.pt'.format(data,partition0,topk,partitions,mem,float(ssim))
val_ap = torch.tensor(torch.load(file))
epoch = torch.arange(val_ap.shape[0])
#绘制曲线图
plt.plot(epoch,val_ap, label='ssim={}'.format(ssim))
plt.xlabel('Epoch')
plt.ylabel('Val AP')
plt.title('{}({} partitions)'.format(data,partitions))
# plt.grid(True)
plt.legend()
plt.savefig('{}_ssim_Convergence_rate.png'.format(data))
plt.clf()
import matplotlib.pyplot as plt
import numpy as np
import torch
# 读取文件内容
ssim_values = [0, 0.1, 0.2, 0.3, 0.4, 2] # 假设这是你的 ssim 参数值
data_values = ['WIKI','REDDIT'] # 存储从文件中读取的数据
partition = 'ours_shared'
# 从文件中读取数据,假设数据存储在文件 data.txt 中
#all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out
partitions=4
topk=0.01
mem='historical'
for data in data_values:
ap_list = []
for ssim in ssim_values:
if ssim == 2:
file = '{}/{}-{}-{}-local-recent.out'.format(data,partitions,partition,topk)
else:
file = '{}/{}-{}-{}-{}-{}-recent.out'.format(data,partitions,partition,topk,mem,ssim)
prefix = 'best test AP:'
with open(file, 'r') as file:
for line in file:
if line.startswith(prefix):
ap = float(line.lstrip(prefix).split(' ')[0])
ap_list.append(ap)
# 绘制柱状图
bar_width = 0.4
# 设置柱状图的位置
bars = range(len(ssim_values))
# 绘制柱状图
plt.bar([b for b in bars], ap_list, width=bar_width)
# 绘制柱状图
plt.ylim([0.8,1])
plt.xticks([b for b in bars], ssim_values)
plt.xlabel('SSIM threshold Values')
plt.ylabel('Test AP')
plt.title('{}({} partitions)'.format(data,partitions))
plt.savefig('ssim_{}.png'.format(data))
plt.clf()
if partition == 'ours_shared':
partition0 = 'ours'
else:
partition0=partition
for ssim in ssim_values:
if ssim == 2:
file = '{}/val_{}_{}_{}_0_recent_0.1_local_2.pt'.format(data,partition0,topk,partitions,)
else:
file = '{}/val_{}_{}_{}_0_recent_0.1_{}_{}.pt'.format(data,partition0,topk,partitions,mem,float(ssim))
val_ap = torch.tensor(torch.load(file))
epoch = torch.arange(val_ap.shape[0])
#绘制曲线图
plt.plot(epoch,val_ap, label='ssim={}'.format(ssim))
plt.xlabel('Epoch')
plt.ylabel('Val AP')
plt.title('{}({} partitions)'.format(data,partitions))
# plt.grid(True)
plt.legend()
plt.ylim([0.98,0.99])
plt.savefig('{}_ssim_Convergence_rate.png'.format(data))
plt.clf()
......@@ -2,26 +2,35 @@
# 定义数组变量
addr="192.168.1.107"
partition_params=("ours" "metis" "ldg" "random")
partition_params=("ours")
#"metis" "ldg" "random")
#("ours" "metis" "ldg" "random")
partitions="16"
nnodes="4"
partitions="4"
node_per="4"
nnodes="1"
node_rank="0"
probability_params=("1" "0.5" "0.1" "0.05" "0.01" "0")
#sample_type_params=("recent") #"boundery_recent_decay" "boundery_recent_uniform")
sample_type_params=("recent" "boundery_recent_decay" "boundery_recent_uniform")
#sample_type_params=("recent")
#memory_type=("all_update" "p2p" "all_reduce" "historical" "local")
#memory_type=("all_update" "local" "historical")
memory_type=("local" "all_update" "historical" "all_reduce")
memory_type=("all_update")
#memory_type=("local" "all_update" "historical" "all_reduce")
shared_memory_ssim=("0" "0.1" "0.2" "0.3" "0.4" )
#data_param=("WIKI" "REDDIT" "LASTFM" "WikiTalk")
data_param=("WIKI" "REDDIT" "LASTFM" "WikiTalk" "StackOverflow")
data_param=("DGraphFin" "WikiTalk")
#data_param=("WIKI" "REDDIT" "LASTFM" "WikiTalk" "StackOverflow")
#data_param=("REDDIT" "WikiTalk")
# 创建输出目录
mkdir -p all
# 遍历数组并执行命令
for data in "${data_param[@]}"; do
model="TGN_large"
if [ "$data" = "WIKI" ] || [ "$data" = "REDDIT" ] || [ "$data" = "LASTFM" ]; then
model="TGN"
fi
mkdir all/"$data"
mkdir all/"$data"/comm
#torchrun --nnodes "$nnodes" --node_rank 0 --nproc-per-node 1 --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode TGN_large --partition ours --memory_type local --sample_type recent --topk 0 > all/"$data"/1.out &
......@@ -33,20 +42,20 @@ for data in "${data_param[@]}"; do
if [ "$mem" = "historical" ]; then
for ssim in "${shared_memory_ssim[@]}"; do
if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$partitions" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode TGN_large --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" --shared_memory_ssim "$ssim" > all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out &
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" --shared_memory_ssim "$ssim" > all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$ssim"-"$sample".out &
wait
fi
done
elif [ "$mem" = "all_reduce" ]; then
if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$partitions" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode TGN_large --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" > all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out &
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" > all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out &
wait
fi
else
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$partitions" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode TGN_large --partition "$partition" --topk 0 --sample_type "$sample" --memory_type "$mem" > all/"$data"/"$partitions"-"$partition"-0-"$mem"-"$sample".out &
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --memory_type "$mem" > all/"$data"/"$partitions"-"$partition"-0-"$mem"-"$sample".out &
wait
if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$partitions" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode TGN_large --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" > all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out &
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --memory_type "$mem" > all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$sample".out &
wait
fi
fi
......@@ -57,20 +66,20 @@ for data in "${data_param[@]}"; do
if [ "$mem" = "historical" ]; then
for ssim in "${shared_memory_ssim[@]}"; do
if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$partitions" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode TGN_large --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --shared_memory_ssim "$ssim" > all/"$data"/"$partitions"-ours_shared-0.01"$mem"-"$ssim"-"$sample"-"$pro".out &
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" --shared_memory_ssim "$ssim" > all/"$data"/"$partitions"-ours_shared-0.01"$mem"-"$ssim"-"$sample"-"$pro".out &
wait
fi
done
elif [ "$mem" = "all_reduce" ]; then
if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$partitions" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode TGN_large --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out&
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out&
wait
fi
else
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$partitions" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode TGN_large --partition "$partition" --topk 0 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all/"$data"/"$partitions"-"$partition"-0-"$mem"-"$sample"-"$pro".out &
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all/"$data"/"$partitions"-"$partition"-0-"$mem"-"$sample"-"$pro".out &
wait
if [ "$partition" = "ours" ]; then
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$partitions" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode TGN_large --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out &
torchrun --nnodes "$nnodes" --node_rank "$node_rank" --nproc-per-node "$node_per" --master-addr "$addr" --master-port 9445 train_boundery.py --dataname "$data" --mode "$model" --partition "$partition" --topk 0.01 --sample_type "$sample" --probability "$pro" --memory_type "$mem" > all/"$data"/"$partitions"-ours_shared-0.01-"$mem"-"$sample"-"$pro".out &
wait
fi
fi
......
LOCAL RANK 0, RANK0
in
local rank is 0 world_size is 1 memory group is 0 memory rank is 0 memory group size is 1
[0]
use cuda on 0
638486
get_neighbors consume: 4.12395s
Epoch 0:
......@@ -50,7 +50,7 @@ parser.add_argument('--rank', default=0, type=int, metavar='W',
help='name of dataset')
parser.add_argument('--local_rank', default=0, type=int, metavar='W',
help='name of dataset')
parser.add_argument('--patience', type=int, default=5, help='Patience for early stopping')
parser.add_argument('--patience', type=int, default=20, help='Patience for early stopping')
parser.add_argument('--world_size', default=1, type=int, metavar='W',
help='number of negative samples')
parser.add_argument('--dataname', default="WIKI", type=str, metavar='W',
......@@ -73,6 +73,8 @@ parser.add_argument('--shared_memory_ssim', default=2, type=float, metavar='W',
help='name of model')
parser.add_argument('--neg_samples', default=1, type=int, metavar='W',
help='name of model')
parser.add_argument('--eval_neg_samples', default=1, type=int, metavar='W',
help='name of model')
parser.add_argument('--memory_type', default='all_update', type=str, metavar='W',
help='name of model')
#boundery_recent_uniform boundery_recent_decay
......@@ -104,6 +106,7 @@ if not 'MASTER_PORT' in os.environ:
os.environ["MASTER_PORT"] = '9337'
os.environ["NCCL_IB_DISABLE"]='1'
os.environ['NCCL_SOCKET_IFNAME']=matching_interfaces[0]
print('rank {}'.format(int(os.environ["LOCAL_RANK"])))
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
local_rank = int(os.environ["LOCAL_RANK"])
def seed_everything(seed=42):
......@@ -219,7 +222,7 @@ def main():
else:
mailbox = None
sampler = NeighborSampler(num_nodes=graph.num_nodes, num_layers=num_layers, fanout=fanout,graph_data=sample_graph, workers=1,policy = policy_train, graph_name = "train",local_part=dist.get_rank(),edge_part=DistIndex(graph.eids_mapper).part,node_part=DistIndex(graph.nids_mapper).part,probability=args.probability)
sampler = NeighborSampler(num_nodes=graph.num_nodes, num_layers=num_layers, fanout=fanout,graph_data=sample_graph, workers=10,policy = policy_train, graph_name = "train",local_part=dist.get_rank(),edge_part=DistIndex(graph.eids_mapper).part,node_part=DistIndex(graph.nids_mapper).part,probability=args.probability)
eval_sampler = NeighborSampler(num_nodes=graph.num_nodes, num_layers=num_layers, fanout=fanout,graph_data=eval_sample_graph, workers=10,policy = 'recent', graph_name = "eval",local_part=dist.get_rank(),edge_part=DistIndex(graph.eids_mapper).part,node_part=DistIndex(graph.nids_mapper).part,probability=args.probability)
train_data = torch.masked_select(graph.edge_index,train_mask.to(graph.edge_index.device)).reshape(2,-1)
......@@ -244,16 +247,16 @@ def main():
val_data = DataSet(edges = val_data,ts = val_ts,eids = val_mask.nonzero().reshape(-1))
print('ts {} {} {} {}'.format(train_data.ts,eval_train_data.ts,test_data.ts,val_data.ts))
neg_samples = args.neg_samples
neg_samples = args.eval_neg_samples
mask = DistIndex(graph.nids_mapper[graph.edge_index[1,:]].to('cpu')).part == dist.get_rank()
if args.local_neg_sample:
print('dst len {} origin len {}'.format(graph.edge_index[1,mask].unique().shape[0],full_dst.unique().shape[0]))
train_neg_sampler = LocalNegativeSampling('triplet',amount = neg_samples,dst_node_list = graph.edge_index[1,mask].unique())
train_neg_sampler = LocalNegativeSampling('triplet',amount = args.neg_samples,dst_node_list = graph.edge_index[1,mask].unique())
else:
train_neg_sampler = LocalNegativeSampling('triplet',amount = neg_samples,dst_node_list = full_dst.unique())
train_neg_sampler = LocalNegativeSampling('triplet',amount = args.neg_samples,dst_node_list = full_dst.unique())
print(train_neg_sampler.dst_node_list)
neg_sampler = LocalNegativeSampling('triplet',amount= neg_samples,dst_node_list = full_dst.unique())
neg_sampler = LocalNegativeSampling('triplet',amount= neg_samples,dst_node_list = full_dst.unique(),seed=12357)
trainloader = DistributedDataLoader(graph,eval_train_data,sampler = sampler,
sampler_fn = SAMPLE_TYPE.SAMPLE_FROM_TEMPORAL_EDGES,
......@@ -354,6 +357,7 @@ def main():
y_true = torch.cat([torch.ones(pred_pos.size(0)), torch.zeros(pred_neg.size(0))], dim=0)
aps.append(average_precision_score(y_true, y_pred.detach().numpy()))
aucs_mrrs.append(roc_auc_score(y_true, y_pred))
if mailbox is not None:
src = metadata['src_pos_index']
dst = metadata['dst_pos_index']
......@@ -425,8 +429,6 @@ def main():
for e in range(train_param['epoch']):
model.module.memory_updater.empty_cache()
tt._zero()
count_empty()
time_count.set_zero()
torch.cuda.synchronize()
epoch_start_time = time.time()
epoch_cnt = epoch_cnt + 1
......@@ -440,8 +442,6 @@ def main():
model.module.memory_updater.last_updated_nid = None
model.module.memory_updater.last_updated_memory = None
model.module.memory_updater.last_updated_ts = None
t0 = time.time()
t_s = tt.start()
sum_local_comm = 0
sum_remote_comm = 0
sum_local_edge_comm = 0
......@@ -470,10 +470,6 @@ def main():
sum_remote_comm +=remote_comm[b_cnt-1]
sum_local_edge_comm +=local_edge_comm[b_cnt-1]
sum_remote_edge_comm +=remote_edge_comm[b_cnt-1]
tt.pre_input += tt.elapsed(t_s)
t_prep_s = time.time()
t1 = time.time()
t_s = tt.start()
if mailbox is not None:
if(graph.efeat.device.type != 'cpu'):
edge_feats = graph.get_local_efeat(graph.eids_mapper[roots.eids.to('cpu')]).to('cuda')
......@@ -490,9 +486,7 @@ def main():
model.train()
optimizer.zero_grad()
pred_pos, pred_neg = model(mfgs,metadata,neg_samples=neg_samples,async_param = param)
tt.time_forward += tt.elapsed(t_s)
t_s = tt.start()
pred_pos, pred_neg = model(mfgs,metadata,neg_samples=args.neg_samples,async_param = param)
if memory_param['historical_fix'] == True:
loss = creterion(pred_pos, torch.ones_like(pred_pos)) + 0.1*inner_prod(model.module.memory_updater.update_memory,model.module.memory_updater.prev_memory)
else:
......@@ -502,12 +496,9 @@ def main():
#mailbox.handle_last_async()
#trainloader.async_feature()
#torch.cuda.synchronize()
t2 = time.time()
loss.backward()
optimizer.step()
tt.time_backward += tt.elapsed(t_s)
#torch.cuda.synchronize()
t3 = time.time()
## train aps
#y_pred = torch.cat([pred_pos, pred_neg], dim=0).sigmoid().cpu()
#y_true = torch.cat([torch.ones(pred_pos.size(0)), torch.zeros(pred_neg.size(0))], dim=0)
......@@ -515,8 +506,6 @@ def main():
#torch.cuda.synchronize()
mailbox.update_shared()
mailbox.update_p2p()
t4 = time.time()
t_s = tt.start()
"""
if mailbox is not None:
#src = metadata['src_pos_index']
......@@ -579,7 +568,6 @@ def main():
print('memory comm {} shared comm {}\n'.format(tot_comm_count,tot_shared_count))
if(e==0):
torch.save((local_access,remote_access,local_edge_access,remote_edge_access,local_comm,remote_comm,local_edge_comm,remote_edge_comm),'all/{}/comm/comm_{}_{}_{}_{}_{}_{}_{}_{}.pt'.format(args.dataname,args.partition,args.topk,dist.get_world_size(),dist.get_rank(),args.sample_type,args.probability,args.memory_type,args.shared_memory_ssim))
tt.print()
ap = 0
auc = 0
ap, auc = eval('val')
......@@ -625,17 +613,6 @@ def main():
best_model_path = get_checkpoint_path(early_stopper.best_epoch)
model.module.load_state_dict(torch.load(best_model_path))
print('best test AP:{:4f} test auc{:4f}'.format(*test_ap_list[early_stopper.best_epoch]))
if mailbox is not None:
mailbox.reset()
model.module.memory_updater.last_updated_nid = None
ap,auc = eval('train')
val_ap,val_auc = eval('val')
ap, auc = eval('test')
eval_neg_samples = 1
if eval_neg_samples > 1:
print('\tval AP:{:4f} val MRR:{:4f} test AP:{:4f} test MRR:{:4f}\n'.format(val_ap,val_auc,ap, auc))
else:
print('\tval ap:{:4f} val auc:{:4f} test AP:{:4f} test AUC:{:4f}\n'.format(val_ap,val_auc,ap, auc))
val_list = torch.tensor(val_list)
loss_list = torch.tensor(loss_list)
print('test_dataset {} avg_time {} \n'.format(test_data.edges.shape[1],avg_time/epoch_cnt))
......
......@@ -209,7 +209,6 @@ class TransfomerAttentionLayer(torch.nn.Module):
self.layer_norm = torch.nn.LayerNorm(dim_out)
def forward(self, b):
t_s = tt.start()
assert(self.dim_time + self.dim_node_feat + self.dim_edge_feat > 0)
self.device = b.device
if b.num_edges() == 0:
......@@ -217,8 +216,6 @@ class TransfomerAttentionLayer(torch.nn.Module):
if self.dim_time > 0:
time_feat = self.time_enc(b.edata['dt'])
zero_time_feat = self.time_enc(torch.zeros(b.num_dst_nodes(), dtype=torch.float32, device=self.device))
tt.time_nbrs += tt.elapsed(t_s)
t_s = tt.start()
if self.combined:
Q = torch.zeros((b.num_edges(), self.dim_out), device=self.device)
K = torch.zeros((b.num_edges(), self.dim_out), device=self.device)
......@@ -301,7 +298,6 @@ class TransfomerAttentionLayer(torch.nn.Module):
rst = b.dstdata['h']
rst = self.w_out(rst)
rst = torch.nn.functional.relu(self.dropout(rst))
tt.time_attention+= tt.elapsed(t_s)
return self.layer_norm(rst)
class IdentityNormLayer(torch.nn.Module):
......
......@@ -10,7 +10,7 @@ def parse_config(f):
return sample_param, memory_param, gnn_param, train_param
class EarlyStopMonitor(object):
def __init__(self, max_round=3, higher_better=True, tolerance=1e-10):
def __init__(self, max_round=10, higher_better=True, tolerance=1e-10):
self.max_round = max_round
self.num_round = 0
......
......@@ -286,6 +286,7 @@ def to_block(graph,data, sample_out,device = torch.device('cuda'),unique = True)
idx = block_node_list[0,b.srcnodes()].to(torch.long)
e_idx = eid_inv[col_len:col_len+elen]
b.srcdata['__ID'] = idx
if sample_out[r].delta_ts().shape[0] > 0:
b.edata['dt'] = sample_out[r].delta_ts().to(device)
b.srcdata['ts'] = block_node_list[1,b.srcnodes()].to(torch.float)
......
......@@ -2,139 +2,53 @@ import os
import time
import torch
class time_count:
total_sample_time = 0
total_next_batch_time = 0
total_sample_core_time = 0
total_fetch_prepare_time = 0
total_comm_time = 0
total_build_time = 0
total_prepare_input_time = 0
total_build_block_time = 0
forward_embedding = 0
forward_all_to_all = 0
backward_all_to_all = 0
memory_historical = 0
memory_update = 0
memory_get = 0
memory_enc = 0
memory_historical_count = 0
time_forward = 0
time_backward = 0
time_sample = 0
pre_batch = 0
pre_input = 0
pos_update = 0
mem_update = 0
time_zero = 0
time_nbrs = 0
time_attention = 0
@staticmethod
def add_memory_count(t1,t2,t3,t4):
time_count.memory_update += t1
time_count.memory_get += t2
time_count.memory_enc += t3
time_count.memory_historical_count += t4
@staticmethod
def add_train_forward_embedding(t1):
time_count.forward_embedding += t1
@staticmethod
def add_train_foward_all_to_all(t1):
time_count.forward_all_to_all += t1
@staticmethod
def add_backward_all_to_all(t1):
time_count.backward_all_to_all += t1
@staticmethod
def add_next(t1,t2):
time_count.total_sample_time += t2
time_count.total_next_batch_time +=t1
@staticmethod
def add_batch(t1,t2,t3,t4) :
time_count.total_fetch_prepare_time +=t1
time_count.total_comm_time+=t2
time_count.total_build_time+=t3
time_count.total_prepare_input_time+=t4
@staticmethod
def add_build_block(t1,t2) :
time_count.total_sample_core_time += t1
time_count.total_build_block_time+=t2
@staticmethod
def set_zero():
time_count.total_sample_time =0
time_count.total_next_batch_time=0
time_count.total_sample_core_time =0
time_count.total_fetch_prepare_time=0
time_count.total_comm_time =0
time_count.total_build_time =0
time_count.total_prepare_input_time =0
time_count.total_build_block_time=0
time_count.forward_embedding = 0
time_count.forward_all_to_all = 0
time_count.backward_all_to_all = 0
time_count.memory_update = 0
time_count.memory_get = 0
time_count.memory_enc = 0
time_count.memory_historical_count = 0
@staticmethod
def query():
return {
"total_sample_time":time_count.total_sample_time,
"total_next_batch_time":time_count.total_next_batch_time,
"total_sample_core_time":time_count.total_sample_core_time,
"total_fetch_prepare_time":time_count.total_fetch_prepare_time,
"total_comm_time":time_count.total_comm_time,
"total_build_time":time_count.total_build_time,
"total_prepare_input_time":time_count.total_prepare_input_time,
"total_build_block_time":time_count.total_build_block_time,
"forward_embedding":time_count.forward_embedding,
"forward_all_to_all":time_count.forward_all_to_all,
"backward_all_to_all":time_count.backward_all_to_all,
"memory_update":time_count.memory_update ,
"memory_get":time_count.memory_get ,
"memory_enc":time_count.memory_enc ,
"memory_historical_count":time_count.memory_historical_count ,
}
time_memory_updater = 0
time_embedding = 0
time_local_update = 0
time_memory_sync = 0
time_sample_and_build = 0
time_memory_fetch = 0
@staticmethod
def _zero():
time_count.time_forward = 0
time_count.time_backward = 0
time_count.time_sample = 0
time_count.pre_batch = 0
time_count.pre_input = 0
time_count.pos_update = 0
time_count.mem_update = 0
time_count.time_zero = 0
time_count.time_nbrs = 0
time_count.time_attention = 0
@staticmethod
def start():
time_count.time_memory_updater = 0
time_count.time_embedding = 0
time_count.time_local_update = 0
time_count.time_memory_sync = 0
time_count.time_sample_and_build = 0
time_count.time_memory_fetch = 0
@staticmethod
def start_gpu():
# Uncomment for better breakdown timings
#torch.cuda.synchronize()
return time.perf_counter()
start_event = torch.cuda.Event(enable_timing=True)
end_event = torch.cuda.Event(enable_timing=True)
start_event.record()
return start_event,end_event
@staticmethod
def elapsed(start):
# Uncomment for better breakdown timings
#torch.cuda.synchronize()
return time.perf_counter() - start
def start():
return time.perf_counter(),0
@staticmethod
def elapsed_event(start_event,end_event):
if start_event.isinstance(torch.cuda.Event):
end_event.record()
end_event.synchronize()
return start_event.elapsed_time(end_event)
else:
torch.cuda.synchronize()
return time.perf_counter() - start_event
@staticmethod
def print():
print(
'time_forward = {} time_backward = {} time_sample = {} pre_batch = {} pre_input = {} pos_update = {} mem_update = {} time_zero = {} time_nbrs = {} time_attention = {}'.format(
time_count.time_forward,
time_count.time_backward,
time_count.time_sample,
time_count.pre_batch,
time_count.pre_input,
time_count.pos_update,
time_count.mem_update,
time_count.time_zero,
time_count.time_nbrs,
time_count.time_attention,
)
)
\ No newline at end of file
print('time_count.time_forward={} time_count.time_backward={} time_count.time_memory_updater={} time_count.time_embedding={} time_count.time_local_update={} time_count.time_memory_sync={} time_count.time_sample_and_build={} time_count.time_memory_fetch={}\n'.format(
time_count.time_backward,
time_count.time_memory_updater,
time_count.time_embedding,
time_count.time_local_update,
time_count.time_memory_sync,
time_count.time_sample_and_build,
time_count.time_memory_fetch ))
\ No newline at end of file
......@@ -291,10 +291,10 @@ def load_from_speed(data,seed,top,sampler_graph_add_rev,device=torch.device('cud
reorder = '../../SPEED/partition/divided_nodes_seed_t2/{}/reorder.txt'.format(data)
edge_i = '../../SPEED/partition/divided_nodes_seed_t2/{}/{}/{}_{}parts_top{}/edge_output{}.txt'.format(data,seed,data,ctx.memory_group_size,top,ctx.memory_group_rank)
elif partition == 'metis':
fnode_i = '../../SPEED/partition/divided_nodes_metis/{}/{}/{}_{}parts_top{}/output{}.txt'.format(data,seed,data,ctx.memory_group_size,top,ctx.memory_group_rank)
fnode_share = '../../SPEED/partition/divided_nodes_metis/{}/{}/{}_{}parts_top{}/outputshared.txt'.format(data,seed,data,ctx.memory_group_size,top)
reorder = '../../SPEED/partition/divided_nodes_metis/{}/reorder.txt'.format(data)
edge_i = '../../SPEED/partition/divided_nodes_metis/{}/{}/{}_{}parts_top{}/edge_output{}.txt'.format(data,seed,data,ctx.memory_group_size,top,ctx.memory_group_rank)
fnode_i = '../../SPEED/partition/divided_nodes_metis_test/{}/{}/{}_{}parts_top{}/output{}.txt'.format(data,seed,data,ctx.memory_group_size,top,ctx.memory_group_rank)
fnode_share = '../../SPEED/partition/divided_nodes_metis_test/{}/{}/{}_{}parts_top{}/outputshared.txt'.format(data,seed,data,ctx.memory_group_size,top)
reorder = '../../SPEED/partition/divided_nodes_metis_test/{}/reorder.txt'.format(data)
edge_i = '../../SPEED/partition/divided_nodes_metis_test/{}/{}/{}_{}parts_top{}/edge_output{}.txt'.format(data,seed,data,ctx.memory_group_size,top,ctx.memory_group_rank)
elif partition == 'ldg':
fnode_i = '../../SPEED/partition/divided_nodes_ldg/{}/{}/{}_{}parts_top{}/output{}.txt'.format(data,seed,data,ctx.memory_group_size,top,ctx.memory_group_rank)
fnode_share = '../../SPEED/partition/divided_nodes_ldg/{}/{}/{}_{}parts_top{}/outputshared.txt'.format(data,seed,data,ctx.memory_group_size,top)
......
......@@ -19,12 +19,15 @@ class LocalNegativeSampling(NegativeSampling):
amount: Union[int, float] = 1,
unique: bool = False,
src_node_list: torch.Tensor = None,
dst_node_list: torch.Tensor = None
dst_node_list: torch.Tensor = None,
seed = False
):
super(LocalNegativeSampling,self).__init__(mode,amount,unique=unique)
self.src_node_list = src_node_list.to('cpu') if src_node_list is not None else None
self.dst_node_list = dst_node_list.to('cpu') if dst_node_list is not None else None
self.rdm = torch.Generator()
if seed is True:
random.seed(seed)
seed = random.randint(0,100000)
print('seed is',seed)
ctx = DistributedContext.get_default_context()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment