| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 | import coolerimport pandas as pdimport numpy as npimport argparsefrom concurrent.futures import ThreadPoolExecutor, as_completeddef calculate_valid_bins_percentage(file_path, threshold=1000, num_threads=1, chunk_size=100000, resolutions=None):    if file_path.endswith('.mcool'):        return calculate_mcool_valid_bins_percentage(file_path, threshold, num_threads, chunk_size, resolutions)    else:        return calculate_cool_valid_bins_percentage(file_path, threshold, chunk_size)def calculate_cool_valid_bins_percentage(file_path, threshold=1000, chunk_size=100000):    clr = cooler.Cooler(file_path)    bins = clr.bins()[:]    total_valid_bins = 0    total_bins = 0    for start in range(0, len(bins), chunk_size):        end = min(start + chunk_size, len(bins))        matrix = clr.matrix(balance=False)[start:end, start:end]        bin_sums = np.sum(matrix, axis=0) + np.sum(matrix, axis=1) - np.diag(matrix)        valid_bins = np.sum(bin_sums > threshold)        total_valid_bins += valid_bins        total_bins += bin_sums.size    valid_bin_percentage = (total_valid_bins / total_bins) * 100 if total_bins > 0 else 0    return {        'File': file_path,        'Valid Bin Percentage': valid_bin_percentage,        'Total Valid Bins': total_valid_bins,        'Total Bins': total_bins    }def calculate_mcool_valid_bins_percentage(file_path, threshold=1000, num_threads=1, chunk_size=100000, resolutions=None):    available_resolutions = cooler.fileops.list_coolers(file_path)    print(f"Available resolutions in {file_path}: {available_resolutions}")    if resolutions:        resolutions = [f'/resolutions/{res}' for res in resolutions.split(',') if f'/resolutions/{res}' in available_resolutions]        print(resolutions)        if not resolutions:            raise ValueError(f"No matching resolutions found in the .mcool file for specified resolutions.")    else:        resolutions = available_resolutions    results = []    with ThreadPoolExecutor(max_workers=num_threads) as executor:        future_to_resolution = {            executor.submit(process_resolution, file_path, resolution_path, threshold, chunk_size): resolution_path            for resolution_path in resolutions        }        for future in as_completed(future_to_resolution):            resolution_path = future_to_resolution[future]            try:                result = future.result()                if result is not None:                    results.append(result)            except Exception as e:                print(f"Error processing resolution {resolution_path}: {e}")    return {        'File': file_path,        'Results': results    }def process_resolution(file_path, resolution_path, threshold, chunk_size):    clr_res = cooler.Cooler(f"{file_path}::{resolution_path}")    bins = clr_res.bins()[:]    print(f"Processing resolution: {resolution_path}")    total_valid_bins = 0    total_bins = 0    for start in range(0, len(bins), chunk_size):        end = min(start + chunk_size, len(bins))        matrix = clr_res.matrix(balance=False)[start:end, start:end]        bin_sums = np.sum(matrix, axis=0) + np.sum(matrix, axis=1) - np.diag(matrix)        valid_bins = np.sum(bin_sums > threshold)        total_valid_bins += valid_bins        total_bins += bin_sums.size    valid_bin_percentage = (total_valid_bins / total_bins) * 100 if total_bins > 0 else 0    return {        'Resolution': resolution_path.split('/')[-1],        'Valid Bin Percentage': valid_bin_percentage,        'Total Valid Bins': total_valid_bins,        'Total Bins': total_bins    }def save_results_to_csv(results, output_path):    if 'Results' in results:        df = pd.DataFrame(results['Results'])        df.insert(0, 'File', results['File'])    else:        df = pd.DataFrame([results])    df.to_csv(output_path, index=False)    print(f"Results saved to {output_path}")def main():    parser = argparse.ArgumentParser(description="Calculate valid bins percentage from .cool or .mcool files.")    parser.add_argument("file_path", type=str, help="Path to the .cool or .mcool file")    parser.add_argument("--output", type=str, required=True, help="Path to save the results as CSV file")    parser.add_argument("--threads", type=int, default=1, help="Number of threads to use for processing")    parser.add_argument("--chunk_size", type=int, default=100000, help="Chunk size for processing large matrices")    parser.add_argument("--resolutions", type=str, help="Comma-separated list of resolutions to process in the .mcool file")    args = parser.parse_args()    try:        results = calculate_valid_bins_percentage(args.file_path, num_threads=args.threads, chunk_size=args.chunk_size, resolutions=args.resolutions)        save_results_to_csv(results, args.output)    except Exception as e:        print(f"Error processing file {args.file_path}: {e}")if __name__ == "__main__":    main()
 |