Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

""" 

scikit-learn is an excellent library for training linear models and provides a 

large number of useful tools. 

 

This module provides simplified interfaces for vaiours linear model regression 

methods. These methods are set up in a way that work out of the box for typical 

problems in cluster expansion and force constant potential construction. This 

includes slight adjustments scitkit-learn default values. 

 

If you would like more flexibility or extended functionality or ability to 

fine-tune parameters that are not included in this interface, it is of course 

possible to use scikit-learn directly. 

More information about the sklearn linear models can be found at 

http://scikit-learn.org/stable/modules/linear_model.html 

""" 

 

import numpy as np 

from collections import OrderedDict 

from sklearn.linear_model import (Lasso, 

LassoCV, 

Ridge, 

RidgeCV, 

ElasticNet, 

ElasticNetCV, 

BayesianRidge, 

ARDRegression) 

from sklearn.model_selection import ShuffleSplit 

from sklearn.feature_selection import RFE, RFECV 

from sklearn.preprocessing import StandardScaler 

from typing import Any, Dict, List, Union 

from ..io.logging import logger 

from .split_bregman import fit_split_bregman 

 

 

logger = logger.getChild('fit_methods') 

 

 

def fit(X: np.ndarray, 

y: np.ndarray, 

fit_method: str, 

standardize: bool = True, 

check_condition: bool = True, 

**kwargs) -> Dict[str, Any]: 

""" 

Wrapper function for all available fit methods. The function 

returns parameters and other pertinent information in the form of 

a dictionary. 

 

Parameters 

----------- 

X 

fit matrix 

y 

target array 

fit_method 

method to be used for training; possible choice are 

"least-squares", "lasso", "elasticnet", "bayesian-ridge", "ardr", 

"rfe", "split-bregman" 

standardize : bool 

if True the fit matrix is standardized before fitting 

check_condition : bool 

if True the condition number will be checked 

(this can be sligthly more time consuming for larger 

matrices) 

""" 

 

if fit_method not in available_fit_methods: 

msg = ['Fit method not available'] 

msg += ['Please choose one of the following:'] 

for key in available_fit_methods: 

msg += [' * ' + key] 

raise ValueError('\n'.join(msg)) 

 

if check_condition: 

cond = np.linalg.cond(X) 

if cond > 1e10: 

logger.warning('Condition number is large, {}'.format(cond)) 

 

if standardize: 

 

# standardize fit matrix, column wise std -> 1.0 

ss = StandardScaler(copy=False, with_mean=False, with_std=True) 

ss.fit_transform(X) # change in place 

 

# standardize target values, std(y) -> 1.0 

y_scale = 1.0/np.std(y) 

y_rescaled = y * y_scale 

 

# train 

results = fit_methods[fit_method](X, y_rescaled, **kwargs) 

 

# inverse standardization 

parameters = results['parameters'] / y_scale 

ss.inverse_transform(X) # change in place 

ss.transform(parameters.reshape(1, -1)).reshape(-1,) 

results['parameters'] = parameters 

 

else: 

results = fit_methods[fit_method](X, y, **kwargs) 

return results 

 

 

def _fit_least_squares(X: np.ndarray, y: np.ndarray) -> Dict[str, Any]: 

""" 

Returns the least-squares solution `a` to the linear problem 

`Xa=y` in the form of a dictionary with a key named `parameters`. 

 

This function is a wrapper to the `linalg.lstsq` function in NumPy. 

 

Parameters 

----------- 

X 

fit matrix 

y 

target array 

""" 

results = dict() 

results['parameters'] = np.linalg.lstsq(X, y, rcond=-1)[0] 

return results 

 

 

def _fit_lasso(X: np.ndarray, y: np.ndarray, 

alpha: float = None, fit_intercept: bool = False, 

**kwargs) -> Dict[str, Any]: 

""" 

Returns the solution `a` to the linear problem `Xa=y` obtained by 

using the LASSO method as implemented in scitkit-learn in the form 

of a dictionary with a key named `parameters`. 

 

LASSO optimizes the following problem:: 

 

(1 / (2 * n_samples)) * ||y - Xw||^2_2 + alpha * ||w||_1 

 

If `alpha` is `None` this function will call `fit_lassoCV` which attempts 

to find the optimal alpha via sklearn's `LassoCV` class. 

 

Parameters 

---------- 

X 

fit matrix 

y 

target array 

alpha 

alpha value 

fit_intercept 

center data or not, forwarded to sklearn 

""" 

if alpha is None: 

return _fit_lassoCV(X, y, fit_intercept=fit_intercept, **kwargs) 

