In diesem Tutorial erstellen wir ein Agentic Knowledge and Infrastructure Technique-System unter Verwendung des leichtgewichtigen Qwen2.5-0.5B-Instruct-Modells für eine effiziente Ausführung. Wir beginnen mit der Erstellung eines flexiblen LLM-Agenten-Frameworks und entwickeln dann spezialisierte Agenten, die verschiedene Ebenen der Datenverwaltung verwalten, von der Aufnahme und Qualitätsanalyse bis hin zur Infrastrukturoptimierung. Wir integrieren diese Agenten in einen Orchestrator, der ihre Interaktionen koordiniert und so eine reibungslose Zusammenarbeit mehrerer Agenten in der gesamten Datenpipeline gewährleistet. Anhand praktischer Beispiele wie E-Commerce und IoT-Pipelines untersuchen wir, wie autonome Entscheidungsfindung komplexe Datenvorgänge rationalisieren kann. Schauen Sie sich das an VOLLSTÄNDIGE CODES hier.

!pip set up -q transformers torch speed up datasets huggingface_hub
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import json, time
from typing import Listing, Dict, Any
from dataclasses import dataclass
from datetime import datetime
import pandas as pd


class LightweightLLMAgent:
   def __init__(self, function: str, model_name: str = "Qwen/Qwen2.5-0.5B-Instruct"):
       self.function = function
       self.model_name = model_name
       self.gadget = "cuda" if torch.cuda.is_available() else "cpu"
       print(f"Loading {model_name} for {function} agent on {self.gadget}...")
       self.tokenizer = AutoTokenizer.from_pretrained(model_name)
       self.mannequin = AutoModelForCausalLM.from_pretrained(
           model_name,
           torch_dtype=torch.float16 if self.gadget == "cuda" else torch.float32,
           device_map="auto"
       )
       self.conversation_history = ()


   def generate_response(self, immediate: str, max_tokens: int = 150) -> str:
       messages = (
           {"function": "system", "content material": f"You're a {self.function} agent in a knowledge infrastructure system."},
           {"function": "consumer", "content material": immediate}
       )
       textual content = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
       model_inputs = self.tokenizer((textual content), return_tensors="pt").to(self.gadget)
       with torch.no_grad():
           generated_ids = self.mannequin.generate(
               model_inputs.input_ids,
               max_new_tokens=max_tokens,
               temperature=0.7,
               do_sample=True,
               top_p=0.95
           )
       generated_ids = (output_ids(len(input_ids):) for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids))
       response = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)(0)
       self.conversation_history.append({"immediate": immediate, "response": response})
       return response

Wir beginnen mit der Einrichtung der leichtgewichtigen LLM-Agenten-Infrastruktur mithilfe des Qwen2.5-0.5B-Instruct-Modells. Wir laden das Modell und den Tokenizer und definieren eine Basisagentenklasse, die kontextbezogene Konversationen abwickeln und intelligente Antworten generieren kann. Dies bildet die zentrale Grundlage für die effiziente Arbeit unserer spezialisierten Agenten innerhalb von Colab. Schauen Sie sich das an VOLLSTÄNDIGE CODES hier.

class DataIngestionAgent(LightweightLLMAgent):
   def __init__(self):
       tremendous().__init__(function="Knowledge Ingestion Specialist")
   def analyze_data_source(self, source_info: Dict) -> Dict:
       immediate = f"""Analyze this knowledge supply and supply ingestion technique:
Supply Sort: {source_info.get('kind', 'unknown')}
Quantity: {source_info.get('quantity', 'unknown')}
Frequency: {source_info.get('frequency', 'unknown')}
Present a quick technique specializing in: 1) Ingestion methodology, 2) Key issues."""
       technique = self.generate_response(immediate, max_tokens=100)
       return {"supply": source_info, "technique": technique, "timestamp": datetime.now().isoformat()}


class DataQualityAgent(LightweightLLMAgent):
   def __init__(self):
       tremendous().__init__(function="Knowledge High quality Analyst")
   def assess_data_quality(self, data_sample: Dict) -> Dict:
       immediate = f"""Assess knowledge high quality for this pattern:
Completeness: {data_sample.get('completeness', 'N/A')}%
Consistency: {data_sample.get('consistency', 'N/A')}%
Points Discovered: {data_sample.get('points', 0)}
Present transient high quality evaluation and high 2 suggestions."""
       evaluation = self.generate_response(immediate, max_tokens=100)
       return {"evaluation": evaluation, "severity": self._calculate_severity(data_sample), "timestamp": datetime.now().isoformat()}
   def _calculate_severity(self, data_sample: Dict) -> str:
       completeness = data_sample.get('completeness', 100)
       consistency = data_sample.get('consistency', 100)
       avg_score = (completeness + consistency) / 2
       if avg_score >= 90: return "LOW"
       elif avg_score >= 70: return "MEDIUM"
       else: return "HIGH"

Wir entwerfen die Datenaufnahme- und Datenqualitätsagenten so, dass sie sich auf die strukturierte Analyse von Datenpipelines konzentrieren. Wir lassen den Aufnahmeagenten den besten Ansatz für den Datenfluss bestimmen, während der Qualitätsagent die Vollständigkeit, Konsistenz und Probleme der Daten bewertet, um umsetzbare Erkenntnisse zu liefern. Gemeinsam bilden sie die ersten beiden Schichten des autonomen Datenmanagements. Schauen Sie sich das an VOLLSTÄNDIGE CODES hier.

class InfrastructureOptimizationAgent(LightweightLLMAgent):
   def __init__(self):
       tremendous().__init__(function="Infrastructure Optimization Specialist")
   def optimize_resources(self, metrics: Dict) -> Dict:
       immediate = f"""Analyze infrastructure metrics and counsel optimizations:
CPU Utilization: {metrics.get('cpu_usage', 0)}%
Reminiscence Utilization: {metrics.get('memory_usage', 0)}%
Storage: {metrics.get('storage_used', 0)}GB / {metrics.get('storage_total', 0)}GB
Question Latency: {metrics.get('query_latency', 0)}ms
Present 2 optimization suggestions."""
       suggestions = self.generate_response(immediate, max_tokens=100)
       return {"current_metrics": metrics, "suggestions": suggestions, "precedence": self._calculate_priority(metrics), "timestamp": datetime.now().isoformat()}
   def _calculate_priority(self, metrics: Dict) -> str:
       cpu = metrics.get('cpu_usage', 0)
       reminiscence = metrics.get('memory_usage', 0)
       if cpu > 85 or reminiscence > 85: return "CRITICAL"
       elif cpu > 70 or reminiscence > 70: return "HIGH"
       else: return "NORMAL"

Wir entwickeln den Infrastructure Optimization Agent, um wichtige Kennzahlen wie CPU-, Arbeitsspeicher- und Speicherauslastung kontinuierlich zu analysieren. Daraus generieren wir intelligente Optimierungsvorschläge, die uns helfen, eine hohe Leistung und Ressourceneffizienz aufrechtzuerhalten. Dieser Agent stellt sicher, dass unsere Infrastruktur während der Datenoperationen reaktionsfähig und skalierbar bleibt. Schauen Sie sich das an VOLLSTÄNDIGE CODES hier.

class AgenticDataOrchestrator:
   def __init__(self):
       print("n" + "="*70)
       print("Initializing Agentic Knowledge Infrastructure System")
       print("="*70 + "n")
       self.ingestion_agent = DataIngestionAgent()
       self.quality_agent = DataQualityAgent()
       self.optimization_agent = InfrastructureOptimizationAgent()
       self.execution_log = ()
   def process_data_pipeline(self, pipeline_config: Dict) -> Dict:
       outcomes = {"pipeline_id": pipeline_config.get("id", "unknown"), "start_time": datetime.now().isoformat(), "phases": ()}
       print("n(Stage 1) Knowledge Ingestion Evaluation")
       ingestion_result = self.ingestion_agent.analyze_data_source(pipeline_config.get("supply", {}))
       print(f"Technique: {ingestion_result('technique')(:150)}...")
       outcomes("phases").append({"stage": "ingestion", "consequence": ingestion_result})
       print("n(Stage 2) Knowledge High quality Evaluation")
       quality_result = self.quality_agent.assess_data_quality(pipeline_config.get("quality_metrics", {}))
       print(f"Evaluation: {quality_result('evaluation')(:150)}...")
       print(f"Severity: {quality_result('severity')}")
       outcomes("phases").append({"stage": "high quality", "consequence": quality_result})
       print("n(Stage 3) Infrastructure Optimization")
       optimization_result = self.optimization_agent.optimize_resources(pipeline_config.get("infrastructure_metrics", {}))
       print(f"Suggestions: {optimization_result('suggestions')(:150)}...")
       print(f"Precedence: {optimization_result('precedence')}")
       outcomes("phases").append({"stage": "optimization", "consequence": optimization_result})
       outcomes("end_time") = datetime.now().isoformat()
       outcomes("standing") = "accomplished"
       self.execution_log.append(outcomes)
       return outcomes
   def generate_summary_report(self) -> pd.DataFrame:
       if not self.execution_log: return pd.DataFrame()
       summary_data = ()
       for log in self.execution_log:
           summary_data.append({"Pipeline ID": log("pipeline_id"), "Begin Time": log("start_time"), "Standing": log("standing"), "Levels Accomplished": len(log("phases"))})
       return pd.DataFrame(summary_data)

Wir haben einen Agentic Knowledge Orchestrator entwickelt, um alle spezialisierten Agenten im Rahmen eines einheitlichen Workflows zu koordinieren. Wir verwenden es, um die Finish-to-Finish-Pipeline-Ausführung zu verwalten und nacheinander Aufnahme, Qualitätsprüfungen und Optimierung auszulösen. Dadurch bringen wir Struktur, Zusammenarbeit und Automatisierung in das gesamte Multiagentensystem. Schauen Sie sich das an VOLLSTÄNDIGE CODES hier.

def important():
   orchestrator = AgenticDataOrchestrator()
   print("n" + "="*70)
   print("EXAMPLE 1: E-commerce Knowledge Pipeline")
   print("="*70)
   ecommerce_pipeline = {
       "id": "ecommerce_pipeline_001",
       "supply": {"kind": "REST API", "quantity": "10GB/day", "frequency": "real-time"},
       "quality_metrics": {"completeness": 87, "consistency": 92, "points": 15},
       "infrastructure_metrics": {"cpu_usage": 78, "memory_usage": 82, "storage_used": 450, "storage_total": 1000, "query_latency": 250}
   }
   result1 = orchestrator.process_data_pipeline(ecommerce_pipeline)
   print("nn" + "="*70)
   print("EXAMPLE 2: IoT Sensor Knowledge Pipeline")
   print("="*70)
   iot_pipeline = {
       "id": "iot_pipeline_002",
       "supply": {"kind": "Message Queue (Kafka)", "quantity": "50GB/day", "frequency": "streaming"},
       "quality_metrics": {"completeness": 95, "consistency": 88, "points": 8},
       "infrastructure_metrics": {"cpu_usage": 65, "memory_usage": 71, "storage_used": 780, "storage_total": 2000, "query_latency": 180}
   }
   result2 = orchestrator.process_data_pipeline(iot_pipeline)
   print("nn" + "="*70)
   print("EXECUTION SUMMARY REPORT")
   print("="*70 + "n")
   summary_df = orchestrator.generate_summary_report()
   print(summary_df.to_string(index=False))
   print("n" + "="*70)
   print("Tutorial Full!")
   print("="*70)
   print("nKey Ideas Demonstrated:")
   print("✓ Light-weight LLM agent structure")
   print("✓ Specialised brokers for various knowledge duties")
   print("✓ Multi-agent orchestration")
   print("✓ Infrastructure monitoring and optimization")
   print("✓ Autonomous decision-making in knowledge pipelines")


if __name__ == "__main__":
   important()

Wir demonstrieren unser komplettes System anhand von zwei realen Beispielen, einem E-Commerce- und einer IoT-Datenpipeline. Wir beobachten, wie jeder Agent seine Rolle autonom ausführt und gleichzeitig zu einem gemeinsamen Ziel beiträgt. Abschließend erstellen wir einen zusammenfassenden Bericht, der die Effizienz der Orchestrierung und die Leistungsfähigkeit der leichtgewichtigen Agentenintelligenz bestätigt.

Abschließend entwerfen und implementieren wir ein intelligentes Dateninfrastruktur-Framework mit mehreren Agenten, das auf einem kompakten Open-Supply-Modell basiert. Wir erleben, wie unabhängige und dennoch kooperative Agenten reale Datensysteme autonom analysieren, bewerten und optimieren können. Der gesamte Aufbau zeigt, wie leichtgewichtige LLMs Infrastrukturintelligenz effizient verarbeiten können, und verdeutlicht gleichzeitig, wie die agentische Orchestrierung herkömmliche Datenworkflows in adaptive, selbstoptimierende Systeme verwandelt, die für skalierbare Unternehmensanwendungen bereit sind.


Schauen Sie sich das an VOLLSTÄNDIGE CODES hier. Schauen Sie sich gerne bei uns um GitHub-Seite für Tutorials, Codes und Notebooks. Sie können uns auch gerne weiter folgen Twitter und vergessen Sie nicht, bei uns mitzumachen 100.000+ ML SubReddit und Abonnieren Unser E-newsletter. Warten! Bist du im Telegram? Jetzt können Sie uns auch per Telegram kontaktieren.


Asif Razzaq ist CEO von Marktechpost Media Inc.. Als visionärer Unternehmer und Ingenieur setzt sich Asif dafür ein, das Potenzial der künstlichen Intelligenz für das soziale Wohl zu nutzen. Sein jüngstes Unterfangen ist die Einführung einer Medienplattform für künstliche Intelligenz, Marktechpost, die sich durch eine ausführliche Berichterstattung über maschinelles Lernen und Deep-Studying-Nachrichten auszeichnet, die sowohl technisch fundiert als auch für ein breites Publikum leicht verständlich ist. Die Plattform verfügt über mehr als 2 Millionen monatliche Aufrufe, was ihre Beliebtheit beim Publikum verdeutlicht.

Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert