from flask import Flask, request, jsonify from flask_cors import CORS import anndata as ad import os app = Flask(__name__) CORS(app) # 存放所有 h5ad 文件的目录 DATA_DIR = os.path.join(os.path.dirname(__file__), "Data") # 缓存已加载的 AnnData 对象,避免重复加载 loaded_data = {} def load_adata(stage): if stage not in loaded_data: filepath = os.path.join(DATA_DIR, f"{stage}.h5ad") if not os.path.exists(filepath): return None loaded_data[stage] = ad.read_h5ad(filepath) return loaded_data[stage] @app.route("/api/genes") def list_genes(): stage = request.args.get("stage") adata = load_adata(stage) if not adata: return jsonify({"error": f"Stage '{stage}' not found"}), 404 return jsonify(list(adata.var_names)) @app.route("/api/gene_expression") def get_gene_expression(): gene = request.args.get("gene") stage = request.args.get("stage") adata = load_adata(stage) if not adata: return jsonify({"error": f"Stage '{stage}' not found"}), 404 if gene not in adata.var_names: return jsonify({"error": f"Gene '{gene}' not found in {stage}"}), 404 # 读取表达值 expr = adata[:, gene].X expr = expr.toarray().flatten() if hasattr(expr, "toarray") else expr.flatten() # 读取三维坐标 if "spatial" in adata.obsm: coords = adata.obsm["spatial"] elif all(k in adata.obs for k in ("x", "y", "z")): coords = adata.obs[["x", "y", "z"]].values else: return jsonify({"error": "No spatial coordinates found"}), 500 result = [ {"x": float(x), "y": float(y), "z": float(z), "value": float(v)} for (x, y, z), v in zip(coords, expr) ] return jsonify({"gene": gene, "expression": result}) @app.route("/api/cell") def get_cell_types(): stage = request.args.get("stage") adata = load_adata(stage) if not adata: return jsonify({"error": f"Stage '{stage}' not found"}), 404 # 查找细胞类型列,按常见的列名优先级查找 cell_type_columns = ['cell_type', 'celltype', 'cluster', 'annotation', 'cell_types', 'clusters'] cell_type_col = None for col in cell_type_columns: if col in adata.obs.columns: cell_type_col = col break if cell_type_col is None: return jsonify({"error": "No cell type information found"}), 404 # 读取细胞类型 cell_types = adata.obs[cell_type_col].values # 读取三维坐标 if "spatial" in adata.obsm: coords = adata.obsm["spatial"] elif all(k in adata.obs for k in ("x", "y", "z")): coords = adata.obs[["x", "y", "z"]].values else: return jsonify({"error": "No spatial coordinates found"}), 500 result = [ {"x": float(x), "y": float(y), "z": float(z), "value": str(ct)} for (x, y, z), ct in zip(coords, cell_types) ] return jsonify({"stage": stage, "cells": result}) @app.route("/api/gene_dist") def get_gene_distribution(): gene = request.args.get("gene") stage = request.args.get("stage") adata = load_adata(stage) if not adata: return jsonify({"error": f"Stage '{stage}' not found"}), 404 if gene not in adata.var_names: return jsonify({"error": f"Gene '{gene}' not found in {stage}"}), 404 # 查找细胞类型列,按常见的列名优先级查找 cell_type_columns = ['cell_type', 'celltype', 'cluster', 'annotation', 'cell_types', 'clusters'] cell_type_col = None for col in cell_type_columns: if col in adata.obs.columns: cell_type_col = col break if cell_type_col is None: return jsonify({"error": "No cell type information found"}), 404 # 读取基因表达值 expr = adata[:, gene].X expr = expr.toarray().flatten() if hasattr(expr, "toarray") else expr.flatten() # 读取细胞类型 cell_types = adata.obs[cell_type_col].values # 按细胞类型分组表达值 distribution = {} for cell_type, expression in zip(cell_types, expr): cell_type_str = str(cell_type) if cell_type_str not in distribution: distribution[cell_type_str] = [] distribution[cell_type_str].append(float(expression)) return jsonify({ "gene": gene, "stage": stage, "distribution": distribution }) if __name__ == "__main__": app.run(debug=True)