:p
atchew
Login
Based on top of my next-8.0 branch. - rebased on top of latest upstream - lots of minor fixes - start support for atomic counters * we need to move ram_limit_used/max to migration.c * that means fixing rdma.c * and test-vmstate. So I am donig that right now. Juan Quintela (11): migration: Update atomic stats out of the mutex migration: Make multifd_bytes atomic multifd: We already account for this packet on the multifd thread multifd: Count the number of bytes sent correctly migration: Make ram_save_target_page() a pointer multifd: Make flags field thread local multifd: Prepare to send a packet without the mutex held multifd: Add capability to enable/disable zero_page multifd: Support for zero pages transmission multifd: Zero pages transmission So we use multifd to transmit zero pages. qapi/migration.json | 8 ++- migration/migration.h | 1 + migration/multifd.h | 36 ++++++++++-- migration/ram.h | 1 + hw/core/machine.c | 1 + migration/migration.c | 16 +++++- migration/multifd.c | 123 +++++++++++++++++++++++++++++++---------- migration/ram.c | 51 +++++++++++++++-- migration/trace-events | 8 +-- 9 files changed, 197 insertions(+), 48 deletions(-) -- 2.38.1
Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/multifd.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ static int multifd_send_pages(QEMUFile *f) transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len; qemu_file_acct_rate_limit(f, transferred); ram_counters.multifd_bytes += transferred; + qemu_mutex_unlock(&p->mutex); stat64_add(&ram_atomic_counters.transferred, transferred); - qemu_mutex_unlock(&p->mutex); qemu_sem_post(&p->sem); return 1; @@ -XXX,XX +XXX,XX @@ int multifd_send_sync_main(QEMUFile *f) p->pending_job++; qemu_file_acct_rate_limit(f, p->packet_len); ram_counters.multifd_bytes += p->packet_len; + qemu_mutex_unlock(&p->mutex); stat64_add(&ram_atomic_counters.transferred, p->packet_len); - qemu_mutex_unlock(&p->mutex); qemu_sem_post(&p->sem); if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { -- 2.38.1
In the spirit of: commit 394d323bc3451e4d07f13341cb8817fac8dfbadd Author: Peter Xu <peterx@redhat.com> Date: Tue Oct 11 17:55:51 2022 -0400 migration: Use atomic ops properly for page accountings Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/ram.h | 1 + migration/migration.c | 4 ++-- migration/multifd.c | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/migration/ram.h b/migration/ram.h index XXXXXXX..XXXXXXX 100644 --- a/migration/ram.h +++ b/migration/ram.h @@ -XXX,XX +XXX,XX @@ typedef struct { Stat64 duplicate; Stat64 normal; Stat64 postcopy_bytes; + Stat64 multifd_bytes; } MigrationAtomicStats; extern MigrationAtomicStats ram_atomic_counters; diff --git a/migration/migration.c b/migration/migration.c index XXXXXXX..XXXXXXX 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -XXX,XX +XXX,XX @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s) ram_counters.dirty_sync_missed_zero_copy; info->ram->postcopy_requests = ram_counters.postcopy_requests; info->ram->page_size = page_size; - info->ram->multifd_bytes = ram_counters.multifd_bytes; + info->ram->multifd_bytes = stat64_get(&ram_atomic_counters.multifd_bytes); info->ram->pages_per_second = s->pages_per_second; info->ram->precopy_bytes = ram_counters.precopy_bytes; info->ram->downtime_bytes = ram_counters.downtime_bytes; @@ -XXX,XX +XXX,XX @@ static MigThrError migration_detect_error(MigrationState *s) static uint64_t migration_total_bytes(MigrationState *s) { return qemu_file_total_transferred(s->to_dst_file) + - ram_counters.multifd_bytes; + stat64_get(&ram_atomic_counters.multifd_bytes); } static void migration_calculate_complete(MigrationState *s) diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ static int multifd_send_pages(QEMUFile *f) p->pages = pages; transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len; qemu_file_acct_rate_limit(f, transferred); - ram_counters.multifd_bytes += transferred; qemu_mutex_unlock(&p->mutex); + stat64_add(&ram_atomic_counters.multifd_bytes, transferred); stat64_add(&ram_atomic_counters.transferred, transferred); qemu_sem_post(&p->sem); @@ -XXX,XX +XXX,XX @@ int multifd_send_sync_main(QEMUFile *f) p->flags |= MULTIFD_FLAG_SYNC; p->pending_job++; qemu_file_acct_rate_limit(f, p->packet_len); - ram_counters.multifd_bytes += p->packet_len; qemu_mutex_unlock(&p->mutex); + stat64_add(&ram_atomic_counters.multifd_bytes, p->packet_len); stat64_add(&ram_atomic_counters.transferred, p->packet_len); qemu_sem_post(&p->sem); -- 2.38.1
Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/multifd.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ int multifd_send_sync_main(QEMUFile *f) p->packet_num = multifd_send_state->packet_num++; p->flags |= MULTIFD_FLAG_SYNC; p->pending_job++; - qemu_file_acct_rate_limit(f, p->packet_len); qemu_mutex_unlock(&p->mutex); - stat64_add(&ram_atomic_counters.multifd_bytes, p->packet_len); - stat64_add(&ram_atomic_counters.transferred, p->packet_len); qemu_sem_post(&p->sem); if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { -- 2.38.1
Current code asumes that all pages are whole. That is not true for example for compression already. Fix it for creating a new field ->sent_bytes that includes it. All ram_counters are used only from the migration thread, so we have two options: - put a mutex and fill everything when we sent it (not only ram_counters, also qemu_file->xfer_bytes). - Create a local variable that implements how much has been sent through each channel. And when we push another packet, we "add" the previous stats. I choose two due to less changes overall. On the previous code we increase transferred and then we sent. Current code goes the other way around. It sents the data, and after the fact, it updates the counters. Notice that each channel can have a maximum of half a megabyte of data without counting, so it is not very important. Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/multifd.h | 2 ++ migration/multifd.c | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/migration/multifd.h b/migration/multifd.h index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -XXX,XX +XXX,XX @@ typedef struct { uint32_t flags; /* global number of generated multifd packets */ uint64_t packet_num; + /* How many bytes have we sent on the last packet */ + uint64_t sent_bytes; /* thread has work to do */ int pending_job; /* array of pages to sent. diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ static int multifd_send_pages(QEMUFile *f) static int next_channel; MultiFDSendParams *p = NULL; /* make happy gcc */ MultiFDPages_t *pages = multifd_send_state->pages; - uint64_t transferred; if (qatomic_read(&multifd_send_state->exiting)) { return -1; @@ -XXX,XX +XXX,XX @@ static int multifd_send_pages(QEMUFile *f) p->packet_num = multifd_send_state->packet_num++; multifd_send_state->pages = p->pages; p->pages = pages; - transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len; + uint64_t transferred = p->sent_bytes; + p->sent_bytes = 0; qemu_file_acct_rate_limit(f, transferred); qemu_mutex_unlock(&p->mutex); stat64_add(&ram_atomic_counters.multifd_bytes, transferred); @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) } qemu_mutex_lock(&p->mutex); + p->sent_bytes += p->packet_len; + p->sent_bytes += p->next_packet_size; p->pending_job--; qemu_mutex_unlock(&p->mutex); -- 2.38.1
We are going to create a new function for multifd latest in the series. Signed-off-by: Juan Quintela <quintela@redhat.com> Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com> --- migration/ram.c | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index XXXXXXX..XXXXXXX 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -XXX,XX +XXX,XX @@ void dirty_sync_missed_zero_copy(void) ram_counters.dirty_sync_missed_zero_copy++; } +struct MigrationOps { + int (*ram_save_target_page)(RAMState *rs, PageSearchStatus *pss); +}; +typedef struct MigrationOps MigrationOps; + +MigrationOps *migration_ops; + CompressionStats compression_counters; struct CompressParam { @@ -XXX,XX +XXX,XX @@ static bool save_compress_page(RAMState *rs, PageSearchStatus *pss, } /** - * ram_save_target_page: save one target page + * ram_save_target_page_legacy: save one target page * * Returns the number of pages written * * @rs: current RAM state * @pss: data about the page we want to send */ -static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss) +static int ram_save_target_page_legacy(RAMState *rs, PageSearchStatus *pss) { RAMBlock *block = pss->block; ram_addr_t offset = ((ram_addr_t)pss->page) << TARGET_PAGE_BITS; @@ -XXX,XX +XXX,XX @@ static int ram_save_host_page_urgent(PageSearchStatus *pss) if (page_dirty) { /* Be strict to return code; it must be 1, or what else? */ - if (ram_save_target_page(rs, pss) != 1) { + if (migration_ops->ram_save_target_page(rs, pss) != 1) { error_report_once("%s: ram_save_target_page failed", __func__); ret = -1; goto out; @@ -XXX,XX +XXX,XX @@ static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss) if (preempt_active) { qemu_mutex_unlock(&rs->bitmap_mutex); } - tmppages = ram_save_target_page(rs, pss); + tmppages = migration_ops->ram_save_target_page(rs, pss); if (tmppages >= 0) { pages += tmppages; /* @@ -XXX,XX +XXX,XX @@ static void ram_save_cleanup(void *opaque) xbzrle_cleanup(); compress_threads_save_cleanup(); ram_state_cleanup(rsp); + g_free(migration_ops); + migration_ops = NULL; } static void ram_state_reset(RAMState *rs) @@ -XXX,XX +XXX,XX @@ static int ram_save_setup(QEMUFile *f, void *opaque) ram_control_before_iterate(f, RAM_CONTROL_SETUP); ram_control_after_iterate(f, RAM_CONTROL_SETUP); + migration_ops = g_malloc0(sizeof(MigrationOps)); + migration_ops->ram_save_target_page = ram_save_target_page_legacy; ret = multifd_send_sync_main(f); if (ret < 0) { return ret; -- 2.38.1
Use of flags with respect to locking was incensistant. For the sending side: - it was set to 0 with mutex held on the multifd channel. - MULTIFD_FLAG_SYNC was set with mutex held on the migration thread. - Everything else was done without the mutex held on the multifd channel. On the reception side, it is not used on the migration thread, only on the multifd channels threads. So we move it to the multifd channels thread only variables, and we introduce a new bool sync_needed on the send side to pass that information. Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/multifd.h | 10 ++++++---- migration/multifd.c | 23 +++++++++++++---------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/migration/multifd.h b/migration/multifd.h index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -XXX,XX +XXX,XX @@ typedef struct { bool running; /* should this thread finish */ bool quit; - /* multifd flags for each packet */ - uint32_t flags; /* global number of generated multifd packets */ uint64_t packet_num; /* How many bytes have we sent on the last packet */ uint64_t sent_bytes; + /* Do we need to do an iteration sync */ + bool sync_needed; /* thread has work to do */ int pending_job; /* array of pages to sent. @@ -XXX,XX +XXX,XX @@ typedef struct { /* pointer to the packet */ MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; /* size of the next packet that contains pages */ uint32_t next_packet_size; /* packets sent through this channel */ @@ -XXX,XX +XXX,XX @@ typedef struct { bool running; /* should this thread finish */ bool quit; - /* multifd flags for each packet */ - uint32_t flags; /* global number of generated multifd packets */ uint64_t packet_num; @@ -XXX,XX +XXX,XX @@ typedef struct { /* pointer to the packet */ MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; /* size of the next packet that contains pages */ uint32_t next_packet_size; /* packets sent through this channel */ diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ int multifd_send_sync_main(QEMUFile *f) } p->packet_num = multifd_send_state->packet_num++; - p->flags |= MULTIFD_FLAG_SYNC; + p->sync_needed = true; p->pending_job++; qemu_mutex_unlock(&p->mutex); qemu_sem_post(&p->sem); @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) if (p->pending_job) { uint64_t packet_num = p->packet_num; - uint32_t flags = p->flags; + p->flags = 0; + if (p->sync_needed) { + p->flags |= MULTIFD_FLAG_SYNC; + p->sync_needed = false; + } p->normal_num = 0; if (use_zero_copy_send) { @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) } } multifd_send_fill_packet(p); - p->flags = 0; p->num_packets++; p->total_normal_pages += p->normal_num; p->pages->num = 0; p->pages->block = NULL; qemu_mutex_unlock(&p->mutex); - trace_multifd_send(p->id, packet_num, p->normal_num, flags, + trace_multifd_send(p->id, packet_num, p->normal_num, p->flags, p->next_packet_size); if (use_zero_copy_send) { @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) p->pending_job--; qemu_mutex_unlock(&p->mutex); - if (flags & MULTIFD_FLAG_SYNC) { + if (p->flags & MULTIFD_FLAG_SYNC) { qemu_sem_post(&p->sem_sync); } qemu_sem_post(&multifd_send_state->channels_ready); @@ -XXX,XX +XXX,XX @@ static void *multifd_recv_thread(void *opaque) rcu_register_thread(); while (true) { - uint32_t flags; + bool sync_needed = false; if (p->quit) { break; @@ -XXX,XX +XXX,XX @@ static void *multifd_recv_thread(void *opaque) break; } - flags = p->flags; + trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags, + p->next_packet_size); + sync_needed = p->flags & MULTIFD_FLAG_SYNC; /* recv methods don't know how to handle the SYNC flag */ p->flags &= ~MULTIFD_FLAG_SYNC; - trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags, - p->next_packet_size); p->num_packets++; p->total_normal_pages += p->normal_num; qemu_mutex_unlock(&p->mutex); @@ -XXX,XX +XXX,XX @@ static void *multifd_recv_thread(void *opaque) } } - if (flags & MULTIFD_FLAG_SYNC) { + if (sync_needed) { qemu_sem_post(&multifd_recv_state->sem_sync); qemu_sem_wait(&p->sem_sync); } -- 2.38.1
We do the send_prepare() and the fill of the head packet without the mutex held. It will help a lot for compression and later in the series for zero pages. Notice that we can use p->pages without holding p->mutex because p->pending_job == 1. Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/multifd.h | 2 ++ migration/multifd.c | 12 ++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/migration/multifd.h b/migration/multifd.h index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -XXX,XX +XXX,XX @@ typedef struct { /* array of pages to sent. * The owner of 'pages' depends of 'pending_job' value: * pending_job == 0 -> migration_thread can use it. + * No need for mutex lock. * pending_job != 0 -> multifd_channel can use it. + * No need for mutex lock. */ MultiFDPages_t *pages; diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) p->flags |= MULTIFD_FLAG_SYNC; p->sync_needed = false; } + qemu_mutex_unlock(&p->mutex); + p->normal_num = 0; if (use_zero_copy_send) { @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) if (p->normal_num) { ret = multifd_send_state->ops->send_prepare(p, &local_err); if (ret != 0) { - qemu_mutex_unlock(&p->mutex); break; } } multifd_send_fill_packet(p); - p->num_packets++; - p->total_normal_pages += p->normal_num; - p->pages->num = 0; - p->pages->block = NULL; - qemu_mutex_unlock(&p->mutex); trace_multifd_send(p->id, packet_num, p->normal_num, p->flags, p->next_packet_size); @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) } qemu_mutex_lock(&p->mutex); + p->num_packets++; + p->total_normal_pages += p->normal_num; + p->pages->num = 0; + p->pages->block = NULL; p->sent_bytes += p->packet_len; p->sent_bytes += p->next_packet_size; p->pending_job--; -- 2.38.1
We have to enable it by default until we introduce the new code. Signed-off-by: Juan Quintela <quintela@redhat.com> --- Change it to a capability. As capabilities are off by default, have to change MULTIFD_ZERO_PAGE to MAIN_ZERO_PAGE, so it is false for default, and true for older versions. --- qapi/migration.json | 8 +++++++- migration/migration.h | 1 + hw/core/machine.c | 1 + migration/migration.c | 13 ++++++++++++- 4 files changed, 21 insertions(+), 2 deletions(-) diff --git a/qapi/migration.json b/qapi/migration.json index XXXXXXX..XXXXXXX 100644 --- a/qapi/migration.json +++ b/qapi/migration.json @@ -XXX,XX +XXX,XX @@ # Requires that QEMU be permitted to use locked memory # for guest RAM pages. # (since 7.1) +# # @postcopy-preempt: If enabled, the migration process will allow postcopy # requests to preempt precopy stream, so postcopy requests # will be handled faster. This is a performance feature and # should not affect the correctness of postcopy migration. # (since 7.1) # +# @main-zero-page: If enabled, the detection of zero pages will be +# done on the main thread. Otherwise it is done on +# the multifd threads. +# (since 8.0) +# # Features: # @unstable: Members @x-colo and @x-ignore-shared are experimental. # @@ -XXX,XX +XXX,XX @@ 'dirty-bitmaps', 'postcopy-blocktime', 'late-block-activate', { 'name': 'x-ignore-shared', 'features': [ 'unstable' ] }, 'validate-uuid', 'background-snapshot', - 'zero-copy-send', 'postcopy-preempt'] } + 'zero-copy-send', 'postcopy-preempt', 'main-zero-page'] } ## # @MigrationCapabilityStatus: diff --git a/migration/migration.h b/migration/migration.h index XXXXXXX..XXXXXXX 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -XXX,XX +XXX,XX @@ int migrate_multifd_channels(void); MultiFDCompression migrate_multifd_compression(void); int migrate_multifd_zlib_level(void); int migrate_multifd_zstd_level(void); +bool migrate_use_main_zero_page(void); #ifdef CONFIG_LINUX bool migrate_use_zero_copy_send(void); diff --git a/hw/core/machine.c b/hw/core/machine.c index XXXXXXX..XXXXXXX 100644 --- a/hw/core/machine.c +++ b/hw/core/machine.c @@ -XXX,XX +XXX,XX @@ const size_t hw_compat_7_1_len = G_N_ELEMENTS(hw_compat_7_1); GlobalProperty hw_compat_7_0[] = { { "arm-gicv3-common", "force-8-bit-prio", "on" }, { "nvme-ns", "eui64-default", "on"}, + { "migration", "main-zero-page", "true" }, }; const size_t hw_compat_7_0_len = G_N_ELEMENTS(hw_compat_7_0); diff --git a/migration/migration.c b/migration/migration.c index XXXXXXX..XXXXXXX 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -XXX,XX +XXX,XX @@ INITIALIZE_MIGRATE_CAPS_SET(check_caps_background_snapshot, MIGRATION_CAPABILITY_XBZRLE, MIGRATION_CAPABILITY_X_COLO, MIGRATION_CAPABILITY_VALIDATE_UUID, - MIGRATION_CAPABILITY_ZERO_COPY_SEND); + MIGRATION_CAPABILITY_ZERO_COPY_SEND, + MIGRATION_CAPABILITY_MAIN_ZERO_PAGE); /* When we add fault tolerance, we could have several migrations at once. For now we don't need to add @@ -XXX,XX +XXX,XX @@ bool migrate_use_multifd(void) return s->enabled_capabilities[MIGRATION_CAPABILITY_MULTIFD]; } +bool migrate_use_main_zero_page(void) +{ + MigrationState *s = migrate_get_current(); + + /* We will enable this when we add the right code. */ + return true || s->enabled_capabilities[MIGRATION_CAPABILITY_MAIN_ZERO_PAGE]; +} + bool migrate_pause_before_switchover(void) { MigrationState *s; @@ -XXX,XX +XXX,XX @@ static Property migration_properties[] = { DEFINE_PROP_MIG_CAP("x-zero-copy-send", MIGRATION_CAPABILITY_ZERO_COPY_SEND), #endif + DEFINE_PROP_MIG_CAP("main-zero-page", + MIGRATION_CAPABILITY_MAIN_ZERO_PAGE), DEFINE_PROP_END_OF_LIST(), }; -- 2.38.1
This patch adds counters and similar. Logic will be added on the following patch. Signed-off-by: Juan Quintela <quintela@redhat.com> --- Added counters for duplicated/non duplicated pages. Removed reviewed by from David. Add total_zero_pages --- migration/multifd.h | 17 ++++++++++++++++- migration/multifd.c | 36 +++++++++++++++++++++++++++++------- migration/ram.c | 2 -- migration/trace-events | 8 ++++---- 4 files changed, 49 insertions(+), 14 deletions(-) diff --git a/migration/multifd.h b/migration/multifd.h index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -XXX,XX +XXX,XX @@ typedef struct { /* size of the next packet that contains pages */ uint32_t next_packet_size; uint64_t packet_num; - uint64_t unused[4]; /* Reserved for future use */ + /* zero pages */ + uint32_t zero_pages; + uint32_t unused32[1]; /* Reserved for future use */ + uint64_t unused64[3]; /* Reserved for future use */ char ramblock[256]; uint64_t offset[]; } __attribute__((packed)) MultiFDPacket_t; @@ -XXX,XX +XXX,XX @@ typedef struct { uint64_t num_packets; /* non zero pages sent through this channel */ uint64_t total_normal_pages; + /* zero pages sent through this channel */ + uint64_t total_zero_pages; /* buffers to send */ struct iovec *iov; /* number of iovs used */ @@ -XXX,XX +XXX,XX @@ typedef struct { ram_addr_t *normal; /* num of non zero pages */ uint32_t normal_num; + /* Pages that are zero */ + ram_addr_t *zero; + /* num of zero pages */ + uint32_t zero_num; /* used for compression methods */ void *data; } MultiFDSendParams; @@ -XXX,XX +XXX,XX @@ typedef struct { uint8_t *host; /* non zero pages recv through this channel */ uint64_t total_normal_pages; + /* zero pages recv through this channel */ + uint64_t total_zero_pages; /* buffers to recv */ struct iovec *iov; /* Pages that are not zero */ ram_addr_t *normal; /* num of non zero pages */ uint32_t normal_num; + /* Pages that are zero */ + ram_addr_t *zero; + /* num of zero pages */ + uint32_t zero_num; /* used for de-compression methods */ void *data; } MultiFDRecvParams; diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ static void multifd_send_fill_packet(MultiFDSendParams *p) packet->normal_pages = cpu_to_be32(p->normal_num); packet->next_packet_size = cpu_to_be32(p->next_packet_size); packet->packet_num = cpu_to_be64(p->packet_num); + packet->zero_pages = cpu_to_be32(p->zero_num); if (p->pages->block) { strncpy(packet->ramblock, p->pages->block->idstr, 256); @@ -XXX,XX +XXX,XX @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) p->next_packet_size = be32_to_cpu(packet->next_packet_size); p->packet_num = be64_to_cpu(packet->packet_num); - if (p->normal_num == 0) { + p->zero_num = be32_to_cpu(packet->zero_pages); + if (p->zero_num > packet->pages_alloc - p->normal_num) { + error_setg(errp, "multifd: received packet " + "with %u zero pages and expected maximum pages are %u", + p->zero_num, packet->pages_alloc - p->normal_num) ; + return -1; + } + + if (p->normal_num == 0 && p->zero_num == 0) { return 0; } @@ -XXX,XX +XXX,XX @@ static int multifd_send_pages(QEMUFile *f) p->packet_num = multifd_send_state->packet_num++; multifd_send_state->pages = p->pages; p->pages = pages; + stat64_add(&ram_atomic_counters.normal, p->normal_num); + stat64_add(&ram_atomic_counters.duplicate, p->zero_num); uint64_t transferred = p->sent_bytes; p->sent_bytes = 0; qemu_file_acct_rate_limit(f, transferred); @@ -XXX,XX +XXX,XX @@ void multifd_save_cleanup(void) p->iov = NULL; g_free(p->normal); p->normal = NULL; + g_free(p->zero); + p->zero = NULL; multifd_send_state->ops->send_cleanup(p, &local_err); if (local_err) { migrate_set_error(migrate_get_current(), local_err); @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) qemu_mutex_unlock(&p->mutex); p->normal_num = 0; + p->zero_num = 0; if (use_zero_copy_send) { p->iovs_num = 0; @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) } multifd_send_fill_packet(p); - trace_multifd_send(p->id, packet_num, p->normal_num, p->flags, - p->next_packet_size); + trace_multifd_send(p->id, packet_num, p->normal_num, p->zero_num, + p->flags, p->next_packet_size); if (use_zero_copy_send) { /* Send header first, without zerocopy */ @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) qemu_mutex_lock(&p->mutex); p->num_packets++; p->total_normal_pages += p->normal_num; + p->total_zero_pages += p->zero_num; p->pages->num = 0; p->pages->block = NULL; p->sent_bytes += p->packet_len; @@ -XXX,XX +XXX,XX @@ out: qemu_mutex_unlock(&p->mutex); rcu_unregister_thread(); - trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages); + trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages, + p->total_zero_pages); return NULL; } @@ -XXX,XX +XXX,XX @@ int multifd_save_setup(Error **errp) p->normal = g_new0(ram_addr_t, page_count); p->page_size = qemu_target_page_size(); p->page_count = page_count; + p->zero = g_new0(ram_addr_t, page_count); if (migrate_use_zero_copy_send()) { p->write_flags = QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; @@ -XXX,XX +XXX,XX @@ int multifd_load_cleanup(Error **errp) p->iov = NULL; g_free(p->normal); p->normal = NULL; + g_free(p->zero); + p->zero = NULL; multifd_recv_state->ops->recv_cleanup(p); } qemu_sem_destroy(&multifd_recv_state->sem_sync); @@ -XXX,XX +XXX,XX @@ static void *multifd_recv_thread(void *opaque) break; } - trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags, - p->next_packet_size); + trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num, + p->flags, p->next_packet_size); sync_needed = p->flags & MULTIFD_FLAG_SYNC; /* recv methods don't know how to handle the SYNC flag */ p->flags &= ~MULTIFD_FLAG_SYNC; p->num_packets++; p->total_normal_pages += p->normal_num; + p->total_normal_pages += p->zero_num; qemu_mutex_unlock(&p->mutex); if (p->normal_num) { @@ -XXX,XX +XXX,XX @@ static void *multifd_recv_thread(void *opaque) qemu_mutex_unlock(&p->mutex); rcu_unregister_thread(); - trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages); + trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages, + p->total_zero_pages); return NULL; } @@ -XXX,XX +XXX,XX @@ int multifd_load_setup(Error **errp) p->normal = g_new0(ram_addr_t, page_count); p->page_count = page_count; p->page_size = qemu_target_page_size(); + p->zero = g_new0(ram_addr_t, page_count); } for (i = 0; i < thread_count; i++) { diff --git a/migration/ram.c b/migration/ram.c index XXXXXXX..XXXXXXX 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -XXX,XX +XXX,XX @@ static int ram_save_multifd_page(QEMUFile *file, RAMBlock *block, if (multifd_queue_page(file, block, offset) < 0) { return -1; } - stat64_add(&ram_atomic_counters.normal, 1); - return 1; } diff --git a/migration/trace-events b/migration/trace-events index XXXXXXX..XXXXXXX 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -XXX,XX +XXX,XX @@ postcopy_preempt_reset_channel(void) "" # multifd.c multifd_new_send_channel_async(uint8_t id) "channel %u" -multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " pages %u flags 0x%x next packet size %u" +multifd_recv(uint8_t id, uint64_t packet_num, uint32_t normal, uint32_t zero, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u zero pages %u flags 0x%x next packet size %u" multifd_recv_new_channel(uint8_t id) "channel %u" multifd_recv_sync_main(long packet_num) "packet num %ld" multifd_recv_sync_main_signal(uint8_t id) "channel %u" multifd_recv_sync_main_wait(uint8_t id) "channel %u" multifd_recv_terminate_threads(bool error) "error %d" -multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %u packets %" PRIu64 " pages %" PRIu64 +multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages, uint64_t zero_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64 " zero pages %" PRIu64 multifd_recv_thread_start(uint8_t id) "%u" -multifd_send(uint8_t id, uint64_t packet_num, uint32_t normal, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u flags 0x%x next packet size %u" +multifd_send(uint8_t id, uint64_t packet_num, uint32_t normalpages, uint32_t zero_pages, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u zero pages %u flags 0x%x next packet size %u" multifd_send_error(uint8_t id) "channel %u" multifd_send_sync_main(long packet_num) "packet num %ld" multifd_send_sync_main_signal(uint8_t id) "channel %u" multifd_send_sync_main_wait(uint8_t id) "channel %u" multifd_send_terminate_threads(bool error) "error %d" -multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64 +multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages, uint64_t zero_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64 " zero pages %" PRIu64 multifd_send_thread_start(uint8_t id) "%u" multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s" multifd_tls_outgoing_handshake_error(void *ioc, const char *err) "ioc=%p err=%s" -- 2.38.1
This implements the zero page dection and handling. Signed-off-by: Juan Quintela <quintela@redhat.com> --- Add comment for offset (dave) Use local variables for offset/block to have shorter lines --- migration/multifd.h | 5 +++++ migration/multifd.c | 45 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/migration/multifd.h b/migration/multifd.h index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -XXX,XX +XXX,XX @@ typedef struct { uint32_t unused32[1]; /* Reserved for future use */ uint64_t unused64[3]; /* Reserved for future use */ char ramblock[256]; + /* + * This array contains the pointers to: + * - normal pages (initial normal_pages entries) + * - zero pages (following zero_pages entries) + */ uint64_t offset[]; } __attribute__((packed)) MultiFDPacket_t; diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ */ #include "qemu/osdep.h" +#include "qemu/cutils.h" #include "qemu/rcu.h" #include "exec/target_page.h" #include "sysemu/sysemu.h" @@ -XXX,XX +XXX,XX @@ static void multifd_send_fill_packet(MultiFDSendParams *p) packet->offset[i] = cpu_to_be64(temp); } + for (i = 0; i < p->zero_num; i++) { + /* there are architectures where ram_addr_t is 32 bit */ + uint64_t temp = p->zero[i]; + + packet->offset[p->normal_num + i] = cpu_to_be64(temp); + } } static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) @@ -XXX,XX +XXX,XX @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) p->normal[i] = offset; } + for (i = 0; i < p->zero_num; i++) { + uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]); + + if (offset > (block->used_length - p->page_size)) { + error_setg(errp, "multifd: offset too long %" PRIu64 + " (max " RAM_ADDR_FMT ")", + offset, block->used_length); + return -1; + } + p->zero[i] = offset; + } + return 0; } @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) { MultiFDSendParams *p = opaque; Error *local_err = NULL; + /* + * older qemu don't understand zero page on multifd channel. To + * have capabilities "false" by default, we need to name it this + * way. + */ + bool use_multifd_zero_page = !migrate_use_main_zero_page(); int ret = 0; bool use_zero_copy_send = migrate_use_zero_copy_send(); @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) qemu_mutex_lock(&p->mutex); if (p->pending_job) { + RAMBlock *rb = p->pages->block; uint64_t packet_num = p->packet_num; p->flags = 0; if (p->sync_needed) { @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) } for (int i = 0; i < p->pages->num; i++) { - p->normal[p->normal_num] = p->pages->offset[i]; - p->normal_num++; + uint64_t offset = p->pages->offset[i]; + if (use_multifd_zero_page && + buffer_is_zero(rb->host + offset, p->page_size)) { + p->zero[p->zero_num] = offset; + p->zero_num++; + ram_release_page(rb->idstr, offset); + } else { + p->normal[p->normal_num] = offset; + p->normal_num++; + } } if (p->normal_num) { @@ -XXX,XX +XXX,XX @@ static void *multifd_recv_thread(void *opaque) } } + for (int i = 0; i < p->zero_num; i++) { + void *page = p->host + p->zero[i]; + if (!buffer_is_zero(page, p->page_size)) { + memset(page, 0, p->page_size); + } + } + if (sync_needed) { qemu_sem_post(&multifd_recv_state->sem_sync); qemu_sem_wait(&p->sem_sync); -- 2.38.1
Signed-off-by: Juan Quintela <quintela@redhat.com> --- - Check zero_page property before using new code (Dave) --- migration/migration.c | 3 +-- migration/ram.c | 32 +++++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/migration/migration.c b/migration/migration.c index XXXXXXX..XXXXXXX 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -XXX,XX +XXX,XX @@ bool migrate_use_main_zero_page(void) { MigrationState *s = migrate_get_current(); - /* We will enable this when we add the right code. */ - return true || s->enabled_capabilities[MIGRATION_CAPABILITY_MAIN_ZERO_PAGE]; + return s->enabled_capabilities[MIGRATION_CAPABILITY_MAIN_ZERO_PAGE]; } bool migrate_pause_before_switchover(void) diff --git a/migration/ram.c b/migration/ram.c index XXXXXXX..XXXXXXX 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -XXX,XX +XXX,XX @@ out: return ret; } +/** + * ram_save_target_page_multifd: save one target page + * + * Returns the number of pages written + * + * @rs: current RAM state + * @pss: data about the page we want to send + */ +static int ram_save_target_page_multifd(RAMState *rs, PageSearchStatus *pss) +{ + RAMBlock *block = pss->block; + ram_addr_t offset = ((ram_addr_t)pss->page) << TARGET_PAGE_BITS; + + if (!migration_in_postcopy()) { + return ram_save_multifd_page(pss->pss_channel, block, offset); + } + + int res = save_zero_page(pss, block, offset); + if (res > 0) { + return res; + } + + return ram_save_page(rs, pss); +} + /** * ram_save_host_page: save a whole host page * @@ -XXX,XX +XXX,XX @@ static int ram_save_setup(QEMUFile *f, void *opaque) ram_control_after_iterate(f, RAM_CONTROL_SETUP); migration_ops = g_malloc0(sizeof(MigrationOps)); - migration_ops->ram_save_target_page = ram_save_target_page_legacy; + if (migrate_use_multifd() && !migrate_use_main_zero_page()) { + migration_ops->ram_save_target_page = ram_save_target_page_multifd; + } else { + migration_ops->ram_save_target_page = ram_save_target_page_legacy; + } + ret = multifd_send_sync_main(f); if (ret < 0) { return ret; -- 2.38.1
Based on top of my next branch. - Rebased on top of latest upstream - Redo a lot of the packet accounting still not completely perfect, but much better than what is upstream Still working continuing on that. Please review. [v2] - rebased on top of latest upstream - lots of minor fixes - start support for atomic counters * we need to move ram_limit_used/max to migration.c * that means fixing rdma.c * and test-vmstate. So I am donig that right now. Juan Quintela (11): migration: Update atomic stats out of the mutex migration: Make multifd_bytes atomic multifd: We already account for this packet on the multifd thread multifd: Count the number of bytes sent correctly migration: Make ram_save_target_page() a pointer multifd: Make flags field thread local multifd: Prepare to send a packet without the mutex held multifd: Add capability to enable/disable zero_page multifd: Support for zero pages transmission multifd: Zero pages transmission So we use multifd to transmit zero pages. qapi/migration.json | 8 ++- migration/migration.h | 1 + migration/multifd.h | 36 ++++++++++-- migration/ram.h | 1 + hw/core/machine.c | 1 + migration/migration.c | 16 +++++- migration/multifd.c | 123 +++++++++++++++++++++++++++++++---------- migration/ram.c | 51 +++++++++++++++-- migration/trace-events | 8 +-- 9 files changed, 197 insertions(+), 48 deletions(-) -- 2.39.1
Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/multifd.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ static int multifd_send_pages(QEMUFile *f) transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len; qemu_file_acct_rate_limit(f, transferred); ram_counters.multifd_bytes += transferred; + qemu_mutex_unlock(&p->mutex); stat64_add(&ram_atomic_counters.transferred, transferred); - qemu_mutex_unlock(&p->mutex); qemu_sem_post(&p->sem); return 1; @@ -XXX,XX +XXX,XX @@ int multifd_send_sync_main(QEMUFile *f) p->pending_job++; qemu_file_acct_rate_limit(f, p->packet_len); ram_counters.multifd_bytes += p->packet_len; + qemu_mutex_unlock(&p->mutex); stat64_add(&ram_atomic_counters.transferred, p->packet_len); - qemu_mutex_unlock(&p->mutex); qemu_sem_post(&p->sem); if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { -- 2.39.1
In the spirit of: commit 394d323bc3451e4d07f13341cb8817fac8dfbadd Author: Peter Xu <peterx@redhat.com> Date: Tue Oct 11 17:55:51 2022 -0400 migration: Use atomic ops properly for page accountings Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/ram.h | 1 + migration/migration.c | 4 ++-- migration/multifd.c | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/migration/ram.h b/migration/ram.h index XXXXXXX..XXXXXXX 100644 --- a/migration/ram.h +++ b/migration/ram.h @@ -XXX,XX +XXX,XX @@ typedef struct { Stat64 duplicate; Stat64 normal; Stat64 postcopy_bytes; + Stat64 multifd_bytes; } MigrationAtomicStats; extern MigrationAtomicStats ram_atomic_counters; diff --git a/migration/migration.c b/migration/migration.c index XXXXXXX..XXXXXXX 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -XXX,XX +XXX,XX @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s) ram_counters.dirty_sync_missed_zero_copy; info->ram->postcopy_requests = ram_counters.postcopy_requests; info->ram->page_size = page_size; - info->ram->multifd_bytes = ram_counters.multifd_bytes; + info->ram->multifd_bytes = stat64_get(&ram_atomic_counters.multifd_bytes); info->ram->pages_per_second = s->pages_per_second; info->ram->precopy_bytes = ram_counters.precopy_bytes; info->ram->downtime_bytes = ram_counters.downtime_bytes; @@ -XXX,XX +XXX,XX @@ static MigThrError migration_detect_error(MigrationState *s) static uint64_t migration_total_bytes(MigrationState *s) { return qemu_file_total_transferred(s->to_dst_file) + - ram_counters.multifd_bytes; + stat64_get(&ram_atomic_counters.multifd_bytes); } static void migration_calculate_complete(MigrationState *s) diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ static int multifd_send_pages(QEMUFile *f) p->pages = pages; transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len; qemu_file_acct_rate_limit(f, transferred); - ram_counters.multifd_bytes += transferred; qemu_mutex_unlock(&p->mutex); + stat64_add(&ram_atomic_counters.multifd_bytes, transferred); stat64_add(&ram_atomic_counters.transferred, transferred); qemu_sem_post(&p->sem); @@ -XXX,XX +XXX,XX @@ int multifd_send_sync_main(QEMUFile *f) p->flags |= MULTIFD_FLAG_SYNC; p->pending_job++; qemu_file_acct_rate_limit(f, p->packet_len); - ram_counters.multifd_bytes += p->packet_len; qemu_mutex_unlock(&p->mutex); + stat64_add(&ram_atomic_counters.multifd_bytes, p->packet_len); stat64_add(&ram_atomic_counters.transferred, p->packet_len); qemu_sem_post(&p->sem); -- 2.39.1
Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/multifd.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ int multifd_send_sync_main(QEMUFile *f) p->packet_num = multifd_send_state->packet_num++; p->flags |= MULTIFD_FLAG_SYNC; p->pending_job++; - qemu_file_acct_rate_limit(f, p->packet_len); qemu_mutex_unlock(&p->mutex); - stat64_add(&ram_atomic_counters.multifd_bytes, p->packet_len); - stat64_add(&ram_atomic_counters.transferred, p->packet_len); qemu_sem_post(&p->sem); if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { -- 2.39.1
Current code asumes that all pages are whole. That is not true for example for compression already. Fix it for creating a new field ->sent_bytes that includes it. All ram_counters are used only from the migration thread, so we have two options: - put a mutex and fill everything when we sent it (not only ram_counters, also qemu_file->xfer_bytes). - Create a local variable that implements how much has been sent through each channel. And when we push another packet, we "add" the previous stats. I choose two due to less changes overall. On the previous code we increase transferred and then we sent. Current code goes the other way around. It sents the data, and after the fact, it updates the counters. Notice that each channel can have a maximum of half a megabyte of data without counting, so it is not very important. Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/multifd.h | 2 ++ migration/multifd.c | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/migration/multifd.h b/migration/multifd.h index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -XXX,XX +XXX,XX @@ typedef struct { uint32_t flags; /* global number of generated multifd packets */ uint64_t packet_num; + /* How many bytes have we sent on the last packet */ + uint64_t sent_bytes; /* thread has work to do */ int pending_job; /* array of pages to sent. diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ static int multifd_send_pages(QEMUFile *f) static int next_channel; MultiFDSendParams *p = NULL; /* make happy gcc */ MultiFDPages_t *pages = multifd_send_state->pages; - uint64_t transferred; if (qatomic_read(&multifd_send_state->exiting)) { return -1; @@ -XXX,XX +XXX,XX @@ static int multifd_send_pages(QEMUFile *f) p->packet_num = multifd_send_state->packet_num++; multifd_send_state->pages = p->pages; p->pages = pages; - transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len; + uint64_t transferred = p->sent_bytes; + p->sent_bytes = 0; qemu_file_acct_rate_limit(f, transferred); qemu_mutex_unlock(&p->mutex); stat64_add(&ram_atomic_counters.multifd_bytes, transferred); @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) } qemu_mutex_lock(&p->mutex); + p->sent_bytes += p->packet_len; + p->sent_bytes += p->next_packet_size; p->pending_job--; qemu_mutex_unlock(&p->mutex); -- 2.39.1
We are going to create a new function for multifd latest in the series. Signed-off-by: Juan Quintela <quintela@redhat.com> Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com> --- migration/ram.c | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index XXXXXXX..XXXXXXX 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -XXX,XX +XXX,XX @@ void dirty_sync_missed_zero_copy(void) ram_counters.dirty_sync_missed_zero_copy++; } +struct MigrationOps { + int (*ram_save_target_page)(RAMState *rs, PageSearchStatus *pss); +}; +typedef struct MigrationOps MigrationOps; + +MigrationOps *migration_ops; + CompressionStats compression_counters; struct CompressParam { @@ -XXX,XX +XXX,XX @@ static bool save_compress_page(RAMState *rs, PageSearchStatus *pss, } /** - * ram_save_target_page: save one target page + * ram_save_target_page_legacy: save one target page * * Returns the number of pages written * * @rs: current RAM state * @pss: data about the page we want to send */ -static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss) +static int ram_save_target_page_legacy(RAMState *rs, PageSearchStatus *pss) { RAMBlock *block = pss->block; ram_addr_t offset = ((ram_addr_t)pss->page) << TARGET_PAGE_BITS; @@ -XXX,XX +XXX,XX @@ static int ram_save_host_page_urgent(PageSearchStatus *pss) if (page_dirty) { /* Be strict to return code; it must be 1, or what else? */ - if (ram_save_target_page(rs, pss) != 1) { + if (migration_ops->ram_save_target_page(rs, pss) != 1) { error_report_once("%s: ram_save_target_page failed", __func__); ret = -1; goto out; @@ -XXX,XX +XXX,XX @@ static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss) if (preempt_active) { qemu_mutex_unlock(&rs->bitmap_mutex); } - tmppages = ram_save_target_page(rs, pss); + tmppages = migration_ops->ram_save_target_page(rs, pss); if (tmppages >= 0) { pages += tmppages; /* @@ -XXX,XX +XXX,XX @@ static void ram_save_cleanup(void *opaque) xbzrle_cleanup(); compress_threads_save_cleanup(); ram_state_cleanup(rsp); + g_free(migration_ops); + migration_ops = NULL; } static void ram_state_reset(RAMState *rs) @@ -XXX,XX +XXX,XX @@ static int ram_save_setup(QEMUFile *f, void *opaque) ram_control_before_iterate(f, RAM_CONTROL_SETUP); ram_control_after_iterate(f, RAM_CONTROL_SETUP); + migration_ops = g_malloc0(sizeof(MigrationOps)); + migration_ops->ram_save_target_page = ram_save_target_page_legacy; ret = multifd_send_sync_main(f); if (ret < 0) { return ret; -- 2.39.1
Use of flags with respect to locking was incensistant. For the sending side: - it was set to 0 with mutex held on the multifd channel. - MULTIFD_FLAG_SYNC was set with mutex held on the migration thread. - Everything else was done without the mutex held on the multifd channel. On the reception side, it is not used on the migration thread, only on the multifd channels threads. So we move it to the multifd channels thread only variables, and we introduce a new bool sync_needed on the send side to pass that information. Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/multifd.h | 10 ++++++---- migration/multifd.c | 23 +++++++++++++---------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/migration/multifd.h b/migration/multifd.h index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -XXX,XX +XXX,XX @@ typedef struct { bool running; /* should this thread finish */ bool quit; - /* multifd flags for each packet */ - uint32_t flags; /* global number of generated multifd packets */ uint64_t packet_num; /* How many bytes have we sent on the last packet */ uint64_t sent_bytes; + /* Do we need to do an iteration sync */ + bool sync_needed; /* thread has work to do */ int pending_job; /* array of pages to sent. @@ -XXX,XX +XXX,XX @@ typedef struct { /* pointer to the packet */ MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; /* size of the next packet that contains pages */ uint32_t next_packet_size; /* packets sent through this channel */ @@ -XXX,XX +XXX,XX @@ typedef struct { bool running; /* should this thread finish */ bool quit; - /* multifd flags for each packet */ - uint32_t flags; /* global number of generated multifd packets */ uint64_t packet_num; @@ -XXX,XX +XXX,XX @@ typedef struct { /* pointer to the packet */ MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; /* size of the next packet that contains pages */ uint32_t next_packet_size; /* packets sent through this channel */ diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ int multifd_send_sync_main(QEMUFile *f) } p->packet_num = multifd_send_state->packet_num++; - p->flags |= MULTIFD_FLAG_SYNC; + p->sync_needed = true; p->pending_job++; qemu_mutex_unlock(&p->mutex); qemu_sem_post(&p->sem); @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) if (p->pending_job) { uint64_t packet_num = p->packet_num; - uint32_t flags = p->flags; + p->flags = 0; + if (p->sync_needed) { + p->flags |= MULTIFD_FLAG_SYNC; + p->sync_needed = false; + } p->normal_num = 0; if (use_zero_copy_send) { @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) } } multifd_send_fill_packet(p); - p->flags = 0; p->num_packets++; p->total_normal_pages += p->normal_num; p->pages->num = 0; p->pages->block = NULL; qemu_mutex_unlock(&p->mutex); - trace_multifd_send(p->id, packet_num, p->normal_num, flags, + trace_multifd_send(p->id, packet_num, p->normal_num, p->flags, p->next_packet_size); if (use_zero_copy_send) { @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) p->pending_job--; qemu_mutex_unlock(&p->mutex); - if (flags & MULTIFD_FLAG_SYNC) { + if (p->flags & MULTIFD_FLAG_SYNC) { qemu_sem_post(&p->sem_sync); } qemu_sem_post(&multifd_send_state->channels_ready); @@ -XXX,XX +XXX,XX @@ static void *multifd_recv_thread(void *opaque) rcu_register_thread(); while (true) { - uint32_t flags; + bool sync_needed = false; if (p->quit) { break; @@ -XXX,XX +XXX,XX @@ static void *multifd_recv_thread(void *opaque) break; } - flags = p->flags; + trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags, + p->next_packet_size); + sync_needed = p->flags & MULTIFD_FLAG_SYNC; /* recv methods don't know how to handle the SYNC flag */ p->flags &= ~MULTIFD_FLAG_SYNC; - trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags, - p->next_packet_size); p->num_packets++; p->total_normal_pages += p->normal_num; qemu_mutex_unlock(&p->mutex); @@ -XXX,XX +XXX,XX @@ static void *multifd_recv_thread(void *opaque) } } - if (flags & MULTIFD_FLAG_SYNC) { + if (sync_needed) { qemu_sem_post(&multifd_recv_state->sem_sync); qemu_sem_wait(&p->sem_sync); } -- 2.39.1
We do the send_prepare() and the fill of the head packet without the mutex held. It will help a lot for compression and later in the series for zero pages. Notice that we can use p->pages without holding p->mutex because p->pending_job == 1. Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/multifd.h | 2 ++ migration/multifd.c | 12 ++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/migration/multifd.h b/migration/multifd.h index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -XXX,XX +XXX,XX @@ typedef struct { /* array of pages to sent. * The owner of 'pages' depends of 'pending_job' value: * pending_job == 0 -> migration_thread can use it. + * No need for mutex lock. * pending_job != 0 -> multifd_channel can use it. + * No need for mutex lock. */ MultiFDPages_t *pages; diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) p->flags |= MULTIFD_FLAG_SYNC; p->sync_needed = false; } + qemu_mutex_unlock(&p->mutex); + p->normal_num = 0; if (use_zero_copy_send) { @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) if (p->normal_num) { ret = multifd_send_state->ops->send_prepare(p, &local_err); if (ret != 0) { - qemu_mutex_unlock(&p->mutex); break; } } multifd_send_fill_packet(p); - p->num_packets++; - p->total_normal_pages += p->normal_num; - p->pages->num = 0; - p->pages->block = NULL; - qemu_mutex_unlock(&p->mutex); trace_multifd_send(p->id, packet_num, p->normal_num, p->flags, p->next_packet_size); @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) } qemu_mutex_lock(&p->mutex); + p->num_packets++; + p->total_normal_pages += p->normal_num; + p->pages->num = 0; + p->pages->block = NULL; p->sent_bytes += p->packet_len; p->sent_bytes += p->next_packet_size; p->pending_job--; -- 2.39.1
We have to enable it by default until we introduce the new code. Signed-off-by: Juan Quintela <quintela@redhat.com> --- Change it to a capability. As capabilities are off by default, have to change MULTIFD_ZERO_PAGE to MAIN_ZERO_PAGE, so it is false for default, and true for older versions. --- qapi/migration.json | 8 +++++++- migration/migration.h | 1 + hw/core/machine.c | 1 + migration/migration.c | 13 ++++++++++++- 4 files changed, 21 insertions(+), 2 deletions(-) diff --git a/qapi/migration.json b/qapi/migration.json index XXXXXXX..XXXXXXX 100644 --- a/qapi/migration.json +++ b/qapi/migration.json @@ -XXX,XX +XXX,XX @@ # Requires that QEMU be permitted to use locked memory # for guest RAM pages. # (since 7.1) +# # @postcopy-preempt: If enabled, the migration process will allow postcopy # requests to preempt precopy stream, so postcopy requests # will be handled faster. This is a performance feature and # should not affect the correctness of postcopy migration. # (since 7.1) # +# @main-zero-page: If enabled, the detection of zero pages will be +# done on the main thread. Otherwise it is done on +# the multifd threads. +# (since 8.0) +# # Features: # @unstable: Members @x-colo and @x-ignore-shared are experimental. # @@ -XXX,XX +XXX,XX @@ 'dirty-bitmaps', 'postcopy-blocktime', 'late-block-activate', { 'name': 'x-ignore-shared', 'features': [ 'unstable' ] }, 'validate-uuid', 'background-snapshot', - 'zero-copy-send', 'postcopy-preempt'] } + 'zero-copy-send', 'postcopy-preempt', 'main-zero-page'] } ## # @MigrationCapabilityStatus: diff --git a/migration/migration.h b/migration/migration.h index XXXXXXX..XXXXXXX 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -XXX,XX +XXX,XX @@ int migrate_multifd_channels(void); MultiFDCompression migrate_multifd_compression(void); int migrate_multifd_zlib_level(void); int migrate_multifd_zstd_level(void); +bool migrate_use_main_zero_page(void); #ifdef CONFIG_LINUX bool migrate_use_zero_copy_send(void); diff --git a/hw/core/machine.c b/hw/core/machine.c index XXXXXXX..XXXXXXX 100644 --- a/hw/core/machine.c +++ b/hw/core/machine.c @@ -XXX,XX +XXX,XX @@ const size_t hw_compat_7_1_len = G_N_ELEMENTS(hw_compat_7_1); GlobalProperty hw_compat_7_0[] = { { "arm-gicv3-common", "force-8-bit-prio", "on" }, { "nvme-ns", "eui64-default", "on"}, + { "migration", "main-zero-page", "true" }, }; const size_t hw_compat_7_0_len = G_N_ELEMENTS(hw_compat_7_0); diff --git a/migration/migration.c b/migration/migration.c index XXXXXXX..XXXXXXX 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -XXX,XX +XXX,XX @@ INITIALIZE_MIGRATE_CAPS_SET(check_caps_background_snapshot, MIGRATION_CAPABILITY_XBZRLE, MIGRATION_CAPABILITY_X_COLO, MIGRATION_CAPABILITY_VALIDATE_UUID, - MIGRATION_CAPABILITY_ZERO_COPY_SEND); + MIGRATION_CAPABILITY_ZERO_COPY_SEND, + MIGRATION_CAPABILITY_MAIN_ZERO_PAGE); /* When we add fault tolerance, we could have several migrations at once. For now we don't need to add @@ -XXX,XX +XXX,XX @@ bool migrate_use_multifd(void) return s->enabled_capabilities[MIGRATION_CAPABILITY_MULTIFD]; } +bool migrate_use_main_zero_page(void) +{ + MigrationState *s = migrate_get_current(); + + /* We will enable this when we add the right code. */ + return true || s->enabled_capabilities[MIGRATION_CAPABILITY_MAIN_ZERO_PAGE]; +} + bool migrate_pause_before_switchover(void) { MigrationState *s; @@ -XXX,XX +XXX,XX @@ static Property migration_properties[] = { DEFINE_PROP_MIG_CAP("x-zero-copy-send", MIGRATION_CAPABILITY_ZERO_COPY_SEND), #endif + DEFINE_PROP_MIG_CAP("main-zero-page", + MIGRATION_CAPABILITY_MAIN_ZERO_PAGE), DEFINE_PROP_END_OF_LIST(), }; -- 2.39.1
This patch adds counters and similar. Logic will be added on the following patch. Signed-off-by: Juan Quintela <quintela@redhat.com> --- Added counters for duplicated/non duplicated pages. Removed reviewed by from David. Add total_zero_pages --- migration/multifd.h | 17 ++++++++++++++++- migration/multifd.c | 36 +++++++++++++++++++++++++++++------- migration/ram.c | 2 -- migration/trace-events | 8 ++++---- 4 files changed, 49 insertions(+), 14 deletions(-) diff --git a/migration/multifd.h b/migration/multifd.h index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -XXX,XX +XXX,XX @@ typedef struct { /* size of the next packet that contains pages */ uint32_t next_packet_size; uint64_t packet_num; - uint64_t unused[4]; /* Reserved for future use */ + /* zero pages */ + uint32_t zero_pages; + uint32_t unused32[1]; /* Reserved for future use */ + uint64_t unused64[3]; /* Reserved for future use */ char ramblock[256]; uint64_t offset[]; } __attribute__((packed)) MultiFDPacket_t; @@ -XXX,XX +XXX,XX @@ typedef struct { uint64_t num_packets; /* non zero pages sent through this channel */ uint64_t total_normal_pages; + /* zero pages sent through this channel */ + uint64_t total_zero_pages; /* buffers to send */ struct iovec *iov; /* number of iovs used */ @@ -XXX,XX +XXX,XX @@ typedef struct { ram_addr_t *normal; /* num of non zero pages */ uint32_t normal_num; + /* Pages that are zero */ + ram_addr_t *zero; + /* num of zero pages */ + uint32_t zero_num; /* used for compression methods */ void *data; } MultiFDSendParams; @@ -XXX,XX +XXX,XX @@ typedef struct { uint8_t *host; /* non zero pages recv through this channel */ uint64_t total_normal_pages; + /* zero pages recv through this channel */ + uint64_t total_zero_pages; /* buffers to recv */ struct iovec *iov; /* Pages that are not zero */ ram_addr_t *normal; /* num of non zero pages */ uint32_t normal_num; + /* Pages that are zero */ + ram_addr_t *zero; + /* num of zero pages */ + uint32_t zero_num; /* used for de-compression methods */ void *data; } MultiFDRecvParams; diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ static void multifd_send_fill_packet(MultiFDSendParams *p) packet->normal_pages = cpu_to_be32(p->normal_num); packet->next_packet_size = cpu_to_be32(p->next_packet_size); packet->packet_num = cpu_to_be64(p->packet_num); + packet->zero_pages = cpu_to_be32(p->zero_num); if (p->pages->block) { strncpy(packet->ramblock, p->pages->block->idstr, 256); @@ -XXX,XX +XXX,XX @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) p->next_packet_size = be32_to_cpu(packet->next_packet_size); p->packet_num = be64_to_cpu(packet->packet_num); - if (p->normal_num == 0) { + p->zero_num = be32_to_cpu(packet->zero_pages); + if (p->zero_num > packet->pages_alloc - p->normal_num) { + error_setg(errp, "multifd: received packet " + "with %u zero pages and expected maximum pages are %u", + p->zero_num, packet->pages_alloc - p->normal_num) ; + return -1; + } + + if (p->normal_num == 0 && p->zero_num == 0) { return 0; } @@ -XXX,XX +XXX,XX @@ static int multifd_send_pages(QEMUFile *f) p->packet_num = multifd_send_state->packet_num++; multifd_send_state->pages = p->pages; p->pages = pages; + stat64_add(&ram_atomic_counters.normal, p->normal_num); + stat64_add(&ram_atomic_counters.duplicate, p->zero_num); uint64_t transferred = p->sent_bytes; p->sent_bytes = 0; qemu_file_acct_rate_limit(f, transferred); @@ -XXX,XX +XXX,XX @@ void multifd_save_cleanup(void) p->iov = NULL; g_free(p->normal); p->normal = NULL; + g_free(p->zero); + p->zero = NULL; multifd_send_state->ops->send_cleanup(p, &local_err); if (local_err) { migrate_set_error(migrate_get_current(), local_err); @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) qemu_mutex_unlock(&p->mutex); p->normal_num = 0; + p->zero_num = 0; if (use_zero_copy_send) { p->iovs_num = 0; @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) } multifd_send_fill_packet(p); - trace_multifd_send(p->id, packet_num, p->normal_num, p->flags, - p->next_packet_size); + trace_multifd_send(p->id, packet_num, p->normal_num, p->zero_num, + p->flags, p->next_packet_size); if (use_zero_copy_send) { /* Send header first, without zerocopy */ @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) qemu_mutex_lock(&p->mutex); p->num_packets++; p->total_normal_pages += p->normal_num; + p->total_zero_pages += p->zero_num; p->pages->num = 0; p->pages->block = NULL; p->sent_bytes += p->packet_len; @@ -XXX,XX +XXX,XX @@ out: qemu_mutex_unlock(&p->mutex); rcu_unregister_thread(); - trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages); + trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages, + p->total_zero_pages); return NULL; } @@ -XXX,XX +XXX,XX @@ int multifd_save_setup(Error **errp) p->normal = g_new0(ram_addr_t, page_count); p->page_size = qemu_target_page_size(); p->page_count = page_count; + p->zero = g_new0(ram_addr_t, page_count); if (migrate_use_zero_copy_send()) { p->write_flags = QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; @@ -XXX,XX +XXX,XX @@ int multifd_load_cleanup(Error **errp) p->iov = NULL; g_free(p->normal); p->normal = NULL; + g_free(p->zero); + p->zero = NULL; multifd_recv_state->ops->recv_cleanup(p); } qemu_sem_destroy(&multifd_recv_state->sem_sync); @@ -XXX,XX +XXX,XX @@ static void *multifd_recv_thread(void *opaque) break; } - trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags, - p->next_packet_size); + trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num, + p->flags, p->next_packet_size); sync_needed = p->flags & MULTIFD_FLAG_SYNC; /* recv methods don't know how to handle the SYNC flag */ p->flags &= ~MULTIFD_FLAG_SYNC; p->num_packets++; p->total_normal_pages += p->normal_num; + p->total_normal_pages += p->zero_num; qemu_mutex_unlock(&p->mutex); if (p->normal_num) { @@ -XXX,XX +XXX,XX @@ static void *multifd_recv_thread(void *opaque) qemu_mutex_unlock(&p->mutex); rcu_unregister_thread(); - trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages); + trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages, + p->total_zero_pages); return NULL; } @@ -XXX,XX +XXX,XX @@ int multifd_load_setup(Error **errp) p->normal = g_new0(ram_addr_t, page_count); p->page_count = page_count; p->page_size = qemu_target_page_size(); + p->zero = g_new0(ram_addr_t, page_count); } for (i = 0; i < thread_count; i++) { diff --git a/migration/ram.c b/migration/ram.c index XXXXXXX..XXXXXXX 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -XXX,XX +XXX,XX @@ static int ram_save_multifd_page(QEMUFile *file, RAMBlock *block, if (multifd_queue_page(file, block, offset) < 0) { return -1; } - stat64_add(&ram_atomic_counters.normal, 1); - return 1; } diff --git a/migration/trace-events b/migration/trace-events index XXXXXXX..XXXXXXX 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -XXX,XX +XXX,XX @@ postcopy_preempt_reset_channel(void) "" # multifd.c multifd_new_send_channel_async(uint8_t id) "channel %u" -multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " pages %u flags 0x%x next packet size %u" +multifd_recv(uint8_t id, uint64_t packet_num, uint32_t normal, uint32_t zero, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u zero pages %u flags 0x%x next packet size %u" multifd_recv_new_channel(uint8_t id) "channel %u" multifd_recv_sync_main(long packet_num) "packet num %ld" multifd_recv_sync_main_signal(uint8_t id) "channel %u" multifd_recv_sync_main_wait(uint8_t id) "channel %u" multifd_recv_terminate_threads(bool error) "error %d" -multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %u packets %" PRIu64 " pages %" PRIu64 +multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages, uint64_t zero_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64 " zero pages %" PRIu64 multifd_recv_thread_start(uint8_t id) "%u" -multifd_send(uint8_t id, uint64_t packet_num, uint32_t normal, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u flags 0x%x next packet size %u" +multifd_send(uint8_t id, uint64_t packet_num, uint32_t normalpages, uint32_t zero_pages, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u zero pages %u flags 0x%x next packet size %u" multifd_send_error(uint8_t id) "channel %u" multifd_send_sync_main(long packet_num) "packet num %ld" multifd_send_sync_main_signal(uint8_t id) "channel %u" multifd_send_sync_main_wait(uint8_t id) "channel %u" multifd_send_terminate_threads(bool error) "error %d" -multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64 +multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages, uint64_t zero_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64 " zero pages %" PRIu64 multifd_send_thread_start(uint8_t id) "%u" multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s" multifd_tls_outgoing_handshake_error(void *ioc, const char *err) "ioc=%p err=%s" -- 2.39.1
This implements the zero page dection and handling. Signed-off-by: Juan Quintela <quintela@redhat.com> --- Add comment for offset (dave) Use local variables for offset/block to have shorter lines --- migration/multifd.h | 5 +++++ migration/multifd.c | 45 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/migration/multifd.h b/migration/multifd.h index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -XXX,XX +XXX,XX @@ typedef struct { uint32_t unused32[1]; /* Reserved for future use */ uint64_t unused64[3]; /* Reserved for future use */ char ramblock[256]; + /* + * This array contains the pointers to: + * - normal pages (initial normal_pages entries) + * - zero pages (following zero_pages entries) + */ uint64_t offset[]; } __attribute__((packed)) MultiFDPacket_t; diff --git a/migration/multifd.c b/migration/multifd.c index XXXXXXX..XXXXXXX 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -XXX,XX +XXX,XX @@ */ #include "qemu/osdep.h" +#include "qemu/cutils.h" #include "qemu/rcu.h" #include "exec/target_page.h" #include "sysemu/sysemu.h" @@ -XXX,XX +XXX,XX @@ static void multifd_send_fill_packet(MultiFDSendParams *p) packet->offset[i] = cpu_to_be64(temp); } + for (i = 0; i < p->zero_num; i++) { + /* there are architectures where ram_addr_t is 32 bit */ + uint64_t temp = p->zero[i]; + + packet->offset[p->normal_num + i] = cpu_to_be64(temp); + } } static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) @@ -XXX,XX +XXX,XX @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) p->normal[i] = offset; } + for (i = 0; i < p->zero_num; i++) { + uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]); + + if (offset > (block->used_length - p->page_size)) { + error_setg(errp, "multifd: offset too long %" PRIu64 + " (max " RAM_ADDR_FMT ")", + offset, block->used_length); + return -1; + } + p->zero[i] = offset; + } + return 0; } @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) { MultiFDSendParams *p = opaque; Error *local_err = NULL; + /* + * older qemu don't understand zero page on multifd channel. To + * have capabilities "false" by default, we need to name it this + * way. + */ + bool use_multifd_zero_page = !migrate_use_main_zero_page(); int ret = 0; bool use_zero_copy_send = migrate_use_zero_copy_send(); @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) qemu_mutex_lock(&p->mutex); if (p->pending_job) { + RAMBlock *rb = p->pages->block; uint64_t packet_num = p->packet_num; p->flags = 0; if (p->sync_needed) { @@ -XXX,XX +XXX,XX @@ static void *multifd_send_thread(void *opaque) } for (int i = 0; i < p->pages->num; i++) { - p->normal[p->normal_num] = p->pages->offset[i]; - p->normal_num++; + uint64_t offset = p->pages->offset[i]; + if (use_multifd_zero_page && + buffer_is_zero(rb->host + offset, p->page_size)) { + p->zero[p->zero_num] = offset; + p->zero_num++; + ram_release_page(rb->idstr, offset); + } else { + p->normal[p->normal_num] = offset; + p->normal_num++; + } } if (p->normal_num) { @@ -XXX,XX +XXX,XX @@ static void *multifd_recv_thread(void *opaque) } } + for (int i = 0; i < p->zero_num; i++) { + void *page = p->host + p->zero[i]; + if (!buffer_is_zero(page, p->page_size)) { + memset(page, 0, p->page_size); + } + } + if (sync_needed) { qemu_sem_post(&multifd_recv_state->sem_sync); qemu_sem_wait(&p->sem_sync); -- 2.39.1
Signed-off-by: Juan Quintela <quintela@redhat.com> --- - Check zero_page property before using new code (Dave) --- migration/migration.c | 3 +-- migration/ram.c | 32 +++++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/migration/migration.c b/migration/migration.c index XXXXXXX..XXXXXXX 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -XXX,XX +XXX,XX @@ bool migrate_use_main_zero_page(void) { MigrationState *s = migrate_get_current(); - /* We will enable this when we add the right code. */ - return true || s->enabled_capabilities[MIGRATION_CAPABILITY_MAIN_ZERO_PAGE]; + return s->enabled_capabilities[MIGRATION_CAPABILITY_MAIN_ZERO_PAGE]; } bool migrate_pause_before_switchover(void) diff --git a/migration/ram.c b/migration/ram.c index XXXXXXX..XXXXXXX 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -XXX,XX +XXX,XX @@ out: return ret; } +/** + * ram_save_target_page_multifd: save one target page + * + * Returns the number of pages written + * + * @rs: current RAM state + * @pss: data about the page we want to send + */ +static int ram_save_target_page_multifd(RAMState *rs, PageSearchStatus *pss) +{ + RAMBlock *block = pss->block; + ram_addr_t offset = ((ram_addr_t)pss->page) << TARGET_PAGE_BITS; + + if (!migration_in_postcopy()) { + return ram_save_multifd_page(pss->pss_channel, block, offset); + } + + int res = save_zero_page(pss, block, offset); + if (res > 0) { + return res; + } + + return ram_save_page(rs, pss); +} + /** * ram_save_host_page: save a whole host page * @@ -XXX,XX +XXX,XX @@ static int ram_save_setup(QEMUFile *f, void *opaque) ram_control_after_iterate(f, RAM_CONTROL_SETUP); migration_ops = g_malloc0(sizeof(MigrationOps)); - migration_ops->ram_save_target_page = ram_save_target_page_legacy; + if (migrate_use_multifd() && !migrate_use_main_zero_page()) { + migration_ops->ram_save_target_page = ram_save_target_page_multifd; + } else { + migration_ops->ram_save_target_page = ram_save_target_page_legacy; + } + ret = multifd_send_sync_main(f); if (ret < 0) { return ret; -- 2.39.1