-
Notifications
You must be signed in to change notification settings - Fork 70
Expand file tree
/
Copy pathservice_replication.cpp
More file actions
1607 lines (1294 loc) · 69.4 KB
/
service_replication.cpp
File metadata and controls
1607 lines (1294 loc) · 69.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#include "service_common.h"
#include <date.h>
static inline std::size_t aligned_to(const std::size_t v, const std::size_t alignment) noexcept {
return (v + alignment - 1) & (-alignment);
}
#ifndef HWM_UPDATE_BASED_ON_ACKS
void Service::rebuild_partition_tracked_isrs(topic_partition *const partition) {
enum {
trace = false,
};
TANK_EXPECT(partition);
const auto self = cluster_state.local_node.ref;
partition->cluster.isr.tracker.size = 0;
for (const auto ptr : partition->cluster.isr.list) {
const auto isr_e = containerof(isr_entry, partition_ll, ptr);
TANK_EXPECT(isr_e->partition() == partition);
const auto node = isr_e->node();
if (node == self) {
continue;
} else if (isr_e->last_msg_lsn == 0) {
// hasn't requested anything yet
continue;
}
auto &it = partition->cluster.isr.tracker.data[partition->cluster.isr.tracker.size++];
it.nid = node->id;
it.lsn = isr_e->last_msg_lsn;
}
std::sort(partition->cluster.isr.tracker.data, partition->cluster.isr.tracker.data + partition->cluster.isr.tracker.size,
[](const auto &a, const auto &b) noexcept {
return a.lsn < b.lsn;
});
if (trace) {
SLog("After rebuilding\n");
for (unsigned i = 0; i < partition->cluster.isr.tracker.size; ++i) {
const auto &it = partition->cluster.isr.tracker.data[i];
SLog(">> Node ", it.nid, " ", it.lsn, "\n");
}
}
#ifdef TANK_RUNTIME_CHECKS
for (unsigned i = 1; i < partition->cluster.isr.tracker.size; ++i) {
TANK_EXPECT(partition->cluster.isr.tracker.data[i].lsn >= partition->cluster.isr.tracker.data[i - 1].lsn);
}
#endif
}
// TODO: this is *not* optimal
bool topic_partition::Cluster::ISR::Tracker::confirmed(const uint8_t k, const uint64_t seqnum) const TANK_NOEXCEPT_IF_NORUNTIME_CHECKS {
enum {
trace = false,
};
if (trace) {
SLog("confirmed() for k = ", k, ", seqnum = ", seqnum, ", size = ", size, "\n");
}
if (k > size) {
// we need to special-case for this
// see consider_pending_client_produce_responses() impl
// where we check for (required_acks + 1 > isr_size) { }
if (trace) {
SLog("k > size\n");
}
return true;
} else if (!k) {
if (trace) {
SLog("k == 0\n");
}
return false;
}
// Producers get an ACK _regardless_ of HWM aadvancement
// i.e if a producer P produces a message with LSN 100, and ack = 2(requires an implicit ack from the leader, and 2 acks from peers in the ISR)
// then as soon as 2+ different peers issue a ConsumePeer for LSN > 100, that produce req. can get ack-ed.
//
// e.g for seqnum = 8, k = 2
// we may have data[0].lsn = 7, data[1] = 8
for (unsigned i = 0; i < k; ++i) {
const auto v = data[i].lsn;
if (trace) {
SLog("Considering ", seqnum, " / ", v, "\n");
}
if (v < seqnum) {
if (trace) {
SLog("NOPE\n");
}
return false;
}
}
if (trace) {
SLog("*CONFIRMED*\n");
}
return true;
}
#endif
// See TANKUtil::produce_request_acks()
uint8_t topic::compute_required_peers_acks(const topic_partition *part, const uint8_t required_acks) const {
// see apply_cluster_state_updates() DEFERRED_REPAIRS comments
TANK_EXPECT(part->cluster.isr.size()); // can't possibly have an empty ISR
// It's possible the number of nodes in the ISR be higher than the number
// of nodes in the replicas set for a few MS -- this is an edge case and it rarely manifests
// but it is important to account for that here
const auto effective_isr_size = std::min<uint8_t>(part->owner->cluster.rf_, part->cluster.isr.size());
// ISR always includes the local node as well
// but we care for acknowledgements from peers so we need to compute it here
const auto peers_in_isr = effective_isr_size - 1;
if (required_acks == 0) {
// magic: require acks from all peers
// so we need to return (effective_isr_size - 1)
// because ISR always include the local node as well
// TODO: need a special > 200 value, because 0 is a for acks is an acceptable value
return peers_in_isr;
} else if (required_acks == 255) {
// magic: quorum
return peers_in_isr / 2 + 1;
} else {
return required_acks;
}
}
// aborts replication of partition `p` from node `src`
// if there is an active replication stream for that node, it will be closed
// if no other partitions are to be replicated from that peer later
void Service::try_abort_replication(topic_partition *p, cluster_node *src, const uint32_t ref) {
enum {
trace = false,
};
TANK_EXPECT(p);
TANK_EXPECT(src);
TANK_EXPECT(cluster_aware());
if (src == cluster_state.local_node.ref) {
// we were definitely not replicating from us
return;
}
if (trace) {
SLog("Attempting to abort replication of ", p->owner->name(), "/", p->idx, " from ", src->id, "@", src->ep, ", ref = ", ref, "\n");
}
// just in case
abort_retry_consume_from(src);
if (auto stream = p->cluster.rs) {
if (trace) {
SLog("Replication stream available for partition\n");
}
if (!stream->src or stream->src != src) {
// this is fine
// instead of checking if `src` is the lader of `p` before invoking this method, we can cheaply
// guard against that here
if (trace) {
SLog("Stream active for partition ", p->owner->name(), "/", p->idx,
", but streaming from ", stream->src->ep, " not from ", src->ep, "\n");
}
return;
}
if (auto c = stream->ch.get()) {
// we no longer wish to receive any content for (peer, partition)
// we may have multiple other partitions that depend on streams from that node
// or that node may be pulling from us now, so we don't want to close the connection unless we need to
TANK_EXPECT(stream->src == src);
TANK_EXPECT(c == src->consume_conn.ch.get());
if (trace) {
SLog("Stream connection available; will shut it down\n");
}
stream->ch.reset(); // this stream is no longer bound to replica's connection
did_abort_repl_stream(src);
}
// in case it wasn't detached
stream->repl_streams_ll.try_detach();
put_repl_stream(stream);
p->cluster.rs = nullptr;
} else if (trace) {
SLog("No replication stream for partition\n");
}
}
bool Service::any_partitions_to_replicate_from(cluster_node *n) const {
enum {
trace = false,
};
TANK_EXPECT(n);
TANK_EXPECT(cluster_aware());
auto self = cluster_state.local_node.ref;
if (not can_accept_any_messages()) {
return false;
}
if (not n->leadership.dirty) {
// fast-path
#ifdef TANK_RUNTIME_CHECKS
for (auto p : *n->leadership.local_replication_list) {
TANK_EXPECT(self->is_replica_for(p));
}
#endif
if (0 == partitions_io_failed_cnt) {
// fast-path
return false == n->leadership.local_replication_list->empty();
} else {
for (auto p : *n->leadership.local_replication_list) {
if (can_accept_messages(p)) {
return true;
}
}
return false;
}
}
for (auto it : n->leadership.list) {
const auto p = containerof(topic_partition, cluster.leader.leadership_ll, it);
if (self->is_replica_for(p) and can_accept_messages(p)) {
return true;
}
}
return false;
}
// returns a list of all partitions where leader is `n` and we
// are replicas of (i.e partitions to replicate from it)
const std::vector<topic_partition *> *Service::partitions_to_replicate_from(cluster_node *const n) {
TANK_EXPECT(n);
TANK_EXPECT(cluster_aware());
enum {
trace = false,
};
auto self = cluster_state.local_node.ref;
TANK_EXPECT(self);
if (n->leadership.list.empty()) {
if (trace) {
SLog("Node ", n->id, "@", n->ep, " is not a leader for any partitions\n");
}
return nullptr;
}
if (not n->leadership.local_replication_list) {
if (trace) {
SLog("Will create local_replication_list\n");
}
n->leadership.local_replication_list.reset(new std::vector<topic_partition *>());
n->leadership.dirty = true; // force rebuild
}
auto v = n->leadership.local_replication_list.get();
if (not can_accept_any_messages()) {
v->clear();
return v;
}
if (n->leadership.dirty) {
v->clear();
// for each partition that node's a leader now
for (auto it : n->leadership.list) {
auto p = containerof(topic_partition, cluster.leader.leadership_ll, it);
if (self->is_replica_for(p) and can_accept_messages(p)) {
v->emplace_back(p);
}
}
// sort it so that replicate_from() can use this
std::sort(v->begin(), v->end(), [](const auto a, const auto b) noexcept {
return a->owner < b->owner or (a->owner == b->owner and a->idx < b->idx);
});
n->leadership.dirty = false;
if (trace) {
SLog("Leadership list dirty for node ", n->id, '@', n->ep, " updated => ", v->size(), " [", values_repr_with_lambda(v->data(), v->size(), [](const auto it) noexcept {
return Buffer{}.append(it->owner->name(), '/', it->idx);
}),
"\n");
}
} else if (trace) {
SLog("Not Dirty => ", v->size(), "\n");
#ifdef TANK_RUNTIME_CHECKS
for (auto p : *v) {
TANK_EXPECT(self->is_replica_for(p));
}
#endif
}
return v;
}
// returns true if tried to send the response to client and client connection was shut down
// invoked by complete_deferred_produce_response()
bool Service::try_generate_produce_response(produce_response *pr) {
enum {
trace = false,
};
TANK_EXPECT(pr);
if (trace) {
SLog("Attempting to generate PRODUCE response\n");
}
TANK_EXPECT(pr->deferred.expiration.ll.empty());
if (pr->client_ctx.connection_ll.try_detach_and_reset()) {
// detach from client connection
if (trace) {
SLog("Now detached from client\n");
}
}
auto c = pr->client_ctx.ch.get();
if (!c or -1 == c->fd) {
// client connection went away already
// will check for (-1 == c->fd) because this may have been invoked in cleanup_connection()
if (trace) {
SLog("Connection gone away\n");
}
put_produce_response(pr);
return true;
}
const auto res = generate_produce_response(pr);
put_produce_response(pr);
return res;
}
// mostly serves as a trampoline for try_generate_produce_response()
// invoked by confirm_deferred_produce_resp_partition()
void Service::complete_deferred_produce_response(produce_response *dpr) {
enum {
trace = false,
};
TANK_EXPECT(dpr);
if (trace) {
SLog("Completing DPR for ", dpr->gen, ", pending_partitions not ack. ", dpr->deferred.pending_partitions, "\n");
}
if (!dpr->deferred.expiration.ll.empty()) {
// unlink from global tracker
dpr->deferred.expiration.ll.detach_and_reset();
deferred_produce_responses_next_expiration = deferred_produce_responses_expiration_list.empty()
? std::numeric_limits<uint64_t>::max()
: switch_list_entry(produce_response,
deferred.expiration.ll,
deferred_produce_responses_expiration_list.prev)
->deferred.expiration.ts;
}
try_generate_produce_response(dpr);
}
// an update associated with a DPR(deferred pending response) has been acknowledged
void Service::confirm_deferred_produce_resp_partition(produce_response *dpr, [[maybe_unused]] topic_partition *p, [[maybe_unused]] const uint8_t pr_participant_idx) {
enum {
trace = false,
};
TANK_EXPECT(dpr);
TANK_EXPECT(cluster_aware());
TANK_EXPECT(p);
TANK_EXPECT(dpr->deferred.pending_partitions);
if (trace) {
SLog("Confirmed DPR ", dpr->gen, " for ", p->owner->name(), "/", p->idx,
" pending_partitions = ", dpr->deferred.pending_partitions, "\n");
}
if (--(dpr->deferred.pending_partitions)) {
// more partititons in the DPR pending ack.
return;
}
complete_deferred_produce_response(dpr);
}
#ifdef HWM_UPDATE_BASED_ON_ACKS
// We need to respect the ordering invariant:
// We can't bump the high water mark to X if there are 1+ produce requests pending ack.
// where their (bundle_last_msg_seq_num < X).
//
// Even if a produce request with required_ack = 1 is acknowledged, we can't
// update_hwmark() by its bundle_last_msg_seq_num if there are other previously submitted(i.e in
// pending_client_produce_acks_tracker.pending) product requests pending ack.
void Service::consider_acknowledged_product_reqs(topic_partition *p) {
enum {
trace = false,
};
TANK_EXPECT(p);
const auto &q = p->cluster.pending_client_produce_acks_tracker.pending;
auto &pq = p->cluster.pending_client_produce_acks_tracker.acknowledged;
topic_partition::Cluster::pending_ack_bundle_desc target{.last_msg_seqnum = 0};
uint64_t next;
if (!q.empty()) {
// there are is at least 1 product request pending
const auto &pa = *q.begin();
next = pa.bundle_desc.last_msg_seqnum;
} else {
next = std::numeric_limits<uint64_t>::max();
}
if (trace) {
SLog(ansifmt::bold, ansifmt::color_green, "q.size = ", q.size(), ", pq.size = ", pq.size(), ", next = ", next, ansifmt::reset, "\n");
}
// go through all acknowledged produce requests in the PQ
while (!pq.empty()) {
const auto &bd = pq.top();
const auto top = bd.last_msg_seqnum;
if (trace) {
SLog("Consider top(", top, ")\n");
}
if (top < next) {
target = bd;
pq.pop();
} else {
break;
}
}
if (target.last_msg_seqnum) {
update_hwmark(p, target);
}
}
#endif
void Service::consider_pending_client_produce_responses(topic_partition *p, cluster_node *peer, const uint64_t seq_num) {
enum {
trace = false,
};
TANK_EXPECT(p);
TANK_EXPECT(peer);
auto *const isr_e = p->cluster.isr.find(peer);
if (!isr_e) {
// peer is not in this partition's ISR
// we shouldn't have allowed for this to begin with
//
// XXX:
// assuming a client is consuming from EOF in a 3 nodes configuration. if you keep publishing while the last one is stopped
// and then you kill it, and then later you restart it
// because it will commence consumption from a past sequence number, q.size() will be 0
// why?
// XXX: is this still an issue?
if (trace) {
SLog("ODD: unable to isr.find() for ", p->owner->name(), "/", p->idx, " for ", peer->id, "@", peer->ep, "\n");
}
return;
}
consider_pending_client_produce_responses(isr_e, p, peer, seq_num);
}
// invoked by peer_consumed_local_partition() and isr_dispose()
// `seq_num` is the sequnece number the peer asked to consume from, i.e (the LSN of the last message persisted locally + 1)
//
// See also Amazon Aurora:
// 1. https://atscaleconference.com/videos/scale-2018-amazon-aurora-design-considerations-for-high-throughput-cloud-native-relational-databases
// 2. https://www.allthingsdistributed.com/files/p1041-verbitski.pdf
// 3. https://www.dropbox.com/s/47xbjrkni9bx0g3/aurora2.pdf?dl=0
// where they also employ the same idea (no commit protocol), where a write is not considered committed until enough peers advertise that their local LSN
// is > the LSN the produce request requires to be confirmed(i.e coordinator to bump the highwater mark).
void Service::consider_pending_client_produce_responses(isr_entry *const isr_e, topic_partition *p, cluster_node *peer, const uint64_t seq_num) {
// https://www.confluent.io/blog/hands-free-kafka-replication-a-lesson-in-operational-simplicity/
// See connection::As::Tank::pending_produce_reqs_acks
enum {
trace = false,
};
TANK_EXPECT(p);
TANK_EXPECT(peer);
TANK_EXPECT(isr_e);
if (trace) {
SLog(ansifmt::color_magenta, ansifmt::inverse, ansifmt::bold, "Considering pending client produce responses for ",
p->owner->name(), "/", p->idx, " from peer ", peer->id, "@", peer->ep, ansifmt::reset, "\n");
}
auto &q = p->cluster.pending_client_produce_acks_tracker.pending; // this is an std::deque<>
#ifndef HWM_UPDATE_BASED_ON_ACKS
if (trace) {
SLog("Considering q of size ", q.size(), ", cluster.isr.tracker.size = ", p->cluster.isr.tracker.size, ", p->cluster.isr.tracker.data[0].lsn = ", p->cluster.isr.tracker.data[0].lsn, "\n");
}
for (auto it = q.begin(); it != q.end();) {
auto &pa = *it;
auto dpr = pa.deferred_resp;
TANK_EXPECT(dpr);
const auto dpr_available = dpr->gen == pa.deferred_resp_gen;
const auto bundle_desc = pa.bundle_desc;
const auto bundle_last_msg_seq_num = bundle_desc.last_msg_seqnum;
if (trace) {
SLog("For pending, bundle_last_msg_seq_num = ", bundle_last_msg_seq_num, ", dpr_available = ", dpr_available, "\n");
}
if (seq_num <= bundle_last_msg_seq_num) {
break;
}
if (dpr_available) {
dpr->participants[pa.pr_participant_idx].res = produce_response::participant::OpRes::OK;
}
const auto required_acks = pa.required_acks;
if (trace) {
SLog("required_acks = ", required_acks, ", confirmed() = ",
p->cluster.isr.tracker.confirmed(required_acks, bundle_last_msg_seq_num), "\n");
}
if (!p->cluster.isr.tracker.confirmed(required_acks, bundle_last_msg_seq_num)) {
++it;
continue;
}
if (dpr_available) {
// trampoline to try_generate_produce_response()
confirm_deferred_produce_resp_partition(dpr, p, pa.pr_participant_idx);
}
it = q.erase(it);
}
#else
using mask_t = decltype(topic_partition::Cluster::PendingClientProduceAcks::pending_ack::isr_nodes_acknowledged_bm);
const auto partition_isr_node_id_mask = static_cast<mask_t>(1) << isr_e->partition_isr_node_id;
if (trace) {
SLog("q.size() = ", q.size(), " for ", ptr_repr(&q), "\n");
}
// don't e.g (for auto it = q.begin(), end = q.end(); it != end; )
// because we use erase() and thus end can change
for (auto it = q.begin(); it != q.end();) {
auto &pa = *it;
auto dpr = pa.deferred_resp; // deferred pending response
TANK_EXPECT(dpr);
const auto dpr_available = dpr->gen == pa.deferred_resp_gen; // cookie check
const auto bundle_desc = pa.bundle_desc;
const auto bundle_last_msg_seq_num = bundle_desc.last_msg_seqnum;
// even if (false == dpr_available) or the client connection (dpr->client_ctx)
// has gone away, we still need to respect the invariant semantics
// we will only update the highwater mark once we got the required acks.
if (seq_num <= bundle_last_msg_seq_num) {
// because all pending_acks in `q` are ordered in ascending order
// by bundle_last_msg_seq_num -- because of how produce() works --
// we know we should stop here
if (trace) {
SLog(seq_num, "<=", bundle_last_msg_seq_num, "\n");
}
break;
}
if (pa.isr_nodes_acknowledged_bm & partition_isr_node_id_mask) {
// already got an ack. earlier from that ISR node
if (trace) {
SLog("Already got ack from that ISR node\n");
}
++it;
continue;
}
if (dpr_available) {
// Set status to OK for the PR candidate
dpr->participants[pa.pr_participant_idx].res = produce_response::participant::OpRes::OK;
}
pa.isr_nodes_acknowledged_bm |= partition_isr_node_id_mask;
const auto required_acks = pa.required_acks;
if (trace) {
SLog("dpr.gen:", dpr->gen,
"(dpr_available=", dpr_available,
"): Pending: required_acks = ", required_acks,
", so far:", pa.ack_count(), " (including this)\n");
}
if (pa.ack_count() < required_acks) {
// we need more ISR nodes to ack. this partition
if (trace) {
SLog("Need more ISR nodes acks(peer acks so far:", pa.ack_count(), ", required_acks:", required_acks, ")\n");
}
++it;
continue;
}
if (trace) {
SLog("Can confirm DPR, dpr_available = ", dpr_available, "\n");
}
if (dpr_available) {
// trampoline to try_generate_produce_response()
confirm_deferred_produce_resp_partition(dpr, p, pa.pr_participant_idx);
}
// push into the prio.queue
// consider_acknowledged_product_reqs() will take care of that pq
p->cluster.pending_client_produce_acks_tracker.acknowledged.push(bundle_desc);
it = q.erase(it);
}
consider_acknowledged_product_reqs(p);
#endif
}
void Service::consider_pending_client_produce_responses(topic_partition *const p) {
// this is invoked when the ISR of a partition has changed
// either because a node has been added or removed from it
// and we get a chance to consider all pending client pending produce responses
// where we may be able to satisfy the invariant
enum {
trace = false,
};
TANK_EXPECT(p);
if (trace) {
SLog(ansifmt::color_magenta, ansifmt::bgcolor_green, "Considering pending produce responses from ",
p->owner->name(), "/", p->idx, " because ISR likely updated", ansifmt::reset, "\n");
}
auto &q = p->cluster.pending_client_produce_acks_tracker.pending;
const auto isr_size = p->cluster.isr.size();
if (trace) {
SLog("q.size() = ", q.size(), ", isr_size = ", isr_size, "\n");
}
#ifndef HWM_UPDATE_BASED_ON_ACKS
for (auto it = q.begin(); it != q.end();) {
auto &pa = *it;
auto dpr = pa.deferred_resp;
const auto dpr_available = dpr->gen == pa.deferred_resp_gen;
const auto required_acks = pa.required_acks;
const auto bundle_desc = pa.bundle_desc;
const auto bundle_last_msg_seq_num = bundle_desc.last_msg_seqnum;
if (!p->cluster.isr.tracker.confirmed(required_acks, bundle_last_msg_seq_num)) {
++it;
continue;
}
if (dpr_available) {
// trampoline to try_generate_produce_response()
confirm_deferred_produce_resp_partition(dpr, p, pa.pr_participant_idx);
}
it = q.erase(it);
}
#else
// this is potentially expensive but
// likely not going to have to do this frequently
for (auto it = q.begin(); it != q.end();) {
auto &pa = *it;
auto dpr = pa.deferred_resp;
const auto dpr_available = dpr->gen == pa.deferred_resp_gen; // cookie check
const auto required_acks = pa.required_acks;
const auto acknowledged = pa.ack_count();
if (trace) {
SLog("Considering pending ack, required_acks = ", required_acks,
", dpr_available = ", dpr_available,
", acknowledged by peers = ", acknowledged,
", required_acks = ", required_acks, "\n");
}
if (required_acks + 1 /* +1 for local */ > isr_size) {
// We need (required_acks) acks. from _PEERS_
// and we already have an implict ack from the local node
// so if (required_acks + 1 > isr_size), which always includes the local node,
// it means we have insufficent nodes in the ISR, so we need to ACK now
if (trace) {
SLog("Insufficient number of nodes in the ISR(", isr_size, "), will ack now\n");
}
} else if (acknowledged < required_acks) {
// not done yet
if (trace) {
SLog("Not ready, acknowledged(", acknowledged, "), required_acks(", required_acks, ")\n");
}
++it;
continue;
}
const auto bundle_desc = pa.bundle_desc;
if (trace) {
SLog("Ready to ack\n");
}
if (dpr_available) {
// trampoline to try_generate_produce_response()
confirm_deferred_produce_resp_partition(dpr, p, pa.pr_participant_idx);
}
// We first push into into the prio.queue and
// eventually consider_acknowledged_product_reqs() will pop from that pq
p->cluster.pending_client_produce_acks_tracker.acknowledged.push(bundle_desc);
it = q.erase(it);
}
consider_acknowledged_product_reqs(p);
#endif
}
// called when a peer has issued a `ConsumePeer` request for (p, seq_num)
//
// This is important for:
// - ISR tracking
// - dealing with pending produce requests pending ack (based on RF)
void Service::peer_consumed_local_partition(topic_partition *p, cluster_node *peer, const uint64_t seq_num) {
enum {
trace = false,
};
TANK_EXPECT(p);
TANK_EXPECT(peer);
auto log = partition_log(p);
const auto last = log->lastAssignedSeqNum;
if (not cluster_aware()) {
// we shouldn't invoke this to begin with
if (trace) {
SLog("Not cluster-aware, will not proceed\n");
}
return;
}
if (p->cluster.leader.node != cluster_state.local_node.ref) {
// well, it's not us anymore
if (trace) {
SLog("Leader of partition is no longer us\n");
}
return;
}
if (trace) {
SLog(ansifmt::inverse, ansifmt::bold, ansifmt::color_brown,
"Peer ", peer->id, "@", peer->ep, " consumed content from ", p->owner->name(), "/", p->idx, " at ", seq_num, ", last = ", last, ansifmt::reset, "\n");
}
if (not p->cluster.replicas.count(peer->id)) {
// peer who consumed is not a replica for this partition
if (trace) {
SLog("Unexpected CONSUME from node that is not in the partition's ISR\n");
}
return;
}
isr_entry *isr_e{nullptr};
// manage this partition's ISR
// it's important that we first deal with ISR and then with pending produce responses
if (seq_num <= last) {
// `peer` has not caught up yet
//
// once the peer attempts to read past the last commited sequence number, we know it
// has all partition content, and thus we can consider it for ISR/membership
if (trace) {
SLog("Peer hasn't caught up yet because (seq_num(", seq_num, ") <= last(", last, "))\n");
}
//2022-08-18: now just returning instead of goto l11;
//WAS: goto l1;
return;
}
if ((isr_e = p->cluster.isr.find(peer))) {
// peer is already in this partition's ISR
TANK_EXPECT(isr_e->partition() == p);
if (trace) {
SLog("Peer already in ISR\n");
}
if (isr_e->pending_next_ack_ll.try_detach_and_reset()) {
// remove from the pend.ack timers
if (trace) {
SLog("Will stop the timer\n");
}
isr_pending_ack_list_next = isr_pending_ack_list.empty()
? std::numeric_limits<uint64_t>::max()
: switch_list_entry(isr_entry, pending_next_ack_ll, isr_pending_ack_list.prev)->pending_next_ack_timeout;
}
#ifndef HWM_UPDATE_BASED_ON_ACKS
// notice that we use (seq_num - 1)
// that is because we track the lsn of the last known. ack message
isr_touch(isr_e, seq_num - 1);
#endif
} else {
// add peer to this partition's ISR
#ifdef HWM_UPDATE_BASED_ON_ACKS
isr_e = isr_bind(p, peer, p->cluster.isr.next_partition_isr_node_id(), __LINE__);
#else
isr_e = isr_bind(p, peer, __LINE__);
#endif
TANK_EXPECT(isr_e);
persist_isr(p, __LINE__);
#ifndef HWM_UPDATE_BASED_ON_ACKS
// notice that we use (seq_num - 1)
// that is because we track the lsn of the last known. ack message
isr_touch(isr_e, seq_num - 1);
#endif
}
l1:
// deal with any pending produce responses
TANK_EXPECT(isr_e);
consider_pending_client_produce_responses(isr_e, p, peer, seq_num);
}
static uint8_t choose_compression_codec(const topic_partition::msg *msgs, const size_t size) {
if (size > 512) {
return 1;
}
size_t sum{0};
for (size_t i{0}; i < size; ++i) {
sum += msgs[i].data.size() + msgs[i].key.size();
if (sum > 1024) {
return 1;
}
}
return 0;
}
// content consumed from peer; commit locally
void Service::persist_peer_partitions_content(topic_partition *const partition, const std::vector<topic_partition::msg> &partition_msgs, const bool first_sparse) {
enum {
trace = false,
};
static thread_local IOBuffer cb_tls;
const auto cnt = partition_msgs.size();
auto &cb{cb_tls};
TANK_EXPECT(partition);
if (trace) {
SLog("Persist ", dotnotation_repr(cnt), " msgs for ", partition->owner->name(), "/", partition->idx, "\n");
}
if (0 == cnt) {
return;
}
auto b = get_buf();
DEFER({
put_buf(b);
});
// we 'll be packing upto X messages / bundle
auto log = partition_log(partition);
uint64_t next_expected;
if (first_sparse) {
// force generate sparse
next_expected = 0;
} else {
next_expected = partition_msgs.front().seqNum;
}
for (const auto *p = partition_msgs.data(), *const e = p + cnt; p < e;) {
// determine batch size
const auto upto = std::min(e, p + 128);
const auto msgset_first_seq_num = p->seqNum;
const auto msgset_msgs_cnt = std::distance(p, upto);
const auto first_msg = p;
const auto last_msg = upto - 1;
const auto msgset_last_seq_num = last_msg->seqNum;
uint8_t bundle_flags = 0;
bool as_sparse = false;
const auto codec = choose_compression_codec(p, std::distance(p, upto));
// is this going to be a sparse batch?
for (const auto *it = p; it < upto; ++it) {
if (const auto seq_num = it->seqNum; seq_num != next_expected) {
as_sparse = true;
break;
} else {
next_expected = seq_num;
}
}
next_expected = upto[-1].seqNum + 1;
b->clear();
// BEGIN: bundle header
// encode the bundle header
// we need to consider wether compression makes sense, wether this is going to be a sparse bundle, etc
bundle_flags |= codec & 0b111;
if (as_sparse) {
bundle_flags |= (1u << 6);
}
if (msgset_msgs_cnt < 16) {
bundle_flags |= msgset_msgs_cnt << 2;
b->pack(bundle_flags);
} else {
b->pack(bundle_flags);
b->encode_varuint32(msgset_msgs_cnt);
}
if (as_sparse) {
b->pack(msgset_first_seq_num);
if (msgset_msgs_cnt != 1) {
b->encode_varuint32((msgset_last_seq_num - msgset_first_seq_num) - 1);
}
}
// END: bundle header
// BEGIN: messages set
if (codec) {
cb.clear();
do {
const auto &m = *p;
uint8_t flags = m.key ? static_cast<uint8_t>(TankFlags::BundleMsgFlags::HaveKey) : 0;
const auto use_last_specified_ts = p != first_msg and m.ts == p[-1].ts;
uint32_t delta;
if (use_last_specified_ts) {
flags |= static_cast<uint8_t>(TankFlags::BundleMsgFlags::UseLastSpecifiedTS);
}
if (as_sparse and p != first_msg and p != last_msg) {
delta = m.seqNum - p[-1].seqNum - 1;
if (!delta) {
flags |= static_cast<uint8_t>(TankFlags::BundleMsgFlags::SeqNumPrevPlusOne);
}
} else {