Graph-DiT/graph_dit/metrics/property_metric.py

303 lines
9.7 KiB
Python

import math, os
import pickle
import os.path as op
import numpy as np
import pandas as pd
from joblib import dump, load
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import mean_absolute_error, roc_auc_score
from rdkit import Chem
from rdkit import rdBase
from rdkit.Chem import AllChem
from rdkit import DataStructs
from rdkit.Chem import rdMolDescriptors
rdBase.DisableLog('rdApp.error')
import json
op_type = {
'nor_conv_1x1': 1,
'nor_conv_3x3': 2,
'avg_pool_3x3': 3,
'skip_connect': 4,
'output': 5,
'none': 6,
'input': 7
}
task_to_colname = {
'hiv_b': 'HIV_active',
'bace_b': 'Class',
'bbbp_b': 'p_np',
'O2': 'O2',
'N2': 'N2',
'CO2': 'CO2',
}
tasktype_name = {
'hiv_b': 'classification',
'bace_b': 'classification',
'bbbp_b': 'classification',
'O2': 'regression',
'N2': 'regression',
'CO2': 'regression',
'nasbench201': 'regression',
}
class TaskModel():
"""Scores based on an ECFP classifier."""
def __init__(self, model_path, task_name):
task_type = tasktype_name[task_name]
self.task_name = task_name
self.task_type = task_type
self.model_path = model_path
self.metric_func = roc_auc_score if 'classification' in self.task_type else mean_absolute_error
try:
self.model = load(model_path)
print(self.task_name, ' evaluator loaded')
except:
print(self.task_name, ' evaluator not found, training new one...')
if 'classification' in task_type:
self.model = RandomForestClassifier(random_state=0)
elif 'regression' in task_type:
self.model = RandomForestRegressor(random_state=0)
perfermance = self.train()
dump(self.model, model_path)
print('Oracle peformance: ', perfermance)
def train(self):
def read_adj_ops_from_json(filename):
with open(filename, 'r') as json_file:
data = json.load(json_file)
adj_ops_pairs = []
for item in data:
adj_matrix = np.array(item['adj_matrix'])
ops = item['ops']
acc = item['train'][0]['accuracy']
adj_ops_pairs.append((adj_matrix, ops, acc))
return adj_ops_pairs
def feature_from_adj_and_ops(adj, ops):
return np.concatenate([adj.flatten(), ops])
filename = '/home/stud/hanzhang/nasbenchDiT/graph_dit/nasbench-201-graph.json'
graphs = read_adj_ops_from_json(filename)
adjs = []
opss = []
accs = []
features = []
for graph in graphs:
adj, ops, acc=graph
op_code = [op_type[op] for op in ops]
adjs.append(adj)
opss.append(op_code)
accs.append(acc)
features.append(feature_from_adj_and_ops(adj, op_code))
features = np.array(features)
labels = np.array(accs)
mask = ~np.isnan(labels)
labels = labels[mask]
features = features[mask]
# features = str(features)
self.model.fit(features, labels)
y_pred = self.model.predict(features)
perf = self.metric_func(labels, y_pred)
print(f'{self.task_name} performance: {perf}')
return perf
def train__(self):
data_path = os.path.dirname(self.model_path)
data_path = os.path.join(os.path.dirname(self.model_path), '..', f'raw/{self.task_name}.csv.gz')
df = pd.read_csv(data_path)
col_name = task_to_colname[self.task_name]
y = df[col_name].to_numpy()
x_smiles = df['smiles'].to_numpy()
mask = ~np.isnan(y)
y = y[mask]
if 'classification' in self.task_type:
y = y.astype(int)
x_smiles = x_smiles[mask]
x_fps = []
mask = []
for i,smiles in enumerate(x_smiles):
mol = Chem.MolFromSmiles(smiles)
mask.append( int(mol is not None) )
fp = TaskModel.fingerprints_from_mol(mol) if mol else np.zeros((1, 2048))
x_fps.append(fp)
x_fps = np.concatenate(x_fps, axis=0)
self.model.fit(x_fps, y)
y_pred = self.model.predict(x_fps)
perf = self.metric_func(y, y_pred)
print(f'{self.task_name} performance: {perf}')
return perf
def __call(self, smiles_list):
fps = []
mask = []
for i,smiles in enumerate(smiles_list):
mol = Chem.MolFromSmiles(smiles)
mask.append( int(mol is not None) )
fp = TaskModel.fingerprints_from_mol(mol) if mol else np.zeros((1, 2048))
fps.append(fp)
fps = np.concatenate(fps, axis=0)
if 'classification' in self.task_type:
scores = self.model.predict_proba(fps)[:, 1]
else:
scores = self.model.predict(fps)
scores = scores * np.array(mask)
return np.float32(scores)
def __call__(self, graph_list):
# def read_adj_ops_from_json(filename):
# with open(filename, 'r') as json_file:
# data = json.load(json_file)
# adj_ops_pairs = []
# for item in data:
# adj_matrix = np.array(item['adj_matrix'])
# ops = item['ops']
# acc = item['train'][0]['accuracy']
# adj_ops_pairs.append((adj_matrix, ops, acc))
# return adj_ops_pairs
def feature_from_adj_and_ops(ops, adj):
return np.concatenate([adj.flatten(), ops])
# filename = '/home/stud/hanzhang/nasbenchDiT/graph_dit/nasbench-201-graph.json'
# graphs = read_adj_ops_from_json(filename)
# adjs = []
# opss = []
# accs = []
# features = []
# for graph in graphs:
# adj, ops, acc=graph
# op_code = [op_type[op] for op in ops]
# adjs.append(adj)
# opss.append(op_code)
# accs.append(acc)
features = []
print(f"graphlist: {graph_list[0]}")
print(f"len graphlist: {len(graph_list)}")
for op_code, adj in graph_list:
features.append(feature_from_adj_and_ops(op_code, adj))
print(f"len features: {len(features)}")
# print(f"features: {features[0].shape}")
features = np.stack(features)
features = features.astype(np.float32)
print(f"features shape: {features.shape}")
fps = features
if 'classification' in self.task_type:
scores = self.model.predict_proba(fps)[:, 1]
else:
scores = self.model.predict(fps)
# scores = scores * np.array(mask)
return np.float32(scores)
@classmethod
def fingerprints_from_mol(cls, mol): # use ECFP4
features_vec = AllChem.GetMorganFingerprintAsBitVect(mol, 2, nBits=2048)
features = np.zeros((1,))
DataStructs.ConvertToNumpyArray(features_vec, features)
return features.reshape(1, -1)
###### SAS Score ######
_fscores = None
def readFragmentScores(name='fpscores'):
import gzip
global _fscores
# generate the full path filename:
if name == "fpscores":
name = op.join(op.dirname(__file__), name)
data = pickle.load(gzip.open('%s.pkl.gz' % name))
outDict = {}
for i in data:
for j in range(1, len(i)):
outDict[i[j]] = float(i[0])
_fscores = outDict
def numBridgeheadsAndSpiro(mol, ri=None):
nSpiro = rdMolDescriptors.CalcNumSpiroAtoms(mol)
nBridgehead = rdMolDescriptors.CalcNumBridgeheadAtoms(mol)
return nBridgehead, nSpiro
def calculateSAS(smiles_list):
scores = []
for i, smiles in enumerate(smiles_list):
mol = Chem.MolFromSmiles(smiles)
score = calculateScore(mol)
scores.append(score)
return np.float32(scores)
def calculateScore(m):
if _fscores is None:
readFragmentScores()
# fragment score
fp = rdMolDescriptors.GetMorganFingerprint(m,
2) # <- 2 is the *radius* of the circular fingerprint
fps = fp.GetNonzeroElements()
score1 = 0.
nf = 0
for bitId, v in fps.items():
nf += v
sfp = bitId
score1 += _fscores.get(sfp, -4) * v
score1 /= nf
# features score
nAtoms = m.GetNumAtoms()
nChiralCenters = len(Chem.FindMolChiralCenters(m, includeUnassigned=True))
ri = m.GetRingInfo()
nBridgeheads, nSpiro = numBridgeheadsAndSpiro(m, ri)
nMacrocycles = 0
for x in ri.AtomRings():
if len(x) > 8:
nMacrocycles += 1
sizePenalty = nAtoms**1.005 - nAtoms
stereoPenalty = math.log10(nChiralCenters + 1)
spiroPenalty = math.log10(nSpiro + 1)
bridgePenalty = math.log10(nBridgeheads + 1)
macrocyclePenalty = 0.
# ---------------------------------------
# This differs from the paper, which defines:
# macrocyclePenalty = math.log10(nMacrocycles+1)
# This form generates better results when 2 or more macrocycles are present
if nMacrocycles > 0:
macrocyclePenalty = math.log10(2)
score2 = 0. - sizePenalty - stereoPenalty - spiroPenalty - bridgePenalty - macrocyclePenalty
# correction for the fingerprint density
# not in the original publication, added in version 1.1
# to make highly symmetrical molecules easier to synthetise
score3 = 0.
if nAtoms > len(fps):
score3 = math.log(float(nAtoms) / len(fps)) * .5
sascore = score1 + score2 + score3
# need to transform "raw" value into scale between 1 and 10
min = -4.0
max = 2.5
sascore = 11. - (sascore - min + 1) / (max - min) * 9.
# smooth the 10-end
if sascore > 8.:
sascore = 8. + math.log(sascore + 1. - 9.)
if sascore > 10.:
sascore = 10.0
elif sascore < 1.:
sascore = 1.0
return sascore