Commit e074b837 by zlj

fix time count in code

parent b9ca4758
...@@ -27,7 +27,7 @@ gnn: ...@@ -27,7 +27,7 @@ gnn:
dim_time: 100 dim_time: 100
dim_out: 100 dim_out: 100
train: train:
- epoch: 50 - epoch: 1
batch_size: 3000 batch_size: 3000
# reorder: 16 # reorder: 16
lr: 0.0004 lr: 0.0004
......
...@@ -202,7 +202,7 @@ def main(): ...@@ -202,7 +202,7 @@ def main():
else: else:
graph,full_sampler_graph,train_mask,val_mask,test_mask,full_train_mask,cache_route = load_from_speed(args.dataname,seed=123457,top=args.topk,sampler_graph_add_rev=True, feature_device=torch.device('cuda:{}'.format(ctx.local_rank)),partition=args.partition)#torch.device('cpu')) graph,full_sampler_graph,train_mask,val_mask,test_mask,full_train_mask,cache_route = load_from_speed(args.dataname,seed=123457,top=args.topk,sampler_graph_add_rev=True, feature_device=torch.device('cuda:{}'.format(ctx.local_rank)),partition=args.partition)#torch.device('cpu'))
if(args.dataname=='GDELT'): if(args.dataname=='GDELT'):
train_param['epoch'] = 1 train_param['epoch'] = 2
#torch.autograd.set_detect_anomaly(True) #torch.autograd.set_detect_anomaly(True)
# 确保 CUDA 可用 # 确保 CUDA 可用
if torch.cuda.is_available(): if torch.cuda.is_available():
...@@ -295,7 +295,7 @@ def main(): ...@@ -295,7 +295,7 @@ def main():
mode='train', mode='train',
queue_size = 200, queue_size = 200,
mailbox = mailbox, mailbox = mailbox,
is_pipeline=False, is_pipeline=True,
use_local_feature = False, use_local_feature = False,
device = torch.device('cuda:{}'.format(local_rank)), device = torch.device('cuda:{}'.format(local_rank)),
probability=args.probability, probability=args.probability,
...@@ -554,7 +554,8 @@ def main(): ...@@ -554,7 +554,8 @@ def main():
optimizer.zero_grad() optimizer.zero_grad()
ones = torch.ones(metadata['dst_neg_index'].shape[0],device = model.device,dtype=torch.float) ones = torch.ones(metadata['dst_neg_index'].shape[0],device = model.device,dtype=torch.float)
pred_pos, pred_neg = model(mfgs,metadata,neg_samples=args.neg_samples,async_param = param) pred_pos, pred_neg = model(mfgs,metadata,neg_samples=args.neg_samples,async_param = param)
time_count.time_backward+=time_count.elapsed_event(t1)
t2=time_count.start_gpu()
#print(time_count.elapsed_event(t2)) #print(time_count.elapsed_event(t2))
loss = creterion(pred_pos, torch.ones_like(pred_pos)) loss = creterion(pred_pos, torch.ones_like(pred_pos))
if args.local_neg_sample is False: if args.local_neg_sample is False:
...@@ -569,16 +570,18 @@ def main(): ...@@ -569,16 +570,18 @@ def main():
#torch.cuda.synchronize() #torch.cuda.synchronize()
loss.backward() loss.backward()
optimizer.step() optimizer.step()
time_count.time_forward+=time_count.elapsed_event(t1) time_count.time_forward+=time_count.elapsed_event(t2)
#torch.cuda.synchronize() #torch.cuda.synchronize()
## train aps ## train aps
#y_pred = torch.cat([pred_pos, pred_neg], dim=0).sigmoid().cpu() #y_pred = torch.cat([pred_pos, pred_neg], dim=0).sigmoid().cpu()
#y_true = torch.cat([torch.ones(pred_pos.size(0)), torch.zeros(pred_neg.size(0))], dim=0) #y_true = torch.cat([torch.ones(pred_pos.size(0)), torch.zeros(pred_neg.size(0))], dim=0)
#train_aps.append(average_precision_score(y_true, y_pred.detach().numpy())) #train_aps.append(average_precision_score(y_true, y_pred.detach().numpy()))
#torch.cuda.synchronize() #torch.cuda.synchronize()
t3 = time_count.start_gpu()
mailbox.update_shared() mailbox.update_shared()
mailbox.update_p2p_mem() mailbox.update_p2p_mem()
mailbox.update_p2p_mail() mailbox.update_p2p_mail()
time_count.time_memory_sync+=time_count.elapsed_event(t3)
#start = time_count.start_gpu() #start = time_count.start_gpu()
#torch.cuda.empty_cache() #torch.cuda.empty_cache()
......
...@@ -152,6 +152,8 @@ class DistributedDataLoader: ...@@ -152,6 +152,8 @@ class DistributedDataLoader:
self.local_root = 0 self.local_root = 0
self.probability = probability self.probability = probability
print('pro {}\n'.format(self.probability)) print('pro {}\n'.format(self.probability))
self.time_count = []
def __iter__(self): def __iter__(self):
if self.chunk_size is None: if self.chunk_size is None:
...@@ -255,6 +257,7 @@ class DistributedDataLoader: ...@@ -255,6 +257,7 @@ class DistributedDataLoader:
return return
while(len(self.result_queue)==0): while(len(self.result_queue)==0):
pass pass
t0 = tt.start_gpu()
batch_data,dist_nid,dist_eid = self.result_queue[0].result() batch_data,dist_nid,dist_eid = self.result_queue[0].result()
b = batch_data[1][0][0] b = batch_data[1][0][0]
self.remote_node += (DistIndex(dist_nid).part != dist.get_rank()).sum().item() self.remote_node += (DistIndex(dist_nid).part != dist.get_rank()).sum().item()
...@@ -268,6 +271,8 @@ class DistributedDataLoader: ...@@ -268,6 +271,8 @@ class DistributedDataLoader:
#end = torch.cuda.Event(enable_timing=True) #end = torch.cuda.Event(enable_timing=True)
#start.record() #start.record()
stream.synchronize() stream.synchronize()
tt.time_sample_and_build += tt.elapsed_event(t0)
t1 = tt.start_gpu()
#end.record() #end.record()
#end.synchronize() #end.synchronize()
#print(start.elapsed_time(end)) #print(start.elapsed_time(end))
...@@ -287,8 +292,11 @@ class DistributedDataLoader: ...@@ -287,8 +292,11 @@ class DistributedDataLoader:
edge_feat = None edge_feat = None
t3 = time.time() t3 = time.time()
self.result_queue.append((batch_data,dist_nid,dist_eid,edge_feat,node_feat)) self.result_queue.append((batch_data,dist_nid,dist_eid,edge_feat,node_feat))
tt.time_memory_fetch += tt.elapsed_event(t1)
t1 = tt.start_gpu()
self.submit() self.submit()
tt.time_sample_and_build+=tt.elapsed_event(t1)
@torch.no_grad() @torch.no_grad()
def __next__(self): def __next__(self):
ctx = DistributedContext.get_default_context() ctx = DistributedContext.get_default_context()
...@@ -333,6 +341,7 @@ class DistributedDataLoader: ...@@ -333,6 +341,7 @@ class DistributedDataLoader:
raise StopIteration raise StopIteration
else: else:
if self.recv_idxs == 0: if self.recv_idxs == 0:
t0 = tt.start_gpu()
data = self._next_data() data = self._next_data()
batch_data,dist_nid,dist_eid = graph_sample( batch_data,dist_nid,dist_eid = graph_sample(
self.graph, self.graph,
...@@ -346,17 +355,21 @@ class DistributedDataLoader: ...@@ -346,17 +355,21 @@ class DistributedDataLoader:
) )
edge_feat = get_edge_feature_by_dist(self.graph,dist_eid,is_local,out_device=self.device) edge_feat = get_edge_feature_by_dist(self.graph,dist_eid,is_local,out_device=self.device)
node_feat,mem = get_node_feature_by_dist(self.graph,self.mailbox,dist_nid, is_local,out_device=self.device) node_feat,mem = get_node_feature_by_dist(self.graph,self.mailbox,dist_nid, is_local,out_device=self.device)
t_sample = tt.elapsed_event(t0)
tt.time_sample_and_build+=t_sample
t1 = tt.start_gpu()
prepare_input(node_feat,edge_feat,mem,batch_data[1],dist_nid,dist_eid) prepare_input(node_feat,edge_feat,mem,batch_data[1],dist_nid,dist_eid)
if(self.mailbox is not None and self.mailbox.historical_cache is not None): if(self.mailbox is not None and self.mailbox.historical_cache is not None):
batch_data[1][0][0].srcdata['his_mem'] = batch_data[1][0][0].srcdata['mem'].clone() batch_data[1][0][0].srcdata['his_mem'] = batch_data[1][0][0].srcdata['mem'].clone()
batch_data[1][0][0].srcdata['his_ts'] = batch_data[1][0][0].srcdata['mail_ts'].clone() batch_data[1][0][0].srcdata['his_ts'] = batch_data[1][0][0].srcdata['mail_ts'].clone()
tt.time_memory_fetch+=tt.elapsed_event(t1)
#if(self.mailbox is not None and self.mailbox.historical_cache is not None): #if(self.mailbox is not None and self.mailbox.historical_cache is not None):
# id = batch_data[1][0][0].srcdata['ID'] # id = batch_data[1][0][0].srcdata['ID']
# mask = DistIndex(id).is_shared # mask = DistIndex(id).is_shared
#batch_data[1][0][0].srcdata['mem'][mask] = self.mailbox.historical_cache.local_historical_data[DistIndex(id).loc[mask]] #batch_data[1][0][0].srcdata['mem'][mask] = self.mailbox.historical_cache.local_historical_data[DistIndex(id).loc[mask]]
self.recv_idxs += 1 self.recv_idxs += 1
else: else:
t0 = time_count.start_gpu()
if(self.recv_idxs < self.expected_idx): if(self.recv_idxs < self.expected_idx):
assert len(self.result_queue) > 0 assert len(self.result_queue) > 0
#print(len(self.result_queue[0])) #print(len(self.result_queue[0]))
...@@ -383,6 +396,7 @@ class DistributedDataLoader: ...@@ -383,6 +396,7 @@ class DistributedDataLoader:
node_feat0 = node_feat0[0] node_feat0 = node_feat0[0]
node_feat = None node_feat = None
mem = self.mailbox.unpack(node_feat0,mailbox = True) mem = self.mailbox.unpack(node_feat0,mailbox = True)
time_count.time_memory_fetch+=time_count.elapsed_event(t0)
#print(node_feat.shape,edge_feat.shape,mem[0].shape) #print(node_feat.shape,edge_feat.shape,mem[0].shape)
#node_feat[1].wait() #node_feat[1].wait()
#node_feat = node_feat[0] #node_feat = node_feat[0]
...@@ -417,7 +431,9 @@ class DistributedDataLoader: ...@@ -417,7 +431,9 @@ class DistributedDataLoader:
global executor global executor
if(len(self.result_queue)==0): if(len(self.result_queue)==0):
#if(self.recv_idxs+1<=self.expected_idx): #if(self.recv_idxs+1<=self.expected_idx):
t0 = tt.start_gpu()
self.submit() self.submit()
time_count.time_sample_and_build = tt.elapsed_event(t0)
""" """
graph_sample( graph_sample(
graph=self.graph, graph=self.graph,
......
...@@ -514,8 +514,8 @@ class SharedMailBox(): ...@@ -514,8 +514,8 @@ class SharedMailBox():
self.update_shared() self.update_shared()
self.update_p2p_mail() self.update_p2p_mail()
self.update_p2p_mem() self.update_p2p_mem()
self.handle_last_async() #self.handle_last_async()
self.sychronize_shared() #self.sychronize_shared()
#self.historical_cache.add_shared_to_queue(handle0,handle1,shared_id_list,shared_list) #self.historical_cache.add_shared_to_queue(handle0,handle1,shared_id_list,shared_list)
""" """
shared_memory = self.node_memory.accessor.data[self.shared_nodes_index] shared_memory = self.node_memory.accessor.data[self.shared_nodes_index]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment