sql >> Databáze >  >> RDS >> Database

Co jsou sekvenční vs. paralelní streamy v Javě?

Java může paralelizovat streamovací operace a využívat vícejádrové systémy. Tento článek poskytuje perspektivu a ukazuje, jak může paralelní stream zlepšit výkon, pomocí vhodných příkladů.

Streamy v Javě

stream v Javě je posloupnost objektů reprezentovaná jako kanál dat. Obvykle má zdroj kde se data nacházejí a cíl kde se to přenáší. Všimněte si, že proud není úložiště; místo toho pracuje se zdrojem dat, jako je pole nebo kolekce. Mezi bity v pasáži se ve skutečnosti říká proud. Během procesu přenosu tok obvykle prochází jednou nebo více možnými transformacemi, jako je filtrování nebo třídění, nebo to může být jakýkoli jiný proces pracující s daty. Tím se původní data upraví do jiné podoby, obvykle podle potřeby programátora. Proto se vytvoří nový proud podle operace, která je na něm použita. Například, když je stream setříděn, výsledkem je nový proud, který produkuje výsledek, který je pak seřazen. To znamená, že nová data jsou transformovanou kopií originálu, nikoli v původní podobě.

Sekvenční stream

Jakákoli operace streamu v Javě, pokud není výslovně uvedena jako paralelní, je zpracovávána postupně. Jsou to v podstatě neparalelní proudy, které ke zpracování svého potrubí používají jediné vlákno. Sekvenční toky nikdy nevyužívají výhody vícejádrového systému, i když základní systém může podporovat paralelní provádění. Co se stane, když například použijeme multithreading pro zpracování streamu? I tehdy funguje na jednom jádru najednou. Může však přeskakovat z jednoho jádra na druhé, pokud není výslovně připojeno ke konkrétnímu jádru. Například zpracování ve čtyřech různých vláknech oproti čtyřem různým jádrům je zjevně odlišné, když první se neshoduje s druhým. Je docela možné spustit více vláken v prostředí s jedním jádrem, ale paralelní zpracování je úplně jiný žánr. Program musí být navržen jako základ pro paralelní programování, kromě provádění v prostředí, které jej podporuje. To je důvod, proč je paralelní programování složitou oblastí.

Zkusme příklad, který myšlenku dále ilustruje.

package org.mano.example;

import java.util.Arrays;
import java.util.List;

public class Main2 {
   public static oid main(String[] args) {
      List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9);
      list.stream().forEach(System.out::println);
      System.out.println();
      list.parallelStream().forEach(System.out::println);
   }
}

Výstup

123456789
685973214

Tento příklad je ilustrací q sekvenčního proudu i q paralelního proudu v provozu. list.stream() pracuje postupně na jednom vláknu pomocí println() úkon. list.parallelStream() , na druhé straně, je zpracováván paralelně, přičemž plně využívá základní vícejádrové prostředí. Zajímavý aspekt je ve výstupu z předchozího programu. V případě sekvenčního streamu se obsah seznamu tiskne v uspořádaném pořadí. Výstup paralelního proudu je naproti tomu neuspořádaný a sekvence se mění při každém spuštění programu. To znamená alespoň jednu věc:vyvolání list.parallelStream() metoda vytvoří println příkaz funguje ve více vláknech, něco, co list.stream() dělá v jediném vláknu.

Paralelní stream

Primární motivací pro použití paralelního toku je učinit zpracování toku součástí paralelního programování, i když celý program nemusí být paralelizován. Paralelní stream využívá vícejádrové procesory, což má za následek podstatné zvýšení výkonu. Na rozdíl od jakéhokoli paralelního programování jsou složité a náchylné k chybám. Knihovna Java stream však poskytuje možnost to udělat snadno a spolehlivým způsobem. Celý program nemusí být paralelizován. ale alespoň část, která zpracovává proud, může být paralelizována. Jsou vlastně docela jednoduché v tom smyslu, že můžeme vyvolat několik metod a o zbytek je postaráno. Existuje několik způsobů, jak to udělat. Jedním takovým způsobem je získat paralelní proud vyvoláním parallelStream() metoda definovaná pomocí Kolekce . Dalším způsobem je vyvolat parallel() metoda definovaná BaseStream v sekvenčním proudu. Sekvenční proud je paralelizován vyvoláním. Všimněte si, že základní platforma musí podporovat paralelní programování, jako je například vícejádrový systém. Jinak invokace nemá smysl. Proud by byl v takovém případě zpracován postupně, i když jsme provedli vyvolání. Pokud je vyvolání provedeno na již paralelním streamu, neudělá nic a jednoduše proud vrátí.

Aby bylo zajištěno, že výsledek paralelního zpracování aplikovaného na stream je stejný jako při sekvenčním zpracování, musí být paralelní proudy bezstavové, neinterferující a asociativní.

Rychlý příklad

