Source code for genomedata

    #!/usr/bin/env python
"""
Genomedata is a module to store and access large-scale functional
genomics data in a format which is both space-efficient and allows
efficient random-access.

Under the surface, genomedata is implemented as a collection of HDF5 files,
but genomedata provides a transparent interface to interact with your
underlying data without having to worry about the mess of repeatedly parsing
large data files or having to keep them in memory for random access.

Copyright 2009, 2010, 2011, 2012, 2013 Michael M. Hoffman <mmh1@washington.edu>

"""

from __future__ import division, with_statement

__version__ = "$Revision: 15987 $"

import sys

import tables
from functools import partial
from numpy import (add, amin, amax, append, array, empty, float32, inf,
                   nan, ndarray, square, uint8)
from os import extsep
from path import path
from tables import Float32Atom, NoSuchNodeError, openFile, UInt8Atom
from warnings import warn

FORMAT_VERSION = 1
SEQ_DTYPE = uint8
SEQ_ATOM = UInt8Atom()

CONTINUOUS_DTYPE = float32
CONTINUOUS_ATOM = Float32Atom(dflt=nan)
CONTINUOUS_CHUNK_SHAPE = (10000, 1)

EXT = "genomedata"
SUFFIX = extsep + EXT

# If there are fewer than this many chomosomes, the default behavior
# is to implement the Genomedata archive as a directory. If there are
# more than this many, it will be a single file by default.
FILE_MODE_CHROMS = 100

try:
    PKG = __package__  # Python 2.6
except NameError:
    if __name__ == "__main__":
        PKG = "genomedata"
    else:
        PKG = __name__


class _InactiveDict(dict):
    """A fake dict that can't be added to."""
    def __setitem__(self, key, value):
        return


def _open_file(*args, **kwargs):
    if not "BUFFER_TIMES" in kwargs:
        # eliminate spurious PerformanceWarning
        kwargs["BUFFER_TIMES"] = inf

    return openFile(*args, **kwargs)


