sql >> Databáze >  >> RDS >> Sqlserver

Jak mohu vložit 10 milionů záznamů v co nejkratším čase?

Prosím ne vytvořte DataTable načíst přes BulkCopy. To je v pořádku řešení pro menší sady dat, ale není absolutně žádný důvod načíst všech 10 milionů řádků do paměti před voláním databáze.

Vaše nejlepší sázka (mimo BCP / BULK INSERT / OPENROWSET(BULK...) ) je streamování obsahu ze souboru do databáze prostřednictvím parametru s hodnotou tabulky (TVP). Pomocí TVP můžete otevřít soubor, přečíst řádek a odeslat řádek, dokud nebude hotový, a pak soubor zavřít. Tato metoda má paměťovou stopu pouze jednoho řádku. Napsal jsem článek Streamování dat do SQL Server 2008 z aplikace, který má příklad právě tohoto scénáře.

Zjednodušený přehled struktury je následující. Předpokládám stejnou importní tabulku a název pole jako v otázce výše.

Požadované databázové objekty:

-- First: You need a User-Defined Table Type
CREATE TYPE ImportStructure AS TABLE (Field VARCHAR(MAX));
GO

-- Second: Use the UDTT as an input param to an import proc.
--         Hence "Tabled-Valued Parameter" (TVP)
CREATE PROCEDURE dbo.ImportData (
   @ImportTable    dbo.ImportStructure READONLY
)
AS
SET NOCOUNT ON;

-- maybe clear out the table first?
TRUNCATE TABLE dbo.DATAs;

INSERT INTO dbo.DATAs (DatasField)
    SELECT  Field
    FROM    @ImportTable;

GO

Kód aplikace C# pro použití výše uvedených objektů SQL je uveden níže. Všimněte si, že namísto vyplnění objektu (např. DataTable) a následného provedení uložené procedury v této metodě zahájí čtení obsahu souboru provedení uložené procedury. Vstupní parametr uloženého procesu není proměnná; je to návratová hodnota metody GetFileContents . Tato metoda je volána při SqlCommand volá ExecuteNonQuery , který otevře soubor, přečte řádek a odešle řádek na SQL Server prostřednictvím IEnumerable<SqlDataRecord> a yield return konstrukty a poté soubor zavře. Uložená procedura vidí pouze proměnnou tabulky, @ImportTable, ke které lze přistupovat, jakmile data začnou přicházet (poznámka:data přetrvávají krátkou dobu, i když ne celý obsah, v tempdb ).

using System.Collections;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using Microsoft.SqlServer.Server;

private static IEnumerable<SqlDataRecord> GetFileContents()
{
   SqlMetaData[] _TvpSchema = new SqlMetaData[] {
      new SqlMetaData("Field", SqlDbType.VarChar, SqlMetaData.Max)
   };
   SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema);
   StreamReader _FileReader = null;

   try
   {
      _FileReader = new StreamReader("{filePath}");

      // read a row, send a row
      while (!_FileReader.EndOfStream)
      {
         // You shouldn't need to call "_DataRecord = new SqlDataRecord" as
         // SQL Server already received the row when "yield return" was called.
         // Unlike BCP and BULK INSERT, you have the option here to create a string
         // call ReadLine() into the string, do manipulation(s) / validation(s) on
         // the string, then pass that string into SetString() or discard if invalid.
         _DataRecord.SetString(0, _FileReader.ReadLine());
         yield return _DataRecord;
      }
   }
   finally
   {
      _FileReader.Close();
   }
}

GetFileContents výše uvedená metoda se používá jako hodnota vstupního parametru pro uloženou proceduru, jak je uvedeno níže:

public static void test()
{
   SqlConnection _Connection = new SqlConnection("{connection string}");
   SqlCommand _Command = new SqlCommand("ImportData", _Connection);
   _Command.CommandType = CommandType.StoredProcedure;

   SqlParameter _TVParam = new SqlParameter();
   _TVParam.ParameterName = "@ImportTable";
   _TVParam.TypeName = "dbo.ImportStructure";
   _TVParam.SqlDbType = SqlDbType.Structured;
   _TVParam.Value = GetFileContents(); // return value of the method is streamed data
   _Command.Parameters.Add(_TVParam);

   try
   {
      _Connection.Open();

      _Command.ExecuteNonQuery();
   }
   finally
   {
      _Connection.Close();
   }

   return;
}

Další poznámky:

  1. Po určitých úpravách lze výše uvedený kód C# upravit tak, aby dávkoval data.
  2. S drobnými úpravami lze výše uvedený kód C# upravit pro odesílání ve více polích (příklad zobrazený v článku „Steaming Data...“ odkazovaném výše obsahuje 2 pole).
  3. Hodnotu každého záznamu můžete také upravit v SELECT prohlášení v proc.
  4. Řádky můžete také odfiltrovat pomocí podmínky WHERE v procesu.
  5. Můžete přistupovat k TVP Table Variable vícekrát; je POUZE PRO ČTENÍ, ale ne „pouze dopředu“.
  6. Výhody oproti SqlBulkCopy :
    1. SqlBulkCopy je pouze INSERT, zatímco použití TVP umožňuje použití dat jakýmkoli způsobem:můžete volat MERGE; můžete DELETE na základě nějaké podmínky; data můžete rozdělit do více tabulek; a tak dále.
    2. Vzhledem k tomu, že TVP není pouze INSERT-only, nepotřebujete samostatnou pracovní tabulku pro výpis dat.
    3. Data z databáze můžete získat zpět voláním ExecuteReader místo ExecuteNonQuery . Například pokud existuje IDENTITY v poli DATAs import tabulky, můžete přidat OUTPUT klauzule do INSERT předat zpět INSERTED.[ID] (za předpokladu ID je název IDENTITY pole). Nebo můžete předat zpět výsledky úplně jiného dotazu nebo obojí, protože více sad výsledků lze odeslat a přistupovat k nim pomocí Reader.NextResult() . Získání informací zpět z databáze není možné při použití SqlBulkCopy přesto existuje několik otázek zde na S.O. lidí, kteří chtějí přesně to udělat (alespoň s ohledem na nově vytvořenou IDENTITY hodnoty).
    4. Další informace o tom, proč je někdy rychlejší pro celkový proces, i když je trochu pomalejší při získávání dat z disku na SQL Server, najdete v tomto dokumentu od týmu zákaznických poradců SQL Server:Maximalizace propustnosti pomocí TVP


  1. Odstraňování problémů s replikací serveru SQL

  2. Začínáme s SQL Server 2017 na Linuxu na Azure Portal

  3. merge update oracle nedokáže získat stabilní sadu řádků

  4. MySQL LIKE IN()?