package org.mano.example;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class Main {

   public static void main(String[] args) {
      List<Employee> employees = Arrays.asList(
         new Employee(1276, "FFF",2000.00),
         new Employee(7865, "AAA",1200.00),
         new Employee(4975, "DDD",3000.00),
         new Employee(4499, "CCC",1500.00),
         new Employee(9937, "GGG",2800.00),
         new Employee(5634, "HHH",1100.00),
         new Employee(9276, "BBB",3200.00),
         new Employee(6852, "EEE",3400.00));

      System.out.println("Original List");
      printList(employees);

      // Using sequential stream
      long start = System.currentTimeMillis();
      List<Employee> sortedItems = employees.stream()
         .sorted(Comparator
            .comparing(Employee::getName))
         .collect(Collectors.toList());
      long end = System.currentTimeMillis();

      System.out.println("sorted using sequential stream");
      printList(sortedItems);
      System.out.println("Total the time taken process :"
         + (end - start) + " milisec.");

      // Using parallel stream
      start = System.currentTimeMillis();
      List<Employee> anotherSortedItems = employees
         .parallelStream().sorted(Comparator
            .comparing(Employee::getName))
         .collect(Collectors.toList());
      end = System.currentTimeMillis();

      System.out.println("sorted using parallel stream");
      printList(anotherSortedItems);
      System.out.println("Total the time taken process :"
         + (end - start) + " milisec.");


      double totsal=employees.parallelStream()
         .map(e->e.getSalary())
         .reduce(0.00,(a1,a2)->a1+a2);
      System.out.println("Total Salary expense: "+totsal);
      Optional<Employee> maxSal=employees.parallelStream()
         .reduce((Employee e1, Employee e2)->
         e1.getSalary()<e2.getSalary()?e2:e1);
      if(maxSal.isPresent())
         System.out.println(maxSal.get().toString());
   }

   public static void printList(List<Employee> list) {
      for (Employee e : list)
         System.out.println(e.toString());
   }
}


package org.mano.example;

public class Employee {
   private int empid;
   private String name;
   private double salary;

   public Employee() {
      super();
   }

   public Employee(int empid, String name,
         double salary) {
      super();
      this.empid = empid;
      this.name = name;
      this.salary = salary;
   }

   public int getEmpid() {
      return empid;
   }

   public void setEmpid(int empid) {
      this.empid = empid;
   }

   public String getName() {
      return name;
   }

   public void setName(String name) {
      this.name = name;
   }

   public double getSalary() {
      return salary;
   }

   public void setSalary(double salary) {
      this.salary = salary;
   }

   @Override
   public String toString() {
      return "Employee [empid=" + empid + ", name="
         + name + ", salary=" + salary + "]";
   }
}

V předchozím kódu si všimněte, jak jsme aplikovali řazení na stream jeden pomocí sekvenčního spouštění.

List<Employee> sortedItems = employees.stream()
               .sorted(Comparator
               .comparing(Employee::getName))
               .collect(Collectors.toList());

a paralelního provádění je dosaženo mírnou změnou kódu.

List<Employee> anotherSortedItems = employees
               .parallelStream().sorted(Comparator
               .comparing(Employee::getName))
               .collect(Collectors.toList());

Porovnáme také systémový čas, abychom získali představu, která část kódu zabere více času. Paralelní operace začíná, jakmile je paralelní stream explicitně získán pomocí parallelStream() metoda. Existuje další zajímavá metoda, nazvaná reduce() . Když použijeme tuto metodu na paralelní proud, operace může probíhat v různých vláknech.

Vždy však můžeme přepínat mezi paralelním a sekvenčním podle potřeby. Pokud chceme změnit paralelní stream na sekvenční, můžeme tak učinit vyvoláním sequential() metoda určená BaseStream . Jak jsme viděli v našem prvním programu, operace prováděná na streamu může být seřazená nebo neuspořádaná podle pořadí prvků. To znamená, že pořadí závisí na zdroji dat. To však neplatí v případě paralelních toků. Pro zvýšení výkonu jsou zpracovávány paralelně. Protože se to děje bez jakékoli sekvence, kde je každý oddíl proudu zpracováván nezávisle na ostatních oddílech bez jakékoli koordinace, důsledek je nepředvídatelně neuspořádaný. Pokud však chceme konkrétně provést operaci s každým prvkem v paralelním proudu, který má být uspořádán, můžeme zvážit forEachOrdered() metoda, která je alternativou k forEach() metoda.

Závěr

Rozhraní API pro streamování jsou součástí Javy již dlouhou dobu, ale přidání vychytávky paralelního zpracování je velmi vítaná a zároveň docela zajímavá funkce. To platí zejména proto, že moderní stroje jsou vícejádrové a existuje stigma, že návrh paralelního programování je složitý. Rozhraní API poskytovaná Javou poskytují schopnost začlenit nádech paralelního programování do programu Java, který má celkový design sekvenčního spouštění. To je možná nejlepší část této funkce.


  1. MySql aktualizuje dvě tabulky najednou

  2. Spustit odložený trigger pouze jednou na řádek v PostgreSQL

  3. MAX() – Najděte maximální hodnotu ve sloupci v MySQL

  4. Django a postgresql schémata