[docs]class Genome(object): """The root level of the genomedata object hierarchy. If you use this as a context manager, it will keep track of any open Chromosomes and close them (and the Genome object) for you later when the context is left:: with Genome("/path/to/genomedata") as genome: chromosome = genome["chr1"] [...] If not used as a context manager, you are responsible for closing the Genomedata archive once you are done: >>> genome = Genome("/path/to/genomedata") >>> chromosome = genome["chr1"] [...] >>> genome.close() """
[docs] def __init__(self, filename, *args, **kwargs): """Create a Genome object from a genomdata archive. :param filename: the root of the Genomedata object hierarchy. This can either be a .genomedata file that contains the entire genome or a directory containing multiple chromosome files. :type filename: string :param \*args: args passed on to openFile if single file or to Chromosome if directory :param \*\*kwargs: keyword args passed on to openFile if single file or to Chromosome if directory Example: >>> genome = Genome("./genomedata.ctcf.pol2b/") >>> genome Genome("./genomedata.ctcf.pol2b/") [...] >>> genome.close() >>> genome = Genome("./cat_chipseq.genomedata", mode="r") [...] >>> genome.close() """ # so that Genome.__del__() won't throw an exception if there # is an error during __init__() self._isopen = False self.filename = filename self.args = args self.kwargs = kwargs # Process path for internal use, following symbolic links # until we get to the eventual file or directory filepath = path(filename).expand() while filepath.islink(): filepath = filepath.readlinkabs() if not filepath.exists(): raise IOError("Could not find Genomedata archive: %s" % filepath) if filepath.isfile(): # Open the Genomedata file isfile = True self.h5file = _open_file(filepath, *args, **kwargs) self._file_attrs = self.h5file.root._v_attrs elif filepath.isdir(): # Genomedata directory isfile = False else: raise ValueError("Genomedata archive must be file or directory: %s" % filepath) self._path = filepath self._isfile = isfile # Keep track of open chromosomes self.open_chromosomes = {} # a kind of refcounting for context managers self._context_count = 0 self._isopen = True format_version = self.format_version if format_version is not None and format_version > FORMAT_VERSION: raise NotImplementedError("This archive has format version %s," " but the installed Genomedata software" " unly supports format version %d" % (format_version, FORMAT_VERSION))
[docs] def __iter__(self): """Return next chromosome, in sorted order, with memoization. Example:: for chromosome in genome: print chromosome.name for supercontig, continuous in chromosome.itercontinuous(): [...] """ assert self.isopen if self._isfile: # Chromosomes are groups # Iterate over child group of root for group in self.h5file.iterNodes("/", classname="Group"): groupname = group._v_name yield self[groupname] else: # Chromosomes are files # sorted so that the order is always the same for filepath in sorted(self._path.files("*" + SUFFIX)): # pass through __getitem__() to allow memoization yield self[filepath.namebase]
[docs] def __getitem__(self, name): """Return a reference to a chromosome of the given name. :param name: name of the chromosome (e.g. "chr1" if chr1.genomedata is a file in the Genomedata archive or chr1 is a top-level group in the single-file Genomedata archive) :type name: string :returns: :class:`Chromosome` Example: >>> genome["chrX"] <Chromosome 'chrX', file='/path/to/genomedata/chrX.genomedata'> >>> genome["chrZ"] KeyError: 'Could not find chromosome: chrZ' """ assert self.isopen try: # memoization return self.open_chromosomes[name] except KeyError: pass try: if self._isfile: res = Chromosome(self.h5file, where="/" + name) else: res = Chromosome._fromfilename( self._path.joinpath(name + SUFFIX), *self.args, **self.kwargs) except (IOError, NoSuchNodeError): raise KeyError("Could not find chromosome: %s" % name) self.open_chromosomes[name] = res return res
def __contains__(self, name): """Return if there is a chromosome of the given name :param name: name of the chromosome (e.g. "chr1" if chr1.genomedata is a file in the Genomedata archive or chr1 is a top-level group in the single-file Genomedata archive) :type name: string :returns: boolean Example: >>> "chrX" in Genome True >>> "chrZ" in Genome False """ try: self[name] except KeyError: return False # Couldn't find chromosome else: return True # No errors opening chromosome def __enter__(self): assert self.isopen self._context_count += 1 return self def __exit__(self, exc_type, exc_value, exc_tb): # XXX: this and __enter__ have potential race conditions, if # _context_count is changed simultaneously by different # threads. should be synchronized self._context_count -= 1 if self._context_count == 0: self.close() def __del__(self): if self.isopen: self.close()
[docs] def close(self): """Close this Genomedata archive and any open chromosomes If the Genomedata archive is a directory, this closes all open chromosomes. If it is a single file, this closes that file. This should only be used if Genome is not a context manager (see :class:`Genome`). The behavior is undefined if this is called while Genome is being used as a context manager. """ assert self.isopen # Whether a single file or a directory, close all the chromosomes # so they know they shouldn't be read. Do this before closing # Genome.h5file in case the chromosomes need access to it in closing. for name, chromosome in self.open_chromosomes.iteritems(): # Only close those not closed manually by the user if chromosome.isopen: chromosome.close() if self._isfile: self.h5file.close() self.open_chromosomes = {} self._isopen = False
def __repr__(self): items = ["'%s'" % self.filename] if self.args: items.append("*%r" % self.args) if self.kwargs: items.append("**%r" % self.kwargs) return "Genome(%s)" % ", ".join(items) def __str__(self): return repr(self) def _accum_extrema(self, name, accumulator): self.tracknames_continuous # for assertion check extrema = [getattr(chromosome, name) for chromosome in self] return accumulator(extrema)
[docs] def erase_data(self, trackname): """Erase all data for the given track across all chromosomes The Genome object must have been created with :param mode:="r+". Behavior is undefined if this is not the case. Currently sets the dirty bit, which can only be erased with genomedata-close-data """ assert self.isopen for chromosome in self: chromosome._erase_data(trackname)
[docs] def add_track_continuous(self, trackname): """Add a new track The Genome object must have been created with :param mode:="r+". Behavior is undefined if this is not the case. Currently sets the dirty bit, which can only be erased with genomedata-close-data """ assert self.isopen if self.format_version < 1: raise NotImplementedError("""Adding tracks is only supported \ for archives created with Genomedata version 1.2.0 or later.""") if self._isfile: # Update tracknames attribute on file attrs = self._file_attrs if "tracknames" in attrs: tracknames = attrs.tracknames if trackname in tracknames: raise ValueError("%s already has a track of name: %s" % (self.filename, trackname)) else: tracknames = array([]) attrs.tracknames = append(tracknames, trackname) # Let the chromosomes handle the rest for chromosome in self: chromosome._add_track_continuous(trackname)
@property
[docs] def isopen(self): """Return a boolean indicating if the Genome is still open""" return self._isopen
@property
[docs] def tracknames_continuous(self): """Return a list of the names of all data tracks stored.""" assert self.isopen if self._isfile: # Tracknames are stored at the root of each file, so we can # access them directly in this case return self._file_attrs.tracknames.tolist() else: # check that all chromosomes have the same tracknames_continuous res = None for chromosome in self: if res is None: res = chromosome.tracknames_continuous else: assert res == chromosome.tracknames_continuous return res
[docs] def index_continuous(self, trackname): """Return the column index of the trackname in the continuous data. :param trackname: name of data track :type trackname: string :returns: integer This is used for efficient indexing into continuous data: >>> col_index = genome.index_continuous("sample_track") >>> data = genome["chr3"][100:150, col_index] although for typical use, the track can be indexed directly: >>> data = genome["chr3"][100:150, "sample_track"] """ try: return self.tracknames_continuous.index(trackname) except ValueError: raise KeyError("Could not find continuous track: %s" % trackname)
@property
[docs] def num_tracks_continuous(self): """Returns the number of continuous data tracks.""" try: return len(self.tracknames_continuous) except AttributeError: return 0
@property
[docs] def format_version(self): """Genomedata format version None means there are no chromosomes in it already. """ assert self.isopen if self._isfile: try: return self._file_attrs.genomedata_format_version except AttributeError: pass # else: self is a directory chromosomes = iter(self) try: first_chromosome = chromosomes.next() except StopIteration: return None res = first_chromosome._format_version assert all(res == chromosome._format_version for chromosome in chromosomes) return res # XXX: should memoize these with an off-the-shelf decorator
@property
[docs] def mins(self): """Return the minimum value for each track. :returns: numpy.array """ return self._accum_extrema("mins", partial(amin, axis=0))
@property
[docs] def maxs(self): """Return a vector of the maximum value for each track. :returns: numpy.array """ return self._accum_extrema("maxs", partial(amax, axis=0))
@property
[docs] def sums(self): """Return a vector of the sum of the values for each track. :returns: numpy.array """ return self._accum_extrema("sums", add.reduce)
@property
[docs] def sums_squares(self): """Return a vector of the sum of squared values for each track's data. :returns: numpy.array """ return self._accum_extrema("sums_squares", add.reduce)
@property
[docs] def num_datapoints(self): """Return the number of datapoints in each track. :returns: a numpy.array vector with an entry for each track. """ return self._accum_extrema("num_datapoints", add.reduce)
@property
[docs] def means(self): """Return a vector of the mean value of each track. :returns: numpy.array """ return self.sums / self.num_datapoints
@property
[docs] def vars(self): """Return a vector of the variance in the data for each track. :returns: numpy.array """ # this is an unstable way of calculating the variance, # but it should be good enough # Numerical Recipes in C, Eqn 14.1.7 # XXX: best would be to switch to the pairwise parallel method # (see Wikipedia) return (self.sums_squares / self.num_datapoints) - \ square(self.means)
[docs]class Chromosome(object): """The Genomedata object corresponding to data for a given chromosome. Usually created by keying into a Genome object with the name of a chromosome, as in: >>> with Genome("/path/to/genomedata") as genome: ... chromosome = genome["chrX"] ... chromosome ... <Chromosome 'chrX', file='/path/to/genomedata/chrX.genomedata'> """
[docs] class ChromosomeDirtyError(Exception): pass
default_where = "/" default_mode = "r" def __init__(self, h5file, where=default_where, name=None): """ :param h5file: tables.File object for the h5file which contains the chromosome to be opened. If the Genomedata archive is a single file, :param where: should specify the path to the chromosome group within the file. :param where: path or Node to the root of the chromosome within the Genomedata file. :param name: name of the Chromosome. If None, the name will try to be parsed from :param where:. :type file: tables.File :type where: string or tables.Node :type name: string or None """ # If file is a string, open the h5 file if isinstance(h5file, tables.File): if name is None: name = where.rpartition("/")[2] else: raise NotImplementedError("Chromosome file of unsupported" " type: %r" % h5file) # Now, open the group that is the root of the chromosome h5group = h5file.getNode(where, classname="Group") # XXX: even though each chromosome has its own dirty bit, and # the metadata only needs to be recalculated on those where it is # set, right now we can't guarantee that the user hasn't changed # the values directly, so we have to set the dirty bit on every # opened chromosome group. This can be improved by tracking what # the user changes. attrs = h5group._v_attrs if h5file.mode in set(["w", "r+", "a"]): # Make sure there is a genomedata_format_version file_attrs = h5file.root._v_attrs if "genomedata_format_version" not in file_attrs: # Set as first version (before it was standard) file_attrs.genomedata_format_version = 0 attrs.dirty = True else: if attrs.dirty: raise self.ChromosomeDirtyError(""" Chromosome has been modified (or loaded with a mode of "w", "r+", or "a") since being closed with genomedata-close-data.""") self.filename = h5file.filename self._name = name self.h5file = h5file self.h5group = h5group self._isfile = (where == self.default_where) self._seq = _ChromosomeSeqSlice(self) self._supercontigs = _Supercontigs(self) self._isopen = True @classmethod def _fromfilename(cls, filename, mode=default_mode, *args, **kwargs): """ :param filename: name of the chromosome (.genomedata) file to access :param mode: mode of interaction with the chromosome file, with ``r``: read, ``w``: write, ``a``: append, ``r+``: append but force file to exist (see documentation for tables.openFile().) :type mode: string :param \*args: args passed on to openFile :param \*\*kwargs: keyword args passed on to openFile """ filepath = path(filename).expand() try: h5file = _open_file(filepath, mode=mode, *args, **kwargs) except IOError: raise IOError("Could not find file: %r" % filename) name = filepath.name.rpartition(SUFFIX)[0] return cls(h5file, name=name)
[docs] def __iter__(self): """Return next supercontig in chromosome. .. versionadded:: 1.2 Supercontigs are ordered by start index Seldom used in favor of the more direct: :meth:`Chromosome.itercontinuous` Example: >>> for supercontig in chromosome: ... supercontig # calls repr() ... <Supercontig 'supercontig_0', [0:66115833]> <Supercontig 'supercontig_1', [66375833:90587544]> <Supercontig 'supercontig_2', [94987544:199501827]> """ assert self.isopen supercontigs = [] for group in self.h5group: supercontig = Supercontig(group) supercontigs.append((supercontig.start, supercontig)) supercontigs.sort() for start, supercontig in supercontigs: yield supercontig
[docs] def __getitem__(self, key): """Return the continuous data corresponding to this bp slice :param key: base_key must index or slice bases track_key specify data tracks with index, slice, string, list of strings, list of indexes, or array of indexes but can also index, slice, or directly specify (string or list of strings) the data tracks. :type key: <base_key>[, <track_key>] :returns: numpy.array If slice is taken over or outside a supercontig boundary, missing data is filled in with NaN's automatically and a warning is printed. Typical use: >>> chromosome = genome["chr4"] >>> chromosome[0:5] # Get all data for the first five bases of chr4 >>> chromosome[0, 0:2] # Get data for first two tracks at chr4:0 >>> chromosome[100, "ctcf"] # Get "ctcf" track value at chr4:100 """ assert self.isopen # XXX: Allow variable/negative steps, negative starts/stops, etc. # XXX: The problem with missing start or end indices is that # 1) it is unclear if 0 or self.start should be used for the # start, and (preference for 0) # 2) the full length of the chromosome is not known, so self.end # would need to be used, which might not be what is wanted. -OJB # XXX: I think this should no longer be a problem now that # self.end is the full length of the chromsome? need to check this -MMH # Sanitize the input if isinstance(key, tuple): base_key, track_key = key else: base_key = key track_key = slice(None) # All tracks # just like NumPy, direct indexing results in output shape # change (at end of method) base_direct_index = isinstance(base_key, int) track_direct_index = isinstance(track_key, (basestring, int)) # convert base_key base_key = slice(*_key_to_tuple(base_key)) # First convert track_key toward slice if isinstance(track_key, (list, ndarray)): track_indexes = array([self._index_continuous(item) for item in track_key]) track_min = track_indexes.min() track_key = slice(track_min, track_indexes.max() + 1, 1) track_subset_indexes = track_indexes - track_min else: track_subset_indexes = slice(None) # everything if isinstance(track_key, basestring): track_key = self.index_continuous(track_key) if isinstance(track_key, int): track_key = slice(track_key, track_key + 1, 1) if isinstance(track_key, slice): # Fix indices to number of tracks track_key = slice(*track_key.indices(self.num_tracks_continuous)) else: raise TypeError("Unrecognized track indexing method: %s" % track_key) nrows = base_key.stop - base_key.start ncols = len(xrange(track_key.start, track_key.stop, track_key.step)) dtype = self._continuous_dtype # Handle degenerate case if nrows < 1 or ncols < 1: # Return empty array (matches numpy behavior) return array([], dtype=dtype) # At this point, base_key and track_key are guaranteed to be slices # with both start and end >= 0 # Lookup appropriate data supercontigs = self.supercontigs[base_key] if len(supercontigs) == 0: warn("slice of chromosome data does not overlap any supercontig" " (filling with 'NaN')") elif len(supercontigs) > 1: warn("slice of chromosome data spans more than one supercontig" " (filling gaps with 'NaN')") data = empty((nrows, ncols), dtype=dtype) data.fill(nan) for supercontig in supercontigs: assert (base_key.start < supercontig.end and base_key.stop > supercontig.start) chr_start = max(base_key.start, supercontig.start) chr_end = min(base_key.stop, supercontig.end) data_slice = slice(chr_start - base_key.start, chr_end - base_key.start) supercontig_slice = slice(supercontig.project(chr_start), supercontig.project(chr_end)) # track_key must be a splice try: data[data_slice, :] = supercontig.continuous[supercontig_slice, track_key] except NoSuchNodeError: # Allow the supercontig to not have a continuous dataset pass # get a subset of tracks data = data[:, track_subset_indexes] # Make output shape appropriate for indexing method (like numpy) if track_direct_index: data = data[:, 0] if base_direct_index: data = data[0] return data
def __str__(self): return str(self.name) def __repr__(self): return "<Chromosome '%s', file='%s'>" % (self.name, self.filename)
[docs] def itercontinuous(self): """Return a generator over all supercontig, continuous pairs. .. versionadded:: 1.2 Supercontigs are ordered by increasing supercontig.start. This is the best way to efficiently iterate over the data since all specified data is in supercontigs:: for supercontig, continuous in chromosome.itercontinuous(): print supercontig, supercontig.start, supercontig.end [...] """ assert self.isopen for supercontig in self: try: yield supercontig, supercontig.continuous except NoSuchNodeError: continue
def _index_continuous(self, track_key): """ Convert track_key to index only when it is a basestring. Otherwise return track_key unchanged. """ if isinstance(track_key, basestring): return self.index_continuous(track_key) return track_key
[docs] def index_continuous(self, trackname): """Return the column index of the trackname in the continuous data. :param trackname: name of data track :type trackname: string :returns: integer This is used for efficient indexing into continuous data: >>> chromosome = genome["chr3"] >>> col_index = chromosome.index_continuous("sample_track") >>> data = chromosome[100:150, col_index] although for typical use, the track can be indexed directly: >>> data = chromosome[100:150, "sample_track"] """ assert self.isopen try: return self.tracknames_continuous.index(trackname) except ValueError: raise KeyError("Could not find continuous track: %s" % trackname)
[docs] def close(self): """Close the current chromosome file. This only needs to be called when Genomedata files are manually opened as Chromosomes. Otherwise, :meth:`Genome.close` should be called to close any open chromosomes or Genomedata files. The behavior is undefined if this is called on a Chromosome accessed through a Genome object. Using Genomedata as a context manager makes life easy by memoizing chromosome access and guaranteeing the proper cleanup. See :class:`Genome`. """ assert self.isopen if self.attrs.dirty: warn("Closing Chromosome with modified data. Metadata needs to" " be recalculated by calling genomedata-close-data on the" " Genomedata archive before re-accessing it") if self._isfile: self.h5file.close() self._isopen = False
def _erase_data(self, trackname): """Erase all data for the given track The Genome object or this Chromosome must have been created with :param mode:="r+". Behavior is undefined if this is not the case. Currently sets the dirty bit, which can only be erased with genomedata-close-data GENOMEDATA """ assert self.isopen col_index = self.index_continuous(trackname) self.attrs.dirty = True for supercontig, continuous in self.itercontinuous(): continuous[:, col_index] = nan def _add_track_continuous(self, trackname): """Add a new track The Genome object must have been created with :param mode:="r+". Behavior is undefined if this is not the case. Currently sets the dirty bit, which can only be erased with genomedata-close-data """ assert self.isopen if self._isfile: # Update tracknames attribute with new trackname file_attrs = self._file_attrs if "tracknames" in file_attrs: tracknames = file_attrs.tracknames if trackname in tracknames: raise ValueError("%s already has a track of name: %s" % (self.filename, trackname)) else: tracknames = array([]) file_attrs.tracknames = append(tracknames, trackname) # else: hope the Genome object updated its own tracknames self.attrs.dirty = True # dirty specific to chromosome # Extend supercontigs by a column (or create them) for supercontig in self: supercontig_length = supercontig.end - supercontig.start try: continuous = supercontig.continuous except NoSuchNodeError: # Define an extendible array in the second dimension (0) supercontig_shape = (supercontig_length, 0) self.h5file.createEArray(supercontig.h5group, "continuous", CONTINUOUS_ATOM, supercontig_shape, chunkshape=CONTINUOUS_CHUNK_SHAPE) continuous = supercontig.continuous # Add column to supercontig continuous array # "truncate" also extends with default values continuous.truncate(continuous.nrows + 1) @property
[docs] def isopen(self): """Return a boolean indicating if the Chromosome is still open""" return self._isopen
@property def _continuous_dtype(self): for supercontig, continuous in self.itercontinuous(): return supercontig._continuous_dtype return CONTINUOUS_DTYPE @property def _seq_dtype(self): for supercontig in self: return supercontig._seq_dtype return SEQ_DTYPE @property
[docs] def name(self): """Return the name of this chromosome (same as __str__()).""" return self._name
@property
[docs] def attrs(self): """Return the attributes for this Chromosome. This may also include Genome-wide attributes if the archive is implemented as a directory. """ assert self.isopen return self.h5group._v_attrs
@property def _file_attrs(self): assert self.isopen return self.h5file.root._v_attrs @property
[docs] def tracknames_continuous(self): """Return a list of the data track names in this Chromosome.""" assert self.isopen return self._file_attrs.tracknames.tolist()
@property
[docs] def num_tracks_continuous(self): """Return the number of tracks in this chromosome""" try: return len(self.tracknames_continuous) except AttributeError: return 0
@property
[docs] def mins(self): """See :attr:`Genome.mins`""" return self.attrs.mins
@property
[docs] def maxs(self): """See :attr:`Genome.maxs`""" return self.attrs.maxs
@property
[docs] def sums(self): """See :attr:`Genome.sums`""" return self.attrs.sums
@property
[docs] def sums_squares(self): """See :attr:`Genome.sums_squares`""" return self.attrs.sums_squares
@property
[docs] def num_datapoints(self): """See :attr:`Genome.num_datapoints`""" return self.attrs.num_datapoints
@property def _format_version(self): """See :attr:`Genome.format_version`""" try: return self._file_attrs.genomedata_format_version except AttributeError: try: return self.attrs.genomedata_format_version except AttributeError: # original version did not have # genomedata_format_version attribute return 0 @property
[docs] def start(self): """Return the index of the first base in this chromosome. For :attr:`Genome.format_version` > 0, this will always be 0. For == 0, this will be the start of the first supercontig. """ if self._format_version == 0: return min(supercontig.start for supercontig in self) else: return self.attrs.start
@property
[docs] def end(self): """Return the index past the last base in this chromosome. For :attr:`Genome.format_version` > 0, this will be the number of bases of sequence in the chromosome. For == 0, this will be the end of the last supercontig. This is the end in half-open coordinates, making slicing simple: >>> chromosome.seq[chromosome.start:chromosome.end] """ if self._format_version == 0: return max(supercontig.end for supercontig in self) else: return self.attrs.end
@property
[docs] def seq(self): """Return the genomic sequence of this chromosome. If the index or slice spans a non-supercontig range, N's are inserted in place of the missing data and a warning is issued. Example: >>> chromosome = genome["chr1"] >>> for supercontig in chromosome: ... print repr(supercontig) ... <Supercontig 'supercontig_0', [0:121186957]> <Supercontig 'supercontig_1', [141476957:143422081]> <Supercontig 'supercontig_2', [143522081:247249719]> >>> chromosome.seq[0:10].tostring() # Inside supercontig 'taaccctaac' >>> chromosome.seq[121186950:121186970].tostring() \ # supercontig boundary 'agAATTCNNNNNNNNNNNNN' >>> chromosome.seq[121186957:121186960].tostring() \ # not in supercontig UserWarning: slice of chromosome sequence does not overlap any \ supercontig (filling with 'N') 'NNN' The entire sequence for a chromosome can be retrieved with: >>> chromosome.seq[chromosome.start:chromosome.end] """ return self._seq
@property
[docs] def supercontigs(self): """Return the supercontig that contains this range if possible. :returns: :class:`Supercontig` Indexable with a slice or simple index: >>> chromosome.supercontigs[100] [<Supercontig 'supercontig_0', [0:66115833]>] >>> chromosome.supercontigs[1:100000000] [<Supercontig 'supercontig_0', [0:66115833]>, \ <Supercontig 'supercontig_1', [66375833:90587544]>, \ <Supercontig 'supercontig_2', [94987544:199501827]>] >>> chromosome.supercontigs[66115833:66375833] \ # Between two supercontigs [] """ return self._supercontigs
[docs]class Supercontig(object): """A container for a segment of data in one chromosome. Implemented via a HDF5 Group """ def __init__(self, h5group): """ :param h5group: group containing the supercontig data :type h5group: HDF5 Group """ self.h5group = h5group def __repr__(self): return "<Supercontig '%s', [%d:%d]>" % (self.name, self.start, self.end) def __str__(self): return str(self.name)
[docs] def project(self, pos, bound=False): """Project chromsomal coordinates to supercontig coordinates. :param pos: chromosome coordinate :param bound: bound result to valid supercontig coordinates :type pos: integer :type bound: boolean :returns: integer """ if bound: pos = max(pos, self.start) pos = min(pos, self.end) return int(pos - self.start)
@property def _seq_dtype(self): try: return self.seq.atom.dtype except NoSuchNodeError: return SEQ_DTYPE @property def _continuous_dtype(self): try: return self.continuous.atom.dtype except NoSuchNodeError: return CONTINUOUS_DTYPE @property
[docs] def continuous(self): """Return the underlying continuous data in this supercontig. To read the whole dataset into memory as a `numpy.array`, use continuous.read() :returns: `tables.EArray` """ return self.h5group.continuous
@property
[docs] def attrs(self): """Return the attributes of this supercontig.""" return self.h5group._v_attrs
@property
[docs] def name(self): """Return the name of this supercontig.""" return self.h5group._v_name
@property
[docs] def seq(self): """See :attr:`Chromosome.seq`.""" return self.h5group.seq
@property
[docs] def start(self): """Return the index of the first base in this supercontig. The first base is index 0. """ return int(self.attrs.start)
@property
[docs] def end(self): """Return the index past the last base in this supercontig. This is the end in half-open coordinates, making slicing simpler: >>> supercontig.seq[supercontig.start:supercontig:end] """ return int(self.attrs.end)
class _ChromosomeSeqSlice(object): def __init__(self, chromosome): assert isinstance(chromosome, Chromosome) self._chromosome = chromosome def __getitem__(self, key): """Get the underlying sequence that corresponds to this index (range). Insert "N"s if the index range spans no/multiple supercontigs. """ supercontigs = self._chromosome.supercontigs[key] if len(supercontigs) == 0: warn("slice of chromosome sequence does not overlap any" " supercontig (filling with 'N')") elif len(supercontigs) > 1: warn("slice of chromosome sequence spans more than one supercontig" " (filling gaps with 'NaN')") # If index was specific, don't return an array key_int = False if isinstance(key, int): key_int = True start, end = _key_to_tuple(key) length = end - start dtype = self._chromosome._seq_dtype if length <= 0: # Handle degenerate case quickly return array([], dtype=dtype) seq = empty((length,), dtype=dtype) seq.fill(ord("N")) # Assumes dtype is numeric type for supercontig in supercontigs: chr_start = max(start, supercontig.start) chr_end = min(end, supercontig.end) dest_start = chr_start - start dest_end = chr_end - start sc_start = supercontig.project(chr_start) sc_end = supercontig.project(chr_end) seq[dest_start:dest_end] = supercontig.seq[sc_start:sc_end] if key_int: seq = seq[0] return seq class _Supercontigs(object): def __init__(self, chromosome): assert isinstance(chromosome, Chromosome) self._chromosome = chromosome def __getitem__(self, key): """Return list of supercontigs containing any of this index range""" start, end = _key_to_tuple(key) if start < self._chromosome.start: start = self._chromosome.start if end > self._chromosome.end: end = self._chromosome.end supercontigs = [] for supercontig in self._chromosome: if start < supercontig.end and end > supercontig.start: supercontigs.append(supercontig) if start >= supercontig.start and end <= supercontig.end: # Key entirely within one supercontig, so we're done break # XXX: would be nice if we could count on supercontig ordering return supercontigs def _key_to_tuple(key): """Key to (start, stop)""" if isinstance(key, int): start = key end = key + 1 elif isinstance(key, slice): if key.start is None or key.stop is None: raise NotImplementedError("Both start and end must be specified in" " chromosomal slices") elif key.step is not None and key.step != 0: raise NotImplementedError("Chromosome slicing does not support" " non-contiguous retrieval") else: start = key.start end = key.stop else: raise NotImplementedError("Unsupported index found: %s" % key) if start < 0 or end < 0: raise NotImplementedError("Negative (wrapping) indices not supported") elif start > end: raise IndexError("Start index can be at most the end index") return start, end def main(args=sys.argv[1:]): pass if __name__ == "__main__": sys.exit(main())