-
-
Notifications
You must be signed in to change notification settings - Fork 137
/
Copy pathmormot.orm.server.pas
2494 lines (2385 loc) · 95 KB
/
mormot.orm.server.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/// ORM Types and Classes for the Server side
// - this unit is a part of the Open Source Synopse mORMot framework 2,
// licensed under a MPL/GPL/LGPL three license - see LICENSE.md
unit mormot.orm.server;
{
*****************************************************************************
Server-Side Object-Relational-Mapping (ORM) Process
- TRestOrmServer Abstract Server
- TRestOrmServerBatchSend TRestBach Server-Side Process
*****************************************************************************
}
interface
{$I ..\mormot.defines.inc}
uses
sysutils,
classes,
variants,
contnrs,
mormot.core.base,
mormot.core.os,
mormot.core.buffers,
mormot.core.unicode,
mormot.core.text,
mormot.core.datetime,
mormot.core.variants,
mormot.core.data,
mormot.core.rtti,
mormot.core.json,
mormot.core.threads,
mormot.crypt.core,
mormot.crypt.jwt,
mormot.core.perf,
mormot.crypt.secure,
mormot.core.log,
mormot.core.interfaces,
mormot.orm.base,
mormot.orm.core,
mormot.orm.rest,
mormot.orm.client,
mormot.soa.core,
mormot.soa.server,
mormot.db.core,
mormot.rest.core,
mormot.rest.client,
mormot.rest.server;
{ ************ TRestOrmServer Abstract Server }
type
/// per-table TRecordVersion, indexed over TOrmModel.Tables[]
TRecordVersionDynArray = array of TRecordVersion;
/// a generic REpresentational State Transfer (REST) ORM server
// - inherit to provide its main storage capabilities, e.g. our in-memory
// engine for TRestOrmServerFullMemory or SQlite3 for TRestOrmServerDB
// - is able to register and redirect some TOrm classes to their own
// dedicated TRestStorage
TRestOrmServer = class(TRestOrm, IRestOrmServer)
protected
fOwner: TRestServer;
/// will contain the in-memory representation of some static tables
// - this array has the same length as the associated Model.Tables[]
// - fStaticData[] will contain pure in-memory tables, not declared as
// SQLite3 virtual tables, therefore not available from joined SQL statements
fStaticData: TRestOrmDynArray;
/// map TRestStorageInMemory or TRestStorageExternal engines
// - this array has the same length as the associated Model.Tables[]
// - fStaticVirtualTable[] will contain in-memory or external tables declared
// as SQLite3 virtual tables, therefore available from joined SQL statements
// - the very same TRestStorage is handled in fStaticData
fStaticVirtualTable: TRestOrmDynArray;
fVirtualTableDirect: boolean;
fCreateMissingTablesOptions: TOrmInitializeTableOptions;
fRecordVersionMax: TRecordVersionDynArray;
fRecordVersionDeleteIgnore: boolean;
fOrmVersionDeleteTable: TOrmTableDeletedClass;
// TOrmHistory.ModifiedRecord handles up to 64 (=1 shl 6) tables
fTrackChangesHistoryTableIndex: TIntegerDynArray;
fTrackChangesHistoryTableIndexCount: integer;
fTrackChangesHistory: array of record
CurrentRow: integer;
MaxSentDataJsonRow: integer;
MaxRevisionJson: integer;
MaxUncompressedBlobSize: integer;
end;
function MaxUncompressedBlobSize(Table: TOrmClass): integer;
/// will retrieve the monotonic value of a TRecordVersion field from the DB
function InternalRecordVersionMaxFromExisting(TableIndex: integer;
RetrieveNext: boolean): TRecordVersion; virtual;
procedure InternalRecordVersionDelete(TableIndex: integer; ID: TID;
Batch: TRestBatch); virtual;
/// will compute the next monotonic value for a TRecordVersion field
// - you may override this method to customize the returned Int64 value
// (e.g. to support several synchronization nodes)
function InternalRecordVersionComputeNext(
TableIndex: PtrInt): TRecordVersion; virtual;
public
/// overridden methods which will perform CRUD operations
// - will call any static TRestStorage, or call MainEngine*() virtual methods
function EngineAdd(TableModelIndex: integer;
const SentData: RawUtf8): TID; override;
function EngineRetrieve(TableModelIndex: integer;
ID: TID): RawUtf8; override;
function EngineList(TableModelIndex: integer; const SQL: RawUtf8;
ForceAjax: boolean = false; ReturnedRowCount: PPtrInt = nil): RawUtf8; override;
function EngineUpdate(TableModelIndex: integer; ID: TID;
const SentData: RawUtf8): boolean; override;
function EngineDelete(TableModelIndex: integer; ID: TID): boolean; override;
function EngineDeleteWhere(TableModelIndex: integer; const SqlWhere: RawUtf8;
const IDs: TIDDynArray): boolean; override;
function EngineRetrieveBlob(TableModelIndex: integer; aID: TID;
BlobField: PRttiProp; out BlobData: RawBlob): boolean; override;
function EngineUpdateBlob(TableModelIndex: integer; aID: TID;
BlobField: PRttiProp; const BlobData: RawBlob): boolean; override;
function EngineUpdateField(TableModelIndex: integer; const SetFieldName,
SetValue, WhereFieldName, WhereValue: RawUtf8): boolean; override;
function EngineUpdateFieldIncrement(TableModelIndex: integer; ID: TID;
const FieldName: RawUtf8; Increment: Int64): boolean; override;
function EngineBatchSend(Table: TOrmClass; var Data: RawUtf8;
var Results: TIDDynArray; ExpectedResultsCount: integer): integer; override;
public
/// virtual abstract methods which will perform CRUD operations on the main DB
function MainEngineAdd(TableModelIndex: integer;
const SentData: RawUtf8): TID; virtual; abstract;
function MainEngineRetrieve(TableModelIndex: integer;
ID: TID): RawUtf8; virtual; abstract;
function MainEngineList(const SQL: RawUtf8; ForceAjax: boolean;
ReturnedRowCount: PPtrInt): RawUtf8; virtual; abstract;
function MainEngineUpdate(TableModelIndex: integer; ID: TID;
const SentData: RawUtf8): boolean; virtual; abstract;
function MainEngineDelete(TableModelIndex: integer;
ID: TID): boolean; virtual; abstract;
function MainEngineDeleteWhere(TableModelIndex: integer;
const SqlWhere: RawUtf8; const IDs: TIDDynArray): boolean; virtual; abstract;
function MainEngineRetrieveBlob(TableModelIndex: integer; aID: TID;
BlobField: PRttiProp; out BlobData: RawBlob): boolean; virtual; abstract;
function MainEngineUpdateBlob(TableModelIndex: integer; aID: TID;
BlobField: PRttiProp; const BlobData: RawBlob): boolean; virtual; abstract;
function MainEngineUpdateField(TableModelIndex: integer; const SetFieldName,
SetValue, WhereFieldName, WhereValue: RawUtf8): boolean; virtual; abstract;
function MainEngineUpdateFieldIncrement(TableModelIndex: integer; ID: TID;
const FieldName: RawUtf8; Increment: Int64): boolean; virtual; abstract;
/// this method is overridden for setting the NoAjaxJson field
// of all associated TRestStorage servers
procedure SetNoAjaxJson(const Value: boolean); virtual;
public
/// this integer property is incremented by the database engine when any SQL
// statement changes the database contents (i.e. on any not SELECT statement)
// - its value can be published to the client on every remote request
// - it may be used by client to avoid retrieve data only if necessary
// - if its value is 0, this feature is not activated on the server, and the
// client must ignore it and always retrieve the content
InternalState: cardinal;
/// a method can be specified here to trigger events after any table update
// - is called BEFORE deletion, and AFTER insertion or update
// - note that the aSentData parameter does not contain all record fields,
// but only transmitted information: e.g. if only one field is updated, only
// this single field (and the ID) is available
// - to be used only server-side, not to synchronize some clients: the framework
// is designed around a stateless RESTful architecture (like HTTP/1.1), in which
// clients ask the server for refresh (see TRestClientUri.UpdateFromServer)
OnUpdateEvent: TOnOrmEvent;
/// a method can be specified here to trigger events after any blob update
// - is called AFTER update of one or several blobs, never on delete nor insert
// - to be used only server-side, not to synchronize some clients: the framework
// is designed around a stateless RESTful architecture (like HTTP/1.1), in which
// clients ask the server for refresh (see TRestClientUri.UpdateFromServer)
OnBlobUpdateEvent: TOnOrmFieldEvent;
/// initialize the class, and associated to a TRest and its TOrmModel
constructor Create(aRest: TRest); override;
/// release memory and any existing associated resource
destructor Destroy; override;
/// ensure the current thread will be taken into account during process
// - this default implementation will call the BeginCurrentThread methods
// of all its internal TRestStorage instances
procedure BeginCurrentThread(Sender: TThread); override;
/// called when thread is finished to ensure
// - this default implementation will call the EndCurrentThread methods
// of all its internal TRestStorage instances
procedure EndCurrentThread(Sender: TThread); override;
/// missing tables are created if they don't exist yet for every TOrm
// class of the Database Model
// - you must call explicitly this before having called OrmMapInMemory()
// - all table description (even Unique feature) is retrieved from the Model
// - this method should also create additional fields, if the TOrm definition
// has been modified; only field adding is mandatory, field renaming or
// field deleting are not allowed in the FrameWork (in such cases, you must
// create a new TOrm type)
// - this virtual method do nothing by default - overridden versions should
// implement it as expected by the underlying storage engine (e.g. SQLite3
// or TRestServerFullInMemory)
// - you can tune some options transmitted to the TOrm.InitializeTable
// virtual methods, e.g. to avoid the automatic create of indexes
procedure CreateMissingTables(user_version: cardinal = 0;
options: TOrmInitializeTableOptions = []); virtual;
/// run the TOrm.InitializeTable methods for all void tables of the model
// - can be used instead of CreateMissingTables e.g. for MongoDB storage
// - you can specify the creation options, e.g. INITIALIZETABLE_NOINDEX
procedure InitializeTables(Options: TOrmInitializeTableOptions);
/// check on which storage instance a SQL SELECT statement is to be executed
// - returns nil if the main engine is to be used
// - or returns the target TRestStorage instance, with the adapted SQL
// statement, ready to be run on it
function InternalAdaptSql(TableIndex: integer; var SQL: RawUtf8): TRestOrm;
/// retrieve a list of members as JSON encoded data
// - used by OneFieldValue() and MultiFieldValue() methods
function InternalListRawUtf8(TableIndex: integer; const SQL: RawUtf8): RawUtf8;
/// virtual method called when a record is updated
// - default implementation will call the OnUpdateEvent/OnBlobUpdateEvent
// methods, if defined
// - will also handle TOrmHistory tables, as defined by TrackChanges()
// - returns true on success, false if an error occurred (but action must continue)
// - you can override this method to implement a server-wide notification,
// but be aware it may be the first step to break the stateless architecture
// of the framework
function InternalUpdateEvent(aEvent: TOrmEvent; aTableIndex: integer; aID: TID;
const aSentData: RawUtf8; aIsBlobFields: PFieldBits; aRec: TOrm): boolean; virtual;
/// this method is called internally after any successful deletion to
// ensure relational database coherency
// - reset all matching TRecordReference properties in the database Model,
// for database coherency, into 0
// - delete all records containing a matched TRecordReferenceToBeDeleted
// property value in the database Model (e.g. TOrmHistory)
// - reset all matching TOrm properties in the database Model,
// for database coherency, into 0
// - important notice: we don't use FOREIGN KEY constraints in this framework,
// and handle all integrity check within this method (it's therefore less
// error-prone, and more cross-database engine compatible)
function AfterDeleteForceCoherency(aTableIndex: integer; aID: TID): boolean; virtual;
/// call this method when the internal DB content is known to be invalid
// - by default, all REST/CRUD requests and direct SQL statements are
// scanned and identified as potentially able to change the internal SQL/JSON
// cache used at SQLite3 database level; but some virtual tables (e.g.
// TRestStorageExternal classes defined in mormot.orm.sql) could flush
// the database content without proper notification
// - this default implementation will just do nothing, but mormot.orm.sqlite3.pas
// unit will call TSqlDataBase.CacheFlush method
procedure FlushInternalDBCache; virtual;
/// called from STATE remote HTTP method
procedure RefreshInternalStateFromStatic;
/// assign a TRestOrm instance for a given slot
// - called e.g. by TOrmVirtualTable.Create, OrmMapMongoDB(), OrmMapInMemory()
// TRestStorageShardDB.Create or TRestOrmServer.RemoteDataCreate
procedure StaticTableSetup(aTableIndex: integer; aStatic: TRestOrm;
aKind: TRestServerKind);
/// fast get the associated static server or virtual table from its index, if any
// - returns nil if aTableIndex is invalid or is not assigned to a TRestOrm
function GetStaticTableIndex(aTableIndex: PtrInt): TRestOrm; overload;
{$ifdef HASINLINE}inline;{$endif}
/// fast get the associated static server or virtual table from its index, if any
// - returns nil if aTableIndex is invalid or is not assigned to a TRestOrm
function GetStaticTableIndex(aTableIndex: PtrInt;
out Kind: TRestServerKind): TRestOrm; overload;
{$ifdef HASINLINE}inline;{$endif}
/// create an external static redirection for a specific class
// - call it just after Create, before IRestOrmServer.CreateMissingTables;
// warning: if you don't call this method before CreateMissingTable method
// is called, the table will be created as a regular table by the main
// database engine, and won't be static
// - the specified TOrm class will have all its CRUD / ORM methods be
// redirected to aRemoteRest, which may be a TRestClient or another
// TRestServer instance (e.g. a fast SQLITE_MEMORY_DATABASE_NAME)
// - if aRemoteRest is a TRestClient, it should have been authenticated
// to the remote TRestServer, so that CRUD / ORM operations will pass
// - this will enable easy creation of proxies, or local servers, with they
// own cache and data model - e.g. a branch office server which may serve
// its local clients over Ethernet, but communicating to a main mORMot
// server via Internet, storing the corporate data in the main office server
// - you may also share some tables (e.g. TAuthUser and TAuthGroup)
// between TRestServer instances in a single service
// - returns a newly created TRestStorageRemote instance
function RemoteDataCreate(aClass: TOrmClass;
aRemoteRest: TRestOrmParent): TRestOrmParent; virtual;
/// initialize change tracking for the given tables
// - by default, it will use the TOrmHistory table to store the
// changes - you can specify a dedicated class as aTableHistory parameter
// - if aTableHistory is not already part of the TOrmModel, it will be added
// - note that this setting should be consistent in time: if you disable
// tracking for a while, or did not enable tracking before adding a record,
// then the content history won't be consistent (or disabled) for this record
// - at every change, aTableHistory.SentDataJson records will be added, up
// to aMaxHistoryRowBeforeBlob items - then aTableHistory.History will store
// a compressed version of all previous changes
// - aMaxHistoryRowBeforeBlob is the maximum number of JSON rows per Table
// before compression into BLOB is triggerred
// - aMaxHistoryRowPerRecord is the maximum number of JSON rows per record,
// above which the versions will be compressed as BLOB
// - aMaxUncompressedBlobSize is the maximum BLOB size per record
// - you can specify aMaxHistoryRowBeforeBlob=0 to disable change tracking
// - you should call this method after the CreateMissingTables call
// - note that change tracking may slow down the writing process, and
// may increase storage space a lot (even if BLOB maximum size can be set),
// so should be defined only when necessary
procedure TrackChanges(const aTable: array of TOrmClass;
aTableHistory: TOrmClass = nil;
aMaxHistoryRowBeforeBlob: integer = 1000;
aMaxHistoryRowPerRecord: integer = 10;
aMaxUncompressedBlobSize: integer = 64*1024); virtual;
/// force compression of all aTableHistory.SentDataJson into History BLOB
// - by default, this will take place in InternalUpdateEvent() when
// aMaxHistoryRowBeforeBlob - as set by TrackChanges() method - is reached
// - you can manually call this method to force History BLOB update, e.g.
// when the server is in Idle state, and ready for process
procedure TrackChangesFlush(aTableHistory: TOrmClass); virtual;
/// check if OnUpdateEvent or change tracked has been defined for this table
// - is used internally e.g. by TRestServerDB.MainEngineUpdateField to
// ensure that the updated ID fields will be computed as expected
function InternalUpdateEventNeeded(aEvent: TOrmEvent; aTableIndex: integer): boolean;
/// will compute the next monotonic value for a TRecordVersion field
function RecordVersionCompute(aTableIndex: integer): TRecordVersion;
/// read only access to the current monotonic value for a TRecordVersion field
// - only useful for testing purposes
function RecordVersionCurrent(aTableIndex: integer): TRecordVersion; overload;
/// read only access to the current monotonic value for a TRecordVersion field
function RecordVersionCurrent(aTable: TOrmClass): TRecordVersion; overload;
/// synchronous master/slave replication from a slave TRest
// - apply all the updates from another (distant) master TRestOrm for a given
// TOrm table, using its TRecordVersion field, to the calling slave
// - both remote Master and local slave TRestServer should have the supplied
// Table class in their data model (maybe in diverse order)
// - by default, all pending updates are retrieved, but you can define a value
// to ChunkRowLimit, so that the updates will be retrieved by smaller chunks
// - returns -1 on error, or the latest applied revision number (which may
// be 0 if there is no data in the table)
// - this method will use regular REST ORM commands, so will work with any
// communication channels: for real-time push synchronization, consider using
// RecordVersionSynchronizeMasterStart and RecordVersionSynchronizeSlaveStart
// over a bidirectionnal communication channel like WebSockets
// - you can use RecordVersionSynchronizeSlaveToBatch if your purpose is
// to access the updates before applying to the current slave storage
function RecordVersionSynchronizeSlave(Table: TOrmClass;
const Master: IRestOrm; ChunkRowLimit: integer = 0;
const OnWrite: TOnBatchWrite = nil): TRecordVersion;
/// synchronous master/slave replication from a slave TRest into a Batch
// - will retrieve all the updates from a (distant) master TRest for a
// given TOrm table, using its TRecordVersion field, and a supplied
// TRecordVersion monotonic value, into a TRestBatch instance
// - both remote Source and local TRestServer should have the supplied
// Table class in each of their data model
// - by default, all pending updates are retrieved, but you can define a value
// to MaxRowLimit, so that the updates will be retrieved by smaller chunks
// - returns nil if nothing new was found, or a TRestBatch instance
// containing all modifications since RecordVersion revision
// - when executing the returned TRestBatch on the database, you should
// set TRestServer.RecordVersionDeleteIgnore := true so that the
// TRecordVersion fields will be forced from the supplied value
// - usually, you should not need to use this method, but rather the more
// straightforward RecordVersionSynchronizeSlave()
function RecordVersionSynchronizeSlaveToBatch(Table: TOrmClass;
const Master: IRestOrm; var RecordVersion: TRecordVersion; MaxRowLimit: integer = 0;
const OnWrite: TOnBatchWrite = nil): TRestBatch; virtual;
/// retrieve the associated static server or virtual table, if any
// - same as a dual call to GetStaticStorage() + GetStaticVirtualTable()
function GetStorage(aClass: TOrmClass): TRestOrmParent;
{$ifdef HASINLINE}inline;{$endif}
/// retrieve the TRestStorage instance used to store and manage
// a specified TOrmClass in memory
// - raise an EModelException if aClass is not part of the database Model
// - returns nil if this TOrmClass is handled by the main engine
function GetStaticStorage(aClass: TOrmClass): TRestOrmParent;
/// retrieve a running TRestStorage virtual table
// - associated e.g. to a 'JSON' or 'Binary' virtual table module, or may
// return a TRestStorageExternal instance (as defined in mormot.orm.sql)
// - this property will return nil if there is no Virtual Table associated
// or if the corresponding module is not a TOrmVirtualTable; i.e.
// "pure" static tables registered by OrmMapInMemory() will be
// accessible only via GetStaticStorage(), not via GetVirtualStorage()
// - has been associated by the TOrmModel.VirtualTableRegister method or
// the OrmMapExternal() global function
function GetVirtualStorage(aClass: TOrmClass): TRestOrmParent;
/// initialize the RecordVersionMax[TableIndex] to the specified Value
procedure SetRecordVersionMax(TableIndex: integer; Value: TRecordVersion);
/// access to the associated TRestServer main instance
property Owner: TRestServer
read fOwner;
/// low-level value access to process TRecordVersion field for each table
// - may equal nil if not TRecordVersion field is defined
property RecordVersionMax: TRecordVersionDynArray
read fRecordVersionMax write fRecordVersionMax;
/// you can force this property to TRUE so that any Delete() will not
// write to the TOrmTableDelete table for TRecordVersion tables
// - to be used when applying a TRestBatch instance as returned by
// RecordVersionSynchronizeToBatch()
property RecordVersionDeleteIgnore: boolean
read fRecordVersionDeleteIgnore write fRecordVersionDeleteIgnore;
/// the options specified to TRestServer.CreateMissingTables
// - as expected by TOrm.InitializeTable methods
property CreateMissingTablesOptions: TOrmInitializeTableOptions
read fCreateMissingTablesOptions;
public
{ IRestOrm overriden methods }
/// implement Server-Side TRest deletion
// - uses internally EngineDelete() function for calling the database engine
// - call corresponding fStaticData[] if necessary
// - this record is also erased in all available TRecordReference properties
// in the database Model, for relational database coherency
function Delete(Table: TOrmClass; ID: TID): boolean; override;
/// implement Server-Side TRest deletion with a WHERE clause
// - will process all ORM-level validation, coherency checking and
// notifications together with a low-level SQL deletion work (if possible)
function Delete(Table: TOrmClass; const SqlWhere: RawUtf8): boolean; override;
/// overridden method for direct static class call (if any)
function TableRowCount(Table: TOrmClass): Int64; override;
/// overridden method for direct static class call (if any)
function TableHasRows(Table: TOrmClass): boolean; override;
/// overridden method for direct static class call (if any)
function MemberExists(Table: TOrmClass; ID: TID): boolean; override;
/// update all BLOB fields of the supplied Value
// - this overridden method will execute the direct static class, if any
function UpdateBlobFields(Value: TOrm): boolean; override;
/// get all BLOB fields of the supplied value from the remote server
// - this overridden method will execute the direct static class, if any
function RetrieveBlobFields(Value: TOrm): boolean; override;
/// implement Server-Side TRest unlocking
// - to be called e.g. after a Retrieve() with forupdate=TRUE
// - implements our custom UNLOCK REST-like verb
// - locking is handled by TRestOrmServer.Model
// - returns true on success
function UnLock(Table: TOrmClass; aID: TID): boolean; override;
/// end a transaction
// - implements REST END collection
// - write all pending TOrmVirtualTableJson data to the disk
procedure Commit(SessionID: cardinal; RaiseException: boolean); override;
public
{ IRestOrmServer methods }
/// create an index for the specific FieldName
// - will call CreateSqlMultiIndex() internally
function CreateSqlIndex(Table: TOrmClass;
const FieldName: RawUtf8; Unique: boolean;
const IndexName: RawUtf8 = ''): boolean; overload;
/// create one or multiple index(es) for the specific FieldName(s)
function CreateSqlIndex(Table: TOrmClass;
const FieldNames: array of RawUtf8; Unique: boolean): boolean; overload;
/// create one index for all specific FieldNames at once
// - will call any static engine for the index creation of such tables, or
// execute a CREATE INDEX IF NOT EXISTS on the main engine
// - note that with SQLite3, your database schema should never contain two
// indices where one index is a prefix of the other, e.g. if you defined:
// ! aServer.CreateSqlMultiIndex(TEmails, ['Email','GroupID'], true);
// Then the following index is not mandatory for SQLite3:
// ! aServer.CreateSqlIndex(TEmails, 'Email', false);
// see "1.6 Multi-Column Indices" in @http://www.sqlite.org/queryplanner.html
function CreateSqlMultiIndex(Table: TOrmClass;
const FieldNames: array of RawUtf8;
Unique: boolean; IndexName: RawUtf8 = ''): boolean; virtual;
/// check if the supplied TOrm is not a virtual or static table
function IsInternalSQLite3Table(aTableIndex: integer): boolean;
/// returns true if the server will handle per-user authentication and
// access right management
function HandleAuthentication: boolean;
/// this property can be left to its TRUE default value, to handle any
// TOrmVirtualTableJson static tables (module JSON or BINARY) with direct
// calls to the storage instance
procedure SetStaticVirtualTableDirect(direct: boolean);
published
/// this property can be left to its TRUE default value, to handle any
// TOrmVirtualTableJson static tables (module JSON or BINARY) with direct
// calls to the storage instance
// - see also IRestOrmServer.SetStaticVirtualTableDirect
// - is set to TRUE by default to enable faster Direct mode
// - in Direct mode, GET/POST/PUT/DELETE of individual records (or BLOB fields)
// from Uri() will call directly the corresponding TRestStorage
// instance, for better speed for most used RESTful operations; but complex
// SQL requests (e.g. joined SELECT) will rely on the main SQL engine
// - if set to false, will use the main SQLite3 engine for all statements
// (should not to be used normally, because it will add unnecessary overhead)
property StaticVirtualTableDirect: boolean read fVirtualTableDirect
write fVirtualTableDirect;
end;
{ ************ TRestOrmServerBatchSend TRestBach Server-Side Process }
type
/// internal state machine used by TRestOrmServer.EngineBatchSend
// - this code is so complex/optimized that it needed its own class
TRestOrmServerBatchSend = class
protected
fParse: TGetJsonField;
fCommand: PUtf8Char;
fValue: RawUtf8;
fValueID: TID;
fValueDirect: PUtf8Char;
fOrm: TRestOrmServer;
fTable: TOrmClass;
fBatchOptions: TRestBatchOptions;
fEncoding, fCommandEncoding, fRunningBatchEncoding: TRestBatchEncoding;
fCommandDirectSupport: TRestOrmBatchDirect;
fCommandDirectFormat: TSaveFieldsAsObject;
fFlags: set of (
fNeedAcquireExecutionWrite,
fAcquiredExecutionWrite,
fRunMainTrans);
fRunningBatchRest: TRestOrm;
fRunningRest: TRestOrm;
fRunStatic: TRestOrm;
fRunTableTrans: array of TRestOrm;
fRunTable, fRunningBatchTable: TOrmClass;
fRunTableIndex, fMainTableIndex: integer;
fRowCountPerTrans, fRowCountPerCurrTrans: cardinal;
fUriContext: TRestServerUriContext;
fResults: TIDDynArray;
fData: RawUtf8;
fCount, fErrors: integer;
fLog: ISynLog;
fValueDirectFields: TFieldBits;
fCounts: array[TRestBatchEncoding] of cardinal;
fTimer: TPrecisionTimer;
fErrorMessage: RawUtf8;
procedure AutomaticTransactionBegin;
procedure AutomaticCommit;
procedure ExecuteValueCheckIfRestChange;
function IsNotAllowed: boolean;
{$ifdef FPC} inline; {$endif}
procedure ParseHeader;
procedure ParseCommand;
procedure ParseValue;
function ExecuteValue: boolean;
procedure ParseEnding;
procedure OnError(E: Exception);
procedure DoLog;
public
/// intialize the TRestBatch server-side processing
constructor Create(aRest: TRestOrmServer; aTable: TOrmClass;
var aData: RawUtf8; aExpectedResultsCount: integer); reintroduce;
/// execute the TRestBatch server-side processing
procedure ParseAndExecute;
/// the ParseAndExecute results
property Results: TIDDynArray
read fResults;
end;
implementation
uses
mormot.orm.storage;
{ ************ TRestOrmServer Abstract Server}
{ TRestOrmServer }
constructor TRestOrmServer.Create(aRest: TRest);
var
t: PtrInt;
begin
if aRest <> nil then
fOwner := aRest as TRestServer;
// set fRest+fModel
inherited Create(aRest);
// faster direct Static call by default
fVirtualTableDirect := true;
// initialize TrackChanges() associated tables
if fModel.Tables <> nil then
begin
fTrackChangesHistoryTableIndexCount := length(fModel.Tables);
SetLength(fTrackChangesHistory, fTrackChangesHistoryTableIndexCount);
if fTrackChangesHistoryTableIndexCount > 64 then
// rows are identified as RecordRef
fTrackChangesHistoryTableIndexCount := 64;
SetLength(fTrackChangesHistoryTableIndex, fTrackChangesHistoryTableIndexCount);
for t := 0 to fTrackChangesHistoryTableIndexCount - 1 do
fTrackChangesHistoryTableIndex[t] := -1;
fOrmVersionDeleteTable := TOrmTableDeleted;
for t := 0 to high(fModel.Tables) do
if fModel.Tables[t].OrmProps.RecordVersionField <> nil then
begin
fOrmVersionDeleteTable := fModel.AddTableInherited(TOrmTableDeleted);
break;
end;
end;
end;
destructor TRestOrmServer.Destroy;
var
i: PtrInt;
orm: TRestOrm;
begin
// free all virtual TRestStorage instances
for i := 0 to high(fStaticVirtualTable) do
if fStaticVirtualTable[i] <> nil then
begin
if fStaticVirtualTable[i].RefCount <> 1 then
ERestStorage.RaiseUtf8('%.Destroy: static virtual % refcnt=%',
[self, fStaticVirtualTable[i], fStaticVirtualTable[i].RefCount]);
IInterface(fStaticVirtualTable[i])._Release;
if fStaticData <> nil then
// free once as fStaticVirtualTable[i], just clear reference here
fStaticData[i] := nil;
end;
// free lasting TRestStorage instances and update file if necessary
for i := 0 to high(fStaticData) do
begin
orm := fStaticData[i];
if orm <> nil then
begin
if orm.RefCount <> 1 then
ERestStorage.RaiseUtf8('%.Destroy: static % refcnt=%',
[self, orm, orm.RefCount]);
IInterface(orm)._Release;
end;
end;
inherited Destroy; // fCache.Free
end;
procedure TRestOrmServer.BeginCurrentThread(Sender: TThread);
var
i: PtrInt;
begin
for i := 0 to length(fStaticVirtualTable) - 1 do
if fStaticVirtualTable[i] <> nil then
fStaticVirtualTable[i].BeginCurrentThread(Sender);
end;
procedure TRestOrmServer.EndCurrentThread(Sender: TThread);
var
i: PtrInt;
begin
for i := 0 to length(fStaticVirtualTable) - 1 do
if fStaticVirtualTable[i] <> nil then
fStaticVirtualTable[i].EndCurrentThread(Sender);
end;
procedure TRestOrmServer.CreateMissingTables(user_version: cardinal;
options: TOrmInitializeTableOptions);
begin
fCreateMissingTablesOptions := options;
end;
procedure TRestOrmServer.InitializeTables(Options: TOrmInitializeTableOptions);
var
t: PtrInt;
begin
if (self <> nil) and
(fModel <> nil) then
for t := 0 to fModel.TablesMax do
if not TableHasRows(fModel.Tables[t]) then
fModel.Tables[t].InitializeTable(self, '', Options);
end;
procedure TRestOrmServer.SetNoAjaxJson(const Value: boolean);
begin
// do nothing at this level
end;
function TRestOrmServer.GetStaticStorage(aClass: TOrmClass): TRestOrmParent;
var
i: PtrInt;
begin
if (self <> nil) and
(fStaticData <> nil) then
begin
i := fModel.GetTableIndexExisting(aClass);
if i < length(fStaticData) then
result := fStaticData[i] // no IRestOrm refcnt involved here
else
result := nil;
end
else
result := nil;
end;
function TRestOrmServer.GetVirtualStorage(aClass: TOrmClass): TRestOrmParent;
var
i: PtrInt;
begin
result := nil;
if fStaticVirtualTable <> nil then
begin
i := fModel.GetTableIndexExisting(aClass);
if (i >= 0) and
(fModel.TableProps[i].Kind in IS_CUSTOM_VIRTUAL) then
result := fStaticVirtualTable[i]; // no IRestOrm refcnt involved here
end;
end;
function TRestOrmServer.GetStorage(aClass: TOrmClass): TRestOrmParent;
begin
if (aClass = nil) or
((fStaticData = nil) and
(fStaticVirtualTable = nil)) then
result := nil
else
result := GetStaticTableIndex(fModel.GetTableIndexExisting(aClass));
end;
function TRestOrmServer.GetStaticTableIndex(aTableIndex: PtrInt): TRestOrm;
begin
result := nil;
if aTableIndex < 0 then
exit;
if aTableIndex < length(fStaticData) then
result := fStaticData[aTableIndex]; // no IRestOrm refcnt here
if result = nil then
if fVirtualTableDirect and
(fStaticVirtualTable <> nil) then
result := fStaticVirtualTable[aTableIndex];
end;
function TRestOrmServer.GetStaticTableIndex(aTableIndex: PtrInt;
out Kind: TRestServerKind): TRestOrm;
begin
result := nil;
Kind := sMainEngine;
if aTableIndex < 0 then
exit;
if aTableIndex < length(fStaticData) then
begin
result := fStaticData[aTableIndex]; // no IRestOrm refcnt here
if result <> nil then
begin
Kind := sStaticDataTable;
exit;
end;
end;
if fVirtualTableDirect and
(fStaticVirtualTable <> nil) then
begin
result := fStaticVirtualTable[aTableIndex]; // no IRestOrm refcnt here
if result <> nil then
Kind := sVirtualTable;
end;
end;
function TRestOrmServer.RemoteDataCreate(aClass: TOrmClass;
aRemoteRest: TRestOrmParent): TRestOrmParent;
var
t: PtrInt;
existing: TRestOrm;
begin
t := Model.GetTableIndexExisting(aClass);
existing := GetStaticTableIndex(t);
if existing <> nil then
ERestStorage.RaiseUtf8('Duplicated %.RemoteDataCreate(%) as %',
[self, aClass, existing]);
result := TRestStorageRemote.Create(aClass, self, aRemoteRest as TRestOrm);
StaticTableSetup(t, result as TRestOrm, sStaticDataTable);
end;
function TRestOrmServer.MaxUncompressedBlobSize(Table: TOrmClass): integer;
var
i: PtrInt;
begin
i := fModel.GetTableIndexExisting(Table);
if (i >= 0) and
(i < length(fTrackChangesHistory)) then
result := fTrackChangesHistory[i].MaxUncompressedBlobSize
else
result := 0;
end;
procedure TRestOrmServer.SetRecordVersionMax(
TableIndex: integer; Value: TRecordVersion);
begin
if cardinal(TableIndex) >= cardinal(Model.TablesMax) then
ERestStorage.RaiseUtf8('%.SetRecordVersionMax(%)', [self, TableIndex]);
fRest.AcquireExecution[execOrmWrite].Safe.Lock;
try
if high(fRecordVersionMax) <> Model.TablesMax then
SetLength(fRecordVersionMax, Model.TablesMax + 1);
fRecordVersionMax[TableIndex] := Value;
finally
fRest.AcquireExecution[execOrmWrite].Safe.UnLock;
end;
end;
function TRestOrmServer.InternalRecordVersionMaxFromExisting(
TableIndex: integer; RetrieveNext: boolean): TRecordVersion;
var
T: TOrmClass;
field: TOrmPropInfoRttiRecordVersion;
max, mDeleted: Int64;
begin
if cardinal(TableIndex) > cardinal(fModel.TablesMax) then
ERestStorage.RaiseUtf8('%.RecordVersionMaxFromExisting(%)', [self, TableIndex]);
fRest.AcquireExecution[execOrmWrite].Safe.Lock;
try
if high(fRecordVersionMax) <> fModel.TablesMax then // checked within lock
SetLength(fRecordVersionMax, fModel.TablesMax + 1);
result := fRecordVersionMax[TableIndex];
if result = 0 then
begin
// need to retrieve the current TRecordVersion of this table from DB
T := fModel.Tables[TableIndex];
field := T.OrmProps.RecordVersionField;
if field = nil then
ERestStorage.RaiseUtf8('% has no RecordVersion', [T]);
if OneFieldValue(T, 'max(' + field.Name + ')', '', [], [], max) then
if max > result then
result := max;
mDeleted := Int64(TableIndex) shl ORMVERSION_DELETEID_SHIFT;
if OneFieldValue(fOrmVersionDeleteTable, 'max(ID)', 'ID>? and ID<?',
[], [mDeleted, mDeleted + ORMVERSION_DELETEID_RANGE], max) then
begin
max := max and pred(ORMVERSION_DELETEID_RANGE);
if max > result then
result := max;
end;
end;
if RetrieveNext then
inc(result);
fRecordVersionMax[TableIndex] := result;
finally
fRest.AcquireExecution[execOrmWrite].Safe.UnLock;
end;
end;
procedure TRestOrmServer.InternalRecordVersionDelete(TableIndex: integer;
ID: TID; Batch: TRestBatch);
var
deleted: TOrmTableDeleted;
revision: TRecordVersion;
begin
if fRecordVersionDeleteIgnore then
exit;
deleted := fOrmVersionDeleteTable.Create;
try
revision := RecordVersionCompute(TableIndex);
deleted.IDValue := revision +
Int64(TableIndex) shl ORMVERSION_DELETEID_SHIFT;
deleted.Deleted := ID;
if Batch <> nil then
Batch.Add(deleted, true, true)
else
Add(deleted, true, true);
if (fOwner <>nil) and
(fOwner.Services <> nil) then
(fOwner.Services as TServiceContainerServer).
RecordVersionNotifyDelete(TableIndex, ID, revision);
finally
deleted.Free;
end;
end;
function TRestOrmServer.InternalRecordVersionComputeNext(
TableIndex: PtrInt): TRecordVersion;
begin
if (PtrUInt(length(fRecordVersionMax)) <= PtrUInt(TableIndex)) or
(fRecordVersionMax[TableIndex] = 0) then
// need to initialize fRecordVersionMax[] and/or access the DB
result := InternalRecordVersionMaxFromExisting(TableIndex, {next=}true)
else
begin
// quick compute the TRecordVersion of this table within the write lock
fRest.AcquireExecution[execOrmWrite].Safe.Lock;
inc(fRecordVersionMax[TableIndex]);
result := fRecordVersionMax[TableIndex];
fRest.AcquireExecution[execOrmWrite].Safe.UnLock;
end;
end;
function TRestOrmServer.RecordVersionCompute(aTableIndex: integer): TRecordVersion;
begin
result := InternalRecordVersionComputeNext(aTableIndex);
if result >= ORMVERSION_DELETEID_RANGE then
EOrmException.RaiseUtf8(
'%.InternalRecordVersionCompute=% overflow: %.ID should be < 2^%)',
[self, result, fOrmVersionDeleteTable, ORMVERSION_DELETEID_SHIFT]);
end;
function TRestOrmServer.RecordVersionCurrent(aTableIndex: integer): TRecordVersion;
begin
if self = nil then
result := 0
else if (cardinal(length(fRecordVersionMax)) <= cardinal(aTableIndex)) or
(fRecordVersionMax[aTableIndex] = 0) then
// need to initialize fRecordVersionMax[] and/or access the DB
result := InternalRecordVersionMaxFromExisting(aTableIndex, {next=}false)
else
result := fRecordVersionMax[aTableIndex];
end;
function TRestOrmServer.RecordVersionCurrent(aTable: TOrmClass): TRecordVersion;
begin
if self = nil then
result := 0
else
result := RecordVersionCurrent(fModel.GetTableIndexExisting(aTable));
end;
function TRestOrmServer.RecordVersionSynchronizeSlave(
Table: TOrmClass; const Master: IRestOrm; ChunkRowLimit: integer;
const OnWrite: TOnBatchWrite): TRecordVersion;
var
t: PtrUInt;
batch: TRestBatch;
ids: TIDDynArray;
status: integer;
{%H-}log: ISynLog;
begin
log := fRest.LogClass.Enter('RecordVersionSynchronizeSlave %', [Table], self);
t := fModel.GetTableIndexExisting(Table);
result := -1; // error
if (PtrUInt(length(fRecordVersionMax)) <= t) or
(fRecordVersionMax[t] = 0) then
// need to initialize fRecordVersionMax[] and/or access the DB
InternalRecordVersionMaxFromExisting(t, {next=}false);
repeat
batch := RecordVersionSynchronizeSlaveToBatch(Table, Master,
fRecordVersionMax[t], ChunkRowLimit, OnWrite);
if batch = nil then
// error
exit;
if batch.Count = 0 then
begin
// nothing new (e.g. reached last chunk)
result := fRecordVersionMax[t];
batch.Free;
break;
end;
try
fRest.AcquireExecution[execOrmWrite].Safe.Lock;
fRecordVersionDeleteIgnore := true;
status := BatchSend(batch, ids);
if status = HTTP_SUCCESS then
begin
if sllDebug in fRest.LogLevel then
fRest.InternalLog(
'RecordVersionSynchronize(%) Added=% Updated=% Deleted=% on %',
[Table, batch.AddCount, batch.UpdateCount, batch.DeleteCount,
Master], sllDebug);
if ChunkRowLimit = 0 then
begin
result := fRecordVersionMax[t];
break;
end;
end
else
begin
fRest.InternalLog('RecordVersionSynchronize(%) BatchSend=%',
[Table, status], sllError);
fRecordVersionMax[t] := 0; // force recompute the maximum from DB
break;
end;
finally
fRecordVersionDeleteIgnore := false;
fRest.AcquireExecution[execOrmWrite].Safe.UnLock;
batch.Free;
end;
until false; // continue synch until nothing new is found
end;
function TRestOrmServer.RecordVersionSynchronizeSlaveToBatch(Table: TOrmClass;
const Master: IRestOrm; var RecordVersion: TRecordVersion;
MaxRowLimit: integer; const OnWrite: TOnBatchWrite): TRestBatch;
var
tableindex, sourcetableindex, updatedrow, deletedrow: integer;
props: TOrmProperties;
where: RawUtf8;
updatedversion, deletedversion: TRecordVersion;
listupdated, listdeleted: TOrmTable;
rec: TOrm;
deletedminid: TID;
deleted: TOrmTableDeleted;
opt: TRestBatchOptions;
{%H-}log: ISynLog;
begin
log := fRest.LogClass.Enter(
'RecordVersionSynchronizeSlaveToBatch % vers=% maxrow=%',
[Table, RecordVersion, MaxRowLimit], self);
result := nil;
if Master = nil then
EOrmException.RaiseUtf8(
'%.RecordVersionSynchronizeSlaveToBatch(Master=nil)', [self]);
tableindex := Model.GetTableIndexExisting(Table);
sourcetableindex := Master.Model.GetTableIndexExisting(Table); // <>tableindex?
props := Model.TableProps[tableindex].props;
if props.RecordVersionField = nil then
EOrmException.RaiseUtf8(
'%.RecordVersionSynchronizeSlaveToBatch(%) with no TRecordVersion field',
[self, Table]);
fRest.AcquireExecution[execOrmWrite].Safe.Lock;
try
where := '%>? order by %';
if MaxRowLimit > 0 then
where := FormatUtf8('% limit %', [where, MaxRowLimit]);
listupdated := Master.MultiFieldValues(Table, '*', where,
[props.RecordVersionField.Name, props.RecordVersionField.Name],
[RecordVersion]);
if listupdated = nil then
exit; // DB error
listdeleted := nil;
try
deletedminid := Int64(sourcetableindex) shl ORMVERSION_DELETEID_SHIFT;
where := 'ID>? and ID<? order by ID';
if MaxRowLimit > 0 then
where := FormatUtf8('% limit %', [where, MaxRowLimit]);
listdeleted := Master.MultiFieldValues(fOrmVersionDeleteTable,
'ID,Deleted', where, [deletedminid + RecordVersion,
deletedminid + ORMVERSION_DELETEID_RANGE]);
if listdeleted = nil then