Introducing InstaNovo-P, a de novo sequencing model for phosphoproteomics¶
Announcing InstaNovo-P¶
We are happy to share our newest model in the InstaNovo family, InstaNovo-P. [link to bioArxiv preprint]
InstaNovo-P is a fine-tuned version of our base model, InstaNovo v1.0.0, specifically tailored towards application in phosphoproteomics mass spectrometry experiments. InstaNovo-P was further trained on approx. 2.8 million PSMs from 75 thousand phosphorylated peptides. InstaNovo-P was extended to recognize the residues phospho-tyrosine, -serine and -threonine, achieving high accuracy in detecting phosphorylated peptides while retaining its performance in unmodified peptides.
import os
import sys
if 'google.colab' in sys.modules or 'KAGGLE_KERNEL_RUN_TYPE' in os.environ:
# Suppress TensorFlow warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '1'
# Install torchvision when running on Google Colab to prevent errors
!uv pip install --system "instanovo[cu124]>=1.1.2" pyopenms-viz torchvision tf-nightly
else:
!uv pip install "instanovo[cu124]>=1.1.2" pyopenms-viz
We can use instanovo version to check the version of InstaNovo (the transformer-based model).
!instanovo version
InstaNovo-P is a finetune of InstaNovo, so we import the transformer-based InstaNovo model.
from instanovo.transformer.model import InstaNovo
Set the device to GPU if available (recommended), otherwise use CPU.
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
device
InstaNovo supports automatic model downloads. You can see the IDs of the pretrained models that are available.
InstaNovo.get_pretrained()
We are now ready to download the InstaNovo-P checkpoint.
model, config = InstaNovo.from_pretrained('instanovo-phospho-v1.0.0')
model = model.to(device).eval()
Loading the InstaNovo-P dataset¶
Download the test fold of the InstaNovo-P dataset dataset from HuggingFace.
Normally, we would use the InstaNovo SpectrumDataFrame class to download a dataset from Hugging Face directly like this:
from instanovo.utils import SpectrumDataFrame
sdf = SpectrumDataFrame.from_huggingface(
"InstaDeepAI/InstaNovo-P",
is_annotated=True,
shuffle=False,
split="test",
)
But this downloads the whole dataset and the train partition alone is more then 6 GB, so this would take a while. So we use load_dataset to only download the test partition.
from datasets import load_dataset
data_files = {"test": "dataset-phospho-test-0000-0001.parquet"}
dataset = load_dataset("InstaDeepAI/InstaNovo-P", data_files=data_files, split="test[:10%]")
dataset
import pandas as pd
df = pd.DataFrame(dataset)
df
from instanovo.utils import SpectrumDataFrame
sdf = SpectrumDataFrame.from_pandas(df)
import pandas as pd
pd.options.plotting.backend = "ms_matplotlib"
row = sdf[0]
row_df = pd.DataFrame({"mz": row["mz_array"], "intensity": row["intensity_array"]})
row_df.plot(
kind="spectrum",
x="mz",
y="intensity",
annotate_mz=True,
bin_method="none",
annotate_top_n_peaks=5,
aggregate_duplicates=True,
title=f"Mass spectrum of {row['sequence']}",
);
from instanovo.transformer.dataset import SpectrumDataset, collate_batch
ds = SpectrumDataset(
sdf,
model.residue_set,
config.get("n_peaks", 200),
return_str=True,
annotated=True,
)
from torch.utils.data import DataLoader
# When using SpectrumDataFrame, workers and shuffle is handled internally.
dl = DataLoader(ds, batch_size=64, shuffle=False, num_workers=0, collate_fn=collate_batch)
batch = next(iter(dl))
spectra, precursors, spectra_mask, peptides, _ = batch
spectra = spectra.to(device)
precursors = precursors.to(device)
Decoding¶
We have three options for decoding:
- Greedy Search
- Beam Search
- Knapsack Beam Search
For the best results and highest peptide recall, use Knapsack Beam Search. For fastest results (over 10x speedup), use Greedy Search.
We generally use a beam size of 5 for Beam Search and Knapsack Beam Search, a higher beam size should increase recall at the cost of performance and vice versa.
Note: in our findings, greedy search has similar performance as knapsack beam search at 5% FDR. I.e. if you plan to filter at 5% FDR anyway, use greedy search for optimal performance.
Greedy Search and Beam Search¶
Greedy search is used when num_beams=1, and beam search is used when num_beams>1
from instanovo.inference import BeamSearchDecoder, GreedyDecoder
num_beams = 1 # Change this, defaults are 1 or 5
if num_beams > 1:
decoder = BeamSearchDecoder(model=model)
else:
decoder = GreedyDecoder(model=model)
Knapsack Beam Search¶
Setup knapsack beam search decoder. This may take a few minutes.
from pathlib import Path
from instanovo.constants import MASS_SCALE
from instanovo.inference.knapsack import Knapsack
from instanovo.inference.knapsack_beam_search import KnapsackBeamSearchDecoder
num_beams = 5
def _setup_knapsack(model: InstaNovo) -> Knapsack:
# Cannot allow negative masses in knapsack graph
if "(-17.03)" in model.residue_set.residue_masses:
model.residue_set.residue_masses["(-17.03)"] = 1e3
if "[UNIMOD:385]" in model.residue_set.residue_masses:
model.residue_set.residue_masses["[UNIMOD:385]"] = 1e3
residue_masses = dict(model.residue_set.residue_masses.copy())
if any(x < 0 for x in residue_masses.values()):
raise NotImplementedError(
"Negative mass found in residues, this will break the knapsack graph. "
"Either disable knapsack or use strictly positive masses"
)
for special_residue in list(model.residue_set.residue_to_index.keys())[:3]:
residue_masses[special_residue] = 0
residue_indices = model.residue_set.residue_to_index
return Knapsack.construct_knapsack(
residue_masses=residue_masses,
residue_indices=residue_indices,
max_mass=4000.00,
mass_scale=MASS_SCALE,
)
knapsack_path = Path("./checkpoints/knapsack/phospho")
if not knapsack_path.exists():
print("Knapsack path missing or not specified, generating...")
knapsack = _setup_knapsack(model)
decoder = KnapsackBeamSearchDecoder(model, knapsack)
print(f"Saving knapsack to {knapsack_path}")
knapsack_path.parent.mkdir(parents=True, exist_ok=True)
knapsack.save(knapsack_path)
else:
print("Knapsack path found. Loading...")
decoder = KnapsackBeamSearchDecoder.from_file(model=model, path=knapsack_path)
Inference time 🚀¶
Evaluating a single batch...
from instanovo.inference import ScoredSequence
with torch.no_grad():
p = decoder.decode(
spectra=spectra,
precursors=precursors,
beam_size=num_beams,
max_length=config["max_length"],
)
preds = [x.sequence if isinstance(x, ScoredSequence) else [] for x in p]
probs = [x.sequence_log_probability if isinstance(x, ScoredSequence) else -float("inf") for x in p]
Confidence probabilities¶
The model returns per-residue confidences in the form of token log-probabilities. We can visualize these or use them as part of a workflow.
from typing import Optional
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from instanovo.inference.beam_search import ScoredSequence
def plot_residue_confidence(prediction: ScoredSequence, peptide: Optional[str] = None) -> None:
if not prediction:
return
ticks = list(range(len(prediction.sequence)))
token_probabilities = np.exp(prediction.token_log_probabilities[:len(ticks)])
sequence_confidence = np.exp(prediction.sequence_log_probability)
_, ax = plt.subplots()
bars = sns.barplot(x=ticks, y=token_probabilities, errorbar=None, ax=ax)
# Increase Y-axis limit to create space for text labels
ax.set_ylim(0, max(token_probabilities) * 1.2)
# Add numbers above bars with a slanted angle
for bar, prob in zip(bars.patches, token_probabilities):
height = bar.get_height()
ax.text(
bar.get_x() + bar.get_width() / 2,
float(height) + 0.02,
f"{float(prob):.4f}",
ha="center",
va="bottom",
fontsize=9,
color="black",
rotation=45,
)
# Check if any residue contains a PTM (e.g., "S(+79.97)")
has_ptm = any("[" in res and "]" in res for res in prediction.sequence)
# Set labels
x_label = f" Prediction: {''.join(prediction.sequence)}"
if peptide is not None:
x_label += f"\nGround truth: {peptide}"
ax.set_xlabel(x_label)
ax.set_ylabel("Confidence Probability")
# Set title with sequence confidence
ax.set_title(
f"Residue Confidence per Position\nSequence Probability: {sequence_confidence:.4f}"
)
# Set X-ticks
ax.set_xticks(ticks)
ax.set_xticklabels(
prediction.sequence,
rotation=45 if has_ptm else 0,
ha="right" if has_ptm else "center",
)
plt.show()
For a spectrum that is sequenced correctly, the sequence probability and per-residue probabilities are uniformly high:
plot_residue_confidence(p[1], peptides[1])
For another spectrum which is sequenced incorrectly, the sequence probability is low and the per-residue probabilities of the incorrectly sequenced residues (up to isomerism) are lower than of those correctly sequenced:
plot_residue_confidence(p[0], peptides[0])
Evaluation¶
from instanovo.utils.metrics import Metrics
metrics = Metrics(model.residue_set, config["isotope_error_range"])
aa_precision, aa_recall, peptide_recall, peptide_precision = metrics.compute_precision_recall(
peptides, preds
)
peptide_recall
Evaluating on the test fold of the InstaNovo-P dataset:
from tqdm.notebook import tqdm
preds = []
targs = []
probs = []
for _, batch in tqdm(enumerate(dl), total=len(dl)):
spectra, precursors, _, peptides, _ = batch
spectra = spectra.to(device)
precursors = precursors.to(device)
with torch.no_grad():
p = decoder.decode(
spectra=spectra,
precursors=precursors,
beam_size=config["n_beams"],
max_length=config["max_length"],
)
preds += [x.sequence if isinstance(x, ScoredSequence) else [] for x in p]
probs += [
x.sequence_log_probability if isinstance(x, ScoredSequence) else -float("inf") for x in p
]
targs += list(peptides)
Evaluation metrics¶
Model performance without filtering:
aa_precision, aa_recall, peptide_recall, peptide_precision = metrics.compute_precision_recall(
targs, preds
)
aa_error_rate = metrics.compute_aa_er(targs, preds)
auc = metrics.calc_auc(targs, preds, np.exp(pd.Series(probs)))
print(f"amino acid error rate: {aa_error_rate:.5f}")
print(f"amino acid precision: {aa_precision:.5f}")
print(f"amino acid recall: {aa_recall:.5f}")
print(f"peptide precision: {peptide_precision:.5f}")
print(f"peptide recall: {peptide_recall:.5f}")
print(f"area under the PR curve: {auc:.5f}")
We can find a threshold to ensure a desired FDR:¶
Model performance at 5% FDR:
fdr = 5 / 100 # Desired FDR
_, threshold = metrics.find_recall_at_fdr(targs, preds, np.exp(probs), fdr=fdr)
aa_precision_fdr, aa_recall_fdr, peptide_recall_fdr, peptide_precision_fdr = (
metrics.compute_precision_recall(targs, preds, np.exp(probs), threshold=threshold)
)
print(f"Performance at {fdr*100:.1f}% FDR:\n")
print(f"amino acid precision: {aa_precision_fdr:.5f}")
print(f"amino acid recall: {aa_recall_fdr:.5f}")
print(f"peptide precision: {peptide_precision_fdr:.5f}")
print(f"peptide recall: {peptide_recall_fdr:.5f}")
print(f"area under the PR curve: {auc:.5f}")
print(f"confidence threshold: {threshold:.5f} <-- Use this as a confidence cutoff")
Note: to reproduce the results of the paper, the entire InstaNovo-P test set should be evaluated.
Saving the predictions...¶
pred_df = pd.DataFrame(
{
"targets": targs,
"tokenized_predictions": preds,
"predictions": ["".join(x) for x in preds],
"log_probabilities": probs,
}
)
pred_df.head()
pred_df.to_csv("predictions_instanovo_phospho.csv", index=False)