Source code for pymatgen.io.lammps.output

# coding: utf-8
# Copyright (c) Pymatgen Development Team.
# Distributed under the terms of the MIT License.

from __future__ import division, print_function, unicode_literals, absolute_import

"""
This module implements classes for processing Lammps output files:

    1. log file: contains the thermodynamic data with the format set by the
        'thermo_style' command

    2. trajectory file(dump file): the file generated by the 'dump' command

    Restrictions:
        The first 2 fields of the ATOMS section in the trajectory(dump) file
        must be the atom id and the atom type. There can be arbitrary number
        of fields after that and they all will be treated as floats and
        updated based on the field names in the ITEM: ATOMS line.
"""

import re
import os
from io import open

import numpy as np

from monty.json import MSONable

from pymatgen.core.periodic_table import _pt_data
from pymatgen.core.structure import Structure
from pymatgen.core.lattice import Lattice
from pymatgen.analysis.diffusion_analyzer import DiffusionAnalyzer
from pymatgen.io.lammps.data import LammpsData, LammpsForceFieldData

__author__ = "Kiran Mathew"
__email__ = "kmathew@lbl.gov"
__credits__ = "Navnidhi Rajput, Michael Humbert"


# TODO write parser for one and multi thermo_styles
[docs]class LammpsLog(MSONable): """ Parser for LAMMPS log file. """ def __init__(self, log_file="log.lammps"): """ Args: log_file (string): path to the log file """ self.log_file = os.path.abspath(log_file) self.timestep = -1 self._parse_log() def _parse_log(self): """ Parse the log file for run and thermodynamic data. Sets the thermodynamic data as a structured numpy array with field names taken from the custom thermo_style command. thermo_style one and multi are not supported yet """ thermo_data = [] fixes = [] d_build = None thermo_pattern = None with open(self.log_file, 'r') as logfile: for line in logfile: # timestep, the unit depedns on the 'units' command time = re.search(r'timestep\s+([0-9]+)', line) if time and not d_build: self.timestep = float(time.group(1)) # total number md steps steps = re.search(r'run\s+([0-9]+)', line) if steps and not d_build: self.nmdsteps = int(steps.group(1)) # simulation info fix = re.search(r'fix.+', line) if fix and not d_build: fixes.append(fix.group()) # dangerous builds danger = re.search(r'Dangerous builds\s+([0-9]+)', line) if danger and not d_build: d_build = int(steps.group(1)) # logging interval thermo = re.search(r'thermo\s+([0-9]+)', line) if thermo and not d_build: self.interval = float(thermo.group(1)) # thermodynamic data, set by the thermo_style command fmt = re.search(r'thermo_style.+', line) if fmt and not d_build: thermo_type = fmt.group().split()[1] fields = fmt.group().split()[2:] no_parse = ["one", "multi"] if thermo_type in no_parse: thermo_data.append("cannot parse thermo_style") else: thermo_pattern_string = r"\s*([0-9eE\.+-]+)" + "".join( [r"\s+([0-9eE\.+-]+)" for _ in range(len(fields) - 1)]) thermo_pattern = re.compile(thermo_pattern_string) if thermo_pattern: if thermo_pattern.search(line): m = thermo_pattern.search(line) thermo_data.append(tuple([float(x) for x in m.groups()])) if thermo_data: if isinstance(thermo_data[0], str): self.thermo_data = [thermo_data] else: # numpy arrays are easier to reshape, previously we used np.array with dtypes self.thermo_data = { fields[i]: [thermo_data[j][i] for j in range(len(thermo_data))] for i in range(len(fields))} self.fixes = fixes self.dangerous_builds = d_build
[docs] def as_dict(self): d = {} for attrib in [a for a in dir(self) if not a.startswith('__') and not callable(getattr(self, a))]: d[attrib] = getattr(self, attrib) d["@module"] = self.__class__.__module__ d["@class"] = self.__class__.__name__ return d
# not really needed ?
[docs] @classmethod def from_dict(cls, d): return cls(log_file=d["log_file"])
# TODO: @wood-b parse binary dump files(*.dcd)
[docs]class LammpsDump(MSONable): """ Parse lammps dump file. """ def __init__(self, timesteps, natoms, box_bounds, atoms_data): self.timesteps = timesteps self.natoms = natoms self.box_bounds = box_bounds self.atoms_data = atoms_data
[docs] @classmethod def from_file(cls, dump_file): timesteps = [] atoms_data = [] natoms = 0 box_bounds = [] bb_flag = 0 parse_timestep, parse_natoms, parse_bb, parse_atoms = False, False, False, False with open(dump_file) as tf: for line in tf: if "ITEM: TIMESTEP" in line: parse_timestep = True continue if parse_timestep: timesteps.append(float(line)) parse_timestep = False if "ITEM: NUMBER OF ATOMS" in line: parse_natoms = True continue if parse_natoms: natoms = int(line) parse_natoms = False if "ITEM: BOX BOUNDS" in line: parse_bb = True continue if parse_bb: box_bounds.append([float(x) for x in line.split()]) bb_flag += 1 parse_bb = False if bb_flag >= 3 else True if "ITEM: ATOMS" in line: parse_atoms = True continue if parse_atoms: line_data = [float(x) for x in line.split()] atoms_data.append(line_data) parse_atoms = False if len(atoms_data) == len(timesteps)*natoms else True return cls(timesteps, natoms, box_bounds, atoms_data)
# TODO: @wood-b simplify this, use LammpsDump to parse + use mdanalysis to process. # make sure its backward compatible
[docs]class LammpsRun(MSONable): """ Parse the lammps data file, trajectory(dump) file and the log file to extract useful info about the system. Note: In order to parse trajectory or dump file, the first 2 fields must be the id and the atom type. There can be arbitrary number of fields after that and they all will be treated as floats. Args: data_file (str): path to the data file trajectory_file (str): path to the trajectory file or dump file log_file (str): path to the log file """ def __init__(self, data_file, trajectory_file, log_file="log.lammps", is_forcefield=False): self.data_file = os.path.abspath(data_file) self.trajectory_file = os.path.abspath(trajectory_file) self.log_file = os.path.abspath(log_file) self.log = LammpsLog(log_file) self.is_forcefield = is_forcefield if self.is_forcefield: self.lammps_data = LammpsForceFieldData.from_file(self.data_file) else: self.lammps_data = LammpsData.from_file(self.data_file) self._set_mol_masses_and_charges() self._parse_trajectory() def _parse_trajectory(self): """ parse the trajectory file. """ traj_timesteps = [] trajectory = [] timestep_label = "ITEM: TIMESTEP" # "ITEM: ATOMS id type ... traj_label_pattern = re.compile( r"^\s*ITEM:\s+ATOMS\s+id\s+type\s+([A-Za-z0-9[\]_\s]*)") # default: id type x y z vx vy vz mol" # updated below based on the field names in the ITEM: ATOMS line # Note: the first 2 fields must be the id and the atom type. There can # be arbitrary number of fields after that and they all will be treated # as floats. traj_pattern = re.compile( r"\s*(\d+)\s+(\d+)\s+([0-9eE.+-]+)\s+([0-9eE.+-]+)\s+" r"([0-9eE.+-]+)\s+" r"([0-9eE.+-]+)\s+" r"([0-9eE.+-]+)\s+([0-9eE.+-]+)\s+(\d+)\s*") parse_timestep = False with open(self.trajectory_file) as tf: for line in tf: if timestep_label in line: parse_timestep = True continue if parse_timestep: traj_timesteps.append(float(line)) parse_timestep = False if traj_label_pattern.search(line): fields = traj_label_pattern.search(line).group(1) fields = fields.split() # example:- id type x y z vx vy vz mol ... traj_pattern_string = r"\s*(\d+)\s+(\d+)" + "".join( [r"\s+([0-9eE\.+-]+)" for _ in range(len(fields))]) traj_pattern = re.compile(traj_pattern_string) if traj_pattern.search(line): # first 2 fields must be id and type, the rest of them # will be casted as floats m = traj_pattern.search(line) line_data = [] line_data.append(int(m.group(1)) - 1) line_data.append(int(m.group(2))) line_data.extend( [float(x) for i, x in enumerate(m.groups()) if i + 1 > 2]) trajectory.append(tuple(line_data)) traj_dtype = np.dtype([(str('Atoms_id'), np.int64), (str('atom_type'), np.int64)] + [(str(fld), np.float64) for fld in fields]) self.trajectory = np.array(trajectory, dtype=traj_dtype) self.timesteps = np.array(traj_timesteps, dtype=np.float64) for step in range(self.timesteps.size): begin = step * self.natoms end = (step + 1) * self.natoms self.trajectory[begin:end] = np.sort(self.trajectory[begin:end], order=str("Atoms_id")) def _set_mol_masses_and_charges(self): """ set the charge, mass and the atomic makeup for each molecule """ mol_config = [] # [ [atom id1, atom id2, ...], ... ] mol_masses = [] # [ [atom mass1, atom mass2, ...], ... ] # mol_charges = [] unique_atomic_masses = np.array(self.lammps_data.atomic_masses)[:, 1] atoms_data = np.array(self.lammps_data.atoms_data) mol_ids = atoms_data[:, 1].astype(np.int64) atom_ids = atoms_data[:, 0].astype(np.int64) unique_mol_ids = np.unique(mol_ids) atomic_types = atoms_data[:, 2].astype(np.int64) atomic_masses = unique_atomic_masses[atomic_types - 1] self.nmols = unique_mol_ids.size for umid in range(self.nmols): mol_config.append(atom_ids[np.where(mol_ids == umid + 1)] - 1) mol_masses.append(atomic_masses[np.where(mol_ids == umid + 1)]) self.mol_config = np.array(mol_config) self.mol_masses = np.array(mol_masses) def _weighted_average(self, mol_id, mol_vector): """ Calculate the weighted average of the array comprising of atomic vectors corresponding to the molecule with id mol_id. Args: mol_id (int): molecule id mol_vector (numpy array): array of shape, natoms_in_molecule with id mol_id x 3 Returns: 1D numpy array(3 x 1) of weighted averages in x, y, z directions """ mol_masses = self.mol_masses[mol_id] return np.array( [np.dot(mol_vector[:, dim], mol_masses) / np.sum(mol_masses) for dim in range(3)]) def _get_mol_vector(self, step, mol_id, param=("x", "y", "z")): """ Returns numpy array corresponding to atomic vectors of parameter "param" for the given time step and molecule id Args: step (int): time step mol_id (int): molecule id param (list): the atomic parameter for which the weighted average is to be computed Returns: 2D numpy array(natoms_in_molecule x 3) of atomic vectors """ begin = step * self.natoms end = (step + 1) * self.natoms mol_vector_structured = \ self.trajectory[begin:end][self.mol_config[mol_id]][param] new_shape = mol_vector_structured.shape + (-1,) mol_vector = mol_vector_structured.view(np.float64).reshape(new_shape) return mol_vector.copy() # TODO: remove this and use only get_displacements(an order of magnitude faster)
[docs] def get_structures_from_trajectory(self): """ Convert the coordinates in each time step to a structure(boxed molecule). Used to construct DiffusionAnalyzer object. Returns: list of Structure objects """ lattice = Lattice([[self.box_lengths[0], 0, 0], [0, self.box_lengths[1], 0], [0, 0, self.box_lengths[2]]]) structures = [] mass_to_symbol = dict( (round(y["Atomic mass"], 1), x) for x, y in _pt_data.items()) unique_atomic_masses = np.array(self.lammps_data.atomic_masses)[:, 1] for step in range(self.timesteps.size): begin = step * self.natoms end = (step + 1) * self.natoms mol_vector_structured = \ self.trajectory[begin:end][:][["x", "y", "z"]] new_shape = mol_vector_structured.shape + (-1,) mol_vector = mol_vector_structured.view(np.float64).reshape( new_shape) coords = mol_vector.copy() species = [mass_to_symbol[round(unique_atomic_masses[atype - 1], 1)] for atype in self.trajectory[begin:end][:]["atom_type"]] try: structure = Structure(lattice, species, coords, coords_are_cartesian=True) except ValueError as error: print("Error: '{}' at timestep {} in the trajectory".format( error, int(self.timesteps[step]))) structures.append(structure) return structures
[docs] def get_displacements(self): """ Return the initial structure and displacements for each time step. Used to interface with the DiffusionAnalyzer. Returns: Structure object, numpy array of displacements """ lattice = Lattice([[self.box_lengths[0], 0, 0], [0, self.box_lengths[1], 0], [0, 0, self.box_lengths[2]]]) mass_to_symbol = dict( (round(y["Atomic mass"], 1), x) for x, y in _pt_data.items()) unique_atomic_masses = np.array(self.lammps_data.atomic_masses)[:, 1] frac_coords = [] for step in range(self.timesteps.size): begin = step * self.natoms end = (step + 1) * self.natoms mol_vector_structured = \ self.trajectory[begin:end][:][["x", "y", "z"]] new_shape = mol_vector_structured.shape + (-1,) mol_vector = mol_vector_structured.view(np.float64).reshape( new_shape) coords = mol_vector.copy() if step == 0: species = [ mass_to_symbol[round(unique_atomic_masses[atype - 1], 1)] for atype in self.trajectory[begin:end][:]["atom_type"]] structure = Structure(lattice, species, coords, coords_are_cartesian=True) step_frac_coords = [lattice.get_fractional_coords(crd) for crd in coords] frac_coords.append(np.array(step_frac_coords)[:, None]) frac_coords = np.concatenate(frac_coords, axis=1) dp = frac_coords[:, 1:] - frac_coords[:, :-1] dp = dp - np.round(dp) f_disp = np.cumsum(dp, axis=1) disp = lattice.get_cartesian_coords(f_disp) return structure, disp
[docs] def get_diffusion_analyzer(self, specie, temperature, time_step, step_skip, **kwargs): """ Args: specie (Element/Specie): Specie to calculate diffusivity for as a String. E.g., "Li". temperature (float): Temperature of the diffusion run in Kelvin. time_step (int): Time step between measurements. step_skip (int): Sampling frequency of the displacements ( time_step is multiplied by this number to get the real time between measurements) For the other parameters please see the pymatgen.analysis.diffusion_analyzer.DiffusionAnalyzer documentation. Returns: DiffusionAnalyzer """ # structures = self.get_structures_from_trajectory() structure, disp = self.get_displacements() return DiffusionAnalyzer(structure, disp, specie, temperature, time_step, step_skip=step_skip, **kwargs)
@property def natoms(self): return self.lammps_data.natoms @property def box_lengths(self): return [l[1] - l[0] for l in self.lammps_data.box_size] @property def traj_timesteps(self): """ trajectory time steps in time units. e.g. for units = real, time units = fmsec """ return self.timesteps * self.log.timestep @property def mol_trajectory(self): """ Compute the weighted average trajectory of each molecule at each timestep Returns: 2D numpy array ((n_timesteps*mols_number) x 3) """ traj = [] for step in range(self.timesteps.size): tmp_mol = [] for mol_id in range(self.nmols): mol_coords = self._get_mol_vector(step, mol_id, param=["x", "y", "z"]) # take care of periodic boundary conditions pbc_wrap(mol_coords, self.box_lengths) tmp_mol.append(self._weighted_average(mol_id, mol_coords)) traj.append(tmp_mol) return np.array(traj) @property def mol_velocity(self): """ Compute the weighted average velcoity of each molecule at each timestep. Returns: 2D numpy array ((n_timesteps*mols_number) x 3) """ velocity = [] for step in range(self.timesteps.size): tmp_mol = [] for mol_id in range(self.nmols): mol_velocities = self._get_mol_vector(step, mol_id, param=["vx", "vy", "vz"]) tmp_mol.append(self._weighted_average(mol_id, mol_velocities)) velocity.append(tmp_mol) return np.array(velocity)
[docs] def as_dict(self): d = {} skip = ["mol_velocity", "mol_trajectory"] # not applicable in general attributes = [a for a in dir(self) if a not in skip and not a.startswith('__')] attributes = [a for a in attributes if not callable(getattr(self, a))] for attrib in attributes: obj = getattr(self, attrib) if isinstance(obj, MSONable): d[attrib] = obj.as_dict() elif isinstance(obj, np.ndarray): d[attrib] = obj.tolist() else: d[attrib] = obj d["@module"] = self.__class__.__module__ d["@class"] = self.__class__.__name__ return d
# not really needed ?
[docs] @classmethod def from_dict(cls, d): return cls(data_file=d["data_file"], trajectory_file=d["trajectory_file"], log_file=d["log_file"], is_forcefield=d["is_forcefield"])
[docs]def pbc_wrap(array, box_lengths): """ wrap the array for molecule coordinates around the periodic boundary. Args: array (numpy.ndarray): molecule coordinates, [[x1,y1,z1],[x2,y2,z2],..] box_lengths (list): [x_length, y_length, z_length] """ ref = array[0, 0] for i in range(3): array[:, i] = np.where((array[:, i] - ref) >= box_lengths[i] / 2, array[:, i] - box_lengths[i], array[:, i]) array[:, i] = np.where((array[:, i] - ref) < -box_lengths[i] / 2, array[:, i] + box_lengths[i], array[:, i])