Package semanticSBML :: Module merge_logic
[hide private]
[frames] | no frames]

Source Code for Module semanticSBML.merge_logic

  1  #python libraries 
  2  import re 
  3  import copy 
  4  import xmlobject 
  5  import os 
  6  import sys 
  7  #external libraries 
  8  import libsbml 
  9  #semanticSBML libraries 
 10  import merge_datamodel 
 11  import libSBAnnotation.database_new 
 12  import libSBAnnotation.config 
 13   
 14  g_known_units = None 
 15  g_bio_qualifier_list=[] 
 16  try: 
 17          g_known_units = xmlobject.XMLFile(path=libSBAnnotation.config.Config().getpath('resources','libsbml_resources')).root.knownUnits[0].kinds.split() 
 18  except: 
 19          sys.stderr.write('error opening file (missing?) ',libSBAnnotation.config.Config().getpath('resources','libsbml_resources')) 
 20          import sys 
 21          sys.exit(1) 
 22  else: 
 23          bq = xmlobject.XMLFile(path=libSBAnnotation.config.Config().getpath('resources','libsbml_resources')).root.BioQualifier 
 24          for q in bq[0].qualifier: 
 25                  g_bio_qualifier_list.append(q.name) 
 26   
27 -class BioRelations(object):
28 ''' 29 singleton class for the comparison of elements 30 creation of globally unique identifiers 31 update of mathematical expressions that contain identifiers 32 ''' 33 __instance = None 34
35 - def __new__(cls, *args, **kargs):
36 if cls.__instance is None: 37 cls.__instance = BioRelations.Local(*args, **kargs) 38 return cls.__instance
39
40 - class Local:
41
42 - def __init__(self,merger=None):
43 self.merger=merger 44 self._ldb = libSBAnnotation.database_new.Meta_DB() 45 self.element_score=self.getScoreMatrix(libSBAnnotation.config.Config().getpath('resources','element_score_matrix')) 46 self.idlist=[] 47 self.model2id2newid={} 48 self.elements_update_queue=set()
49 50
51 - def setNewId(self,element,merge_model):
52 ''' 53 set a unique id for an element, 54 if id is aready unique, leave it, if not set a new one 55 @param element: merge(d) element or libsmbl element 56 @type element: MergedElement | OriginElement | libsmbl.(Species|...) 57 ''' 58 #TODO set unique metaid too!!!! 59 #---------------------------------------------------- 60 try: 61 if element.newidset: 62 print 'double id set',element#DEBUG 63 except AttributeError: 64 element.newidset=True 65 #---------------------------------------------------- 66 i='M' 67 id=element.getId() 68 #---------------------------------------------------- 69 while element.getId() in self.idlist: 70 element.setId('%s_%s'%(id,i)) 71 if i=='M': i=0 72 i+=1 73 self.idlist.append(element.getId()) 74 try: 75 self.model2id2newid[merge_model][id]=element.getId() 76 except KeyError: 77 self.model2id2newid[merge_model]={id:element.getId()}
78 79
80 - def getMathElementNames(self,mathobj):
81 ''' 82 get a list of stings that are identifiers of sbml elements used in the inserted math obj 83 this is a recursive function 84 @type mathobj: libSBML.math 85 @return: list of identifier strings 86 @rtype: [str] 87 ''' 88 if mathobj.getNumChildren()==0: 89 if mathobj.isName(): 90 return [mathobj.getName()] 91 else: 92 return [] 93 else: 94 names=[] 95 #check all children, recursive 96 for i in range(mathobj.getNumChildren()): 97 childnames=self.getMathElementNames(mathobj.getChild(i)) 98 if childnames:names+=childnames 99 #get this name/element 100 if mathobj.isName() or mathobj.isFunction(): 101 names.append(mathobj.getName()) 102 103 return names
104
105 - def updateMathElementIDs(self,mathobj,merge_model,create_copy=True):
106 ''' 107 update all IDs in mathematical expressions with IDs of merged elements if the old element is merged 108 109 ''' 110 if create_copy: mathobj = mathobj.clone() 111 math=mathobj 112 try: 113 math = mathobj.getMath() 114 except AttributeError: 115 pass 116 117 if math.getNumChildren()==0: 118 #-------------------------------------------------------- 119 if math.isName() or math.isFunction(): 120 try: 121 if merge_model.id2merge_element[math.getName()].getTuple():#check if tuple is available 122 math.setName(merge_model.id2merge_element[math.getName()].getTuple().getMergedElement().getId()) 123 elif self.model2id2newid[merge_model][math.getName()]!=math.getName():#check if id was updated 124 math.setName = self.model2id2newid[merge_model][math.getName()] 125 except KeyError:#no!? internal parameter! 126 #print 'HUH internal param ',math.getName(),merge_model.repr 127 pass 128 #elif math.isFunction(): 129 # if self.model2id2newid[merge_model][math.getFunction()]!=math.getFunction(): 130 # math.setFunction = self.model2id2newid[merge_model][math.getFunction()] 131 132 #-------------------------------------------------------- 133 else: 134 #check all children, recursive 135 for i in range(math.getNumChildren()): 136 childnames=self.updateMathElementIDs(math.getChild(i),merge_model,False) 137 #-------------------------------------------------------- 138 #check for fkt defs 139 try: 140 newfds=[] 141 for ofd in mathobj.functionDefinitions: 142 xfd = ofd 143 if create_copy: xfd = ofd.clone() 144 newfds.append(self.updateMathElementIDs(xfd,merge_model,False)) 145 if create_copy: mathobj.functionDefinitions=newfds 146 except AttributeError:# no fkt def attached 147 pass 148 149 #-------------------------------------------------------- 150 return mathobj
151 152
153 - def chooseBestTuple(self,el1,el2):
154 ''' 155 reorder elements so that adding the second element to the tuple of the first gives a better result than adding the first element to the tuple of the second 156 @type el1: L{OriginElement} 157 @parameter el1: L{OriginElement} that belongs to a L{MergeTuple} 158 @rtype: (OriginElement,OriginElement) 159 @return: reorder list of the input 160 ''' 161 t1=el1.getTuple() 162 e1_combined_pair_score=0 163 e1_combined_location_score=0 164 for e in t1.getElements(): 165 if not e.mid==el1.mid: 166 e1_combined_pair_score+=t1.pair_score[frozenset([el1.mid,e.mid])] 167 e1_combined_location_score+=t1.pair_location_score[frozenset([el1.mid,e.mid])] 168 t2=el2.getTuple() 169 e2_combined_pair_score=0 170 e2_combined_location_score=0 171 for e in t2.getElements(): 172 if not e.mid==el2.mid: 173 e2_combined_pair_score+=t2.pair_score[frozenset([el2.mid,e.mid])] 174 e2_combined_location_score+=t2.pair_location_score[frozenset([el2.mid,e.mid])] 175 176 if e1_combined_location_score==e2_combined_location_score: 177 if e1_combined_pair_score>=e2_combined_pair_score: 178 return (el1,el2) 179 else: 180 return (el2,el1) 181 elif e1_combined_location_score>=e2_combined_location_score: 182 return (el1,el2) 183 else: 184 return (el2,el1)
185
186 - def compareElements(self,el1,el2):
187 ''' 188 compare the annotations of two elements and return a similarity score 189 @param el1: element containing annotations 190 @param el2: element containing annotations 191 @type el1: libSBML base element or L{OriginElement} 192 @type el2: libSBML base element or L{OriginElement} 193 ''' 194 if isinstance(el1, merge_datamodel.OriginElement): 195 el1annotations=el1.getMiriamAnnotations() 196 else: 197 el1annotations=ElementAnnotations(el1).getAnnotations() 198 if isinstance(el2, merge_datamodel.OriginElement): 199 el2annotations=el2.getMiriamAnnotations() 200 else: 201 el2annotations=ElementAnnotations(el2).getAnnotations() 202 203 score=0 204 for an1 in el1annotations:#FIXME must use pop list to no repeat 205 for an2 in el2annotations: 206 if self.isAnnotationEqual(an1,an2,el1.type): 207 score += self.element_score[an1.qualifier][an2.qualifier] 208 #if el1.id.lower() in ['glci','glcx'] or el2.id.lower() in ['glci','glcx']: 209 # if el1.id.lower() in ['glci','glcx'] and el2.id.lower() in ['glci','glcx']: 210 # print 'yepp',el1.id,el2.id, score 211 return score
212
213 - def compareLocation(self,el1, el2):
214 #if not el1.bioQuantity or not el2.bioQuantity:#uncomparable due to missing bioquantities 215 # return 0 216 def compare(em1,em2): 217 '''internal function for comparing the location, this is needed due to different element types that have different number of locations''' 218 score=0 219 if em1 and em2: 220 tuple=self.merger.getTuple(em1,em2) 221 if tuple: 222 #print 'found for ',em1,em1.repr,em1.merge_model.repr,em2,em2.repr,em2.merge_model.repr,tuple.pair_score 223 try: 224 return tuple.pair_score[frozenset([em1.mid,em2.mid])] 225 except KeyError: 226 print 'should not happen' 227 asdf 228 else: 229 return score 230 return score
231 232 if el1.type=='reaction':#reactands and products may be in differnent comapartments 233 score=0 234 #print 'reactands and products'#DEBUG 235 for n in range(2): 236 for x in el1.getLocation()[n]: 237 for y in el2.getLocation()[n]: 238 score += compare(x,y) 239 return score 240 else: 241 try: 242 return compare(el1.getLocation(),el2.getLocation()) 243 except NotImplementedError: 244 return 0
245
246 - def isAnnotationEqual(self,an1,an2,type):
247 ''' 248 return if 2 annotations can be considered equal 249 @type an1: Annotation 250 @type an2: Annotation 251 @rtype: bool 252 ''' 253 if str(an1)==str(an2): 254 #print 'exact match',an1.getName(),type#DEBUG 255 return True 256 #elif type=='reaction': 257 #print 'reaction, comparing directly' 258 # return str(an1)==str(an2)#TODO do a database powered comparison 259 #TODO now do something more sophisticated like checking if the reactands are equal 260 #elif type == 'compartment': 261 #print 'compartment, comparing directly' 262 # return str(an1)==str(an2) 263 elif an1.getName() and an2.getName(): 264 res = self._ldb.searchEntries(an1.getName(),1.) 265 #res=self._ldb.get_id_exact(an1.getName(),type) 266 #print 'here:',an2.db+an2.id,an1.getName(),res[0] 267 if len(res)>1: 268 WRONG#DEBUG 269 for k,v in res[0].iteritems(): 270 #print k+v+'=='+an2.db+an2.id 271 if k+v == an1.db+an2.id: 272 #jefunden,unglaublich_es_gibt_doch_wunder,_unsere_datenbank_war_soeben_das_erste_mal_nuetzlich 273 return True 274 else: 275 #falsch 276 return False 277 #elif an2.getName():#this should make not difference since if one is not known we can only do string matching 278 # print an2.getName(),self._ldb.get_id_exact(an2.getName(),type) 279 else:#FALLBACK ids not in database comparing directly 280 #print 'Id is not in database, comparing directly' 281 return str(an1)==str(an2)
282 - def getScoreMatrix(self,filename):
283 ''' 284 TODO 285 triangular to square matrix 286 ''' 287 try: 288 raw_cmp_scr = open(filename,'r').read() 289 except IOError: 290 sys.stderr.write('missing '+filename) 291 try: 292 import libSBAnnotation.database_new 293 db = libSBAnnotation.database_new.Meta_DB() 294 db.buildDB('element_score_matrix') 295 except Exception,e: 296 raise merge_datamodel.MergeError("missing %s\n%sTrying to get it ..."(filename,e)) 297 else: 298 raise merge_datamodel.MergeError('Please Restart The Merging Process') 299 raw_cmp_scr = raw_cmp_scr.split() 300 raw_cmp_scr = [int(x) for x in raw_cmp_scr if re.match('-?\d+',x)] 301 #print raw_cmp_scr#DEBUG 302 vs=g_bio_qualifier_list 303 #vs=[' ','is','hasPart', 'isPartOf', 'isVersionOf', 'hasVersion', 'isHomologTo', 'isDescribedBy','encodes','isEncodedBy', 'unknown'] 304 #vs=[' ','is','hasPart', 'isPartOf', 'isVersionOf', 'hasVersion', 'isHomologTo', 'isDescribedBy', 'unknown'] 305 score_matrix={} 306 n=0 307 try: 308 for v in vs: 309 if not score_matrix.has_key(v):score_matrix[v]={} 310 for iv in vs[n:]: 311 if not score_matrix.has_key(iv):score_matrix[iv]={} 312 score_matrix[v][iv]=raw_cmp_scr.pop(0) 313 score_matrix[iv][v]=score_matrix[v][iv] 314 n+=1 315 except IndexError,e: 316 sys.stderr.write('Your score matrix file is not correct will update %s now!'%filename) 317 try: 318 import libSBAnnotation.database_new 319 db = libSBAnnotation.database_new.Meta_DB() 320 db.buildDB('element_score_matrix') 321 except Exception,e: 322 raise MergeError("Faulty score matrix %s\n%sTrying to get new one ..."(filename,e)) 323 else: 324 raise merge_datamodel.MergeError('Please Restart The Merging Process') 325 return score_matrix
326
327 - def updateId(self,libsbml_element,oldId,newId):
328 329 if isinstance(libsbml_element,libsbml.Model): 330 elements_w_id=['Species', 'Compartments', 'Reactions', 'Parameters', 'FunctionDefinitions', 'InitialAssignments', 'Rules'] 331 for etype in elements_w_id: 332 for e in list(getattr(self._merged_element.libsbml_model,'getListOf'+etype)()): 333 self.updateId(e,oldId,newId) 334 #------- 335 #reaction participants: Reactant, Modifier, Product 336 if libsbml_element.getElementName()=='reaction': 337 self._updateReactionPointers(libsbml_element,oldId,newId) 338 #------- 339 #location: compartment, outside 340 try: 341 self._updateLocationPointers(libsbml_element,oldId,newId) 342 except AttributeError: 343 pass 344 #------- 345 #math statement: kineticLaw, initialAssignment, 346 try: 347 try: 348 libsbml_element=libsbml_element.getKineticLaw() 349 except AttributeError: 350 pass 351 math = libsbml_element.getMath() 352 self._updateMathPointers(math,oldId,newId) 353 except AttributeError: 354 pass
355 #------- 356 357 #--------------------------------------------------------
358 - def _updateMathPointers(self,math,oldId,newId):
359 360 if math.isFunction(): 361 if math.getName()==oldId: 362 math.setName(newId) 363 for i in range(math.getNumChildren()): 364 childnames=self._updateMathPointers(math.getChild(i),oldId,newId) 365 366 elif math.getNumChildren()==0: 367 if math.isName(): 368 if math.getName()==oldId: 369 math.setName(newId) 370 else: 371 #check all children, recursive 372 for i in range(math.getNumChildren()): 373 childnames=self._updateMathPointers(math.getChild(i),oldId,newId)
374 375 #--------------------------------------------------------
376 - def _updateLocationPointers(self,libsbml_element,oldId,newId):
377 try: 378 if libsbml_element.getOutside()==oldId: libsbml_element.setOutside(newId) 379 except AttributeError: 380 if libsbml_element.getCompartment()==oldId: libsbml_element.setCompartment(newId)
381 382 383 #--------------------------------------------------------
384 - def _updateReactionPointers(self,libsbml_element,oldId,newId):
385 types=['Reactant','Modifier','Product'] 386 for type in types: 387 listof=getattr(libsbml_element,'getListOf'+type+'s')() 388 for i,elem in enumerate(list(listof)): 389 if elem.getSpecies()==oldId: 390 elem.setSpecies(newId)
391 #-------------------------------------------------------- 392 393
394 -def resetScoreMatrix(filename):
395 ''' 396 create an empty score matrix 397 the score matrix should be filled to contain values that represent the relationship between MIRIAM annotations with different or same qualifier 398 the score matrix is in human readable format 399 ''' 400 FH=open(filename,'w') 401 #vs=(' ','is','hasPart', 'isPartOf', 'isVersionOf', 'hasVersion', 'isHomologTo', 'isDescribedBy','encodes','isEncodedBy', 'unknown') 402 vs=['']+g_bio_qualifier_list 403 404 tmpstr = ''.join(['%-14s' for i in range(len(vs))])+'\n' 405 FH.write(tmpstr % tuple(vs)) 406 n=len(vs) 407 for v in vs: 408 if n<len(vs): 409 v = '%-14s'%v 410 for x in range(len(vs)-1-n): 411 v+='%-14s' % ' ' 412 for x in range(n): 413 v+='%-14d' % 1 414 FH.write(v+'\n') 415 n-=1 416 FH.close()
417