-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
276 lines (225 loc) · 10.1 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#!/usr/bin/env python3
#-*- coding: utf-8 -*-
"""
Utilities for conformer generation modules
"""
from rdkit.Chem import AllChem
from rdkit.ML.Cluster import Butina
from collections import defaultdict
from contextlib import contextmanager
import os
import pickle
import shutil
from typing import Optional, Union
import numpy as np
from rdmc.utils import PERIODIC_TABLE as PT
from rdmc.external.logparser import GaussianLog
_software_available = {}
@contextmanager
def register_software(software):
"""
Register the availability of external software.
"""
if _software_available.get(software) is False:
return
try:
yield
except ImportError:
_software_available[software] = False
else:
_software_available[software] = True
def mol_to_sdf(mol: 'RDKitMol',
path: str,
conf_ids: Optional[list] = None,
save_conf_attrs: bool = True,
extra_attrs: dict = {},
):
"""
Write a molecule and its conformers to an SDF file.
Args:
mol ('RDKitMol'): An RDKitMol object.
path (str): Path to the output SDF file.
conf_ids (list, optional): Conformer IDs to write. Defaults to None.
attrs (list, optional): Attributes to write to the SDF file. Defaults to {}.
E.g., {'Energy': ['0.0', '1.0']}.
"""
conf_ids = conf_ids or list(range(mol.GetNumConformers()))
writer = AllChem.SDWriter(path)
try:
for c_id in conf_ids:
for key, value in extra_attrs.items():
mol.SetProp(key, str(value[c_id]))
if save_conf_attrs:
for prop in mol.GetConformer(c_id).GetPropNames():
mol.SetProp(prop,
str(mol.GetConformer(c_id).GetProp(prop)))
writer.write(mol._mol, confId=c_id)
except Exception:
raise
finally:
writer.close()
def get_binary(software: str) -> str:
"""
Get the path to the binary of a software.
Args:
software (str): Name of the software.
Returns:
str: Path to the binary if available, otherwise an empty string.
"""
path = shutil.which(software) or ''
_software_available[software] = (path != '')
return path
def mol_to_dict(mol,
copy: bool = True,
iter: int = None,
conf_copy_attrs: list = None):
"""
Convert a molecule to a dictionary that stores its conformers object, atom coordinates,
and iteration numbers for a certain calculation (optional).
Args:
mol ('RDKitMol'): An RDKitMol object.
copy (bool, optional): Use a copy of the molecule to process data. Defaults to True.
iter (int, optional): Number of iterations. Defaults to None.
conf_copy_attrs (list, optional): Conformer-level attributes to copy to the dictionary.
Returns:
list: mol data as a list of dict; each dict corresponds to a conformer.
"""
mol_data = []
if copy:
mol = mol.Copy(copy_attrs=conf_copy_attrs)
if conf_copy_attrs is None:
conf_copy_attrs = []
for c_id in range(mol.GetNumConformers()):
conf = mol.GetConformer(id=c_id)
positions = conf.GetPositions()
mol_data.append({"positions": positions,
"conf": conf})
if iter is not None:
mol_data[c_id].update({"iter": iter})
for attr in conf_copy_attrs:
mol_data[c_id].update({attr: getattr(mol, attr)[c_id]})
return mol_data
def dict_to_mol(mol_data,
conf_copy_attrs: list = None):
"""
Convert a dictionary that stores its conformers object, atom coordinates,
and conformer-level attributes to an RDKitMol. The method assumes that the
first conformer's owning mol contains the conformer-level attributes, which
are extracted through the Copy function (this should be the case if the
dictionary was generated with the mol_to_dict function).
Args:
mol_data (list) List containing dictionaries of data entries for each conformer.
conf_copy_attrs (list, optional): Conformer-level attributes to copy to the mol.
Returns:
mol ('RDKitMol'): An RDKitMol object.
"""
if conf_copy_attrs is None:
conf_copy_attrs = []
mol = mol_data[0]["conf"].GetOwningMol().Copy(quickCopy=True, copy_attrs=conf_copy_attrs)
[mol.AddConformer(c["conf"].ToConformer(), assignId=True) for c in mol_data]
return mol
def cluster_confs(mol, cutoff=1.0):
rmsmat = AllChem.GetConformerRMSMatrix(mol.ToRWMol(), prealigned=False)
num = mol.GetNumConformers()
clusters = Butina.ClusterData(rmsmat, num, cutoff, isDistData=True, reordering=True)
confs_to_keep = [c[0] for c in clusters]
updated_mol = mol.Copy(quickCopy=True)
[updated_mol.AddConformer(c.ToConformer(), assignId=True) for c in mol.GetConformers(confs_to_keep)]
return updated_mol
def get_conf_failure_mode(rxn_dir, pruner=True):
"""
Parse a reaction directory for a TS generation run and extract failure modes (which conformer failed the
full workflow and for what reason)
Args:
rxn_dir (str) Path to the reaction directory.
pruner (bool: Optional) Whether or not pruner was used during workflow
Returns:
failure_dict ('dict'): Dictionary of conformer ids mapped to the corresponding failure mode.
"""
failure_modes = {
0: "opt",
1: "prune",
2: "freq",
3: "irc",
4: "workflow",
5: "none",
}
opt_check_file = os.path.join(rxn_dir, "opt_check_ids.pkl")
freq_check_file = os.path.join(rxn_dir, "freq_check_ids.pkl")
prune_check_file = os.path.join(rxn_dir, "prune_check_ids.pkl")
irc_check_file = os.path.join(rxn_dir, "irc_check_ids.pkl")
workflow_check_file = os.path.join(rxn_dir, "workflow_check_ids.pkl")
opt_check_ids = pickle.load(open(opt_check_file, "rb"))
prune_check_ids = pickle.load(open(prune_check_file, "rb")) if pruner else {i: True for i in range(len(opt_check_ids))}
freq_check_ids = pickle.load(open(freq_check_file, "rb"))
irc_check_ids = pickle.load(open(irc_check_file, "rb"))
workflow_check_ids = pickle.load(open(workflow_check_file, "rb"))
all_checks = defaultdict(list)
for d in [opt_check_ids, prune_check_ids, freq_check_ids, irc_check_ids, workflow_check_ids]:
for k, v in d.items():
all_checks[k].append(v)
all_checks = np.concatenate([np.array([*all_checks.values()]), np.array([[False]] * len(all_checks))], axis=1)
modes = np.argmax(~all_checks, axis=1)
failure_dict = {i: failure_modes[m] for i, m in enumerate(modes)}
return failure_dict
def get_frames_from_freq(log,
amplitude: float = 1.0,
num_frames: int = 10,
weights: Union[bool, np.array] = False):
"""
Args:
log (GaussianLog): A gaussian log object with vibrational freq calculated.
amplitude (float): The amplitude of the motion. If a single value is provided then the guess
will be unique (if available). 0.25 will be the default. Otherwise, a list
can be provided, and all possible results will be returned.
num_frames (int): The number of frames in each direction (forward and reverse). Defaults to 10.
weights (bool or np.array): If ``True``, use the sqrt(atom mass) as a scaling factor to the displacement.
If ``False``, use the identity weights. If a N x 1 ``np.array` is provided, then
The concern is that light atoms (e.g., H) tend to have larger motions
than heavier atoms.
Returns:
np.array: The atomic numbers as an 1D array
np.array: The 3D geometries at each frame as a 3D array (number of frames x 2 + 1, number of atoms, 3)
"""
assert log.num_neg_freqs == 1
equ_xyz = log.converged_geometries[-1]
disp = log.cclib_results.vibdisps[0]
amp_factors = np.linspace(-amplitude, amplitude, 2 * num_frames + 1)
# Generate weights
if isinstance(weights, bool) and weights:
atom_masses = np.array([PT.GetAtomicWeight(int(num)) for num in log.cclib_results.atomnos]).reshape(-1, 1)
weights = np.sqrt(atom_masses)
elif isinstance(weights, bool) and not weights:
weights = np.ones((equ_xyz.shape[0], 1))
xyzs = equ_xyz - np.einsum('i,jk->ijk', amp_factors, weights * disp)
return log.cclib_results.atomnos, xyzs
def convert_log_to_mol(log_path: str,
amplitude: float = 1.0,
num_frames: int = 10,
weights: Union[bool, np.array] = False):
"""
Args:
log_path (str): The path to the log file.
amplitude (float): The amplitude of the motion. If a single value is provided then the guess
will be unique (if available). 0.25 will be the default. Otherwise, a list
can be provided, and all possible results will be returned.
num_frames (int): The number of frames in each direction (forward and reverse). Defaults to 10.
weights (bool or np.array): If ``True``, use the sqrt(atom mass) as a scaling factor to the displacement.
If ``False``, use the identity weights. If a N x 1 ``np.array` is provided, then
The concern is that light atoms (e.g., H) tend to have larger motions
than heavier atoms.
"""
glog = GaussianLog(log_path)
try:
assert glog.success
assert glog.is_ts
assert glog.num_neg_freqs == 1
except AssertionError:
return None
# Get TS mol object and construct geometries as numpy arrays for all frames
mol = glog.get_mol(converged=True, embed_conformers=False, sanitize=False)
_, xyzs = get_frames_from_freq(glog, amplitude=amplitude, num_frames=num_frames, weights=weights)
# Embed geometries to the mol object for output
mol.EmbedMultipleNullConfs(xyzs.shape[0])
[mol.SetPositions(xyzs[i, :, :], id=i) for i in range(xyzs.shape[0])]
return mol