digital-embryo/embryo-backend/app.py
2025-07-27 19:18:31 +08:00

221 lines
6.9 KiB
Python

from flask import Flask, request, jsonify
from flask_cors import CORS
import anndata as ad
import os
app = Flask(__name__)
CORS(app)
# 存放所有 h5ad 文件的目录
DATA_DIR = os.path.join(os.path.dirname(__file__), "Data")
# 缓存已加载的 AnnData 对象,避免重复加载
loaded_data = {}
def load_adata(stage):
if stage not in loaded_data:
filepath = os.path.join(DATA_DIR, f"{stage}.h5ad")
if not os.path.exists(filepath):
return None
loaded_data[stage] = ad.read_h5ad(filepath)
return loaded_data[stage]
@app.route("/api/genes")
def list_genes():
stage = request.args.get("stage")
adata = load_adata(stage)
if not adata:
return jsonify({"error": f"Stage '{stage}' not found"}), 404
return jsonify(list(adata.var_names))
@app.route("/api/gene_expression")
def get_gene_expression():
gene = request.args.get("gene")
stage = request.args.get("stage")
adata = load_adata(stage)
if not adata:
return jsonify({"error": f"Stage '{stage}' not found"}), 404
if gene not in adata.var_names:
return jsonify({"error": f"Gene '{gene}' not found in {stage}"}), 404
# 读取表达值
expr = adata[:, gene].X
expr = expr.toarray().flatten() if hasattr(expr, "toarray") else expr.flatten()
# 读取三维坐标
if "spatial" in adata.obsm:
coords = adata.obsm["spatial"]
elif all(k in adata.obs for k in ("x", "y", "z")):
coords = adata.obs[["x", "y", "z"]].values
else:
return jsonify({"error": "No spatial coordinates found"}), 500
result = [
{"x": float(x), "y": float(y), "z": float(z), "value": float(v)}
for (x, y, z), v in zip(coords, expr)
]
return jsonify({"gene": gene, "expression": result})
@app.route("/api/cell")
def get_cell_types():
stage = request.args.get("stage")
adata = load_adata(stage)
if not adata:
return jsonify({"error": f"Stage '{stage}' not found"}), 404
# 查找细胞类型列,按常见的列名优先级查找
cell_type_columns = ['cell_type', 'celltype', 'cluster', 'annotation', 'cell_types', 'clusters']
cell_type_col = None
for col in cell_type_columns:
if col in adata.obs.columns:
cell_type_col = col
break
if cell_type_col is None:
return jsonify({"error": "No cell type information found"}), 404
# 读取细胞类型
cell_types = adata.obs[cell_type_col].values
# 读取三维坐标
if "spatial" in adata.obsm:
coords = adata.obsm["spatial"]
elif all(k in adata.obs for k in ("x", "y", "z")):
coords = adata.obs[["x", "y", "z"]].values
else:
return jsonify({"error": "No spatial coordinates found"}), 500
result = [
{"x": float(x), "y": float(y), "z": float(z), "value": str(ct)}
for (x, y, z), ct in zip(coords, cell_types)
]
return jsonify({"stage": stage, "cells": result})
@app.route("/api/gene_dist")
def get_gene_distribution():
gene = request.args.get("gene")
stage = request.args.get("stage")
adata = load_adata(stage)
if not adata:
return jsonify({"error": f"Stage '{stage}' not found"}), 404
if gene not in adata.var_names:
return jsonify({"error": f"Gene '{gene}' not found in {stage}"}), 404
# 查找细胞类型列,按常见的列名优先级查找
cell_type_columns = ['cell_type', 'celltype', 'cluster', 'annotation', 'cell_types', 'clusters']
cell_type_col = None
for col in cell_type_columns:
if col in adata.obs.columns:
cell_type_col = col
break
if cell_type_col is None:
return jsonify({"error": "No cell type information found"}), 404
# 读取基因表达值
expr = adata[:, gene].X
expr = expr.toarray().flatten() if hasattr(expr, "toarray") else expr.flatten()
# 读取细胞类型
cell_types = adata.obs[cell_type_col].values
# 按细胞类型分组表达值
distribution = {}
for cell_type, expression in zip(cell_types, expr):
cell_type_str = str(cell_type)
if cell_type_str not in distribution:
distribution[cell_type_str] = []
distribution[cell_type_str].append(float(expression))
return jsonify({
"gene": gene,
"stage": stage,
"distribution": distribution
})
@app.route("/api/gene_temporal_analysis")
def get_gene_temporal_analysis():
gene = request.args.get("gene")
if not gene:
return jsonify({"error": "Gene parameter is required"}), 400
stages = ["CS7", "CS8", "CS9"]
result_data = []
for stage in stages:
adata = load_adata(stage)
if not adata:
return jsonify({"error": f"Stage '{stage}' not found"}), 404
if gene not in adata.var_names:
# 如果某个阶段没有该基因,跳过这个阶段
continue
# 查找细胞类型列
cell_type_columns = ['cell_type', 'celltype', 'cluster', 'annotation', 'cell_types', 'clusters']
cell_type_col = None
for col in cell_type_columns:
if col in adata.obs.columns:
cell_type_col = col
break
if cell_type_col is None:
return jsonify({"error": f"No cell type information found in stage {stage}"}), 404
# 读取基因表达值
expr = adata[:, gene].X
expr = expr.toarray().flatten() if hasattr(expr, "toarray") else expr.flatten()
# 读取细胞类型
cell_types = adata.obs[cell_type_col].values
# 按细胞类型计算平均表达值和细胞数量
cell_type_stats = {}
for cell_type, expression in zip(cell_types, expr):
cell_type_str = str(cell_type)
if cell_type_str not in cell_type_stats:
cell_type_stats[cell_type_str] = {
'expressions': [],
'count': 0
}
cell_type_stats[cell_type_str]['expressions'].append(float(expression))
cell_type_stats[cell_type_str]['count'] += 1
# 计算每个细胞类型的平均表达值
total_cells = len(cell_types)
stage_data = {
'stage': stage,
'cell_types': {}
}
for cell_type, stats in cell_type_stats.items():
avg_expression = sum(stats['expressions']) / len(stats['expressions'])
proportion = stats['count'] / total_cells * 100 # 转换为百分比
stage_data['cell_types'][cell_type] = {
'avg_expression': avg_expression,
'proportion': proportion,
'count': stats['count']
}
result_data.append(stage_data)
if not result_data:
return jsonify({"error": f"Gene '{gene}' not found in any stage"}), 404
return jsonify({
"gene": gene,
"stages_data": result_data
})
if __name__ == "__main__":
app.run(debug=True)