# -*- coding: utf-8 -*-
"""
Functions to write pLabel data.
"""
import pandas as pd
import os
import numpy as np
import re
if __name__ == '__main__':
import HelperFunctions as hf
else:
from . import HelperFunctions as hf
def _unique_mods(modlist):
"""
Go through a list containing lists of modifications, strings of
modifications or NaN and extract all occuring unique mod-strings
Args:
modlist: list of modifications form xtable
Returns:
Lisr of unique modifications
"""
alist = []
for element in modlist:
if isinstance(element, list):
for e in element:
if e not in alist:
alist.append(e)
elif isinstance(element, float):
if not np.isnan(element):
if element not in alist:
alist.append(str(element))
else:
if element not in alist:
alist.append(element)
return list(set(alist))
def _generate_plabel_pepstring(xtype, xlink1, xlink2, pepseq1, pepseq2, score, mod1, mod2, modpos1, modpos2, mods2num):
"""
pep1=3 7 2 VFLLPDKK 22.7634 KKFETK 1 4,1
1: 3-1 = cross, loop, mono
2: xlink2
3: xlink2
4: pep1
5: score?
6: pep2
7: unknown, mostly 1
8: modification (e.g. 4,1 = modification 1 at position 4)
"""
if xtype == 'inter':
typeno = 3
elif xtype == 'intra':
typeno = 3
elif xtype == 'homomultimeric':
typeno = 3
elif xtype == 'loop':
typeno = 2
elif xtype == 'mono':
typeno = 1
if typeno > 1:
pepStringElements = [typeno, _cast_if_not_nan(xlink1, int),
_cast_if_not_nan(xlink2, int), pepseq1,
'{:.4f}'.format(score), pepseq2, '1']
else:
pepStringElements = [typeno, _cast_if_not_nan(xlink1, int),
pepseq1, '1']
pepStringElements = [str(x) for x in pepStringElements]
modlabels = []
mods = []
modposs = []
if not hf.isnan(modpos2):
# increment the modpos2 position to fit pLabel numbering
# add position for: cterm1, xlink, nterm2
modpos2 = [x + (len(pepseq1) + 3) for x in modpos2]
if not hf.isnan(modpos1):
mods.extend(_make_list(mod1))
modposs.extend(_make_list(modpos1))
mods.extend(_make_list(mod2))
modposs.extend(_make_list(modpos2))
elif not hf.isnan(modpos1):
mods.extend(_make_list(mod1))
modposs.extend(_make_list(modpos1))
for mod, pos in zip(mods, modposs):
modlabels.append('{},{}'.format(int(pos), mods2num[mod]))
pepStringElements.extend(modlabels)
return ' '.join(pepStringElements)
def _parse_mgf(filenames, mgfDir):
"""
Parse all mgf files matching to a list of filenames and extract all
TITLE arguments as list
Args:
filenames: List of rawfile basenames to look for
mgfDir: Path to mgf files
Returns:
titles2mgfoffset: Dictionary mapping mgf header titles to
the resp rawfile and the position within
the mgf-file
"""
titles2mgfoffset = {}
localMGFFiles = []
# collect mgf file names in mgfDir
for file in os.listdir(hf.compatible_path(mgfDir)):
if file.endswith('.mgf'):
localMGFFiles.append(file)
# check which files referenced in xtable are present in the dir
mgfToOpen = []
mgfNotFound = []
for file in filenames:
if file + '.mgf' in localMGFFiles:
mgfToOpen.append(file + '.mgf')
# pXtract usually adds the fragmentation method after conversion
# allow Orbitrap files to be recognised
elif file + '_HCDFT' + '.mgf' in localMGFFiles:
mgfToOpen.append(file + '_HCDFT' + '.mgf')
else:
mgfNotFound.append(file + '.mgf')
# raise error if file is missing
if len(mgfNotFound) > 0:
raise Exception('The following mgf files were not found at the ' +
'specified directory: {}'.format(', '.join(mgfNotFound)))
pattern = re.compile(r'TITLE=([^\.]+\.\d+\.\d+\.\d+\.\d+.*$)')
# parse the mgf files for titles
for f in mgfToOpen:
mgfFile = os.path.join(mgfDir, f)
with open(hf.compatible_path(mgfFile)) as inf:
offset_last = 0
offset_before_last = 0
for line in inf.readlines():
if line.startswith('TITLE='):
# in case of pXtract:
# TITLE=2017_08_04_SVs_BS3_16.2419.2419.4.dta
# for MSConvert with TPP compatibility:
# TITLE=2017_08_18_SK_3.1093.1093.2 File:"2017_08_18_SK_3.raw", NativeID:"controllerType=0 controllerNumber=1 scan=1093"
# MSConvert w/o TPP:
# TITLE=2017_08_18_SK_3.1093.1093.2
if pattern.match(line):
m = pattern.match(line)
title = m.group(1)
else:
raise(Exception('Title not found'))
titles2mgfoffset[title.upper()] = mgfFile, offset_before_last
offset_before_last = offset_last
offset_last += len(line) + 1
return titles2mgfoffset
def _make_list(strorList):
"""
take lists, floats or strings as input and return either the list or
a one-element list of the string or float
"""
if isinstance(strorList, list):
return strorList
elif isinstance(strorList, float):
if not np.isnan(strorList):
return [strorList]
else:
return [strorList]
def _cast_if_not_nan(input, typefunc):
if not hf.isnan(input):
return typefunc(input)
else:
return input
[docs]def Write(xtable, outpath, mgfDir, xlinker, mergepLabel = False):
"""
Converts xtable data structure to (multiple) input file(s)
for the pLabel cross-link annotation tool
Args:
xtable: data table structure
outpath: path to write file (w/o file extension!)
xlinker: xlinker as given to pLabel
mergepLabel (bool): Whether to generate a new MGF and single pLabel file
"""
rawfiles = xtable['rawfile'].unique().tolist()
if xlinker == '':
raise Exception('Please provide a name for the cross-linker')
titles2mgfoffset = _parse_mgf(rawfiles, mgfDir)
allTitles = list(titles2mgfoffset.keys())
if not mergepLabel:
# separate by rawfile
for rf in rawfiles:
xtablePerRawfile = xtable[xtable['rawfile'] == rf].copy()
# separate per type within rawfile
outfile = os.path.join(outpath + '_' + rf + '.pLabel')
print('Opening {} to write'.format(outfile))
with open(hf.compatible_path(outfile), 'w') as out:
out.write('[FilePath]\n')
out.write('File_Path=' + os.path.join(mgfDir, rf + '.mgf\n'))
modifications = _unique_mods(xtablePerRawfile['mod2'].tolist() +\
xtablePerRawfile['mod1'].tolist())
mods2num = {} # dict mapping mod names to indices
out.write('[Modification]\n')
for idx, mod in enumerate(modifications):
out.write('{}={}\n'.format(idx+1, mod))
mods2num[mod] = idx+1
out.write('[xlink]\n')
out.write('xlink={}\n'.format(xlinker))
out.write('[Total]\n')
out.write('total={}\n'.format(len(xtablePerRawfile.index)))
idx = 1
for _, row in xtablePerRawfile.iterrows():
out.write('[Spectrum{}]\n'.format(idx))
idx += 1
scanno = str(int(row['scanno']))
prec_ch = str(int(row['prec_ch']))
title = ''
nothingFound = True
for idx, t in enumerate(allTitles):
# add the scanno twice to the search string to avoid
# matching of substrings e.g. 2516 to 25164
if '.'.join([rf, scanno, scanno, prec_ch]).upper() in t:
title = t
# set the variable to check if any matching title
# was found for a row
nothingFound = False
# remove the title from the list to avoid setting
# the same title twice
del allTitles[idx]
# leave the loop once the title has been removed
break
if nothingFound:
raise Exception('[pLabel writer] couldnt find a matching spectrum for {}. If converting an xTable that was not generated from pLink input searched with the same mgf-file, please activate the merge-pLabel-option'.\
format('.'.join([rf, scanno, scanno, prec_ch]).upper()))
# Generate the spectrum title as used by pLabel from
# rawfile name, scanno and precursor charge
out.write('name={}\n'.format(title))
out.write('pep1={}\n'.format(_generate_plabel_pepstring(row['type'],
row['xlink1'],
row['xlink2'],
row['pepseq1'],
row['pepseq2'],
row['score'],
row['mod1'],
row['mod2'],
row['modpos1'],
row['modpos2'],
mods2num)))
elif mergepLabel:
# Write only one pLabel file
outfile = outpath + '.pLabel'
outMGF = outpath + '.mgf'
filesWithOffsetToCopy = []
# a list with new mgf spectrum titles to integrate non-pLink results
# into the pLabel viewer
new_titles_and_charges_for_copy = []
print('[pLabel] Opening {} to write'.format(outfile))
with open(hf.compatible_path(outfile), 'w') as plabel:
plabel.write('[FilePath]\n')
plabel.write('File_Path=' + outMGF + '\n')
modifications = _unique_mods(xtable['mod2'].tolist() +\
xtable['mod1'].tolist())
mods2num = {} # dict mapping mod names to indices
plabel.write('[Modification]\n')
for idx, mod in enumerate(modifications):
plabel.write('{}={}\n'.format(idx+1, mod))
mods2num[mod] = idx+1
plabel.write('[xlink]\n')
plabel.write('xlink={}\n'.format(xlinker))
plabel.write('[Total]\n')
plabel.write('total={}\n'.format(len(xtable.index)))
plabel_specno = 1
for rf in rawfiles:
xtablePerRawfile = xtable[xtable['rawfile'] == rf].copy()
for _, row in xtablePerRawfile.iterrows():
toWrite = ''
toWrite += ('[Spectrum{}]\n'.format(plabel_specno))
plabel_specno += 1
scanno = str(int(row['scanno']))
prec_ch = str(int(row['prec_ch']))
nothingFound = True
title = ''
for idx, t in enumerate(allTitles):
# add the scanno twice to the search string to avoid
# matching of substrings e.g. 2516 to 25164
# the charge is not considered here as charge assignment
# can vary between different prorgammes
if '.'.join([rf, scanno, scanno]).upper() in t:
# generate a new mgf-spectrum title unique for this
# entry (pLabel cannot take a spectrum twice)
counter = 0
while '.'.join([rf, scanno, scanno, prec_ch, str(counter)]) in new_titles_and_charges_for_copy:
counter +=1
title = '.'.join([rf, scanno, scanno, prec_ch, str(counter)])
new_titles_and_charges_for_copy.append((title, prec_ch))
# save the position of each title in the MGF file
# for MGF-file merging
filesWithOffsetToCopy.append(titles2mgfoffset[t])
# set the variable to check if any matching title
# was found for a row
nothingFound = False
# leave the loop
break
if nothingFound:
raise Exception('[pLabel writer] couldnt find a matching spectrum for {}'.\
format('.'.join([rf, scanno, scanno, prec_ch]).upper()))
# Generate the spectrum title as used by pLabel from
# rawfile name, scanno and precursor charge
toWrite += ('name={}\n'.format(title))
toWrite += ('pep1={}\n'.format(_generate_plabel_pepstring(row['type'],
row['xlink1'],
row['xlink2'],
row['pepseq1'],
row['pepseq2'],
row['score'],
row['mod1'],
row['mod2'],
row['modpos1'],
row['modpos2'],
mods2num)))
plabel.write(toWrite)
print('[pLabel] Merging MGF files')
# Generate merged MGF file containing only the matching spectra
print('Opening {} to write'.format(outMGF))
with open(hf.compatible_path(outMGF), 'w') as mgf:
# sequentially open all MGF-files to copy from
templates = set([file for file, offset in filesWithOffsetToCopy])
for template in templates:
with open(hf.compatible_path(template), 'r') as t:
print('Opening {} to read'.format(template))
offsets = []
new_titles_and_charges = []
for idx, (file, offset) in enumerate(filesWithOffsetToCopy):
if file == template:
# parts to read from that file
offsets.append(offset)
# new titles to generate for each part read
new_titles_and_charges.append(new_titles_and_charges_for_copy[idx])
for idx, o in enumerate(offsets):
# move to the part of the file where the spectrum is stored
t.seek(o, 0)
new_title_and_charge = new_titles_and_charges[idx]
while True:
# loop through the lines of the spectrum until end-signa
line = t.readline()
if line.startswith('END IONS'):
# leave loop if the current spectrum ends
mgf.write(line)
break
elif line.startswith('TITLE'):
# change the title line
mgf.write('TITLE={}\n'.format(new_title_and_charge[0]))
elif line.startswith('CHARGE'):
# change the charge line
mgf.write('CHARGE={}+\n'.format(new_title_and_charge[1]))
else:
mgf.write(line)
if __name__ == '__main__':
import sys
sys.path.append(r'C:\Users\User\Documents\03_software\python\CroCo\src')
import croco
infile = r'C:\Users\User\Documents\03_software\python\CroCo\testdata\final\output\all_merged_xTable_intra.xlsx'
xTable = croco.xTable.Read(infile)
outpath = r'C:\Users\User\Documents\03_software\python\CroCo\testdata\final\output\xTable_to_vis\pLabel'
mgfDir = r'C:\Users\User\Documents\03_software\python\CroCo\testdata\final'
Write(xTable, outpath, mgfDir, 'BS3', mergepLabel = True)