sql >> Databáze >  >> RDS >> PostgreSQL

SQLAlchemy:seskupení po dnech přes více tabulek

SQL pracuje s tabulkovými daty a vrací je (nebo vztahy, chcete-li si to takto představit, ale ne všechny tabulky SQL jsou vztahy). To znamená, že vnořená tabulka, jak je znázorněna v otázce, není tak běžnou vlastností. Existují způsoby, jak něco takového vytvořit v Postgresql, například pomocí polí JSON nebo kompozitů, ale je zcela možné pouze načíst tabulková data a provést vnoření v aplikaci. Python má itertools.groupby() , což se vzhledem k seřazeným datům docela hodí.

Chybový column "incoming.id" must appear in the GROUP BY clause... říká, že neagregáty ve výběrovém seznamu, mající klauzuli atd. se musí objevit v GROUP BY klauzule nebo být použity v agregaci, jinak nebudou mít případně neurčité hodnoty . Jinými slovy, hodnota by musela být vybrána pouze z některého řádku ve skupině, protože GROUP BY kondenzuje seskupené řádky do jednoho řádku a každý by hádal, ze které řady byli vybráni. Implementace to možná umožňuje, jako to dělá SQLite a MySQL, ale standard SQL to zakazuje. Výjimkou z pravidla je situace, kdy existuje funkční závislost ; GROUP BY klauzule určuje neagregáty. Představte si spojení mezi tabulkami A a B seskupeno podle A primární klíč uživatele. Bez ohledu na to, který řádek ve skupině by systém vybral hodnoty pro A 's, by byly stejné, protože seskupení bylo provedeno na základě primárního klíče.

Jedním ze způsobů, jak řešit 3-bodový obecný zamýšlený přístup, by bylo vybrat spojení příchozích a odchozích, seřazených podle jejich časových razítek. Protože neexistuje žádná hierarchie dědictví nastavení – – protože možná ani neexistuje, nejsem obeznámen s účetnictvím – – návrat k používání n-tic Core a plain result usnadňuje práci v tomto případě:

incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing)
all_entries = all_entries.order_by(all_entries.c.timestamp)
all_entries = db_session.execute(all_entries)

Potom za účelem vytvoření vnořené struktury itertools.groupby() se používá:

date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
date_groups = [(k, [dict(ent) for ent in g]) for k, g in date_groups]

Konečným výsledkem je seznam 2-tic data a seznam slovníků hesel ve vzestupném pořadí. Není to úplně řešení ORM, ale svou práci zvládne. Příklad:

In [55]: session.add_all([Incoming(accountID=1, amount=1, description='incoming',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [56]: session.add_all([Outgoing(accountID=1, amount=2, description='outgoing',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [57]: session.commit()

In [58]: incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    ...:     where(Incoming.accountID == 1)
    ...: 
    ...: outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    ...:     where(Outgoing.accountID == 1)
    ...: 
    ...: all_entries = incoming.union(outgoing)
    ...: all_entries = all_entries.order_by(all_entries.c.timestamp)
    ...: all_entries = db_session.execute(all_entries)

In [59]: date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
    ...: [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Out[59]: 
[(datetime.date(2019, 9, 1),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 5,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 6, 101521),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 29, 420446),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 2),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 6, 101495),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 29, 420419),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 3),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 6, 101428),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 2,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 29, 420352),
    'type': 'outgoing'}])]

Jak již bylo zmíněno, Postgresql může přinést téměř stejný výsledek jako použití pole JSON:

from sqlalchemy.dialects.postgresql import aggregate_order_by

incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing).alias('all_entries')

day = func.date_trunc('day', all_entries.c.timestamp)

stmt = select([day,
               func.array_agg(aggregate_order_by(
                   func.row_to_json(literal_column('all_entries.*')),
                   all_entries.c.timestamp))]).\
    group_by(day).\
    order_by(day)

db_session.execute(stmt).fetchall()

Pokud ve skutečnosti Incoming a Outgoing lze považovat za potomky společného základu, například Entry , použití sjednocení lze poněkud automatizovat pomocí dědění konkrétní tabulky :

from sqlalchemy.ext.declarative import AbstractConcreteBase

class Entry(AbstractConcreteBase, Base):
    pass

class Incoming(Entry):
    __tablename__ = 'incoming'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="incomings")

    __mapper_args__ = {
        'polymorphic_identity': 'incoming',
        'concrete': True
    }

class Outgoing(Entry):
    __tablename__ = 'outgoing'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="outgoings")

    __mapper_args__ = {
        'polymorphic_identity': 'outgoing',
        'concrete': True
    }

Bohužel pomocí AbstractConcreteBase vyžaduje ruční volání funkce configure_mappers() když byly definovány všechny potřebné třídy; v tomto případě je první možnost po definování User , protože Account závisí na tom prostřednictvím vztahů:

from sqlalchemy.orm import configure_mappers
configure_mappers()

Poté, aby se načetly všechny Incoming a Outgoing v jediném polymorfním dotazu ORM použijte Entry :

session.query(Entry).\
    filter(Entry.accountID == accountID).\
    order_by(Entry.timestamp).\
    all()

a pokračujte pomocí itertools.groupby() jako výše ve výsledném seznamu Incoming a Outgoing .



  1. Jak nastavit navigační formulář jako výchozí formulář v aplikaci Access

  2. Existuje příkaz SQL, který rozdělí to, co by byly 2 dlouhé sloupce, na několik párů sloupců?

  3. PDOStatement::execute():SQLSTATE[HY093]:Neplatné číslo parametru:počet vázaných proměnných neodpovídá počtu tokenů

  4. Jak vložit atribut img src kódovaný base64 do tabulky v Oracle a poté jej zobrazit na stránce v Oracle apex