इस नोटबुक में, हम प्रदर्शित करते हैं कि Google Colab में पूरी तरह से इन-मेमोरी “सेंसर अलर्ट” पाइपलाइन का निर्माण कैसे करें, जो फास्टस्ट्रीम, एक उच्च-प्रदर्शन, पायथन-नेटिव स्ट्रीम प्रोसेसिंग फ्रेमवर्क और रैबिटमीक्यू के साथ इसके एकीकरण का उपयोग करके। FastStream.Rabbit के Rabbitbroker और TestRabbitbroker का लाभ उठाकर, हम बाहरी बुनियादी ढांचे की आवश्यकता के बिना एक संदेश ब्रोकर का अनुकरण करते हैं। हम चार अलग -अलग चरणों को ऑर्केस्ट्रेट करते हैं: अंतर्ग्रहण और सत्यापन, सामान्यीकरण, निगरानी और चेतावनी पीढ़ी, और संग्रह, प्रत्येक को डेटा की गुणवत्ता और प्रकार की सुरक्षा सुनिश्चित करने के लिए pydantic मॉडल (rawsensordata, सामान्यीकृतडाटा, AlertData) के रूप में परिभाषित किया गया है। हुड के तहत, पायथन के Asyncio शक्तियां एसिंक्रोनस मैसेज फ्लो, जबकि NEST_ASYNCIO COLAB में नेस्टेड इवेंट लूप्स को सक्षम बनाता है। हम अंतिम परिणाम निरीक्षण के लिए ट्रेस करने योग्य पाइपलाइन निष्पादन और पंडों के लिए मानक लॉगिंग मॉड्यूल को भी नियुक्त करते हैं, जिससे डेटाफ्रेम में संग्रहीत अलर्ट की कल्पना करना आसान हो जाता है।
!pip install -q faststream(rabbit) nest_asyncio
हम अपने RabbitMQ एकीकरण के साथ FastStream स्थापित करते हैं, कोर स्ट्रीम-प्रोसेसिंग फ्रेमवर्क और ब्रोकर कनेक्टर्स, साथ ही NEST_ASYNCIO पैकेज प्रदान करते हैं, जो ColAb जैसे वातावरण में नेस्टेड Asyncio इवेंट लूप्स को सक्षम बनाता है। यह सब -क्यू ध्वज के साथ आउटपुट को न्यूनतम रखते हुए प्राप्त किया जाता है।
import nest_asyncio, asyncio, logging
nest_asyncio.apply()
हम NEST_ASyncio, Asyncio, और लॉगिंग मॉड्यूल का आयात करते हैं, फिर पायथन के इवेंट लूप को पैच करने के लिए nest_asyncio.apply () लागू करते हैं ताकि आप त्रुटियों के बिना Colab या Jupyter नोटबुक जैसे वातावरण के अंदर नेस्टेड एसिंक्रोनस कार्यों को चला सकें। लॉगिंग आयात आपको विस्तृत रनटाइम लॉग के साथ अपनी पाइपलाइन को साधन करने के लिए पढ़ता है।
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("sensor_pipeline")
हम एक टाइमस्टैम्प और गंभीरता के साथ उपसर्ग जानकारी ‘स्तर (और ऊपर) संदेशों को उत्सर्जित करने के लिए लॉगिंग में पायथन के निर्मित – को कॉन्फ़िगर करते हैं, फिर अपनी स्ट्रीमिंग पाइपलाइन के भीतर संरचित लॉग को उत्सर्जित करने के लिए” सेंसर_पिपलाइन “नामक एक समर्पित लॉगर बनाएं।
from faststream import FastStream
from faststream.rabbit import RabbitBroker, TestRabbitBroker
from pydantic import BaseModel, Field, validator
import pandas as pd
from typing import List
हम फास्टस्ट्रीम के कोर फास्टस्ट्रीम क्लास में अपने रैबिटमक कनेक्टर्स (रियल ब्रोकर्स के लिए रैबिटब्रोकर और testrabbbitbroker के लिए and मेमोरी टेस्टिंग), पाइडेंटिक के बेसेमॉडल, फील्ड, और वैराय रूप से डिक्लेरिटिव डेटा सत्यापन के लिए सत्यापनकर्ता, पांडा के लिए पांडा, और पायथन की सूची के लिए लाते हैं।
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)
हम AMQP URL का उपयोग करके एक (स्थानीय) RabbitMQ सर्वर पर इंगित एक रैबिटब्रोकर को तत्काल करते हैं, फिर उस ब्रोकर से बंधे एक फास्टस्ट्रीम एप्लिकेशन बनाते हैं, जो आपके पाइपलाइन चरणों के लिए मैसेजिंग बैकबोन की स्थापना करते हैं।
class RawSensorData(BaseModel):
sensor_id: str = Field(..., examples=("sensor_1"))
reading_celsius: float = Field(..., ge=-50, le=150, examples=(23.5))
@validator("sensor_id")
def must_start_with_sensor(cls, v):
if not v.startswith("sensor_"):
raise ValueError("sensor_id must start with 'sensor_'")
return v
class NormalizedData(BaseModel):
sensor_id: str
reading_kelvin: float
class AlertData(BaseModel):
sensor_id: str
reading_kelvin: float
alert: bool
ये pydantic मॉडल प्रत्येक चरण के लिए स्कीमा को परिभाषित करते हैं: Rawsensordata इनपुट वैधता (जैसे, रीडिंग रेंज और एक सेंसर_ उपसर्ग) को लागू करता है, सामान्यीकृतडाटा सेल्सियस को केल्विन में परिवर्तित करता है, और अलर्टडाटा अंतिम अलर्ट पेलोड (एक बूलियन ध्वज सहित) को एक प्रकार के-सेफ़ डेटा प्रवाह को सुनिश्चित करता है।
archive: List(AlertData) = ()
@broker.subscriber("sensor_input")
@broker.publisher("normalized_input")
async def ingest_and_validate(raw: RawSensorData) -> dict:
logger.info(f"Ingested raw data: {raw.json()}")
return raw.dict()
@broker.subscriber("normalized_input")
@broker.publisher("sensor_alert")
async def normalize(data: dict) -> dict:
norm = NormalizedData(
sensor_id=data("sensor_id"),
reading_kelvin=data("reading_celsius") + 273.15
)
logger.info(f"Normalized to Kelvin: {norm.json()}")
return norm.dict()
ALERT_THRESHOLD_K = 323.15
@broker.subscriber("sensor_alert")
@broker.publisher("archive_topic")
async def monitor(data: dict) -> dict:
alert_flag = data("reading_kelvin") > ALERT_THRESHOLD_K
alert = AlertData(
sensor_id=data("sensor_id"),
reading_kelvin=data("reading_kelvin"),
alert=alert_flag
)
logger.info(f"Monitor result: {alert.json()}")
return alert.dict()
@broker.subscriber("archive_topic")
async def archive_data(payload: dict):
rec = AlertData(**payload)
archive.append(rec)
logger.info(f"Archived: {rec.json()}")
एक इन-मेमोरी आर्काइव सूची सभी अंतिम अलर्ट एकत्र करती है, जबकि चार अतुल्यकालिक कार्य, @ब्रोकर .subscriber/ @broker.publisher के माध्यम से वायर्ड, पाइपलाइन चरणों का निर्माण करते हैं। ये फ़ंक्शन कच्चे सेंसर इनपुट को निगलना और मान्य करते हैं, सेल्सियस को केल्विन में परिवर्तित करते हैं, एक अलर्ट थ्रेशोल्ड के खिलाफ जांच करते हैं, और अंत में प्रत्येक अलर्टडेटा रिकॉर्ड को संग्रहीत करते हैं, पूर्ण ट्रेसबिलिटी के लिए हर कदम पर लॉग उत्सर्जित करते हैं।
async def main():
readings = (
{"sensor_id": "sensor_1", "reading_celsius": 45.2},
{"sensor_id": "sensor_2", "reading_celsius": 75.1},
{"sensor_id": "sensor_3", "reading_celsius": 50.0},
)
async with TestRabbitBroker(broker) as tb:
for r in readings:
await tb.publish(r, "sensor_input")
await asyncio.sleep(0.1)
df = pd.DataFrame((a.dict() for a in archive))
print("\nFinal Archived Alerts:")
display(df)
asyncio.run(main())
अंत में, मुख्य कोरूटीन इन-मेमोरी टेस्ट्राबबिटब्रोकर में नमूना सेंसर रीडिंग का एक सेट प्रकाशित करता है, प्रत्येक पाइपलाइन चरण को चलाने की अनुमति देने के लिए संक्षेप में रुकता है, और फिर आर्काइव से परिणामी अलर्टडेटा रिकॉर्ड को एक पांडा डेटाफ्रेम में आसान प्रदर्शन और अंत-से-एंड अलर्ट प्रवाह के सत्यापन और सत्यापन के लिए टकराता है। अंत में, Asyncio.run (मुख्य ()) Colab में पूरे Async डेमो को बंद कर देता है।
अंत में, यह ट्यूटोरियल दर्शाता है कि TestRabbitbroker के माध्यम से RabbitMQ Abstractions और इन-मेमोरी परीक्षण के साथ संयुक्त रूप से कैसे फास्टस्ट्रीम, बाहरी दलालों को तैनात करने के ओवरहेड के बिना वास्तविक समय डेटा पाइपलाइनों के विकास में तेजी ला सकता है। Pydantic हैंडलिंग स्कीमा सत्यापन, Asyncio का प्रबंधन समवर्ती, और पंडों के साथ त्वरित डेटा विश्लेषण को सक्षम करने के साथ, यह पैटर्न सेंसर निगरानी, ETL कार्यों या घटना – संचालित वर्कफ़्लोज़ के लिए एक मजबूत नींव प्रदान करता है। आप एक लाइव ब्रोकर URL (Rabbitmq, Kafka, Nats, या Redis) में स्वैप करके yoummory मेमोरी डेमो में उत्पादन करने के लिए मूल रूप से संक्रमण कर सकते हैं और किसी भी पायथन वातावरण में स्केलेबल, बनाए रखने योग्य धारा प्रसंस्करण को अनलॉक करने के लिए Uvicorn या आपके पसंदीदा ASGI सर्वर के तहत फास्टस्ट्रीम रन चला रहे हैं।
यह रहा कोलैब नोटबुक। इसके अलावा, हमें फॉलो करना न भूलें ट्विटर और हमारे साथ जुड़ें तार -चैनल और लिंक्डइन जीआरओयूपी। हमारे साथ जुड़ने के लिए मत भूलना 90K+ एमएल सबरेडिट।
🔥 ।

आईआईटी मद्रास में मार्कटेकपोस्ट में एक परामर्श इंटर्न और दोहरे डिग्री के छात्र सना हसन, वास्तविक दुनिया की चुनौतियों का समाधान करने के लिए प्रौद्योगिकी और एआई को लागू करने के बारे में भावुक हैं। व्यावहारिक समस्याओं को हल करने में गहरी रुचि के साथ, वह एआई और वास्तविक जीवन के समाधानों के चौराहे के लिए एक नया दृष्टिकोण लाता है।
