Pertanyaan Hubungan antara penangan perintah, agregat, repositori dan penyimpanan peristiwa di CQRS


Saya ingin memahami beberapa detail hubungan antara penangan perintah, agregat, repositori, dan penyimpanan peristiwa dalam sistem berbasis CQRS.

Apa yang saya pahami sejauh ini:

  • Penangan perintah menerima perintah dari bus. Mereka bertanggung jawab untuk memuat agregat yang sesuai dari repositori dan memanggil logika domain pada agregat. Setelah selesai, mereka menghapus perintah dari bus.
  • Agregat memberikan perilaku dan keadaan internal. Negara tidak pernah menjadi publik. Satu-satunya cara untuk mengubah keadaan adalah dengan menggunakan perilaku. Metode yang memodelkan perilaku ini membuat acara dari properti perintah, dan menerapkan peristiwa ini ke agregat, yang pada gilirannya memanggil penangan kejadian yang menetapkan keadaan internal yang sesuai.
  • Repositori hanya memungkinkan pemuatan agregat pada ID yang diberikan, dan menambahkan agregat baru. Pada dasarnya, repositori menghubungkan domain ke event store.
  • Event store, last but not least, bertanggung jawab untuk menyimpan acara ke database (atau penyimpanan apa pun yang digunakan), dan memuat ulang peristiwa ini sebagai aliran acara yang disebut.

Sejauh ini bagus. Sekarang ada beberapa masalah yang belum saya dapatkan:

  • Jika pengendali perintah memanggil perilaku pada agregat yang sudah ada, semuanya cukup mudah. Command handler mendapat referensi ke repositori, memanggil metode loadById dan agregat dikembalikan. Tetapi apa yang dilakukan oleh pengendali perintah ketika belum ada agregat, tetapi yang harus dibuat? Dari pemahaman saya, kelompok agregasi selanjutnya harus dibangun kembali menggunakan peristiwa. Ini berarti bahwa pembuatan agregat dilakukan sebagai balasan atas peristiwa fooCreated. Tetapi untuk dapat menyimpan acara apa pun (termasuk yang foo dibuat), saya membutuhkan agregat. Jadi ini terlihat bagi saya seperti masalah ayam-dan-telur: Saya tidak dapat membuat agregat tanpa peristiwa, tetapi satu-satunya komponen yang harus membuat acara adalah agregat. Jadi pada dasarnya ini adalah: Bagaimana cara membuat agregat baru, siapa yang melakukan apa?
  • Ketika suatu agregat memicu suatu peristiwa, respon handler internal terhadapnya (biasanya dengan dipanggil melalui metode yang berlaku) dan mengubah status agregat. Bagaimana acara ini diserahkan ke repositori? Siapa yang memulai tindakan "tolong kirim peristiwa baru ke toko repositori / peristiwa store"? Agregat itu sendiri? Repositori dengan memperhatikan agregat? Orang lain yang berlangganan acara internal? ...?
  • Terakhir tetapi tidak paling tidak saya memiliki masalah memahami konsep aliran acara dengan benar: Dalam imajinasi saya, itu hanya sesuatu seperti daftar acara yang teratur. Apa yang penting adalah bahwa itu "dipesan". Apakah ini benar?

32
2017-09-11 04:10


asal


Jawaban:


Berikut ini didasarkan pada pengalaman saya sendiri dan eksperimen saya dengan berbagai kerangka kerja seperti Lokad.CQRS, NCQRS, dll. Saya yakin ada beberapa cara untuk menangani hal ini. Saya akan posting apa yang paling masuk akal bagi saya.

1. Penciptaan Agregat:

Setiap kali pengendali perintah membutuhkan agregat, ia menggunakan repositori. Repositori mengambil daftar masing-masing peristiwa dari event store dan memanggil konstruktor yang kelebihan beban, menyuntikkan peristiwa

var stream = eventStore.LoadStream(id)
var User = new User(stream)

Jika agregat tidak ada sebelumnya, aliran akan kosong dan objek yang baru dibuat akan berada dalam keadaan asli. Anda mungkin ingin memastikan bahwa dalam keadaan ini hanya beberapa perintah yang diizinkan untuk menghidupkan agregat, mis. User.Create().

2. Penyimpanan Acara baru

Penanganan perintah terjadi di dalam a Satuan Kerja. Selama eksekusi perintah setiap acara yang dihasilkan akan ditambahkan ke daftar di dalam agregat (User.Changes). Setelah eksekusi selesai, perubahan akan ditambahkan ke event store. Pada contoh di bawah ini terjadi di baris berikut:

store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

3. Urutan Acara

Bayangkan saja apa yang akan terjadi, jika dua berikutnya CustomerMoved acara diputar ulang dalam urutan yang salah.

Sebuah contoh

Saya akan mencoba untuk mengilustrasikan dengan sepotong pseudo-code (saya dengan sengaja meninggalkan masalah repositori di dalam command handler untuk menunjukkan apa yang akan terjadi di belakang layar):

Layanan Aplikasi:

UserCommandHandler
    Handle(CreateUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Create(cmd.UserName, ...)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

    Handle(BlockUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Block(string reason)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

Agregat:

User
    created = false
    blocked = false

    Changes = new List<Event>

    ctor(eventStream)
        foreach (event in eventStream)
            this.Apply(event)

    Create(userName, ...)
        if (this.created) throw "User already exists"
        this.Apply(new UserCreated(...))

    Block(reason)
        if (!this.created) throw "No such user"
        if (this.blocked) throw "User is already blocked"
        this.Apply(new UserBlocked(...))

    Apply(userCreatedEvent)
        this.created = true
        this.Changes.Add(userCreatedEvent)

    Apply(userBlockedEvent)
        this.blocked = true
        this.Changes.Add(userBlockedEvent)

Memperbarui:

Sebagai catatan sampingan: Jawaban Yves mengingatkan saya pada artikel yang menarik oleh Udi Dahan dari beberapa tahun yang lalu:


30
2017-09-11 08:04



Variasi kecil pada jawaban Dennis yang sangat baik:

  • Ketika berhadapan dengan kasus penggunaan "kreasi" (yaitu harus memisahkan agregat baru), cobalah mencari agregat atau pabrik lain yang dapat Anda pindahkan tanggung jawab itu. Ini tidak bertentangan dengan memiliki ctor yang mengambil peristiwa untuk menghidrasi (atau mekanisme lain untuk rehidrasi dalam hal ini). Kadang-kadang pabrik hanya metode statis (baik untuk "konteks" / "maksud" menangkap), kadang-kadang itu adalah metode contoh dari agregat lain (tempat yang baik untuk "data" warisan), kadang-kadang itu adalah objek pabrik eksplisit (tempat yang baik untuk " "logika penciptaan" yang kompleks.
  • Saya suka memberikan metode GetChanges () eksplisit pada agregat saya yang mengembalikan daftar internal sebagai larik. Jika agregat saya tinggal di memori di luar satu eksekusi, saya juga menambahkan metode AcceptChanges () untuk menunjukkan daftar internal harus dihapus (biasanya dipanggil setelah hal-hal memerah ke event store). Anda dapat menggunakan model tarikan (GetChanges / Changes) atau dorong (pikirkan .net atau IObservable) di sini. Banyak tergantung pada semantik transaksional, teknologi, kebutuhan, dll ...
  • Arus acara Anda adalah daftar tertaut. Setiap revisi (event / changeset) menunjuk ke yang sebelumnya (a.k.a. the parent). Arus acara Anda adalah urutan peristiwa / perubahan yang terjadi pada agregat tertentu. Urutan hanya dijamin dalam batas agregat.

9
2017-09-11 08:26



saya hampir setuju dengan yves-reynhout dan dennis-traub tapi saya ingin menunjukkan kepada Anda bagaimana saya melakukan ini. Saya ingin menghapus agregat saya dari tanggung jawab untuk menerapkan peristiwa pada diri mereka sendiri atau untuk menghidrasi kembali diri mereka; jika tidak ada banyak duplikasi kode: setiap konstruktor agregat akan terlihat sama:

UserAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


OrderAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


ProfileAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)

Tanggung jawab itu bisa diserahkan kepada petugas operator. Perintah ditangani langsung oleh agregat.

Command dispatcher class

    dispatchCommand(command) method:
        newEvents = ConcurentProofFunctionCaller.executeFunctionUntilSucceeds(tryToDispatchCommand)
        EventDispatcher.dispatchEvents(newEvents)

    tryToDispatchCommand(command) method:
        aggregateClass = CommandSubscriber.getAggregateClassForCommand(command)
        aggregate = AggregateRepository.loadAggregate(aggregateClass, command.getAggregateId())
        newEvents = CommandApplier.applyCommandOnAggregate(aggregate, command)
        AggregateRepository.saveAggregate(command.getAggregateId(), aggregate, newEvents)

ConcurentProofFunctionCaller class

    executeFunctionUntilSucceeds(pureFunction) method:
        do this n times
            try
                call result=pureFunction()
                return result
            catch(ConcurentWriteException)
                continue
        throw TooManyRetries    

AggregateRepository class

     loadAggregate(aggregateClass, aggregateId) method:
         aggregate = new aggregateClass
         priorEvents = EventStore.loadEvents()
         this.applyEventsOnAggregate(aggregate, priorEvents)

     saveAggregate(aggregateId, aggregate, newEvents)
        this.applyEventsOnAggregate(aggregate, newEvents)
        EventStore.saveEventsForAggregate(aggregateId, newEvents, priorEvents.version)

SomeAggregate class
    handleCommand1(command1) method:
        return new SomeEvent or throw someException BUT don't change state!
    applySomeEvent(SomeEvent) method:
        changeStateSomehow() and not throw any exception and don't return anything!

Perlu diingat bahwa ini adalah kode semu yang diproyeksikan dari aplikasi PHP; kode sebenarnya harus memiliki hal-hal yang disuntikkan dan tanggung jawab lain disempurnakan di kelas lain. Ideanya adalah menjaga agregat sebersih mungkin dan menghindari duplikasi kode.

Beberapa aspek penting tentang agregat:

  1. penangan perintah tidak boleh mengubah keadaan; mereka menghasilkan acara atau melempar pengecualian
  2. acara berlaku seharusnya tidak membuang pengecualian apapun dan tidak boleh mengembalikan apa pun; mereka hanya mengubah keadaan internal

Implementasi PHP sumber terbuka ini dapat ditemukan sini.


0
2017-09-23 07:28