J A B B Y A I

Loading

Below code was utilized create unform graph vectors based on nodes and edges of a medical graph dictionary with 500 nodes (body parts, cellular structure, diseases, medical treatment, symptoms), hierarchical order (parent, child) and medical relationship edges (treated_with, contains, experiences….)

492 bit in vector size it was combined with 384 miniLLM vectors for MLM and CLM training that resulted in 0.2 loss and 1 perplexity based on only 500 Pubmed sample data. Both models also had around <9 perplexity and 85% token match success ratio for validation test. I am looking AI experts to collaborate nd can share more of my code and output results with interested parties. Sky is the limit with the right resources

import os

import json

import logging

from typing import List, Dict, Any

from collections import Counter

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(__name__)

class StandardizedMedicalVectorSystem:

def __init__(self, embedding_dir=’vector_embeddings’):

self.entity_types = {

“Body Part”: 101,

“Cellular Structure”: 201,

“Disease”: 301,

“Medical Treatment”: 401,

“Symptom”: 501

}

self.relationship_types = {

“HAS_SUBPART”: 1000,

“CONTAINS”: 2000,

“AFFECTED_BY”: 3000,

“TREATED_WITH”: 4000,

“EXPERIENCES”: 5000,

“SYMPTOM_TREATMENT”: 6000,

“DISEASE_TREATMENT”: 7000

}

self.embedding_dir = embedding_dir

os.makedirs(embedding_dir, exist_ok=True)

self.load_graph()

def load_graph(self):

“””Load and initialize graph data”””

try:

with open(“graph_digital_map.json”, “r”, encoding=”utf-8″) as f:

self.graph_data = json.load(f)

self.node_labels = {

node[“id”]: node[“label”]

for node in self.graph_data[“body_parts”][“nodes”]

}

self.node_names = {

node[“name”].lower(): node[“id”]

for node in self.graph_data[“body_parts”][“nodes”]

}

self.edges = self.graph_data[“body_parts”][“edges”]

except Exception as e:

logger.error(f”Error loading graph: {e}”)

raise

def pad_vector(self, vector: List[int], size: int = 6) -> List[int]:

return vector + [0] * (size – len(vector)) if len(vector) < size else vector[:size]

def create_zero_vector(self, size: int = 6) -> List[int]:

return [0] * size

def id_to_vector(self, node_id: str) -> List[int]:

entity_label = self.node_labels.get(node_id)

if not entity_label:

return self.create_zero_vector()

base_type = self.entity_types.get(entity_label)

if not base_type:

return self.create_zero_vector()

_, *nums = node_id.split(“.”)

vector = [base_type] + [int(n) for n in nums]

return self.pad_vector(vector)

def get_parent_by_relationship(self, node_id: str) -> List[int]:

for edge in self.edges:

if edge[“relationship”] == “HAS_SUBPART”:

targets = edge[“target”] if isinstance(edge[“target”], list) else [edge[“target”]]

if node_id in targets:

return self.id_to_vector(edge[“source”])

return self.create_zero_vector()

def get_children_vectors(self, node_id: str) -> List[List[int]]:

children_vectors = []

for edge in self.edges:

if edge[“relationship”] == “HAS_SUBPART” and edge[“source”] == node_id:

targets = edge[“target”] if isinstance(edge[“target”], list) else [edge[“target”]]

for target in targets:

children_vectors.append(self.id_to_vector(target))

while len(children_vectors) < 8:

children_vectors.append(self.create_zero_vector())

return children_vectors[:8]

def gather_leaf_nodes(self, node_id: str) -> List[str]:

# Recursive method to gather leaf nodes under a node_id

children = [

target for edge in self.edges if edge[“relationship”] == “HAS_SUBPART” and edge[“source”] == node_id

for target in (edge[“target”] if isinstance(edge[“target”], list) else [edge[“target”]])

]

if not children:

return [node_id]

leaves = []

for child_id in children:

leaves.extend(self.gather_leaf_nodes(child_id))

return leaves

def aggregate_relationships_by_frequency(self, node_id: str, max_entries_per_type: int = 12) -> Dict[str, List[List[int]]]:

leaf_nodes = self.gather_leaf_nodes(node_id)

rel_vectors = {rel: [] for rel in self.relationship_types if rel != “HAS_SUBPART”}

# Count frequencies

rel_counters = {rel: Counter() for rel in rel_vectors}

for leaf_id in leaf_nodes:

for edge in self.edges:

rel = edge[“relationship”]

if rel == “HAS_SUBPART”:

continue

if edge[“source”] == leaf_id:

targets = edge[“target”] if isinstance(edge[“target”], list) else [edge[“target”]]

rel_counters[rel].update(targets)

elif isinstance(edge[“target”], list) and leaf_id in edge[“target”]:

rel_counters[rel][edge[“source”]] += 1

elif edge[“target”] == leaf_id:

rel_counters[rel][edge[“source”]] += 1

# Select top relationships

for rel, counter in rel_counters.items():

top_rels = [self.id_to_vector(node_id) for node_id, _ in counter.most_common(max_entries_per_type)]

while len(top_rels) < max_entries_per_type:

top_rels.append(self.create_zero_vector())

rel_vectors[rel] = top_rels[:max_entries_per_type]

# Fill missing rel types

if len(rel_vectors) < 6:

for i in range(len(rel_vectors) + 1, 7):

rel_vectors[f”rel{i}”] = [self.create_zero_vector() for _ in range(max_entries_per_type)]

return rel_vectors

def generate_standardized_embeddings(self) -> Dict[str, Any]:

standardized_embeddings = {}

for node in self.graph_data[“body_parts”][“nodes”]:

node_id, node_name = node[“id”], node[“name”]

standardized_embeddings[node_id] = {

‘node_id’: node_id,

‘node_name’: node_name,

‘entity_vector’: self.id_to_vector(node_id),

‘parent_vector’: self.get_parent_by_relationship(node_id),

‘children_vectors’: self.get_children_vectors(node_id),

‘relationship_vectors’: self.aggregate_relationships_by_frequency(node_id)

}

output_path = os.path.join(self.embedding_dir, ‘standardized_embeddings.json’)

with open(output_path, ‘w’) as f:

json.dump(standardized_embeddings, f, indent=2)

logger.info(f”Saved embeddings for {len(standardized_embeddings)} nodes in {output_path}”)

return standardized_embeddings

def main():

system = StandardizedMedicalVectorSystem()

embeddings = system.generate_standardized_embeddings()

example_id = next(iter(embeddings))

logger.info(f”Example embedding for {example_id}:”)

logger.info(json.dumps(embeddings[example_id], indent=2))

if __name__ == “__main__”:

main()

submitted by /u/vagobond45
[link] [comments]

Leave a Comment