Skip to content
Snippets Groups Projects
Commit fe69cbde authored by Javier González-Delgado's avatar Javier González-Delgado
Browse files

Update file contact_features.ipynb

parent ddcab40a
No related branches found
No related tags found
No related merge requests found
%% Cell type:code id:3ff6fe72 tags:
``` python
# Load notebooks with required functions
import ipynb
from ipynb.fs.full.wcontact_matrix import *
from ipynb.fs.full.utils import *
# Load required libraries
import numpy as np
import os
from tqdm import tqdm
from joblib import Parallel, delayed
from functools import partial
import mdtraj as md
import itertools
import pandas as pd
import MDAnalysis
import time
import umap
import numba
import pynndescent
import sys
import warnings #Optional
warnings.filterwarnings("ignore") #Optional
```
%% Cell type:code id:d158cc6b tags:
``` python
def contact_features(ensemble_path, ensemble_name, thresholds, interactive = True, N_cores = 1, start = None, end = None, subsequence = None, sel_chain = None, umap_dim = 10):
# Initial parameters
var_dict = {'multiframe' : 'n', 'check_folder' : True, 'do_xtc' : False, 'do_pdb' : False,
'N' : 1, 'start' : start, 'end' : end, 'subsequence' : subsequence,
'ensemble_name' : ensemble_name, 'ensemble_path' : ensemble_path}
var_dict['xtc_files'] = [file for file in os.listdir(ensemble_path) if file.endswith(".xtc")]
var_dict['pdb_files'] = [file for file in os.listdir(ensemble_path) if file.endswith(".pdb") or file.endswith(".prmtop") or file.endswith(".gsd") or file.endswith(".hdf5") or file.endswith(".mol2") or file.endswith(".hoomdxml") or file.endswith(".prm7") or file.endswith(".arc") or file.endswith(".parm7") or file.endswith(".gro") or file.endswith(".pdb.gz") or file.endswith(".pdb.gz") or file.endswith(".h5") or file.endswith(".lh5") or file.endswith(".psf")]
var_dict['folders'] = [file for file in os.listdir(ensemble_path) if (os.path.isdir("/".join([ensemble_path,file])) and not file.startswith('.'))]
print("\n----------------------------------------------------------------------------------\n")
print(' \(·_·) \(·_·)')
print(' ) )z This is WARIO! ) )z')
print(" / \\ / \\ \n")
if interactive == True:
print("Before launching the computation, let me check I understood everything correctly...")
print("\n----------------------------------------------------------------------------------\n")
# File processing
print("".join(["For the ensemble named ",var_dict["ensemble_name"],', I found ',
str(len(var_dict["xtc_files"])),' .xtc file(s), ',str(len(var_dict["pdb_files"])),' .pdb file(s) and ',
str(len(var_dict["folders"])),' folder(s).']))
if len(var_dict["xtc_files"]) + len(var_dict["folders"]) + len(var_dict["pdb_files"]) == 0:
sys.exit("".join(['Folder for ', var_dict["ensemble_name"], ' ensemble is empty...']))
# .xtc file with a .pdb topology file
if len(var_dict["xtc_files"]) >= len(var_dict["pdb_files"]) and len(var_dict["pdb_files"]) == 1:
if interactive == True:
print('\nShould I interprete this input as:\n')
else:
print('\nI will interprete this input as:\n')
print("".join([str(var_dict["xtc_files"][0]),' : trajectory of ',var_dict["ensemble_name"],',']))
print("".join([str(var_dict["pdb_files"][0]),' : topology file of ',var_dict["ensemble_name"],'.']))
if len(var_dict["xtc_files"]) > 1:
print("\nMore than one .xtc file were found. Taking the first as the trajectory file.\n")
if interactive == True:
ens_input = input('...? (y/n)')
else:
ens_input = 'y'
if ens_input == 'n':
var_dict['multiframe'] = input("Should I ignore .xtc files and consider the .pdb file as a multiframe file? (y/n)")
else:
var_dict["do_xtc"] = True
var_dict["xtc_root_path"] = var_dict["ensemble_path"]
var_dict['check_folder'] = False
# multiframe .pdb files
if var_dict['multiframe'] == 'y' or (len(var_dict["pdb_files"]) >= 1 and len(var_dict["xtc_files"]) == 0):
if interactive == True:
print('\nShould I interprete this input as:\n')
else:
print('\nI will interprete this input as:\n')
print("".join([str(var_dict["pdb_files"][0]),' : trajectory of ',var_dict["ensemble_name"],'.']))
if len(var_dict["pdb_files"]) > 1:
print("\nMore than one multiframe .pdb file were found. Taking the first as the trajectory file.\n")
if interactive == True:
ens_input = input('...? (y/n)')
else:
ens_input = 'y'
if ens_input == 'y':
print('Trajectory has been given as multiframe .pdb file, which is not supported.')
print("Converting file to .xtc + topology .pdb...\n ")
if not os.path.exists("/".join([var_dict["ensemble_path"],'converted_files'])):
os.mkdir("/".join([var_dict["ensemble_path"],'converted_files']))
multiframe_pdb_to_xtc(pdb_file = "/".join([var_dict["ensemble_path"],var_dict["pdb_files"][0]]), save_path = "/".join([var_dict["ensemble_path"],'converted_files']), prot_name = var_dict["pdb_files"][0].split('.pdb')[0])
print("Done.")
var_dict["do_xtc"] = True
var_dict["xtc_root_path"] = "/".join([var_dict["ensemble_path"],'converted_files'])
var_dict["xtc_files"] = [file for file in os.listdir(var_dict["xtc_root_path"]) if file.endswith(".xtc")]
var_dict["pdb_files"] = [file for file in os.listdir(var_dict["xtc_root_path"]) if file.endswith(".pdb")]
var_dict['check_folder'] = False
# folder with .pdb files
if len(var_dict["folders"]) >= 1 and var_dict['check_folder'] == True:
if interactive:
print('\nShould I interprete this input as:\n')
else:
print('\nI will interprete this input as:\n')
print("".join([var_dict["folders"][0],' folder contains: trajectory of ',var_dict["ensemble_name"],"."]))
if len(var_dict["folders"]) > 1:
print("\nMore than one .pdb folder were found. Taking the first as the trajectory folder.\n")
if interactive:
ens_input = input('...? (y/n)')
else:
ens_input = 'y'
if ens_input == 'y':
var_dict["do_pdb"] = True
if not var_dict["do_pdb"] and not var_dict["do_xtc"]:
sys.exit("".join(['\n Sorry, I did not understood the input. Please follow the guidelines described in the function documentation to create ',ensemble_name,' folder.\n']))
print("\n----------------------------------------------------------------------------------\n")
if interactive == True:
print("Everything seems OK!\n")
print("".join(['There are ',str(os.cpu_count()),' threads (cores) available.']))
n_cores = int(input("Please specify the number of threads (cores) you would like to use (positive integer):"))
else:
if N_cores == 'max':
n_cores = int(os.cpu_count())
else:
n_cores = int(N_cores)
if not os.path.exists("/".join([os.path.abspath(ensemble_path),"_".join(['results',ensemble_name])])):
os.mkdir("/".join([os.path.abspath(ensemble_path),"_".join(['results',ensemble_name])]))
results_path = "/".join([os.path.abspath(ensemble_path),"_".join(['results',ensemble_name])])
if os.path.exists("/".join([os.path.abspath(ensemble_path),"_".join(['results',ensemble_name])])):
if len(os.listdir("/".join([os.path.abspath(ensemble_path),"_".join(['results',ensemble_name])]))) == 0:
results_path = "/".join([os.path.abspath(ensemble_path),"_".join(['results',ensemble_name])])
else:
sys.exit("".join(['The folder ', "/".join([os.path.abspath(ensemble_path),"_".join(['results',ensemble_name])]),' already exists and it is not empty. Please empty or delete it.']))
print("\n----------------------------------------------------------------------------------\n")
print("3..."); time.sleep(1);
print("2..."); time.sleep(1)
print("1..."); time.sleep(1)
print("Go!"); time.sleep(0.2)
print("\n----------------------------------------------------------------------------------")
# Build frames and save coordinates
print('\nComputing contact weights for ' + var_dict["ensemble_name"] + '...\n')
if var_dict["do_xtc"] == True:
wcontact_matrix(thresholds, xtc_file = "/".join([var_dict["xtc_root_path"],var_dict["xtc_files"][0]]), top_file = "/".join([var_dict["xtc_root_path"],var_dict["pdb_files"][0]]),
pdb_folder = None, num_cores = n_cores, prot_name = ensemble_name, save_to = results_path,
start = var_dict["start"], end = var_dict["end"], subsequence = var_dict["subsequence"],
select_chain = sel_chain,
name_variable = 'ipynb.fs.full.wcontact_matrix')
if var_dict["do_pdb"] == True:
wcontact_matrix(thresholds, xtc_file = None, top_file = None, pdb_folder = "/".join([var_dict["ensemble_path"],var_dict["folders"][0]]), num_cores = n_cores,
prot_name = ensemble_name, save_to = results_path,
start = var_dict["start"], end = var_dict["end"], subsequence = var_dict["subsequence"],
select_chain = sel_chain,
name_variable = 'ipynb.fs.full.wcontact_matrix')
print("\n----------------------------------------------------------------------------------\n")
print("Contact weights computed.\n")
print("Embedding data into a 2-dimensional UMAP space for visualization...\n")
wcont_data = pd.read_csv("/".join([results_path, "_".join([ensemble_name,'wcontmatrix.txt'])]), sep = ' ', header = None)
embedding_2d = umap.UMAP(random_state = 42,
n_neighbors = 15,
min_dist = 0.1).fit_transform(wcont_data)
np.save('/'.join([results_path, "_".join([ensemble_name,'embedding_2d_wcont'])]), embedding_2d)
print("".join(["Done! Embedding data into a ",str(umap_dim),"-dimensional UMAP space...\n"]))
clusterable_embedding = umap.UMAP(
n_neighbors = 30,
min_dist = 0.0,
n_components = umap_dim,
random_state = 42
).fit_transform(wcont_data)
np.save('/'.join([results_path, "_".join([ensemble_name,'clusterable_embedding_wcont'])]), clusterable_embedding)
print("\n----------------------------------------------------------------------------------\n")
print("Done! Clustering can be performed in the low-dimensional space.")
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment