#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
mwtab.mwtab
~~~~~~~~~~~
This module provides the :class:`~mwtab.mwtab.MWTabFile` class
that stores the data from a single ``mwTab`` formatted file in the
form of an :py:class:`~collections.OrderedDict`. Data can be accessed
directly from the :class:`~mwtab.mwtab.MWTabFile` instance using
bracket accessors.
The data is divided into a series of "sections" which each contain a
number of "key-value"-like pairs. Also, the file contains a specially
formatted ``SUBJECT_SAMPLE_FACTOR`` block and blocks of data between
``*_START`` and ``*_END``.
"""
from __future__ import print_function, division, unicode_literals
from collections import OrderedDict
import io
import sys
import json
from .tokenizer import tokenizer
[docs]class MWTabFile(OrderedDict):
"""MWTabFile class that stores data from a single ``mwTab`` formatted file in
the form of :py:class:`collections.OrderedDict`.
"""
prefixes = {
"METABOLOMICS WORKBENCH": "",
"PROJECT": "PR:",
"STUDY": "ST:",
"SUBJECT": "SU:",
"SUBJECT_SAMPLE_FACTORS": "",
"COLLECTION": "CO:",
"TREATMENT": "TR:",
"SAMPLEPREP": "SP:",
"CHROMATOGRAPHY": "CH:",
"ANALYSIS": "AN:",
"MS": "MS:",
"NMR": "NM:",
"MS_METABOLITE_DATA": "",
"NMR_BINNED_DATA": "",
"METABOLITES": ""
}
def __init__(self, source, *args, **kwds):
"""File initializer.
:param str source: Source a `MWTabFile` instance was created from.
"""
super(MWTabFile, self).__init__(*args, **kwds)
self.source = source
self.study_id = ""
self.analysis_id = ""
self.header = ""
[docs] def read(self, filehandle):
"""Read data into a :class:`~mwtab.mwtab.MWTabFile` instance.
:param filehandle: file-like object.
:type filehandle: :py:class:`io.TextIOWrapper`, :py:class:`gzip.GzipFile`,
:py:class:`bz2.BZ2File`, :py:class:`zipfile.ZipFile`
:return: None
:rtype: :py:obj:`None`
"""
input_str = filehandle.read()
if not input_str:
raise ValueError("Blank input string retrieved from source.")
mwtab_str = self._is_mwtab(input_str)
json_str = self._is_json(input_str)
if not input_str:
pass
elif json_str:
self.update(json_str)
elif mwtab_str:
self._build_mwtabfile(mwtab_str)
else:
raise TypeError("Unknown file format")
try:
self.study_id = self["METABOLOMICS WORKBENCH"].get("STUDY_ID")
self.analysis_id = self["METABOLOMICS WORKBENCH"].get("ANALYSIS_ID")
# self.header = self["METABOLOMICS WORKBENCH"].get("HEADER")
self.header = " ".join(
["#METABOLOMICS WORKBENCH"]
+ [item[0] + ":" + item[1] for item in self["METABOLOMICS WORKBENCH"].items() if item[0] not in ["VERSION", "CREATED_ON"]]
)
except KeyError as e:
raise KeyError("File missing header information \"METABOLOMICS WORKBENCH\"", e)
filehandle.close()
[docs] def write(self, filehandle, file_format):
"""Write :class:`~mwtab.mwtab.MWTabFile` data into file.
:param filehandle: file-like object.
:type filehandle: :py:class:`io.TextIOWrapper`
:param str file_format: Format to use to write data: `mwtab` or `json`.
:return: None
:rtype: :py:obj:`None`
"""
try:
if file_format == "json":
json_str = self._to_json()
filehandle.write(json_str)
elif file_format == "mwtab":
mwtab_str = self._to_mwtab()
filehandle.write(mwtab_str)
else:
raise TypeError("Unknown file format.")
except IOError:
raise IOError('"filehandle" parameter must be writable.')
filehandle.close()
[docs] def writestr(self, file_format):
"""Write :class:`~mwtab.mwtab.MWTabFile` data into string.
:param str file_format: Format to use to write data: `mwtab` or `json`.
:return: String representing the :class:`~mwtab.mwtab.MWTabFile` instance.
:rtype: :py:class:`str`
"""
if file_format == "json":
json_str = self._to_json()
return json_str
elif file_format == "mwtab":
mwtab_str = self._to_mwtab()
return mwtab_str
else:
raise TypeError("Unknown file format.")
def _build_mwtabfile(self, mwtab_str):
"""Build :class:`~mwtab.mwtab.MWTabFile` instance.
:param mwtab_str: String in `mwtab` format.
:type mwtab_str: :py:class:`str` or :py:class:`bytes`
:return: instance of :class:`~mwtab.mwtab.MWTabFile`.
:rtype: :class:`~mwtab.mwtab.MWTabFile`
"""
mwtab_file = self
lexer = tokenizer(mwtab_str)
token = next(lexer)
while token.key != "!#ENDFILE":
if token.key.startswith("#"):
name = token.key[1:]
section = self._build_block(lexer)
if section:
if name == "METABOLITES":
data_section = next((n for n in mwtab_file.keys() if n in ("MS_METABOLITE_DATA", "NMR_METABOLITE_DATA", "NMR_BINNED_DATA")), None)
if data_section:
for key in section.keys():
mwtab_file[data_section][key] = section[key]
elif name == "NMR":
mwtab_file["NM"] = section
else:
mwtab_file[name] = section
token = next(lexer)
return mwtab_file
def _build_block(self, lexer):
"""Build individual text block of :class:`~mwtab.mwtab.MWTabFile` instance.
:param lexer: instance of the mwtab tokenizer.
:type lexer: :func:`~mwtab.tokenizer.tokenizer`
:return: Section dictionary.
:rtype: :py:class:`collections.OrderedDict`
"""
section = OrderedDict()
token = next(lexer)
alias = {
"subject_type": "Subject ID",
"local_sample_id": "Sample ID",
"factors": "Factors",
"additional_sample_data": "Additional sample data",
}
while not token.key.startswith("#ENDSECTION"):
# TODO: Move to separate method (no longer works the same way as the other possibilities in _build_block())
if token.key.startswith("SUBJECT_SAMPLE_FACTORS"):
if type(section) != list:
section = list()
# sample_dict = OrderedDict({alias[token._fields[x]]: token[x] for x in range(1, len(token._fields))})
# if not sample_dict.get("Additional sample data"):
# del sample_dict["Additional sample data"]
section.append(token.value)
elif token.key.endswith("_START"):
data = []
token = next(lexer)
header = list(token.value)
while not token.key.endswith("_END"):
if token.key in ("Samples", "Factors", "metabolite_name", "Bin range(ppm)"):
pass
else:
data.append(OrderedDict(zip(["Metabolite"] + header[1:], token.value)))
token = next(lexer)
if token.key.startswith("METABOLITES"):
section["Metabolites"] = data
elif token.key.startswith("EXTENDED_"):
section["Extended"] = data
else:
section["Data"] = data
elif token.key.endswith("_RESULTS_FILE"):
if len(token) > 2:
key, value, extra = token
section[key] = OrderedDict([(key, value)])
for pair in extra:
section[key].update((pair,))
else:
key, value = token
section[key] = value
else:
key, value, = token
if key in section:
section[key] += " {}".format(value)
else:
section[key] = value
# load token(s) (from parsing of next line in file)
token = next(lexer)
return section
[docs] def print_file(self, f=sys.stdout, file_format="mwtab"):
"""Print :class:`~mwtab.mwtab.MWTabFile` into a file or stdout.
:param f: writable file-like stream.
:type f: :py:class:`io.StringIO`
:param str file_format: Format to use: `mwtab` or `json`.
:return: None
:rtype: :py:obj:`None`
"""
if file_format == "mwtab":
for key in self:
if key == "SUBJECT_SAMPLE_FACTORS":
print("#SUBJECT_SAMPLE_FACTORS: \tSUBJECT(optional)[tab]SAMPLE[tab]FACTORS(NAME:VALUE pairs separated by |)[tab]Additional sample data", file=f)
self.print_subject_sample_factors(key, f=f, file_format=file_format)
else:
if key == "METABOLOMICS WORKBENCH":
print(self.header, file=f)
elif key == "NM":
print("#NMR", file=f)
else:
print("#{}".format(key), file=f)
self.print_block(key, f=f, file_format=file_format)
print("#END", file=f)
elif file_format == "json":
print(self._to_json(), file=f)
[docs] def print_subject_sample_factors(self, section_key, f=sys.stdout, file_format="mwtab"):
"""Print `mwtab` `SUBJECT_SAMPLE_FACTORS` section into a file or stdout.
:param str section_key: Section name.
:param f: writable file-like stream.
:type f: :py:class:`io.StringIO`
:param str file_format: Format to use: `mwtab` or `json`.
:return: None
:rtype: :py:obj:`None`
"""
if file_format == "mwtab":
for item in self[section_key]:
formatted_items = []
for k in item.keys():
if k in ["Subject ID", "Sample ID"]:
formatted_items.append(str(item[k]))
elif k == "Factors":
factors = []
for k2 in item[k]:
factors.append("{}:{}".format(k2, item[k][k2]))
formatted_items.append(" | ".join(factors))
elif k == "Additional sample data":
additional_sample_data = []
for k2 in item[k]:
additional_sample_data.append("{}={}".format(k2, item[k][k2]))
formatted_items.append(";".join(additional_sample_data))
line = "{}{}\t{}".format(section_key, 33 * " ", "\t".join(formatted_items))
# for file missing "Additional sample data" items
if len(formatted_items) < 4:
line += "\t"
else:
print(line, file=f)
[docs] def print_block(self, section_key, f=sys.stdout, file_format="mwtab"):
"""Print `mwtab` section into a file or stdout.
:param str section_key: Section name.
:param f: writable file-like stream.
:type f: :py:class:`io.StringIO`
:param str file_format: Format to use: `mwtab` or `json`.
:return: None
:rtype: :py:obj:`None`
"""
if file_format == "mwtab":
for key, value in self[section_key].items():
if section_key == "METABOLOMICS WORKBENCH" and key not in ("VERSION", "CREATED_ON"):
continue
if key in ("VERSION", "CREATED_ON"):
cw = 20 - len(key)
elif key == "Units":
cw = 33 - len(section_key+":UNITS")
else:
cw = 30 - len(key)
if key.endswith("_RESULTS_FILE"):
if isinstance(value, dict):
print("{}{} \t{}\t{}:{}".format(self.prefixes.get(section_key, ""),
*[i for pair in value.items() for i in pair]), file=f)
else:
print("{}{}{}\t{}".format(self.prefixes.get(section_key, ""), key, cw * " ", value), file=f)
# prints #MS_METABOLITE_DATA, #NMR_METABOLITE_DATA, or #NMR_BINNED_DATA sections
elif key == "Units":
print("{}:UNITS{}\t{}".format(section_key, cw * " ", value), file=f)
elif key == "Data":
print("{}_START".format(section_key), file=f)
if "METABOLITE" in section_key:
# prints "Samples" line at head of data section
print("\t".join(["Samples"] + [k for k in self[section_key][key][0].keys()][1:]), file=f)
# prints "Factors" line at head of data section
factors_list = ["Factors"]
factors_dict = {i["Sample ID"]: i["Factors"] for i in self["SUBJECT_SAMPLE_FACTORS"]}
for k in [k for k in self[section_key][key][0].keys()][1:]:
factors = [fk + ":" + factors_dict[k][fk] for fk in factors_dict[k].keys()]
factors_list.append(" | ".join(factors))
print("\t".join(factors_list), file=f)
for i in self[section_key][key]:
print("\t".join([i[k] for k in i.keys()]), file=f)
else: # NMR_BINNED_DATA
print("\t".join(["Bin range(ppm)"] + [k for k in self[section_key][key][0].keys()][1:]), file=f)
for i in self[section_key][key]:
print("\t".join([i[k] for k in i.keys()]), file=f)
print("{}_END".format(section_key), file=f)
# prints #METABOLITES section
elif key in ("Metabolites", "Extended"):
if key == "Metabolites":
print("#METABOLITES", file=f)
print("METABOLITES_START", file=f)
else:
print("EXTENDED_{}_START".format(section_key), file=f)
print("\t".join(["metabolite_name"] + [k for k in self[section_key][key][0].keys()][1:]), file=f)
for i in self[section_key][key]:
print("\t".join(i[k] for k in i.keys()), file=f)
if key == "Metabolites":
print("METABOLITES_END", file=f)
else:
print("EXTENDED_{}_END".format(section_key), file=f)
else:
if len(str(value)) > 80:
words = str(value).split(" ")
length = 0
line = list()
for word in words:
if length + len(word) + len(line) - 1 <= 80:
line.append(word)
length += len(word)
else:
print("{}{}{}\t{}".format(self.prefixes.get(section_key, ""), key, cw * " ", " ".join(line)), file=f)
line = [word]
length = len(word)
print("{}{}{}\t{}".format(self.prefixes.get(section_key, ""), key, cw * " ", " ".join(line)),
file=f)
else:
print("{}{}{}\t{}".format(self.prefixes.get(section_key, ""), key, cw * " ", value), file=f)
elif file_format == "json":
print(json.dumps(self[section_key], sort_keys=False, indent=4), file=f)
def _to_json(self):
"""Save :class:`~mwtab.mwtab.MWTabFile` into JSON string.
:return: JSON string.
:rtype: :py:class:`str`
"""
return json.dumps(self, sort_keys=False, indent=4)
def _to_mwtab(self):
"""Save :class:`~mwtab.mwtab.MWTabFile` in `mwtab` formatted string.
:return: NMR-STAR string.
:rtype: :py:class:`str`
"""
mwtab_str = io.StringIO()
self.print_file(mwtab_str)
return mwtab_str.getvalue()
@staticmethod
def _is_mwtab(string):
"""Test if input string is in `mwtab` format.
:param string: Input string.
:type string: :py:class:`str` or :py:class:`bytes`
:return: Input string if in mwTab format or False otherwise.
:rtype: :py:class:`str` or :py:obj:`False`
"""
if isinstance(string, str):
lines = string.replace("\r", "\n").split("\n")
elif isinstance(string, bytes):
lines = string.decode("utf-8").replace("\r", "\n").split("\n")
else:
raise TypeError("Expecting <class 'str'> or <class 'bytes'>, but {} was passed".format(type(string)))
lines = [line for line in lines if line]
header = lines[0]
if header.startswith("#METABOLOMICS WORKBENCH"):
return "\n".join(lines)
return False
@staticmethod
def _is_json(string):
"""Test if input string is in JSON format.
:param string: Input string.
:type string: :py:class:`str` or :py:class:`bytes`
:return: Input string if in JSON format or False otherwise.
:rtype: :py:class:`str` or :py:obj:`False`
"""
try:
if isinstance(string, bytes):
json_str = json.loads(string.decode("utf-8"), object_pairs_hook=OrderedDict)
elif isinstance(string, str):
json_str = json.loads(string, object_pairs_hook=OrderedDict)
else:
raise TypeError("Expecting <class 'str'> or <class 'bytes'>, but {} was passed".format(type(string)))
return json_str
except ValueError:
return False