else: 

lasso = Lasso(alpha=alpha, fit_intercept=fit_intercept, **kwargs) 

lasso.fit(X, y) 

results = dict() 

results['parameters'] = lasso.coef_ 

return results 

 

 

def _fit_lassoCV(X: np.ndarray, 

y: np.ndarray, 

alphas: List[float] = None, 

fit_intercept: bool = False, 

cv: int = 10, 

n_jobs: int = -1, 

**kwargs) -> Dict[str, Any]: 

""" 

Returns the solution `a` to the linear problem `Xa=y` obtained by 

using the LassoCV method as implemented in scitkit-learn in the 

form of a dictionary with a key named `parameters`. 

 

The dictionary will also contain the keys `alpha_optimal` (alpha 

value that yields the lowest validation RMSE), `alpha_path` (all 

tested alpha values), and `mse_path` (MSE for validation set for 

each alpha). 

 

Parameters 

----------- 

X 

fit matrix 

y 

target array 

alphas 

list of alpha values to be evaluated during regularization path 

fit_intercept 

center data or not, forwarded to sklearn 

cv 

how many folds to carry out in cross-validation 

n_jobs 

number of cores to use during the cross validation. 

None means 1 unless in a joblib.parallel_backend context. 

-1 means using all processors. 

See sklearn's glossary for more details. 

""" 

193 ↛ 196line 193 didn't jump to line 196, because the condition on line 193 was never false if alphas is None: 

alphas = np.logspace(-8, -0.3, 100) 

 

lassoCV = LassoCV(alphas=alphas, fit_intercept=fit_intercept, cv=cv, n_jobs=n_jobs, **kwargs) 

lassoCV.fit(X, y) 

results = dict() 

results['parameters'] = lassoCV.coef_ 

results['alpha_optimal'] = lassoCV.alpha_ 

return results 

 

 

def _fit_ridge(X, y, alpha=None, fit_intercept=False, **kwargs): 

results = dict() 

206 ↛ 213line 206 didn't jump to line 213, because the condition on line 206 was never false if alpha is None: 

207 ↛ 209line 207 didn't jump to line 209, because the condition on line 207 was never false if 'alphas' not in kwargs: 

kwargs['alphas'] = np.logspace(-6, 3, 100) 

ridge = RidgeCV(fit_intercept=fit_intercept, **kwargs) 

ridge.fit(X, y) 

results['alpha_optimal'] = ridge.alpha_ 

else: 

ridge = Ridge(alpha=alpha, fit_intercept=fit_intercept, **kwargs) 

ridge.fit(X, y) 

results['parameters'] = ridge.coef_ 

return results 

 

 

def _fit_bayesian_ridge(X: np.ndarray, y: np.ndarray, 

fit_intercept: bool = False, 

**kwargs) -> Dict[str, Any]: 

""" 

Returns the solution `a` to the linear problem `Xa=y` obtained by using 

Bayesian ridge regression as implemented in scitkit-learn in the 

form of a dictionary with a key named `parameters`. 

 

Parameters 

----------- 

X 

fit matrix 

y 

target array 

fit_intercept 

center data or not, forwarded to sklearn 

""" 

brr = BayesianRidge(fit_intercept=fit_intercept, **kwargs) 

brr.fit(X, y) 

results = dict() 

results['parameters'] = brr.coef_ 

return results 

 

 

def _fit_elasticnet(X: np.ndarray, y: np.ndarray, 

alpha: float = None, fit_intercept: bool = False, 

**kwargs) -> Dict[str, Any]: 

""" 

Returns the solution `a` to the linear problem `Xa=y` obtained by using 

the ElasticNet method as implemented in scitkit-learn in the 

form of a dictionary with a key named `parameters`. 

 

If `alpha` is `None` this function will call the fit_lassoCV which attempts 

to find the optimal alpha via sklearn ElasticNetCV class. 

 

Parameters 

----------- 

X 

fit matrix 

y 

target array 

alpha 

alpha value 

fit_intercept 

center data or not, forwarded to sklearn 

""" 

if alpha is None: 

return _fit_elasticnetCV(X, y, fit_intercept=fit_intercept, **kwargs) 

else: 

elasticnet = ElasticNet(alpha=alpha, fit_intercept=fit_intercept, **kwargs) 

elasticnet.fit(X, y) 

results = dict() 

results['parameters'] = elasticnet.coef_ 

return results 

 

 

def _fit_elasticnetCV(X: np.ndarray, 

y: np.ndarray, 

alphas: List[float] = None, 

l1_ratio: Union[float, List[float]] = None, 

fit_intercept: bool = False, 

cv: int = 10, 

n_jobs: int = -1, 

**kwargs) -> Dict[str, Any]: 

""" 

Returns the solution `a` to the linear problem `Xa=y` obtained by using 

the ElasticNetCV method as implemented in scitkit-learn in the 

form of a dictionary with a key named `parameters`. 

 

The dictionary returned by this function will also contain the 

fields `alpha_optimal` (alpha value that yields the lowest 

validation RMSE), `alpha_path` (all tested alpha values), 

`l1_ratio_optmal` (alpha value that yields the lowest validation 

RMSE), `l1_ratio_path` (all tested `l1_ratio` values) `mse_path` 

(MSE for validation set for each alpha and `l1_ratio`) 

 

Parameters 

----------- 

X 

fit matrix 

y 

target array 

alphas 

list of alpha values to be evaluated during regularization path 

l1_ratio 

l1_ratio values to be evaluated during regularization path 

fit_intercept 

center data or not, forwarded to sklearn 

cv 

how many folds to carry out in cross-validation 

n_jobs 

number of cores to use during the cross validation. 

None means 1 unless in a joblib.parallel_backend context. 

-1 means using all processors. 

See sklearn's glossary for more details. 

""" 

 

316 ↛ 318line 316 didn't jump to line 318, because the condition on line 316 was never false if alphas is None: 

alphas = np.logspace(-8, -0.3, 100) 

318 ↛ 322line 318 didn't jump to line 322, because the condition on line 318 was never false if l1_ratio is None: 

l1_ratio = [1.0, 0.995, 0.99, 0.98, 0.97, 0.95, 0.925, 0.9, 0.85, 

0.8, 0.75, 0.65, 0.5, 0.4, 0.25, 0.1] 

 

elasticnetCV = ElasticNetCV(alphas=alphas, l1_ratio=l1_ratio, cv=cv, 

fit_intercept=fit_intercept, n_jobs=n_jobs, **kwargs) 

elasticnetCV.fit(X, y) 

results = dict() 

results['parameters'] = elasticnetCV.coef_ 

results['alpha_optimal'] = elasticnetCV.alpha_ 

results['l1_ratio_optimal'] = elasticnetCV.l1_ratio_ 

return results 

 

 

def _fit_ardr(X: np.ndarray, 

y: np.ndarray, 

threshold_lambda: float = None, 

line_scan: bool = False, 

fit_intercept: bool = False, 

**kwargs) -> Dict[str, Any]: 

""" 

Returns the solution `a` to the linear problem `Xa=y` obtained by 

using the automatic relevance determination regression (ARDR) 

method as implemented in scitkit-learn in the form of a dictionary 

with a key named `parameters`. 

 

Parameters 

----------- 

X 

fit matrix 

y 

target array 

threshold_lambda 

threshold lambda parameter forwarded to sklearn 

line_scan 

whether or not to perform line-scan in order to find optimal 

threshold-lambda 

fit_intercept 

center data or not, forwarded to sklearn 

""" 

 

359 ↛ 360line 359 didn't jump to line 360, because the condition on line 359 was never true if threshold_lambda is not None and line_scan: 

raise ValueError('Specify threshold_lambda or set line_scan=True, not both') 

 

if threshold_lambda is None: 

threshold_lambda = 1e4 

 

if line_scan: 

return _fit_ardr_line_scan(X, y, fit_intercept=fit_intercept, **kwargs) 

else: 

ardr = ARDRegression(threshold_lambda=threshold_lambda, fit_intercept=fit_intercept, 

**kwargs) 

ardr.fit(X, y) 

results = dict() 

results['parameters'] = ardr.coef_ 

return results 

 

 

def _fit_ardr_line_scan(X: np.ndarray, 

y: np.ndarray, 

cv_splits: int = 1, 

threshold_lambdas: List[float] = None, 

**kwargs) -> Dict[str, Any]: 

""" ARDR with line-scan for optimal threshold-lambda. 

 

Parameters 

----------- 

X 

fit matrix 

y 

target array 

threshold_lambdas 

list of threshold-lambda values to be evaluated. The optimal 

lambda-value found will be used in the final fit. 

cv_splits 

how many CV splits to carry out when evaluating each lambda value. 

""" 

 

from .cross_validation import CrossValidationEstimator 

 

# default lambda values to scan 

if threshold_lambdas is None: 

threshold_lambdas = np.logspace(3, 6, 15) 

 

# run lin-scan of lambda values 

cv_data = [] 

for lam in threshold_lambdas: 

