Commit a2b8c19a by zlj

fix some problem

parent e0e37a11
对于热点数据的为k的通信比例是
假设$y$作为跨分区访问的比例
$N_{root}^k\cdot \eta_1\cdot m \cdot d_{mem} + (3d_{mem} + d_{e} + d_{n})\cdot \theta \cdot y \cdot N_{nei} $
选择k带来的y可以先暂时估算为
$((|E|-|E^k|)*y+|E^k|*\frac{m-1}{m})/|E|$
化简一下远程访问的比例大概是$(1-|E^k|/|E|)*y + |E^k|/E*\frac{m-1}{m}$
那么通信量可以表示为$|E^k|\cdot \eta_1\cdot m \cdot d_{mem} + d_{fetch}\cdot N_{nei} \cdot \theta \cdot ((1-|E^k|/|E|)+ |E^k|/E*\frac{m-1}{m})$
化简后可以表示为
$|E^k|(\eta_1\cdot m \cdot d_{mem} - d_{fetch} N_{nei}\cdot \theta/|E|) + d_{fetch} \cdot N_{nei} \cdot \theta$
那么$\eta_1 < d_{fetch} N_{nei} \cdot \theta/|E|/({d_{mem} \cdot m}) $带来优势
$\eta_1 < \frac{3}{m}N_{nei}/|E|$则能够减少通信的开销,实验验证当$\alpha$大于0.7的时候,$\eta_1$小于0.1
从精度角度考虑,采用k共享之后,能够采样得到的本地特征的增加数量
对于热点数据,访问外部分区的数量
$|E_k| * \frac{(|E|-|E_k|)}{|E|}\frac{m-1}{m}$
对于非热点数据,访问外部分区的数量
$(|E|-|E_k|)*\frac{|E|-|E_k|}{E}y$
......@@ -6,9 +6,9 @@ addr="192.168.1.107"
partition_params=("ours")
#"metis" "ldg" "random")
#("ours" "metis" "ldg" "random")
partitions="4"
partitions="8"
node_per="4"
nnodes="1"
nnodes="2"
node_rank="0"
probability_params=("0.1")
sample_type_params=("boundery_recent_decay")
......@@ -19,7 +19,7 @@ memory_type=("historical")
#memory_type=("local" "all_update" "historical" "all_reduce")
shared_memory_ssim=("0.3")
#data_param=("WIKI" "REDDIT" "LASTFM" "WikiTalk")
data_param=("WikiTalk")
data_param=("LASTFM" "WikiTalk" "StackOverflow")
#"GDELT")
#data_param=("WIKI" "REDDIT" "LASTFM" "DGraphFin" "WikiTalk" "StackOverflow")
#data_param=("WIKI" "REDDIT" "LASTFM" "WikiTalk" "StackOverflow")
......@@ -32,9 +32,9 @@ data_param=("WikiTalk")
#seed=(( RANDOM % 1000000 + 1 ))
mkdir -p all_"$seed"
for data in "${data_param[@]}"; do
model="JODIE_large"
model="TGN_large"
if [ "$data" = "WIKI" ] || [ "$data" = "REDDIT" ] || [ "$data" = "LASTFM" ]; then
model="JODIE"
model="TGN"
fi
#model="APAN"
mkdir all_"$seed"/"$data"
......
......@@ -137,7 +137,7 @@ class AdaParameter:
self.alpha = max(min(self.alpha, self.max_alpha),self.min_alpha)
print('gnn aggregate {} fetch {} memory sync {} memory update {}'.format(average_gnn_aggregate,average_fetch,average_memory_sync_time,average_memory_update_time))
print('beta is {} alpha is {}\n'.format(self.beta,self.alpha))
self.reset_time()
#self.reset_time()
#log(2-a1 ) = log(2-a2) * t1/t2 * (1 + wait_threshold)
#2-a1 = 2-a2 ^(t1/t2 * (1 + wait_threshold))
#a1 = 2 - 2-a2 ^(t1/t2 * (1 + wait_threshold))
......
......@@ -224,10 +224,10 @@ class DistributedDataLoader:
next_data = self.input_dataset[torch.tensor([],device=self.device,dtype= torch.long)]
else:
next_data = self.input_dataset[self.batch_pos_l[self.submitted]:self.batch_pos_r[self.submitted] +1]
if self.mode=='train' and self.probability < 1:
if self.mode=='train' and self.ada_param.beta < 1:
mask = ((DistIndex(self.graph.nids_mapper[next_data.edges[0,:].to('cpu')]).part == dist.get_rank())&(DistIndex(self.graph.nids_mapper[next_data.edges[1,:].to('cpu')]).part == dist.get_rank()))
if self.probability > 0:
mask[~mask] = (torch.rand((~mask).sum().item()) < self.probability)
if self.ada_param.beta > 0:
mask[~mask] = (torch.rand((~mask).sum().item()) < self.ada_param.beta)
next_data = next_data[mask.to(next_data.device)]
self.submitted = self.submitted + 1
return next_data
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment