sql >> Databáze >  >> RDS >> Database

Konfigurace Service Broker pro asynchronní zpracování

Ve svém posledním článku jsem mluvil o výhodách implementace asynchronního zpracování pomocí Service Broker v SQL Server oproti jiným metodám, které existují pro oddělené zpracování dlouhých úloh. V tomto článku si projdeme všechny součásti, které je třeba nakonfigurovat pro základní konfiguraci Service Broker v jedné databázi, a důležité aspekty pro správu konverzace mezi službami brokera. Abychom mohli začít, budeme muset vytvořit databázi a povolit databázi pro použití Service Broker:

CREATE DATABASE AsyncProcessingDemo;GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name =N'AsyncProcessingDemo') =0BEGIN ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER;ENDGO USE AsyncProcessingDemo;GO 

Konfigurace komponent brokera

Základními objekty, které je třeba v databázi vytvořit, jsou typy zpráv pro zprávy, smlouva, která definuje, jak budou zprávy zasílány mezi službami, fronta a služba iniciátor a fronta a cílová služba. Mnoho příkladů online pro Service Broker ukazuje komplexní pojmenování objektů pro typy zpráv, smlouvy a služby pro Service Broker. Neexistuje však požadavek, aby byla jména složitá, a pro kterýkoli z objektů lze použít jednoduché názvy objektů.

Pro zprávy budeme muset vytvořit typ zprávy pro požadavek, který se bude jmenovat AsyncRequest a typ zprávy pro výsledek, který se bude jmenovat AsyncResult . Oba budou používat XML, který bude ověřen jako správně vytvořený zprostředkovatelskými službami k odesílání a přijímání dat požadovaných službami.

-- Vytvořte typy zprávCREATE MESSAGE TYPE [AsyncRequest] VALIDATION =WELL_FORMED_XML;CREATE MESSAGE TYPE [AsyncResult] VALIDATION =WELL_FORMED_XML;

Ve smlouvě je uvedeno, že AsyncRequest bude odesláno iniciující službou cílové službě a že cílová služba vrátí AsyncResult zprávu zpět iniciační službě. Smlouva může také specifikovat více typů zpráv pro iniciátora a cíl, nebo že konkrétní typ zprávy může být odeslán jakoukoli službou, pokud to konkrétní zpracování vyžaduje.

-- Vytvořte smlouvu VYTVOŘTE SMLOUVU [AsyncContract] ( [AsyncRequest] ODESLAL INICIATOR, [AsyncResult] ODESLAL CÍL);

Pro každou ze služeb musí být vytvořena fronta, která zajistí ukládání zpráv přijatých službou. Cílová služba, kam bude požadavek odeslán, musí být vytvořena s uvedením AsyncContract povolit odesílání zpráv do služby. V tomto případě se služba jmenuje ProcessingService a bude vytvořen v ProcessingQueue v rámci databáze. Iniciační služba nevyžaduje zadání smlouvy, díky čemuž může přijímat zprávy pouze jako odpověď na konverzaci, která byla z ní zahájena.

-- Vytvořte frontu zpracování a službu - zadejte smlouvu, která umožní odeslání do službyCREATE QUEUE ProcessingQueue;CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Vytvořte frontu požadavků a službu CREATE QUEUE RequestQueue;CREATE SERVICE [RequestService] ON QUEUE RequestQueue;

Odeslání zprávy ke zpracování

Jak jsem vysvětlil v předchozím článku, dávám přednost implementaci uložené procedury wrapper pro odeslání nové zprávy zprostředkovatelské službě, aby ji bylo možné v případě potřeby jednou upravit a škálovat výkon. Tento postup je jednoduchý obal kolem vytvoření nové konverzace a odeslání zprávy do ProcessingService .

-- Vytvořte proceduru wrapper pro odesílání zprávCREATE PROCEDURE dbo.SendBrokerMessage @FromService SYSNAME, @ToService SYSNAME, @Contract SYSNAME, @MessageType SYSNAME, @MessageBody XMLASBEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; ZAČÍT TRANSAKCI; ZAČNĚTE DIALOGOVOU KONVERZICI @conversation_handle ZE SLUŽBY @FromService TO SERVICE @ToService NA SMLUVU @Contract SE ŠIFROVÁNÍ =VYPNUTO; ODESLAT PŘI KONVERZICI @conversation_handle TYP ZPRÁVY @MessageType(@MessageBody); ZAKÁZAT TRANSAKCI;ENDGO

Pomocí uložené procedury wrapperu nyní můžeme odeslat testovací zprávu do ProcessingService abychom ověřili, že jsme zprostředkovatelské služby nastavili správně.

-- Odeslat žádostEXECUTE dbo.SendBrokerMessage @FromService =N'RequestService', @ToService =N'ProcessingService', @Contract =N'AsyncContract', @MessageType =N'AsyncRequest', @MessageBody =N'12345'; -- Zkontrolovat zprávu ve frontě zpracování SELECT CAST(body_message_body AS XML) FROM ProcessingQueue;GO

Zpracování zpráv

Zatímco jsme mohli ručně zpracovat zprávy z ProcessingQueue , pravděpodobně budeme chtít, aby se zprávy zpracovávaly automaticky při jejich odesílání do ProcessingService . Chcete-li to provést, je třeba vytvořit uloženou proceduru aktivace, kterou otestujeme a později navážeme na frontu, abychom automatizovali zpracování při aktivaci fronty. Ke zpracování zprávy potřebujeme RECEIVE zprávu z fronty v rámci transakce spolu s typem zprávy a popisovačem konverzace pro zprávu. Typ zprávy zajišťuje, že na zpracovávanou zprávu je aplikována příslušná logika, a popisovač konverzace umožňuje odeslání odpovědi zpět iniciační službě, když je zpráva zpracována.

RECEIVE umožňuje zpracování jedné zprávy nebo více zpráv v rámci stejného popisovače konverzace nebo skupiny v jediné transakci. Pro zpracování více zpráv je nutné použít proměnnou tabulky nebo pro zpracování jedné zprávy lze použít lokální proměnnou. Níže uvedený postup aktivace načte jednu zprávu z fronty, zkontroluje typ zprávy, aby zjistil, zda se jedná o AsyncRequest zprávu a poté provede dlouhodobý proces na základě přijatých informací o zprávě. Pokud neobdrží zprávu ve smyčce, počká až 5000 ms neboli 5 sekund, než do fronty vstoupí další zpráva, než smyčku opustí a ukončí její provádění. Po zpracování zprávy vytvoří AsyncResult zprávu a odešle ji zpět iniciátorovi na stejném popisovači konverzace, ze kterého byla zpráva přijata. Postup také zkontroluje typ zprávy, aby zjistil, zda EndDialog nebo Error byla přijata zpráva, aby se konverzace uklidila jejím ukončením.

-- Vytvořit proceduru zpracování pro zpracování frontyCREATE PROCEDURE dbo.ProcessingQueueActivationASBEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle =konverzace_handle, @message_body =CAST(tělo_zprávy AS XML), @message_type_name =message_type_name FROM ProcessingQueue), TIMEOUT 5000; IF (@@ROWCOUNT =0) ZAČÁTE TRANSAKCI VRÁCENÍ; PŘESTÁVKA; END IF @message_type_name =N'AsyncRequest' BEGIN -- Zde vyřešte složité dlouhé zpracování -- Pro ukázku vytáhneme číslo účtu a pošleme zpět pouze odpověď DECLARE @AccountNumber INT =@message_body.value('(AsyncRequest/AccountNumber) [1]“, „INT“); -- Vytvořte zprávu s odpovědí a odešlete ji zpět DECLARE @reply_message_body XML =N' ' + CAST(@AccountNumber AS NVARCHAR(11)) + ' '; ODESLAT V KONVERZICI @conversation_handle TYP ZPRÁVY [AsyncResult] (@reply_message_body); END -- Pokud ukončíte dialogové okno, ukončete dialog ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; KONEC – Pokud se zobrazí chybová zpráva, zaprotokolujte a ukončete konverzaci ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' ZAČÁTEK – Zde zaznamenejte kód chyby a proveďte jakoukoli požadovanou manipulaci – Konec konverzace pro chybu END CONVERSATION @conversation_handle; UKONČIT TRANSAKCI ZÁVAZKU; ENDENDGO

RequestQueue bude také muset zpracovat zprávy, které jsou mu odeslány, takže další postup pro zpracování AsyncResult je třeba vytvořit zprávy vrácené procedurou ProcessingQueueActivation. Vzhledem k tomu, že víme, že zpráva AsnycResult znamená, že veškeré zpracování bylo dokončeno, konverzaci lze ukončit, jakmile tuto zprávu zpracujeme, což odešle zprávu EndDialog do ProcessingService, která bude následně zpracována její aktivační procedurou k ukončení konverzace vše vyčistit a vyhnout se požáru a zapomenout na problémy, ke kterým dochází, když jsou konverzace správně ukončeny.

-- Vytvořit proceduru pro zpracování odpovědí na frontu požadavkuCREATE PROCEDURE dbo.RequestQueueActivationASBEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle =konverzace_handle, @message_body =CAST(tělo_zprávy AS XML), @message_type_name =message_type_name FROM RequestQueue ), TIMEOUT 5000; IF (@@ROWCOUNT =0) ZAČÁTE TRANSAKCI VRÁCENÍ; PŘESTÁVKA; END IF @message_type_name =N'AsyncResult' BEGIN -- V případě potřeby zde zpracujte zprávu s odpovědí DECLARE @AccountNumber INT =@message_body.value('(AsyncResult/AccountNumber)[1]', 'INT'); -- Vzhledem k tomu, že se jedná o veškerou práci, kterou právě provádíte, ukončete konverzaci a odešlete zprávu EndDialog END CONVERSATION @conversation_handle; END -- Pokud ukončíte dialogové okno, ukončete dialog ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; KONEC -- Pokud se zobrazí chybová zpráva, zaprotokolujte a ukončete konverzaci ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' ZAČÁTEK KONVERZACE @conversation_handle; UKONČIT TRANSAKCI ZÁVAZKU; ENDENDGO

