from flask import Flask, request, jsonify from flask_cors import CORS import anndata as ad import os app = Flask(__name__) CORS(app) # 存放所有 h5ad 文件的目录 DATA_DIR = os.path.join(os.path.dirname(__file__), "Data") # 缓存已加载的 AnnData 对象,避免重复加载 loaded_data = {} def load_adata(stage): if stage not in loaded_data: filepath = os.path.join(DATA_DIR, f"{stage}.h5ad") if not os.path.exists(filepath): return None loaded_data[stage] = ad.read_h5ad(filepath) return loaded_data[stage] @app.route("/api/genes") def list_genes(): stage = request.args.get("stage") adata = load_adata(stage) if not adata: return jsonify({"error": f"Stage '{stage}' not found"}), 404 return jsonify(list(adata.var_names)) @app.route("/api/gene_expression") def get_gene_expression(): gene = request.args.get("gene") stage = request.args.get("stage") adata = load_adata(stage) if not adata: return jsonify({"error": f"Stage '{stage}' not found"}), 404 if gene not in adata.var_names: return jsonify({"error": f"Gene '{gene}' not found in {stage}"}), 404 # 读取表达值 expr = adata[:, gene].X expr = expr.toarray().flatten() if hasattr(expr, "toarray") else expr.flatten() # 读取三维坐标 if "spatial" in adata.obsm: coords = adata.obsm["spatial"] elif all(k in adata.obs for k in ("x", "y", "z")): coords = adata.obs[["x", "y", "z"]].values else: return jsonify({"error": "No spatial coordinates found"}), 500 result = [ {"x": float(x), "y": float(y), "z": float(z), "value": float(v)} for (x, y, z), v in zip(coords, expr) ] return jsonify({"gene": gene, "expression": result}) @app.route("/api/cell") def get_cell_types(): stage = request.args.get("stage") adata = load_adata(stage) if not adata: return jsonify({"error": f"Stage '{stage}' not found"}), 404 # 查找细胞类型列,按常见的列名优先级查找 cell_type_columns = ['cell_type', 'celltype', 'cluster', 'annotation', 'cell_types', 'clusters'] cell_type_col = None for col in cell_type_columns: if col in adata.obs.columns: cell_type_col = col break if cell_type_col is None: return jsonify({"error": "No cell type information found"}), 404 # 读取细胞类型 cell_types = adata.obs[cell_type_col].values # 读取三维坐标 if "spatial" in adata.obsm: coords = adata.obsm["spatial"] elif all(k in adata.obs for k in ("x", "y", "z")): coords = adata.obs[["x", "y", "z"]].values else: return jsonify({"error": "No spatial coordinates found"}), 500 result = [ {"x": float(x), "y": float(y), "z": float(z), "value": str(ct)} for (x, y, z), ct in zip(coords, cell_types) ] return jsonify({"stage": stage, "cells": result}) @app.route("/api/gene_dist") def get_gene_distribution(): gene = request.args.get("gene") stage = request.args.get("stage") adata = load_adata(stage) if not adata: return jsonify({"error": f"Stage '{stage}' not found"}), 404 if gene not in adata.var_names: return jsonify({"error": f"Gene '{gene}' not found in {stage}"}), 404 # 查找细胞类型列,按常见的列名优先级查找 cell_type_columns = ['cell_type', 'celltype', 'cluster', 'annotation', 'cell_types', 'clusters'] cell_type_col = None for col in cell_type_columns: if col in adata.obs.columns: cell_type_col = col break if cell_type_col is None: return jsonify({"error": "No cell type information found"}), 404 # 读取基因表达值 expr = adata[:, gene].X expr = expr.toarray().flatten() if hasattr(expr, "toarray") else expr.flatten() # 读取细胞类型 cell_types = adata.obs[cell_type_col].values # 按细胞类型分组表达值 distribution = {} for cell_type, expression in zip(cell_types, expr): cell_type_str = str(cell_type) if cell_type_str not in distribution: distribution[cell_type_str] = [] distribution[cell_type_str].append(float(expression)) return jsonify({ "gene": gene, "stage": stage, "distribution": distribution }) @app.route("/api/gene_temporal_analysis") def get_gene_temporal_analysis(): gene = request.args.get("gene") if not gene: return jsonify({"error": "Gene parameter is required"}), 400 stages = ["CS7", "CS8", "CS9"] result_data = [] for stage in stages: adata = load_adata(stage) if not adata: return jsonify({"error": f"Stage '{stage}' not found"}), 404 if gene not in adata.var_names: # 如果某个阶段没有该基因,跳过这个阶段 continue # 查找细胞类型列 cell_type_columns = ['cell_type', 'celltype', 'cluster', 'annotation', 'cell_types', 'clusters'] cell_type_col = None for col in cell_type_columns: if col in adata.obs.columns: cell_type_col = col break if cell_type_col is None: return jsonify({"error": f"No cell type information found in stage {stage}"}), 404 # 读取基因表达值 expr = adata[:, gene].X expr = expr.toarray().flatten() if hasattr(expr, "toarray") else expr.flatten() # 读取细胞类型 cell_types = adata.obs[cell_type_col].values # 按细胞类型计算平均表达值和细胞数量 cell_type_stats = {} for cell_type, expression in zip(cell_types, expr): cell_type_str = str(cell_type) if cell_type_str not in cell_type_stats: cell_type_stats[cell_type_str] = { 'expressions': [], 'count': 0 } cell_type_stats[cell_type_str]['expressions'].append(float(expression)) cell_type_stats[cell_type_str]['count'] += 1 # 计算每个细胞类型的平均表达值 total_cells = len(cell_types) stage_data = { 'stage': stage, 'cell_types': {} } for cell_type, stats in cell_type_stats.items(): avg_expression = sum(stats['expressions']) / len(stats['expressions']) proportion = stats['count'] / total_cells * 100 # 转换为百分比 stage_data['cell_types'][cell_type] = { 'avg_expression': avg_expression, 'proportion': proportion, 'count': stats['count'] } result_data.append(stage_data) if not result_data: return jsonify({"error": f"Gene '{gene}' not found in any stage"}), 404 return jsonify({ "gene": gene, "stages_data": result_data }) if __name__ == "__main__": app.run(debug=True)