
在使用snakemake管理工作流时,尤其是在slurm等高性能计算集群上运行时,用户可能会遇到一个常见问题:当规则内部执行python脚本或包含print()语句时,其输出不会像执行普通shell命令(如star)那样实时显示在slurm的输出文件中,而是在脚本完成或失败后才一次性输出。
这种现象的根本原因在于Python的标准输出(stdout)默认是带缓冲的。这意味着Python程序在执行过程中产生的输出并不会立即发送到操作系统,而是先存储在一个内部缓冲区中,直到缓冲区满、程序结束、遇到换行符(在某些情况下)或者被明确刷新时,才会被写入到实际的输出流(如文件或终端)。在Slurm环境中,当Snakemake将Python脚本的输出重定向到Slurm的作业输出文件时,这种缓冲机制会导致输出延迟。
解决方案:强制刷新标准输出
要解决这个问题,最直接的方法是在Python代码中显式地强制刷新标准输出缓冲区。这可以通过导入sys模块并调用sys.stdout.flush()来实现。例如,在你的print()语句之后立即调用flush():
import sys
print("========RUNNING JOB SPLADDER=========")
sys.stdout.flush() # 立即刷新输出
print("
")
sys.stdout.flush() # 立即刷新输出
print(input.genomes)
sys.stdout.flush() # 立即刷新输出
# ... 其他代码通过这种方式,每次print()调用后,其内容都会被立即写入到Slurm的输出文件中,从而实现实时输出。
除了实时输出问题,原始的Snakemake规则设计也存在一些可以优化的地方,这些优化将使工作流更具鲁棒性、可扩展性和可维护性。
Snakemake的核心思想是基于通配符(wildcards)来泛化规则,使其能够处理单个样本或一个最小单元的数据,而不是在单个规则内部循环处理所有样本。原始规则在一个spladder规则中遍历所有基因组,这与Snakemake的设计哲学相悖。
问题:
最佳实践: 将规则设计为处理单个通配符(例如{genome})对应的输出。Snakemake会根据顶层all规则或下游规则的需求,自动为每个通配符实例生成并调度独立的作业。
Snakemake要求规则必须产生其output声明中列出的所有文件。如果一个规则在特定条件下未能产生所有声明的输出文件,Snakemake会认为该规则执行失败,并可能删除已生成的部分输出。
问题: 原始规则中,如果某个genome_id没有对应的rsa_ids,那么spladder build命令将不会被执行,从而导致该基因组对应的输出文件(merge_graphs_mutex_exons_C3.pickle)不会被创建。这会引发Snakemake错误,并可能导致其他基因组的输出也被删除。
最佳实践: 在工作流的顶层(通常是all规则或数据预处理阶段),预先筛选出所有有效的数据组合,确保每个Snakemake规则实例都有能力且必须产生其所有声明的输出。
Snakemake允许使用Python函数来动态地生成规则的input和params,这对于根据通配符值查找相关数据非常有用。
input函数: 可以定义一个函数,接收wildcards作为参数,返回规则所需的输入文件字典。这使得输入文件的查找逻辑与规则本身分离,提高了可读性和模块化。
params指令: 用于传递额外的参数给shell指令。它可以是一个字典,也可以是一个lambda函数,根据wildcards或input动态生成参数值。这使得shell命令保持简洁,将复杂的逻辑移到Python代码中。
expand函数是Snakemake提供的一个强大工具,用于根据多个通配符组合生成文件路径列表。它比手动编写列表推导式更简洁、更安全,并且能更好地与Snakemake的通配符机制集成。
示例:expand("data/spladder/{genome}/merge_graphs_mutex_exons_C3.pickle", genome=valid_genome_ids)
尽可能使用shell指令来执行外部命令。它比run指令更简洁,并且Snakemake能够更好地管理其执行环境和错误捕获。当命令变得复杂时,可以通过多行字符串和参数格式化来保持可读性。
基于上述最佳实践,以下是重构后的Snakemake工作流示例:
import re
from pathlib import Path
import pandas as pd # 假设accessions是一个pandas DataFrame
# 示例数据(请根据实际情况替换或加载)
# accessions = pd.DataFrame({
# 'genome_id': ['genomeA', 'genomeB', 'genomeA', 'genomeC'],
# 'rsa_id_col': ['rsa1', 'rsa2', 'rsa3', 'rsa4']
# }, index=['rsa1', 'rsa2', 'rsa3', 'rsa4'])
# 假设accessions DataFrame已经加载
# 模拟一个accessions DataFrame,实际使用时应从文件加载
accessions = pd.DataFrame({
'genome_id': ['genome1', 'genome2', 'genome1', 'genome3'],
'some_other_col': ['val1', 'val2', 'val3', 'val4']
}, index=['rsa_id_A', 'rsa_id_B', 'rsa_id_C', 'rsa_id_D'])
rule all:
"""
顶层规则,定义最终需要生成的所有输出文件。
在这里预先筛选出所有有效的基因组ID,确保每个请求的输出都有对应的输入。
"""
input:
expand(
"data/spladder/{genome}/merge_graphs_mutex_exons_C3.pickle",
genome=[
genome_id
for genome_id in accessions['genome_id'].unique()
if len(accessions[accessions['genome_id'] == genome_id]) > 0
]
)
def spladder_input(wildcards):
"""
根据通配符 {genome} 动态查找并返回spladder规则所需的输入文件。
"""
# 过滤出当前基因组ID对应的所有rsa_ids
filtered_accessions = accessions[accessions['genome_id'] == wildcards.genome]
rsa_ids = filtered_accessions.index.values # 获取索引作为rsa_id
return {
'genome_annotation': f"../ressources/genomes/{wildcards.genome}/genomic.gtf",
'bams': expand("data/alignments/{rsa}/{rsa}_Aligned.sortedByCoord.out.bam", rsa=rsa_ids),
}
rule spladder:
"""
Spladder处理规则,针对单个基因组 {genome} 进行操作。
"""
input:
unpack(spladder_input) # 使用unpack函数将spladder_input返回的字典解包为规则的输入
output:
"data/spladder/{genome}/merge_graphs_mutex_exons_C3.pickle"
threads: 20 # 根据集群资源和程序需求调整线程数
resources:
mem_mb=1024 * 20, # 20GB内存
runtime=60 * 8 # 8小时运行时长
params:
# 使用lambda函数动态生成bams参数字符串和输出目录
bams_str=lambda wildcards, input: ','.join(input.bams),
outdir=lambda wildcards, output: Path(output).parent
shell:
"""
mkdir -p {params.outdir} && \
spladder build \
--set-mm-tag nM \
--bams {params.bams_str} \
--annotation {input.genome_annotation} \
--outdir {params.outdir} \
--parallel {threads}
"""通过遵循这些最佳实践,不仅可以解决Slurm环境下Python输出的实时性问题,还能显著提升Snakemake工作流的性能、健壮性和可维护性。
以上就是Snakemake在Slurm环境下实时输出与规则优化:深度教程的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号