DWB flush (코드분석 04)

개발새발·2021년 12월 31일
0
post-thumbnail

DWB flush (코드분석 04)

코드 위치 : https://github.com/CUBRID/cubrid

dwb_flush_block()

storage/double_write_buffer.c: 2169

/*
 * dwb_flush_block () - Flush pages from specified block.
 *
 * return   : Error code.
 * thread_p (in): Thread entry.
 * block(in): The block that needs flush.
 * file_sync_helper_can_flush(in): True, if file sync helper thread can flush.
 * current_position_with_flags(out): Current position with flags.
 *
 *  Note: The block pages can't be modified by others during flush.
 */
STATIC_INLINE int
dwb_flush_block(THREAD_ENTRY *thread_p, DWB_BLOCK *block, bool file_sync_helper_can_flush,
				UINT64 *current_position_with_flags)
{
	UINT64 local_current_position_with_flags, new_position_with_flags;	// reset_bit_position 문에서 쓰이는 변수
	int error_code = NO_ERROR;	// #define NO_ERROR 0
	DWB_SLOT *p_dwb_ordered_slots = NULL;	// 정렬된 slot들을 담을 구조체 변수
	unsigned int i, ordered_slots_length;	// index, 정렬된 slot들의 길이
	PERF_UTIME_TRACKER time_track;	// 시간 기록용 구조체 변수
	int num_pages;	// page 수
	unsigned int current_block_to_flush, next_block_to_flush;	// 현재 flush되는 block, 그 다음 flush되는 block
	int max_pages_to_sync;	// sync될 수 있는 최대 page 수
#if defined(SERVER_MODE)	// SERVER MODE로 실행됐을 경우
	bool flush = false;	// flush 유무
	PERF_UTIME_TRACKER time_track_file_sync_helper;	// 시간 기록용 구조체 변수
#endif
#if !defined(NDEBUG)	// DEBUG MODE로 실행됐을 경우
	DWB_BLOCK *saved_file_sync_helper_block = NULL;	// helper thread에 의해 동기화될 block이 저장될 DWB_BLOCK 포인터
	LOG_LSA nxio_lsa;	// log 주소 식별자
#endif

	assert(block != NULL && block->count_wb_pages > 0 && dwb_is_created());
	// flush될 block이 NULL이거나 write buffer page 수가 0이하이거나 dwb가 생성되지 않았으면 crash

	PERF_UTIME_TRACKER_START(thread_p, &time_track);
	// 시간 기록 시작

	/* Currently we allow only one block to be flushed. */
	// 하나의 block만 flush 허용
	ATOMIC_INC_32(&dwb_Global.blocks_flush_counter, 1);
	// &dwb_Global.blocks_flush_counter++;
	assert(dwb_Global.blocks_flush_counter <= 1);
	// flush counter가 1보다 크면 crash (한번에 하나의 블록만 flush 가능하므로 1보다 크면 crash)

	/* Order slots by VPID, to flush faster. */
	// 빠른 flush를 위해 slot들을 VPID순으로 정렬
	error_code = dwb_block_create_ordered_slots(block, &p_dwb_ordered_slots, &ordered_slots_length);
	// slot ordering 함수 block의 slot수 + 1 만큼 메모리 할당하고 memcpy 마지막 슬롯은 빈 slot 으로 초기화
	// qsort 로 오래된 것 부터 순서대로 정렬 p_dwb_ordered_slots에 정렬한 slot 배열을 받는다. slots_length = count_wb_pages + 1;
	// 정렬 기준 순서 vol 식별자, page, log page, log offset 순
	// Slot  ordering은 다음 두 가지 장점 때문에 진행한다.
	// 1. DB에 있는 volume들이 VIPD 순서로 정렬된다.
	// 2. 같은 page는 page LSA를 통해서 최신 버전만 Flush
	if (error_code != NO_ERROR)
	{
		error_code = ER_FAILED;		// #define ER_FAILED -1
		goto end;
	}

	/* Remove duplicates */
	// 같은 page를 중복 flush 하지 않기 위해서 중복되는 slot 제거
	for (i = 0; i < block->count_wb_pages - 1; i++)
	{
		DWB_SLOT *s1, *s2;

		s1 = &p_dwb_ordered_slots[i];
		s2 = &p_dwb_ordered_slots[i + 1];

		assert(s1->io_page->prv.p_reserve_2 == 0);

		if (!VPID_ISNULL(&s1->vpid) && VPID_EQ(&s1->vpid, &s2->vpid))
		// s1->vpid의 pageid가 NULL_PAGEID가 아니고, s1과 s2의 요소들이 모두 같다면
		{
		/* Next slot contains the same page, but that page is newer than the current one.
		 * Invalidate the VPID to avoid flushing the page twice.
		 * I'm sure that the current slot is not in hash.
		 */
			assert(LSA_LE(&s1->lsa, &s2->lsa));

			VPID_SET_NULL(&s1->vpid);
			// s2의 slot에 동일한 page가 포함되어 있고, 더 최신이므로 s1을 버림

			assert(s1->position_in_block < DWB_BLOCK_NUM_PAGES);
			VPID_SET_NULL(&(block->slots[s1->position_in_block].vpid));
			// 같은 page를 flush하지 않기 위해 s1의 VPID 무효화

			fileio_initialize_res(thread_p, s1->io_page, IO_PAGESIZE);
			// s1->io_page의 모든 요소를 초기화
		}
		// 정렬한 slot(p_dwb_ordered_slots)의 n과 n + 1의 vpid를 비교해서 같으면
		// n번째의 slot(이전 시점의 page LSA 가진 slot)을 초기화한다.

		/* Check for WAL protocol. */
		// WAL(write-ahead logging, 로그 선행 기입)을 사용하는 시스템에서 모든 수정은 적용 이전에 로그에 기록된다.
#if !defined(NDEBUG)	// DEBUG MODE로 실행됐을 경우
		if (s1->io_page->prv.pageid != NULL_PAGEID && logpb_need_wal(&s1->io_page->prv.lsa))
		// 로그 선행 기입이 되지 않아서 기입이 필요하다면
		{
			/* Need WAL. Check whether log buffer pool was destroyed. */
			// WAL 필요함, log buffer pool이 파괴되었는지 확인
			nxio_lsa = log_Gl.append.get_nxio_lsa();
			assert(LSA_ISNULL(&nxio_lsa));
		}
#endif
	}

	PERF_UTIME_TRACKER_TIME(thread_p, &time_track, PSTAT_DWB_FLUSH_BLOCK_SORT_TIME_COUNTERS);
	// slot 정렬에 걸린 시간 기록

#if !defined(NDEBUG)	// DEBUG MODE로 실행됐을 경우
	saved_file_sync_helper_block = (DWB_BLOCK *)dwb_Global.file_sync_helper_block;
	// file sync helper 시간 기록 시작
#endif

#if defined(SERVER_MODE)	// SERVER MODE로 실행됐을 경우
	PERF_UTIME_TRACKER_START(thread_p, &time_track_file_sync_helper);

	while (dwb_Global.file_sync_helper_block != NULL)
	// 선언 : DWB_BLOCK *volatile file_sync_helper_block; /* The block that will be sync by helper thread.
	{
		flush = true;

		/* Be sure that the previous block was written on disk, before writing the current block. */
		// 현재 block을 쓰기 전에 이전 block이 disk에 기록되었는지 확인해야함
		if (dwb_is_file_sync_helper_daemon_available())
		{
			/* Wait for file sync helper. */
			thread_sleep(1);
			// file sync helper를 기다림
		}
		else
		{
			/* Helper not available, flush the volumes from previous block. */
			// helper 사용이 불가능하다면, 이전 block에서 volume을 flush
			for (i = 0; i < dwb_Global.file_sync_helper_block->count_flush_volumes_info; i++)
			{
				assert(dwb_Global.file_sync_helper_block->flush_volumes_info[i].vdes != NULL_VOLDES);
				// flush_volumes_info의 vdes가 -1이면 crash
				// #define NULL_VOLDES   (-1)

				if (ATOMIC_INC_32(&(dwb_Global.file_sync_helper_block->flush_volumes_info[i].num_pages), 0) >= 0)
				{
					(void)fileio_synchronize(thread_p,
											 dwb_Global.file_sync_helper_block->flush_volumes_info[i].vdes, NULL,
											 FILEIO_SYNC_ONLY);
					// Database volume의 상태를 disk의 상태와 동기화

					dwb_log("dwb_flush_block: Synchronized volume %d\n",
							dwb_Global.file_sync_helper_block->flush_volumes_info[i].vdes);
					// 동기화 했다는 로그 남김
				}
			}
			(void)ATOMIC_TAS_ADDR(&dwb_Global.file_sync_helper_block, (DWB_BLOCK *)NULL);
			// &dwb_Global.file_sync_helper_block = (DWB_BLOCK *)NULL;
		}
	}

#if !defined(NDEBUG)	// DEBUG MODE로 실행됐을 경우
	if (saved_file_sync_helper_block)
	// 위에서 (DWB_BLOCK *)dwb_Global.file_sync_helper_block을 대입한 바 있음
	{
		for (i = 0; i < saved_file_sync_helper_block->count_flush_volumes_info; i++)
		{
			assert(saved_file_sync_helper_block->flush_volumes_info[i].all_pages_written == true && saved_file_sync_helper_block->flush_volumes_info[i].num_pages == 0);
			// 위 조건에 만족하지 않으면 crash
		}
	}
#endif

	if (flush == true)
	{
		PERF_UTIME_TRACKER_TIME(thread_p, &time_track_file_sync_helper, PSTAT_DWB_WAIT_FILE_SYNC_HELPER_TIME_COUNTERS);
		// file sync helper 걸린 시간 기록
	}
#endif /* SERVER_MODE */

	ATOMIC_TAS_32(&block->count_flush_volumes_info, 0);
	// count_flush_volumes_info = 0;
	block->all_pages_written = false;

	/* First, write and flush the double write file buffer. */
	// 먼저 DWB volume에 write, flush
	if (fileio_write_pages(thread_p, dwb_Global.vdes, block->write_buffer, 0, block->count_wb_pages,
						   IO_PAGESIZE, FILEIO_WRITE_NO_COMPENSATE_WRITE) == NULL)
	// system crash가 일어나기 전에 DWB volume으로 먼저 Flush한다.
	// 빠르게 Flush하기위해 한 개의 block을 한번에 write한다.
	// write_buffer를 사용해 slot의 순서대로 write를 진행한다.
	{
		/* Something wrong happened. */
		// 함수가 NULL을 반환했을 시 오류 감지
		assert(false);
		error_code = ER_FAILED;
		goto end;
	}

	/* Increment statistics after writing in double write volume. */
	// double write volume에 write 작업 후 통계 증가
	perfmon_add_stat(thread_p, PSTAT_PB_NUM_IOWRITES, block->count_wb_pages);
	// 통계량 축적 (Accumulate amount to statistic)

	if (fileio_synchronize(thread_p, dwb_Global.vdes, dwb_Volume_name, FILEIO_SYNC_ONLY) != dwb_Global.vdes)
	// Database volume의 상태를 disk의 상태와 동기화
	// fileio_synchronize()로 fsync()를 호출해 Flush를 마무리한다.
	{
		// 함수 반환값이 dwb_Global.ves와 일치하지 않을 시 오류 감지
		assert(false);
		/* Something wrong happened. */
		error_code = ER_FAILED;
		goto end;
	}
	dwb_log("dwb_flush_block: DWB synchronized\n");
	// 동기화 했다는 로그 남김

	/* Now, write and flush the original location. */
	// 이제 정렬된 slot들을 DB에다가 write, flush
	error_code =
		dwb_write_block(thread_p, block, p_dwb_ordered_slots, ordered_slots_length, file_sync_helper_can_flush, true);
	// DB에 해당하는 page마다 write() 함수를 호출하여 write를 진행한다.
	if (error_code != NO_ERROR)
	{
		// 오류 감지
		assert(false);
		goto end;
	}

	max_pages_to_sync = prm_get_integer_value(PRM_ID_PB_SYNC_ON_NFLUSH) / 2;
	// (enum param_id)PRM_ID_PB_SYNC_ON_NFLUSH = 74

	/* Now, flush only the volumes having pages in current block. */
	// 이제 현재 block에 page가 있는 volume만 flush
	for (i = 0; i < block->count_flush_volumes_info; i++)
	{
		assert(block->flush_volumes_info[i].vdes != NULL_VOLDES);
		// flush_volumes_info의 vdes가 -1이면 crash
		// #define NULL_VOLDES   (-1)

		num_pages = ATOMIC_INC_32(&block->flush_volumes_info[i].num_pages, 0);
		// num_pages에 &block->flush_volumes_info[i].num_pages 대입
		if (num_pages == 0)
		{
			/* Flushed by helper. */
			// helper에 의해 flush 완료됨
			continue;
		}

#if defined(SERVER_MODE)	// SERVER MODE로 실행됐을 경우
		if (file_sync_helper_can_flush == true)
		{
			if ((num_pages > max_pages_to_sync) && dwb_is_file_sync_helper_daemon_available())
			// page 수가 최대 동기화 page 수보다 크고 daemon이 사용가능하다면
			{
				/* Let the helper thread to flush volumes having many pages. */
				// helper thread가 많은 page를 가진 volume을 flush하도록 해줌
				assert(dwb_Global.file_sync_helper_block != NULL);
				// file_sync_helper_block이 NULL이면 crash
				continue;
			}
		}
		else
		{
			assert(dwb_Global.file_sync_helper_block == NULL);
			// file_sync_helper_block이 NULL이 아니면 crash
		}
#endif

		if (!ATOMIC_CAS_32(&block->flush_volumes_info[i].flushed_status, VOLUME_NOT_FLUSHED,
						   VOLUME_FLUSHED_BY_DWB_FLUSH_THREAD))
		// flush할 때 각 볼륨에 flush, 그 다음 동기화
		// CAS하는 이유는 flush한 볼륨만 동기화를 해주면 되기 때문
		// compare and swap은 첫번째, 두번째 인자가 같으면 세번째 인자를 첫번째 포인터에 대입하고 true 반환, 다르면 false 반환
		// (enum <unnamed>)VOLUME_NOT_FLUSHED = 0
		// (enum <unnamed>)VOLUME_FLUSHED_BY_DWB_FLUSH_THREAD = 2
		{
			/* Flushed by helper. */
			// helper에 의해 flush 완료됨
			continue;
			// dwb_write_block()에서 flush_volumes_info 사용할 때 VOLUME_NOT_FLUSHED로 초기화
			// helper가 처리중이거나 처리한 경우 continue;
		}

		num_pages = ATOMIC_TAS_32(&block->flush_volumes_info[i].num_pages, 0);
		// flush_volumes_info[i].num_pages에 0 대입하고 그 값을 num_pages에 대입
		assert(num_pages != 0);
		// num_pages가 0이면 crash

		(void)fileio_synchronize(thread_p, block->flush_volumes_info[i].vdes, NULL, FILEIO_SYNC_ONLY);
		// Database volume의 상태를 disk의 상태와 동기화
		// sync daemon을 호출 불가능할 경우 fileio_synchronized()로 fsync()를 직접 호출

		dwb_log("dwb_flush_block: Synchronized volume %d\n", block->flush_volumes_info[i].vdes);
		// 동기화 했다는 로그 남김
	}

	/* Allow to file sync helper thread to finish. */
	// file sync helper thread가 완료되도록 허용
	block->all_pages_written = true;

	// 이 부분은 그냥 통계를 위한 tracking 용도
	if (perfmon_is_perf_tracking_and_active(PERFMON_ACTIVATION_FLAG_FLUSHED_BLOCK_VOLUMES))
	// active thread가 있고 expanded statistic의 activation_flag가 active된 경우 true 반환
	// (enum <unnamed>)PERFMON_ACTIVATION_FLAG_FLUSHED_BLOCK_VOLUMES = 128
	{
		perfmon_db_flushed_block_volumes(thread_p, block->count_flush_volumes_info);
	}

	/* The block is full or there is only one thread that access DWB. */
	// block이 가득 찼거나 DWB에 접근하는 thread가 하나만 있음
	assert(block->count_wb_pages == DWB_BLOCK_NUM_PAGES || DWB_IS_MODIFYING_STRUCTURE(ATOMIC_INC_64(&dwb_Global.position_with_flags, 0LL)));

	ATOMIC_TAS_32(&block->count_wb_pages, 0);
	// &block->count_wb_pages = 0
	ATOMIC_INC_64(&block->version, 1ULL);
	// &block->version++;

	/* Reset block bit, since the block was flushed. */
	// block이 flush되었으므로 block bit 리셋
reset_bit_position:
	local_current_position_with_flags = ATOMIC_INC_64(&dwb_Global.position_with_flags, 0LL);
	// local_current_position_with_flags = &dwb_Global.position_with_flags
	new_position_with_flags = DWB_ENDS_BLOCK_WRITING(local_current_position_with_flags, block->block_no);
	/*
	 *	Ends DWB block writing
	 *	#define DWB_ENDS_BLOCK_WRITING(position_with_flags, block_no)           \
	 *		(assert(DWB_IS_BLOCK_WRITE_STARTED(position_with_flags, block_no)), \
	 *		(position_with_flags) & ~(1ULL << (63 - (block_no))))
	 */

	if (!ATOMIC_CAS_64(&dwb_Global.position_with_flags, local_current_position_with_flags, new_position_with_flags))
	// compare결과 다르면
	{
		/* The position was changed by others, try again. */
		// 다른 사용자가 위치를 변경했으니 다시 시도
		goto reset_bit_position;
	}

	/* Advance flushing to next block. */
	// flush 대상을 다음 block으로
	current_block_to_flush = dwb_Global.next_block_to_flush;
	next_block_to_flush = DWB_GET_NEXT_BLOCK_NO(current_block_to_flush);
	// 다음 DWB block num 구함

	if (!ATOMIC_CAS_32(&dwb_Global.next_block_to_flush, current_block_to_flush, next_block_to_flush))
	// compare and swap 결과가 0이면
	{
		/* I'm the only thread that can advance next block to flush. */
		// 지금 진행하고 있는 thread가 flush할 다음 블록을 지정할 수 있는 유일한 thread
		assert_release(false);
	}

	/* Release locked threads, if any. */
	// 잠긴 thread가 있는 경우 해제
	dwb_signal_block_completion(thread_p, block);
	// Signal double write buffer block completion
	// 잠긴 thread가 있는 경우 wait queue를 파괴하고 잠긴 thread 해제
	if (current_position_with_flags)
	{
		*current_position_with_flags = new_position_with_flags;
	}

end:
	ATOMIC_INC_32(&dwb_Global.blocks_flush_counter, -1);
	// &dwb_Global.blocks_flush_counter--;

	if (p_dwb_ordered_slots != NULL)
	{
		free_and_init(p_dwb_ordered_slots);
		/*
		 *	#define free_and_init(ptr) \
		 *			do { \
		 *				free ((void*) (ptr)); \
		 *				(ptr) = NULL; \
		 *			} while (0)
		 */
	}

	PERF_UTIME_TRACKER_TIME(thread_p, &time_track, PSTAT_DWB_FLUSH_BLOCK_TIME_COUNTERS);
	// flush 걸린 시간 기록

	return error_code;
}

 

dwb_block_create_ordered_slots()

storage/double_write_buffer.c: 1830

/*
 * dwb_block_create_ordered_slots () - Create ordered slots from block slots.
 *
 * return   : Error code.
 * block(in): The block.
 * p_dwb_ordered_slots(out): The ordered slots.
 * p_ordered_slots_length(out): The ordered slots array length.
 */
STATIC_INLINE int
dwb_block_create_ordered_slots(DWB_BLOCK *block, DWB_SLOT **p_dwb_ordered_slots,
							   unsigned int *p_ordered_slots_length)
{
	DWB_SLOT *p_local_dwb_ordered_slots = NULL;

	assert(block != NULL && p_dwb_ordered_slots != NULL);
	// block 과 p_dwb_ordered_slots의 포인터가 제대로 전해지지 않았으면 crash

	/* including sentinel */
	p_local_dwb_ordered_slots = (DWB_SLOT *)malloc((block->count_wb_pages + 1) * sizeof(DWB_SLOT));
	// write buffer에 쓰여진 page 수만큼의 slot을 할당한다.
	if (p_local_dwb_ordered_slots == NULL)
	{
		er_set(ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
			   (block->count_wb_pages + 1) * sizeof(DWB_SLOT));
		return ER_OUT_OF_VIRTUAL_MEMORY;
	}
	// malloc error
	memcpy(p_local_dwb_ordered_slots, block->slots, block->count_wb_pages * sizeof(DWB_SLOT));
	// block의 slots을 memcpy 한다.

	/* init sentinel slot */
	dwb_init_slot(&p_local_dwb_ordered_slots[block->count_wb_pages]);
	// 마지막 slot을 초기화한다. (memcpy가 안 된 마지막 slot이다.)
	// 아마 ordered slot 의 마지막이란 것을 나타내기 위해 쓰는 것 같다?(char *의 \0 같은)

	/* Order pages by (VPID, LSA) */
	qsort((void *)p_local_dwb_ordered_slots, block->count_wb_pages, sizeof(DWB_SLOT), dwb_compare_slots);
	// 복사해온 block의 slot을 qsort 한다. 값이 크면 1 (뒤로 밀린다.)
	// vpid.volid 순서대로 정렬 같으면 vpid.pageid, lsa.pageid, lsa.offset 순으로 정렬
	*p_dwb_ordered_slots = p_local_dwb_ordered_slots;
	*p_ordered_slots_length = block->count_wb_pages + 1;

	return NO_ERROR;
}

 

fileio_write_pages()

storage/file_io.c: 4314

/*
 * fileio_write_pages () - write the content of several contiguous pages to disk
 *   return: io_page_p on success, NULL on failure
 *   thread_p(in): Thread entry
 *   vol_fd(in): Volume descriptor
 *   io_page_p(in): In-memory address where the pages resides
 *   page_id(in): First page identifier
 *   num_pages(in): Number of pages to flush
 *   page_size(in): Page size
 *   write_mode(in): FILEIO_WRITE_NO_COMPENSATE_WRITE skips page flush
 */
void *
fileio_write_pages(THREAD_ENTRY *thread_p, int vol_fd, char *io_pages_p, PAGEID page_id, int num_pages,
				   size_t page_size, FILEIO_WRITE_MODE write_mode)
{
#if defined(EnableThreadMonitoring)
	TSC_TICKS start_tick, end_tick;
	TSCTIMEVAL elapsed_time;
#endif
	off_t offset;
	ssize_t nbytes_written;
	size_t nbytes_to_be_written;

	assert(num_pages > 0);
	offset = FILEIO_GET_FILE_SIZE(page_size, page_id);
	// page_size * page_id page_id 가 0 이므로 offset = 0
	nbytes_to_be_written = ((size_t)page_size) * ((size_t)num_pages);
	// 써야하는 byte 수
	while (nbytes_to_be_written > 0)
	{
		nbytes_written = fileio_os_write(thread_p, vol_fd, io_pages_p, nbytes_to_be_written, offset);
		// dwb volume 에 io_pages_p(write buffer) 를 offset 위치에서 (처음엔 0)(lseek 으로 파일 커서조정)
		// nbytes_to_be_written 만큼 write
		offset += nbytes_written;
		io_pages_p += nbytes_written;
		nbytes_to_be_written -= nbytes_written;
	} // 한번에 write가 안될수도 있으니 반복해서 시도
	return io_pages_p;
}

 

dwb_write_block()

storage/double_write_buffer.c: 1990

/*
 * dwb_write_block () - Write block pages in specified order.
 *
 * return   : Error code.
 * thread_p (in): The thread entry.
 * block(in): The block that is written.
 * p_dwb_ordered_slots(in): The slots that gives the pages flush order.
 * ordered_slots_length(in): The ordered slots array length.
 * remove_from_hash(in): True, if needs to remove entries from hash.
 * file_sync_helper_can_flush(in): True, if helper can flush.
 *
 *  Note: This function fills to_flush_vdes array with the volumes that must be flushed.
 */
STATIC_INLINE int
dwb_write_block(THREAD_ENTRY *thread_p, DWB_BLOCK *block, DWB_SLOT *p_dwb_ordered_slots,
				unsigned int ordered_slots_length, bool file_sync_helper_can_flush, bool remove_from_hash)
{
	VOLID last_written_volid;
	int last_written_vol_fd, vol_fd;
	int count_writes = 0, num_pages_to_sync;
	FLUSH_VOLUME_INFO *current_flush_volume_info = NULL;
	bool can_flush_volume = false;

	assert(block != NULL && p_dwb_ordered_slots != NULL);
	/*
	 * Write the whole slots data first and then remove it from hash. Is better to do in this way.
	 * Thus, the fileio_write may be slow. While the current transaction has delays caused by fileio_write,
	 * the concurrent transaction still can access the data from memory instead disk
	 */
	assert(block->count_wb_pages < ordered_slots_length);
	assert(block->count_flush_volumes_info == 0);

	num_pages_to_sync = prm_get_integer_value(PRM_ID_PB_SYNC_ON_NFLUSH);
	// 74
	last_written_volid = NULL_VOLID;
	last_written_vol_fd = NULL_VOLDES;
	// page를 vpid 순으로 정렬했기때문에 같은 volume 일때는 vol_fd를 다시 구하지 않게
	// last_written_volid 와 last_wrtitten_vol_fd 를 들고다닌다

	for (i = 0; i < block->count_wb_pages; i++)
	{
		vpid = &p_dwb_ordered_slots[i].vpid;
		if (VPID_ISNULL(vpid))
			continue;
		// 중복 슬롯이라 제거됐으면, continue
		assert(VPID_ISNULL(&p_dwb_ordered_slots[i + 1].vpid) || VPID_LT(vpid, &p_dwb_ordered_slots[i + 1].vpid));
		// 다음 슬롯의 vpid 가 NULL 이 아니고 현재 슬롯의 vpid 보다 크거나 같지 않으면 crash (정렬이 안 됐다는 것이다)
		if (last_written_volid != vpid->volid)
		// 현재 slot 의 vpid 와 같지 않으면 새로운 volume 이니 fd를 얻어온다.
		{
			/* Get the volume descriptor. */
			if (current_flush_volume_info != NULL)
			{
				assert_release(current_flush_volume_info->vdes == last_written_vol_fd);
				current_flush_volume_info->all_pages_written = true;
				can_flush_volume = true;
				current_flush_volume_info = NULL; /* reset */
			}
			// write가 완료된 volume의 flush_volume_info에 모든 pages 가 write 됐고
			// flush 해도 된다는 flag 를 주고 current_flush_volume_info 를 초기화
			vol_fd = fileio_get_volume_descriptor(vpid->volid);
			// 현재 슬롯의 vpid 의 fd 를 구해온다.
			if (vol_fd == NULL_VOLDES)
			{
				/* probably it was removed meanwhile. skip it! */
				continue;
			}
			// 삭제된 볼륨이니 skip
			last_written_volid = vpid->volid;
			last_written_vol_fd = vol_fd;
			// 현재 volume의 volid, vol_fd로 바꿔준다.
			current_flush_volume_info = dwb_add_volume_to_block_flush_area(thread_p, block, last_written_vol_fd);
			// 현재 volume 에 flush 해야하니 새로운 flush_volume_info 받아옴
			// flush 한 볼륨의 수만큼 사용
		}
		assert(last_written_vol_fd != NULL_VOLDES);
		assert(p_dwb_ordered_slots[i].io_page->prv.p_reserve_2 == 0);
		assert(p_dwb_ordered_slots[i].vpid.pageid == p_dwb_ordered_slots[i].io_page->prv.pageid && p_dwb_ordered_slots[i].vpid.volid == p_dwb_ordered_slots[i].io_page->prv.volid);
		/* Write the data. */
		if (fileio_write(thread_p, last_written_vol_fd, p_dwb_ordered_slots[i].io_page, vpid->pageid, IO_PAGESIZE,
						 FILEIO_WRITE_NO_COMPENSATE_WRITE) == NULL)
		// db volume 에 slot의 page 를 offset(page_id * page_size) 위치에서 page_size만큼 write.
		// dwb volume 에 write 할때 사용한 fileio_write_pages와 차이는
		// write_pages 는 반복문으로 written 된 offset 에서 다시 write 하지만
		// write 는 page를 다 쓰지 못했을시 page를 다시 write 한다.
		{
			ASSERT_ERROR();
			dwb_log_error("DWB write page VPID=(%d, %d) LSA=(%lld,%d) with %d error: \n",
						  vpid->volid, vpid->pageid, p_dwb_ordered_slots[i].io_page->prv.lsa.pageid,
						  (int)p_dwb_ordered_slots[i].io_page->prv.lsa.offset, er_errid());
			assert(false);
			/* Something wrong happened. */
			return ER_FAILED;
		}
		dwb_log("dwb_write_block: written page = (%d,%d) LSA=(%lld,%d)\n",
				vpid->volid, vpid->pageid, p_dwb_ordered_slots[i].io_page->prv.lsa.pageid,
				(int)p_dwb_ordered_slots[i].io_page->prv.lsa.offset);
#if defined(SERVER_MODE)
		assert(current_flush_volume_info != NULL);
		ATOMIC_INC_32(&current_flush_volume_info->num_pages, 1);
		// num_pages : 해당 volume 에 write 수
		count_writes++;
		// sync daemon을 사용할 수 있을 때 현재 volume의 write 수가 num_pages_to_sync 파라미터 값(74)
		// 보다 클시 daemon을 통해 flush 하기위해 사용
		if (file_sync_helper_can_flush && (count_writes >= num_pages_to_sync || can_flush_volume == true) && dwb_is_file_sync_helper_daemon_available())
		// file_sync_helper가 flush 할 수 있고(and), count_writes 가 num_pages_to_sync
		// 보다 크거나(or) can_flush_volume 이 true이고(위에서 다른 vpid 일때, 즉 volume 이 변한경우)
		// sync daemon이 사용 가능할 경우
		{
			if (ATOMIC_CAS_ADDR(&dwb_Global.file_sync_helper_block, (DWB_BLOCK *)NULL, block))
			{
				dwb_file_sync_helper_daemon->wakeup();
			}
			// Write가 끝난 뒤, sync daemon을 사용할 수 있다는 전제하에,
			// 전역변수 ‘file_sync_helper_block’에 현재 Flush하려는 Block을 참조 시킨다.
			// Daemon을 호출해도 되는 이유는 이미 DWB volume에 flush가 되었기 때문에
			// 해당 Flush가 급한 작업이 아니기 때문이다.(system crash가 발생해도 recovery 가능)
			/* Add statistics. */
			perfmon_add_stat(thread_p, PSTAT_PB_NUM_IOWRITES, count_writes);
			count_writes = 0;
			can_flush_volume = false;
		}
#endif
	} //for문 끝

	/* the last written volume */
	if (current_flush_volume_info != NULL)
	{
		current_flush_volume_info->all_pages_written = true;
	}
// for문에서 volid 가 다를경우 if 문으로 all_pages_wriiten = true 로 바꿔줬는데
// 마지막 volume은 처리를 못해주니 true 로
#if !defined(NDEBUG)
	for (i = 0; i < block->count_flush_volumes_info; i++)
	{
		assert(block->flush_volumes_info[i].all_pages_written == true);
		assert(block->flush_volumes_info[i].vdes != NULL_VOLDES);
	}
#endif

#if defined(SERVER_MODE)
	if (file_sync_helper_can_flush && (dwb_Global.file_sync_helper_block == NULL) && (block->count_flush_volumes_info > 0))
	{
		/* If file_sync_helper_block is NULL, it means that the file sync helper thread does not run and was not woken yet. */
		if (dwb_is_file_sync_helper_daemon_available() && ATOMIC_CAS_ADDR(&dwb_Global.file_sync_helper_block, (DWB_BLOCK *)NULL, block))
		{
			dwb_file_sync_helper_daemon->wakeup();
		}
	}
#endif
	// 반복문안에서 sync daemon 호출하지 못한 경우 호출한다. (volume 이 1개고 write한 page 수가 적은 경우)
	/* Add statistics. */
	perfmon_add_stat(thread_p, PSTAT_PB_NUM_IOWRITES, count_writes);
	/* Remove the corresponding entries from hash. */
	if (remove_from_hash)
	{
		PERF_UTIME_TRACKER time_track;
		PERF_UTIME_TRACKER_START(thread_p, &time_track);
		for (i = 0; i < block->count_wb_pages; i++)
		{
			vpid = &p_dwb_ordered_slots[i].vpid;
			if (VPID_ISNULL(vpid))
			{
				continue;
			}
			assert(p_dwb_ordered_slots[i].position_in_block < DWB_BLOCK_NUM_PAGES);
			error_code = dwb_slots_hash_delete(thread_p, &block->slots[p_dwb_ordered_slots[i].position_in_block]);
			if (error_code != NO_ERROR)
			{
				return error_code;
			}
		}
		PERF_UTIME_TRACKER_TIME(thread_p, &time_track, PSTAT_DWB_DECACHE_PAGES_AFTER_WRITE);
	}
	return NO_ERROR;
}

 

본 시리즈의 글들은 CUBRID DB엔진 오픈 스터디를 진행하며 팀원들과 함께 공부한 내용을 정리한 것입니다.
Github 링크

profile
블록체인 개발 어때요

1개의 댓글

comment-user-thumbnail
2024년 3월 13일

Embark on a journey of discovery with our cutting-edge tool designed to revolutionize the way you browse Instagram Stories. Our online Stories Agent tool empowers you to access Stories from any public profile without any limitations, providing you with a seamless and worry-free browsing experience. By leveraging our secure accounts for browsing, Instagram Story Viewer you can explore your favorite content discreetly and without leaving any traces. Plus, with the ability to download photo and video Stories directly to your device, you can enjoy them offline at your leisure. Say hello to hassle-free browsing with our Stories Agent tool today!

답글 달기