123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- #!/usr/bin/env python3
- import pysam
- import argparse
- import os
- import glob
- def read_list_from_file(filename):
- '''Read a list of chromosome names from a file.'''
- with open(filename, 'r') as file:
- return [line.strip() for line in file]
- def clean_filename(filename):
- '''Remove various BAM-related extensions from a filename.'''
- for ext in [".dup.bam", ".sort.bam", ".sorted.bam", ".bam"]:
- if filename.endswith(ext):
- return filename[:-len(ext)]
- return filename
- def bam_stats(bam_file, sample_name, quality_threshold=None, blackchr1_chromosomes_file=None, blackchr2_file=None, filter_output=None):
- '''Calculate statistics from a BAM file and optionally filter.'''
- samfile = pysam.AlignmentFile(bam_file, "rb")
-
- blackchr1_chromosomes = read_list_from_file(blackchr1_chromosomes_file) if blackchr1_chromosomes_file else []
- blackchr2_chromosomes = read_list_from_file(blackchr2_file) if blackchr2_file else []
- total_reads = 0
- mapped_reads = 0
- multi_mapped_reads = 0
- duplicates = 0
- low_quality = 0
- blackchr1_mapped_reads = 0
- blackchr2_mapped_reads = 0
- passed_filter_reads = 0
- output_file = None
- if filter_output:
- output_file = pysam.AlignmentFile(filter_output, "wb", header=samfile.header)
- for read in samfile:
- total_reads += 1
- is_filter_passed = True
- if read.is_duplicate:
- duplicates += 1
- is_filter_passed = False
- if quality_threshold and read.mapping_quality < quality_threshold:
- low_quality += 1
- is_filter_passed = False
- if not read.is_unmapped:
- mapped_reads += 1
- if read.has_tag("XS"):
- multi_mapped_reads += 1
- is_filter_passed = False
- if read.reference_name in blackchr1_chromosomes:
- blackchr1_mapped_reads += 1
- is_filter_passed = False
- if read.reference_name in blackchr2_chromosomes:
- blackchr2_mapped_reads += 1
- is_filter_passed = False
- if is_filter_passed:
- passed_filter_reads += 1
- if output_file:
- output_file.write(read)
- samfile.close()
- if output_file:
- output_file.close()
- return {
- "sample_name": sample_name,
- "total_reads": total_reads,
- "mapped_reads": mapped_reads,
- "multi_mapped_reads": multi_mapped_reads,
- "duplicates": duplicates,
- "low_quality": low_quality,
- "blackchr1_mapped_reads": blackchr1_mapped_reads,
- "blackchr2_mapped_reads": blackchr2_mapped_reads,
- "passed_filter_reads": passed_filter_reads
- }
- def summary_statistics(files):
- '''Summarize statistics from multiple BAM statistics files, including percentages.'''
- summary_data = {}
- for file in files:
- sample_name = os.path.basename(file).split('.')[0]
- with open(file, 'r') as f:
- for line in f:
- if line.startswith("Statistics"):
- continue
- parts = line.strip().split("\t")
- category = parts[0]
- value = parts[1]
- if category not in summary_data:
- summary_data[category] = []
- summary_data[category].append(value)
- headers = ["Category"] + [os.path.basename(file).split('.')[0] for file in files]
- print("\t".join(headers))
- for category, values in summary_data.items():
- print(f"{category}\t" + "\t".join(values))
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="Stat BAM file.")
- parser.add_argument("-a", "--action", choices=['qc', 'summary'], required=True, help="Action to perform: 'qc' for quality control, 'summary' for summarizing multiple samples.")
- parser.add_argument("files", nargs='+', help="Path to the BAM file or statistics files.")
- parser.add_argument("--sample_name", help="Sample name for the output statistics. If not provided, it's inferred from the BAM filename.")
- parser.add_argument("-q", "--quality", type=int, help="Quality threshold for low-quality mappings.")
- parser.add_argument("--blackchr1", help="Path to the blackchr1 chromosomes file.")
- parser.add_argument("--blackchr2", help="Path to the blackchr2 chromosomes file.")
- parser.add_argument("-f", "--filter", help="Path to output filtered BAM file.")
- args = parser.parse_args()
- if args.action == 'qc':
- for bam_file in args.files:
- sample_name = args.sample_name if args.sample_name else clean_filename(os.path.basename(bam_file))
- result = bam_stats(bam_file, sample_name, args.quality, args.blackchr1, args.blackchr2, args.filter)
- headers = ["Statistics", result["sample_name"]]
- print("\t".join(headers))
- categories = [
- "total_reads",
- "mapped_reads",
- "multi_mapped_reads",
- "duplicates",
- "low_quality",
- "blackchr1_mapped_reads",
- "blackchr2_mapped_reads",
- "passed_filter_reads"
- ]
- for category in categories:
- count = result[category]
- percentage = (count / result["total_reads"]) * 100 if result["total_reads"] else 0
- print(f"{category.replace('_', ' ').capitalize()}\t{count} ({percentage:.2f}%)")
- elif args.action == 'summary':
- summary_statistics(args.files)
|