LongestTranscript.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import argparse
  2. import pandas as pd
  3. def parse_attributes(attributes_str):
  4. """ 解析 GTF 文件中的属性字段 """
  5. attributes = {}
  6. for attr in attributes_str.split(';'):
  7. attr = attr.strip()
  8. if attr:
  9. parts = attr.split(' ', 1)
  10. if len(parts) == 2:
  11. key, value = parts
  12. attributes[key] = value.strip('"')
  13. return attributes
  14. def read_gtf(gtf_file):
  15. """ 从 GTF 文件读取数据 """
  16. data = []
  17. with open(gtf_file, 'r') as file:
  18. for line in file:
  19. if line.startswith('#'):
  20. continue
  21. fields = line.strip().split('\t')
  22. attributes = parse_attributes(fields[8])
  23. data.append({
  24. 'chrom': fields[0],
  25. 'source': fields[1],
  26. 'feature': fields[2],
  27. 'start': int(fields[3]),
  28. 'end': int(fields[4]),
  29. 'score': fields[5],
  30. 'strand': fields[6],
  31. 'frame': fields[7],
  32. 'attributes': fields[8],
  33. 'transcript_id': attributes.get('transcript_id', ''),
  34. 'gene_id': attributes.get('gene_id', ''),
  35. 'line': line.strip() # 保存整行
  36. })
  37. return pd.DataFrame(data)
  38. def longest_transcripts(df):
  39. """ 找到每个基因的最长转录本 """
  40. return df[df['feature'] == 'transcript'].sort_values(['gene_id', 'end'], ascending=[True, False]).drop_duplicates('gene_id')
  41. def to_bed6(df):
  42. """ 将数据转换为 BED6 格式 """
  43. df['score'] = "."
  44. df['name'] = df['gene_id']
  45. return df[['chrom', 'start', 'end', 'name', 'score', 'strand']]
  46. def write_filtered_gtf(df, longest_transcripts, gtf_out_file):
  47. """ 将最长转录本的相关行和对应基因的行写入新的 GTF 文件 """
  48. longest_transcript_ids = set(longest_transcripts['transcript_id'])
  49. with open(gtf_out_file, 'w') as f:
  50. for _, row in df.iterrows():
  51. if row['feature'] == 'gene' or row['transcript_id'] in longest_transcript_ids:
  52. f.write(row['line'] + '\n')
  53. def main():
  54. parser = argparse.ArgumentParser(description="从 GTF 文件提取最长转录本并保存为 BED6 和 GTF 格式")
  55. parser.add_argument("gtf_file", help="输入的 GTF 文件路径")
  56. parser.add_argument("bed_file", help="输出的 BED6 格式文件路径")
  57. parser.add_argument("gtf_out_file", help="输出的 GTF 格式文件路径")
  58. args = parser.parse_args()
  59. df = read_gtf(args.gtf_file)
  60. longest_df = longest_transcripts(df)
  61. bed6_df = to_bed6(longest_df)
  62. bed6_df.to_csv(args.bed_file, sep='\t', index=False, header=False)
  63. write_filtered_gtf(df, longest_df, args.gtf_out_file)
  64. if __name__ == "__main__":
  65. main()