Source code for

# -*- coding: utf-8 -*-

"""Contains class and functions related to read definition and use"""
from operator import attrgetter

[docs]class Read: """Define a read. :param header: header of the read :param sequence: sequence of the read :type header: str :type sequence: str """ def __init__(self, header, sequence): self.header = header self.sequence = sequence self.matches = [] self.breakpoints = [] self.all_strand = [] self.all_ref = [] self.bp_blasted = [] # Breakpoints that have Blast hit # Equality between two Read def __eq__(self, other): if isinstance(self, other.__class__): return self.__dict__ == other.__dict__ return False # self representation for print def __repr__(self): return repr((self.header, self.sequence, self.matches, self.breakpoints, self.get_strand(), self.get_ref(), self.bp_blasted))
[docs] def get_strand(self): """ Compute the strand of this Read (-1=nothing / 0=normal / 1=revcomp / 2=normal AND revcomp) """ # Nothing strand = -1 # If it has matches if self.all_strand: # Get all uniq strand all_strand_uniq = set(self.all_strand) # A single strand if len(all_strand_uniq) == 1: strand = self.all_strand[0] # Two strand else: strand = 2 return strand
[docs] def get_ref(self): """ Compute the ref of this Read (0=nothing / 1=ref1 / 2=ref2 / 3=ref1 AND ref2) """ # Nothing ref = 0 # If it has matches if self.all_ref: # Get all uniq ref all_ref_uniq = set(self.all_ref) # A single ref if len(all_ref_uniq) == 1: ref = self.all_ref[0] # Two ref else: ref = 3 return ref
[docs] def output(self, sep): """ Proper output of a line of the result file :param sep: Separator to use in CSV :type sep: list(char) """ # Output the first info ret = self.header + sep[2] + str(self.get_strand()) + sep[2] + \ str(len(self.breakpoints)) + sep[2] # Output all breakpoints if len(self.breakpoints) > 0: ret += sep[1].join([x.output(sep) for x in self.breakpoints]) # No breakpoint else: ret += "None" # Tab before next column ret += sep[2] # Sort matches by position on the read matches = sorted(self.matches, key=attrgetter('beg_pos_read')) read_info = [] ref_info = [] sizes = [] inserts = [] # For each matches in position order for i in matches: read_info.append(i.output_read(sep[0])) ref_info.append(i.output_ref(sep[0])) sizes.append(i.size) inserts.append(sep[0].join(map(str, i.inserts))) # Clean output for matches read ret += sep[1].join(read_info) + sep[2] # Clean output for matches ref ret += sep[1].join(ref_info) + sep[2] # Clean output for sizes ret += sep[1].join(map(str, sizes)) + sep[2] # Clean output for insertions ret += sep[1].join(map(str, inserts)) + sep[2] # Output ids of all breakpoints that get successfully Blasted if len(self.bp_blasted) > 0: ret += sep[1].join(map(str, self.bp_blasted)) # Nothing blasted else: ret += "None" # New line ret += "\n" return ret
[docs] def add_a_match(self, match): """ Test if this match should be added or not. It must be added if it is not a subpart of an already added other match. In some case, some already added matches are subparts of the match to add. If so, they are removed. :param match: the match to add :type match: :py:class:`Match` """ # Is this match a subpart of a match already added? to_add = True for i in self.matches: if match.is_include_in(i): to_add = False break # If we need to add this match if to_add: # Matches to be removed matches_to_remove = [] # Is some matches included in the tested one? for i in self.matches: if i.is_include_in(match): # Add it to be removed matches_to_remove.append(i) # Remove them for i in matches_to_remove: # Remove it self.matches.remove(i) # Remove the corresponding ref/strand self.all_ref.remove(i.ref) self.all_strand.remove(i.strand) # Add the match self.matches.append(match) # Add ref/strand self.all_ref.append(match.ref) self.all_strand.append(match.strand)
[docs] def get_matches(self, hits, gap, k, strand, ref): """ Populate matches list from all hits, for one strand :param hits: matching position on read and ref :param gap: maximum authorized gap size for continuous hits :param k: k-mer size :param strand: the strand of this hit :param ref: the reference index of this hit :type hits: dict :type gap: int :type k: int :type strand: int :type ref: int """ # If there is some hits if hits: insert = [] beg_pos_read = sorted(hits.keys())[0] beg_pos_ref = hits[beg_pos_read] prev_pos_read = beg_pos_read prev_pos_ref = beg_pos_ref for i in sorted(hits.keys()): # Is the read or ref NOT continuous (modulo gap)? if i >= prev_pos_read+gap+k+1 or \ hits[i] >= prev_pos_ref+gap+k+1 or \ hits[i] < prev_pos_ref: # Add the previous good and complete match size = prev_pos_read - beg_pos_read + k match = Match(beg_pos_read, beg_pos_ref, strand, ref, size, insert, len(self.sequence)) # Add the match self.add_a_match(match) # Reinit insertions insert = [] # New match beg_pos_read = i beg_pos_ref = hits[i] # Not completely contiguous else: # Compute the size of insertion insert_tmp = max(i - (prev_pos_read + k), hits[i] - (prev_pos_ref + k)) if insert_tmp > 0: # Add this insert for this hit insert.append(insert_tmp) # Set both prev_pos to current_pos prev_pos_read = i prev_pos_ref = hits[i] # Add the last one size = prev_pos_read - beg_pos_read + k match = Match(beg_pos_read, beg_pos_ref, strand, ref, size, insert, len(self.sequence)) # Add the match self.add_a_match(match)
[docs] def get_breakpoints(self): """ Populate breakpoints list using all hits, for both strands """ # Sort matches by position on the read matches = sorted(self.matches, key=attrgetter('beg_pos_read')) # If there is more than one match on this read if len(matches) > 1: # breakpoints are everything between two matches for i in range(1, len(matches)): # Starting pos of bp is the ending pos of previous match # Warning, end_pos_read can be greater than beg_pos of # current match (overlapping reads). Breakpoint is then # of negative size. #start_bp = min(matches[i-1].end_pos_read, # matches[i].beg_pos_read) start_bp = matches[i-1].end_pos_read # Size of bp is position of current match minus # ending pos of previous match # Warning, it can be negative (overlapping reads) size_bp = matches[i].beg_pos_read - matches[i-1].end_pos_read # Create the breakpoint break_point = Breakpoint(start_bp, size_bp) # Add it to breakpoint list for this read if break_point not in self.breakpoints: self.breakpoints.append(break_point)
[docs]class Match: """Define a match. :param beg_pos_read: starting position in the read of this match :param beg_pos_ref: starting position in the ref of this match :param strand: strand of this match :param ref: the ref index for this match :param size: size of the match :param inserts: size of potential insertions (possible to have several insertions in ONE match) :param seq_l: size of the read (needed for rev comp computation) :type beg_pos_read: int :type beg_pos_ref: int :type strand: int :type ref: int :type size: int :type inserts: list(int) :type seq_l: int """ def __init__(self, beg_pos_read, beg_pos_ref, strand, ref, size, inserts, \ seq_l): self.ref = ref self.strand = strand # 0 -> normal / 1 -> rc # Compute ending position on the read (not that useful anymore) # Normal strand, direct if self.strand == 0: self.beg_pos_read = beg_pos_read self.end_pos_read = beg_pos_read + size # Reverse complement: use total length of the read else: self.beg_pos_read = seq_l - size - beg_pos_read self.end_pos_read = seq_l - beg_pos_read # Positions on the ref self.beg_pos_ref = beg_pos_ref self.end_pos_ref = beg_pos_ref + size # Size of the match self.size = size # Potential insertions if not inserts: # No insertion self.inserts = [0] else: self.inserts = inserts # self representation for print def __repr__(self): return repr((self.strand, self.beg_pos_read, self.end_pos_read, self.beg_pos_ref, self.end_pos_ref, self.size, self.inserts, self.ref)) # Equality between two Matches def __eq__(self, other): if isinstance(self, other.__class__): return self.__dict__ == other.__dict__ return False
[docs] def is_include_in(self, other): """ Check if this match is included in another match :param other: the match to compare with :type other: :py:class:`Match` """ ret = False if self.beg_pos_read >= other.beg_pos_read and\ self.end_pos_read <= other.end_pos_read: ret = True return ret
[docs] def output_read(self, sep): """ Correct output of read infos :param sep: Separator to use in CSV :type sep: list(char) """ # Reverse complement if self.strand: # Add ref ret = "(-)" else: ret = "(+)" # Match info ret += "({}){}{}{}".format(self.ref, self.beg_pos_read, sep[0], self.size) return ret
[docs] def output_ref(self, sep): """ Correct output of ref infos :param sep: Separator to use in CSV :type sep: list(char) """ return "({}){}{}{}".format(self.ref, self.beg_pos_ref, sep[0], self.size)
[docs]class Breakpoint: """Define a breakpoint. :param beg_pos_read: starting position in the read of this match :param size: size of the match :type beg_pos_read: int :type beg_pos_ref: int :type size: int """ def __init__(self, beg_pos_read, size): self.beg_pos_read = beg_pos_read # Size of the match self.size = size # self representation for print def __repr__(self): return repr((self.beg_pos_read, self.size)) # Equality between two Matches def __eq__(self, other): if isinstance(self, other.__class__): return self.__dict__ == other.__dict__ return False
[docs] def output(self, sep): """ Proper output of a line in the result file :param sep: Separator to use in CSV :type sep: list(char) """ return str(self.beg_pos_read) + sep[0] + str(self.size)