#!/usr/bin/env python3 import pysam import argparse import os import glob def read_list_from_file(filename): '''Read a list of chromosome names from a file.''' with open(filename, 'r') as file: return [line.strip() for line in file] def clean_filename(filename): '''Remove various BAM-related extensions from a filename.''' for ext in [".dup.bam", ".sort.bam", ".sorted.bam", ".bam"]: if filename.endswith(ext): return filename[:-len(ext)] return filename def bam_stats(bam_file, sample_name, quality_threshold=None, blackchr1_chromosomes_file=None, blackchr2_file=None, filter_output=None): '''Calculate statistics from a BAM file and optionally filter.''' samfile = pysam.AlignmentFile(bam_file, "rb") blackchr1_chromosomes = read_list_from_file(blackchr1_chromosomes_file) if blackchr1_chromosomes_file else [] blackchr2_chromosomes = read_list_from_file(blackchr2_file) if blackchr2_file else [] total_reads = 0 mapped_reads = 0 multi_mapped_reads = 0 duplicates = 0 low_quality = 0 blackchr1_mapped_reads = 0 blackchr2_mapped_reads = 0 passed_filter_reads = 0 output_file = None if filter_output: output_file = pysam.AlignmentFile(filter_output, "wb", header=samfile.header) for read in samfile: total_reads += 1 is_filter_passed = True if read.is_duplicate: duplicates += 1 is_filter_passed = False if quality_threshold and read.mapping_quality < quality_threshold: low_quality += 1 is_filter_passed = False if not read.is_unmapped: mapped_reads += 1 if read.has_tag("XS"): multi_mapped_reads += 1 is_filter_passed = False if read.reference_name in blackchr1_chromosomes: blackchr1_mapped_reads += 1 is_filter_passed = False if read.reference_name in blackchr2_chromosomes: blackchr2_mapped_reads += 1 is_filter_passed = False if is_filter_passed: passed_filter_reads += 1 if output_file: output_file.write(read) samfile.close() if output_file: output_file.close() return { "sample_name": sample_name, "total_reads": total_reads, "mapped_reads": mapped_reads, "multi_mapped_reads": multi_mapped_reads, "duplicates": duplicates, "low_quality": low_quality, "blackchr1_mapped_reads": blackchr1_mapped_reads, "blackchr2_mapped_reads": blackchr2_mapped_reads, "passed_filter_reads": passed_filter_reads } def summary_statistics(files): '''Summarize statistics from multiple BAM statistics files, including percentages.''' summary_data = {} for file in files: sample_name = os.path.basename(file).split('.')[0] with open(file, 'r') as f: for line in f: if line.startswith("Statistics"): continue parts = line.strip().split("\t") category = parts[0] value = parts[1] if category not in summary_data: summary_data[category] = [] summary_data[category].append(value) headers = ["Category"] + [os.path.basename(file).split('.')[0] for file in files] print("\t".join(headers)) for category, values in summary_data.items(): print(f"{category}\t" + "\t".join(values)) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Stat BAM file.") parser.add_argument("-a", "--action", choices=['qc', 'summary'], required=True, help="Action to perform: 'qc' for quality control, 'summary' for summarizing multiple samples.") parser.add_argument("files", nargs='+', help="Path to the BAM file or statistics files.") parser.add_argument("--sample_name", help="Sample name for the output statistics. If not provided, it's inferred from the BAM filename.") parser.add_argument("-q", "--quality", type=int, help="Quality threshold for low-quality mappings.") parser.add_argument("--blackchr1", help="Path to the blackchr1 chromosomes file.") parser.add_argument("--blackchr2", help="Path to the blackchr2 chromosomes file.") parser.add_argument("-f", "--filter", help="Path to output filtered BAM file.") args = parser.parse_args() if args.action == 'qc': for bam_file in args.files: sample_name = args.sample_name if args.sample_name else clean_filename(os.path.basename(bam_file)) result = bam_stats(bam_file, sample_name, args.quality, args.blackchr1, args.blackchr2, args.filter) headers = ["Statistics", result["sample_name"]] print("\t".join(headers)) categories = [ "total_reads", "mapped_reads", "multi_mapped_reads", "duplicates", "low_quality", "blackchr1_mapped_reads", "blackchr2_mapped_reads", "passed_filter_reads" ] for category in categories: count = result[category] percentage = (count / result["total_reads"]) * 100 if result["total_reads"] else 0 print(f"{category.replace('_', ' ').capitalize()}\t{count} ({percentage:.2f}%)") elif args.action == 'summary': summary_statistics(args.files)