cve = CrossValidationEstimator((X, y), fit_method='ardr', validation_method='shuffle-split', 

threshold_lambda=lam, test_size=0.1, train_size=0.9, 

n_splits=cv_splits, **kwargs) 

cve.validate() 

cv_data.append([lam, cve.rmse_validation]) 

 

# select best lambda 

cv_data = np.array(cv_data) 

optimal_ind = cv_data[:, 1].argmin() 

lambda_optimal = cv_data[optimal_ind, 0] 

 

# final fit with optimal lambda 

results = _fit_ardr(X, y, threshold_lambda=lambda_optimal, **kwargs) 

results['threshold_lambda_optimal'] = lambda_optimal 

return results 

 

 

class _Estimator: 

""" Estimator class making it possible to use all fit methods 

as estimators in RFE """ 

 

def __init__(self, fit_method, **kwargs): 

427 ↛ 428line 427 didn't jump to line 428, because the condition on line 427 was never true if fit_method == 'rfe': 

raise ValueError('recursive infinitum') 

self.fit_method = fit_method 

self.kwargs = kwargs 

self.coef_ = None 

 

def fit(self, X, y): 

fit_func = fit_methods[self.fit_method] 

results = fit_func(X, y, **self.kwargs) 

self.coef_ = results['parameters'] 

 

def get_params(self, deep=True): 

params = {k: v for k, v in self.kwargs.items()} 

params['fit_method'] = self.fit_method 

return params 

 

def predict(self, A): 

return np.dot(A, self.coef_) 

 

 

def fit_rfe(X: np.ndarray, 

y: np.ndarray, 

n_features: int = None, 

step: Union[int, float] = 0.04, 

estimator: str = 'least-squares', 

final_estimator: str = None, 

estimator_kwargs: dict = {}, 

final_estimator_kwargs: dict = {}, 

cv_splits: int = 5, 

n_jobs: int = -1, 

**rfe_kwargs): 

""" 

Returns the solution `a` to the linear problem `Xa=y` obtained by 

recursive feature elimination (RFE). 

 

Parameters 

----------- 

X 

fit matrix 

y 

target array 

n_features 

number of features to select, if None sklearn.feature_selection.RFECV 

will be used to determine the optimal number of features 

step 

if given as integer then corresponds to number of parameters to 

eliminate in each iteration. If given as a float then corresponds to 

the fraction of parameters to remove each iteration. 

estimator 

fit method during RFE algorithm 

final_estimator 

fit_method to be used in final fit, 

if None will default to whichever estimator is being used 

cv_splits 

number of cv-splits to carry out if finding optimal n_features 

n_jobs 

number of cores to use during the cross validation. 

-1 means using all processors. 

""" 

 

# handle kwargs 

if final_estimator is None: 

final_estimator = estimator 

490 ↛ 493line 490 didn't jump to line 493, because the condition on line 490 was never false if len(final_estimator_kwargs) == 0: 

final_estimator_kwargs = estimator_kwargs 

 

estimator_obj = _Estimator(estimator, **estimator_kwargs) 

if n_features is None: 

495 ↛ 497line 495 didn't jump to line 497, because the condition on line 495 was never false if 'scoring' not in rfe_kwargs: 

rfe_kwargs['scoring'] = 'neg_mean_squared_error' 

cv = ShuffleSplit(train_size=0.9, test_size=0.1, n_splits=cv_splits) 

rfe = RFECV(estimator_obj, step=step, cv=cv, n_jobs=n_jobs, **rfe_kwargs) 

else: 

rfe = RFE(estimator_obj, n_features_to_select=n_features, step=step, **rfe_kwargs) 

 

# Carry out RFE 

rfe.fit(X, y) 

features = rfe.support_ 

ranking = rfe.ranking_ 

 

# carry out final fit 

n_params = X.shape[1] 

results = fit(X[:, features], y, fit_method=final_estimator, **final_estimator_kwargs) 

params = np.zeros(n_params) 

params[features] = results['parameters'] 

results['parameters'] = params 

 

# finish up 

results['features'] = features 

results['ranking'] = ranking 

return results 

 

 

fit_methods = OrderedDict([ 

('least-squares', _fit_least_squares), 

('lasso', _fit_lasso), 

('ridge', _fit_ridge), 

('bayesian-ridge', _fit_bayesian_ridge), 

('elasticnet', _fit_elasticnet), 

('split-bregman', fit_split_bregman), 

('ardr', _fit_ardr), 

('rfe', fit_rfe), 

]) 

available_fit_methods = sorted(fit_methods.keys())