SQL pracuje s tabulkovými daty a vrací je (nebo vztahy, chcete-li si to takto představit, ale ne všechny tabulky SQL jsou vztahy). To znamená, že vnořená tabulka, jak je znázorněna v otázce, není tak běžnou vlastností. Existují způsoby, jak něco takového vytvořit v Postgresql, například pomocí polí JSON nebo kompozitů, ale je zcela možné pouze načíst tabulková data a provést vnoření v aplikaci. Python má itertools.groupby()
, což se vzhledem k seřazeným datům docela hodí.
Chybový column "incoming.id" must appear in the GROUP BY clause...
říká, že neagregáty ve výběrovém seznamu, mající klauzuli atd. se musí objevit v GROUP BY
klauzule nebo být použity v agregaci, jinak nebudou mít případně neurčité hodnoty . Jinými slovy, hodnota by musela být vybrána pouze z některého řádku ve skupině, protože GROUP BY
kondenzuje seskupené řádky do jednoho řádku a každý by hádal, ze které řady byli vybráni. Implementace to možná umožňuje, jako to dělá SQLite a MySQL, ale standard SQL to zakazuje. Výjimkou z pravidla je situace, kdy existuje funkční závislost
; GROUP BY
klauzule určuje neagregáty. Představte si spojení mezi tabulkami A a B seskupeno podle A primární klíč uživatele. Bez ohledu na to, který řádek ve skupině by systém vybral hodnoty pro A 's, by byly stejné, protože seskupení bylo provedeno na základě primárního klíče.
Jedním ze způsobů, jak řešit 3-bodový obecný zamýšlený přístup, by bylo vybrat spojení příchozích a odchozích, seřazených podle jejich časových razítek. Protože neexistuje žádná hierarchie dědictví nastavení – – protože možná ani neexistuje, nejsem obeznámen s účetnictvím – – návrat k používání n-tic Core a plain result usnadňuje práci v tomto případě:
incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
where(Incoming.accountID == accountID)
outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
where(Outgoing.accountID == accountID)
all_entries = incoming.union(outgoing)
all_entries = all_entries.order_by(all_entries.c.timestamp)
all_entries = db_session.execute(all_entries)
Potom za účelem vytvoření vnořené struktury itertools.groupby()
se používá:
date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
date_groups = [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Konečným výsledkem je seznam 2-tic data a seznam slovníků hesel ve vzestupném pořadí. Není to úplně řešení ORM, ale svou práci zvládne. Příklad:
In [55]: session.add_all([Incoming(accountID=1, amount=1, description='incoming',
...: timestamp=datetime.utcnow() - timedelta(days=i))
...: for i in range(3)])
...:
In [56]: session.add_all([Outgoing(accountID=1, amount=2, description='outgoing',
...: timestamp=datetime.utcnow() - timedelta(days=i))
...: for i in range(3)])
...:
In [57]: session.commit()
In [58]: incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
...: where(Incoming.accountID == 1)
...:
...: outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
...: where(Outgoing.accountID == 1)
...:
...: all_entries = incoming.union(outgoing)
...: all_entries = all_entries.order_by(all_entries.c.timestamp)
...: all_entries = db_session.execute(all_entries)
In [59]: date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
...: [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Out[59]:
[(datetime.date(2019, 9, 1),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 5,
'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 6, 101521),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 4,
'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 29, 420446),
'type': 'outgoing'}]),
(datetime.date(2019, 9, 2),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 4,
'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 6, 101495),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 3,
'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 29, 420419),
'type': 'outgoing'}]),
(datetime.date(2019, 9, 3),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 3,
'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 6, 101428),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 2,
'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 29, 420352),
'type': 'outgoing'}])]
Jak již bylo zmíněno, Postgresql může přinést téměř stejný výsledek jako použití pole JSON:
from sqlalchemy.dialects.postgresql import aggregate_order_by
incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
where(Incoming.accountID == accountID)
outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
where(Outgoing.accountID == accountID)
all_entries = incoming.union(outgoing).alias('all_entries')
day = func.date_trunc('day', all_entries.c.timestamp)
stmt = select([day,
func.array_agg(aggregate_order_by(
func.row_to_json(literal_column('all_entries.*')),
all_entries.c.timestamp))]).\
group_by(day).\
order_by(day)
db_session.execute(stmt).fetchall()
Pokud ve skutečnosti Incoming
a Outgoing
lze považovat za potomky společného základu, například Entry
, použití sjednocení lze poněkud automatizovat pomocí dědění konkrétní tabulky
:
from sqlalchemy.ext.declarative import AbstractConcreteBase
class Entry(AbstractConcreteBase, Base):
pass
class Incoming(Entry):
__tablename__ = 'incoming'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="incomings")
__mapper_args__ = {
'polymorphic_identity': 'incoming',
'concrete': True
}
class Outgoing(Entry):
__tablename__ = 'outgoing'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="outgoings")
__mapper_args__ = {
'polymorphic_identity': 'outgoing',
'concrete': True
}
Bohužel pomocí AbstractConcreteBase
vyžaduje ruční volání funkce configure_mappers()
když byly definovány všechny potřebné třídy; v tomto případě je první možnost po definování User
, protože Account
závisí na tom prostřednictvím vztahů:
from sqlalchemy.orm import configure_mappers
configure_mappers()
Poté, aby se načetly všechny Incoming
a Outgoing
v jediném polymorfním dotazu ORM použijte Entry
:
session.query(Entry).\
filter(Entry.accountID == accountID).\
order_by(Entry.timestamp).\
all()
a pokračujte pomocí itertools.groupby()
jako výše ve výsledném seznamu Incoming
a Outgoing
.