Testování postupů

Před automatizací zpracování front pro naše služby je důležité otestovat aktivační procedury, abyste se ujistili, že zpracovávají zprávy správně, a aby se zabránilo deaktivaci fronty v případě chyby, která není správně zpracována. Protože ve ProcessingQueue již zpráva existuje ProcessingQueueActivation Pro zpracování této zprávy lze provést proceduru. Mějte na paměti, že WAITFOR způsobí, že ukončení procedury bude trvat 5 sekund, i když je zpráva zpracována okamžitě z fronty. Po zpracování zprávy můžeme ověřit, že postup fungoval správně, dotazem na RequestQueue zjistit, zda AsyncResult zpráva existuje, a pak můžeme ověřit, že RequestQueueActivation procedura funguje správně jejím provedením.

-- Zpracovat zprávu z fronty zpracováníEXECUTE dbo.ProcessingQueueActivation;GO -- Zkontrolovat zprávu s odpovědí ve frontě požadavkuSELECT CAST(tělo_zprávy AS XML) FROM Fronta požadavků;GO -- Zpracovat zprávu z fronty požadavkůEXECUTE dbo.RequestQueueActivation; 

Automatizace zpracování

V tomto okamžiku jsou všechny komponenty kompletní, aby bylo možné plně automatizovat naše zpracování. Jediné, co zbývá, je svázat aktivační procedury s jejich příslušnými frontami a poté odeslat další testovací zprávu pro ověření, že je zpracována a ve frontách poté nic nezůstane.

-- Změňte frontu zpracování pro zadání interní aktivaceALTER QUEUE ProcessingQueue WITH ACTIVATION (STATUS =ON, PROCEDURE_NAME =dbo.ProcessingQueueActivation, MAX_QUEUE_READERS =10, EXECUTE AS SELF); GO -- Změňte interní frontu požadavku na frontu požadavku S AKTIVACÍ ( STATUS =ZAPNUTO, PROCEDURE_NAME =dbo.RequestQueueActivation, MAX_QUEUE_READERS =10, PROVÁDĚT JAKO SEBE );GO -- Testovat automatickou aktivaci-- Odeslat požadavek PROVEĎTE dbo.SendBrokerMessage @FromService =N'RequestService N', @To ProcessingService', @Contract =N'AsyncContract', @MessageType =N'AsyncRequest', @MessageBody =N'12345'; -- Zkontrolovat zprávu ve frontě zpracování -- nic tam není, protože byla zpracována automatickySELECT CAST(tělo_zprávy AS XML) FROM ProcessingQueue;GO -- Zkontrolovat zprávu s odpovědí ve frontě požadavku -- nic tam není, protože byla zpracována automatickySELECT CAST(tělo_zprávy AS XML) FROM RequestQueue;GO

Shrnutí

Základní komponenty pro automatizované asynchronní zpracování v SQL Server Service Broker lze nakonfigurovat v nastavení jediné databáze, aby bylo možné oddělené zpracování dlouho běžících úloh. To může být mocný nástroj pro zlepšení výkonu aplikace, ze zkušenosti koncového uživatele, oddělením zpracování od interakcí koncového uživatele s aplikací.


  1. PostgreSQL:Fulltextové vyhledávání - Jak hledat dílčí slova?

  2. Existuje v PostgreSQL něco jako funkce zip(), která kombinuje dvě pole?

  3. Předplatitelský obchodní datový model

  4. Je zaručeno zachování objednávky v dílčím dotazu?