DWB 초기화 (코드분석 02)

개발새발·2021년 12월 29일
0
post-thumbnail

DWB 초기화 (코드분석 02)

코드 위치 : https://github.com/CUBRID/cubrid

dwb_create()

storage/double_write_buffer.c: 2820

/*
 * dwb_create () - Create DWB.
 *
 * return   : Error code.
 * thread_p (in): The thread entry.
 * dwb_path_p (in) : The double write buffer volume path.
 * db_name_p (in) : The database name.
 */
int dwb_create(THREAD_ENTRY *thread_p, const char *dwb_path_p, const char *db_name_p)
{
	UINT64 current_position_with_flags;
	int error_code = NO_ERROR;

	error_code = dwb_starts_structure_modification(thread_p, &current_position_with_flags);
	// 구조 변경 시작, bit 플래그 세팅, dwb 초기화
	if (error_code != NO_ERROR)
	{
		dwb_log_error("Can't create DWB: error = %d\n", error_code);
		return error_code;
	}

	/* DWB structure modification started, no other transaction can modify the global position with flags */
	if (DWB_IS_CREATED(current_position_with_flags))
	// current_position_with_flags에 DWB_CREATE가 set되어 있다면
	{
		/* Already created, restore the modification flag. */
		goto end;
	}

	fileio_make_dwb_name(dwb_Volume_name, dwb_path_p, db_name_p);
	// 만약 dwb_path_p가 /로 끝나는 경우
	// *dwb_Volume_name = "[dwb_path_p][db_name_p]_dwb";
	// /가 없는 경우
	// *dwb_Volume_name = "[dwb_path_p]/[db_name_p]_dwb";

	error_code = dwb_create_internal(thread_p, dwb_Volume_name, &current_position_with_flags);
	// dwb_Global의 실질적 할당 / 초기화
	if (error_code != NO_ERROR)
	{
		dwb_log_error("Can't create DWB: error = %d\n", error_code);
		goto end;
	}

end:
	/* Ends the modification, allowing to others to modify global position with flags. */
	dwb_ends_structure_modification(thread_p, current_position_with_flags);
	// 구조 변경 종료, bit 플래그 세팅, 이 스레드의 점유 상태를 해제하고 wait_queue에 있는 다음 스레드 깨움

	return error_code;
}

 

dwb_starts_structure_modification()

storage/double_write_buffer.c: 806

/*
 * dwb_starts_structure_modification () - Starts structure modifications.
 *
 * return   : Error code
 * thread_p (in): The thread entry.
 * current_position_with_flags(out): The current position with flags.
 *
 *  Note: This function must be called before changing structure of DWB.
 */
STATIC_INLINE int
dwb_starts_structure_modification(THREAD_ENTRY *thread_p, UINT64 *current_position_with_flags)
{
	UINT64 local_current_position_with_flags, new_position_with_flags, min_version;
	unsigned int block_no;
	int error_code = NO_ERROR;
	unsigned int start_block_no, blocks_count;
	DWB_BLOCK *file_sync_helper_block;

	assert(current_position_with_flags != NULL);

	do
	{
		local_current_position_with_flags = ATOMIC_INC_64(&dwb_Global.position_with_flags, 0ULL);
		// local_current_position_with_flags = dwb_Global.position_with_flags;
		if (DWB_IS_MODIFYING_STRUCTURE(local_current_position_with_flags))
		// 만약 local_current_position_with_flags에 DWB_MODIFY_STRUCTURE 플래그가 세워져있다면
		{
			/* Only one thread can change the structure */
			return ER_FAILED;
			// 오직 하나의 스레드만 구조체에 영향을 줄 수 있기 때문에 에러처리
		}

		new_position_with_flags = DWB_STARTS_MODIFYING_STRUCTURE(local_current_position_with_flags);
		/* Start structure modifications, the threads that want to flush afterwards, have to wait. */
	} while (!ATOMIC_CAS_64(&dwb_Global.position_with_flags, local_current_position_with_flags, new_position_with_flags));
	// dwb_Global.position_with_flags 값과 local_current_position_with_flags값이 같으면,
	// dwb_Global.position_with_flag에 new_position_with_flags를 할당하고 true를 반환
	// 같지 않으면 false 반환

	// 아마도 다른 스레드가 위 코드를 거의 동시에 시작한 경우,
	// while 에 늦게 도착한 스레드는 코드를 다시 실행하고 DWB_MODIFY_STRUCTURE 플래그가 세워져 있기 때문에 에러처리

#if defined(SERVER_MODE)
	while ((ATOMIC_INC_32(&dwb_Global.blocks_flush_counter, 0) > 0) || dwb_flush_block_daemon_is_running() || dwb_file_sync_helper_daemon_is_running())
	// while (dwb_Global.blocks_flush_counter > 0 || \
	// dwb_flush_block_daemon_is_running() || \
	// dwb_file_sync_helper_daemon_is_running())
	// flush thread가 dwb에 접근 중일 때는 구조를 변경할 수 없으므로 flush가 끝날 때까지 대기
	{
		/* Can't modify structure while flush thread can access DWB. */
		thread_sleep(20);
	}
#endif

	/* Since we set the modify structure flag, I'm the only thread that access the DWB. */
	file_sync_helper_block = dwb_Global.file_sync_helper_block;
	if (file_sync_helper_block != NULL)
	{
		/* All remaining blocks are flushed by me. */
		(void)ATOMIC_TAS_ADDR(&dwb_Global.file_sync_helper_block, (DWB_BLOCK *)NULL);
		dwb_log("Structure modification, needs to flush DWB block = %d having version %lld\n",
				file_sync_helper_block->block_no, file_sync_helper_block->version);
	}

	local_current_position_with_flags = ATOMIC_INC_64(&dwb_Global.position_with_flags, 0ULL);

	/* Need to flush incomplete blocks, ordered by version. */
	start_block_no = DWB_NUM_TOTAL_BLOCKS;
	min_version = 0xffffffffffffffff;
	// 어떤 block의 version 보다 무조건 크도록 설정
	blocks_count = 0;
	for (block_no = 0; block_no < DWB_NUM_TOTAL_BLOCKS; block_no++)
	{
		if (DWB_IS_BLOCK_WRITE_STARTED(local_current_position_with_flags, block_no))
		// MSB부터 정방향 체크
		{
			if (dwb_Global.blocks[block_no].version < min_version)
			{
				min_version = dwb_Global.blocks[block_no].version;
				start_block_no = block_no;
			}
			blocks_count++;
		}
	}
	// > DWB_IS_BLOCK_WRITE_STARTED(local_current_position_with_flags, block_no) 
	// = (local_current_position_with_flags) & (1ULL << (63 - (block_no)))) != 0
	// local_current_position_with_flags에서 오른쪽 방향으로 block_no번째 bit가 set되어 있으면 true, clear되어있으면 false를 반환

	block_no = start_block_no;
	while (blocks_count > 0)
	{
		if (DWB_IS_BLOCK_WRITE_STARTED(local_current_position_with_flags, block_no))
		// 위와 같음. 오른쪽 방향으로 block_no 번째 비트가 set되어있으면
		{
			/* Flush all pages from current block. I must flush all remaining data. */
			error_code =
				dwb_flush_block(thread_p, &dwb_Global.blocks[block_no], false, &local_current_position_with_flags);
			// 블록에 남아있는 데이터 flush
			if (error_code != NO_ERROR)
			{
				/* Something wrong happened. */
				dwb_log_error("Can't flush block = %d having version %lld\n", block_no,
							  dwb_Global.blocks[block_no].version);

				return error_code;
			}

			dwb_log_error("DWB flushed %d block having version %lld\n", block_no, dwb_Global.blocks[block_no].version);
			blocks_count--;
			// flush 이후 flush가 필요한 블록 카운트 -1
		}

		block_no = (block_no + 1) % DWB_NUM_TOTAL_BLOCKS;
		// 결과적으로 version이 가장 낮은 블록부터 오른쪽으로 순회, 인덱스 끝에 닿으면 다시 0으로 돌아와서 순회
	}

	local_current_position_with_flags = ATOMIC_INC_64(&dwb_Global.position_with_flags, 0ULL);
	// local_current_position_with_flags = dwb_Global.position_with_flags
	assert(DWB_GET_BLOCK_STATUS(local_current_position_with_flags) == 0);
	// flush에 실패한 block이 있을 경우, crash

	*current_position_with_flags = local_current_position_with_flags;
	// 포인터 매개변수를 통해 out

	return NO_ERROR;
}

 

dwb_ends_structure_modification()

storage/double_write_buffer.c: 908

/*
 * dwb_ends_structure_modification () - Ends structure modifications.
 *
 * return   : Error code.
 * thread_p (in): The thread entry.
 * current_position_with_flags(in): The current position with flags.
 */
STATIC_INLINE void
dwb_ends_structure_modification(THREAD_ENTRY *thread_p, UINT64 current_position_with_flags)
{
	UINT64 new_position_with_flags;

	new_position_with_flags = DWB_ENDS_MODIFYING_STRUCTURE(current_position_with_flags);
	// 구조 변경을 마쳤으므로 DWB_MODIFY_STRUCTURE를 clear

	/* Ends structure modifications. */
	assert(dwb_Global.position_with_flags == current_position_with_flags);

	ATOMIC_TAS_64(&dwb_Global.position_with_flags, new_position_with_flags);
	// dwb_Global.position_with_flags = new_position_with_flags

	/* Signal the other threads. */
	dwb_signal_structure_modificated(thread_p);
}

 

dwb_create_internal()

storage/double_write_buffer.c: 1147

/*
 * dwb_create_internal () - Create double write buffer.
 *
 * return   : Error code.
 * thread_p (in): The thread entry.
 * dwb_volume_name (in) : The double write buffer volume name.
 * current_position_with_flags (in/out): Current position with flags.
 *
 *  Note: Is user responsibility to ensure that no other transaction can access DWB structure, during creation.
 */
STATIC_INLINE int
dwb_create_internal(THREAD_ENTRY *thread_p, const char *dwb_volume_name, UINT64 *current_position_with_flags)
{
	int error_code = NO_ERROR;
	unsigned int double_write_buffer_size, num_blocks = 0;
	unsigned int i, num_pages, num_block_pages;
	int vdes = NULL_VOLDES;
	DWB_BLOCK *blocks = NULL;
	UINT64 new_position_with_flags;

	const int freelist_block_count = 2;
	const int freelist_block_size = DWB_SLOTS_FREE_LIST_SIZE;

	assert(dwb_volume_name != NULL && current_position_with_flags != NULL);

	double_write_buffer_size = prm_get_integer_value(PRM_ID_DWB_SIZE);
	// PRM_ID_DWB_SIZE : defualt 2M
	num_blocks = prm_get_integer_value(PRM_ID_DWB_BLOCKS);
	// PRM_ID_DWB_BLOCKS : (PRM_NAME_DWB_BLOCKS) default 2개
	if (double_write_buffer_size == 0 || num_blocks == 0)
	{
		/* Do not use double write buffer. */
		return NO_ERROR;
	}

	dwb_adjust_write_buffer_values(&double_write_buffer_size, &num_blocks);

	num_pages = double_write_buffer_size / IO_PAGESIZE;
	num_block_pages = num_pages / num_blocks;
	// double_write_buffer_size = PRM_ID_DWB_SIZE : defualt 2M
	// IO_PAGESIZE : MIN 4K, MAX 16K(default)
	// 페이지 크기는 4K, 8K, 16K. 4K와 16K 사이의 값을 지정할 경우 지정한 값의 올림값으로 설정되며,
	// 4K보다 작으면 4K로 설정되고 16K보다 크면 16K로 설정된다.

	assert(IS_POWER_OF_2(num_blocks));
	assert(IS_POWER_OF_2(num_pages));
	assert(IS_POWER_OF_2(num_block_pages));
	assert(num_blocks <= DWB_MAX_BLOCKS);

	/* Create and open DWB volume first */
	vdes = fileio_format(thread_p, boot_db_full_name(), dwb_volume_name, LOG_DBDWB_VOLID, num_block_pages, true,
						 false, false, IO_PAGESIZE, 0, false);
	if (vdes == NULL_VOLDES)
	{
		goto exit_on_error;
	}

	/* Needs to flush dirty page before activating DWB. */
	fileio_synchronize_all(thread_p, false);

	/* Create DWB blocks */
	error_code = dwb_create_blocks(thread_p, num_blocks, num_block_pages, &blocks);
	if (error_code != NO_ERROR)
	{
		goto exit_on_error;
	}

	dwb_Global.blocks = blocks;
	dwb_Global.num_blocks = num_blocks;
	dwb_Global.num_pages = num_pages;
	dwb_Global.num_block_pages = num_block_pages;
	dwb_Global.log2_num_block_pages = (unsigned int)(log((float)num_block_pages) / log((float)2));
	dwb_Global.blocks_flush_counter = 0;
	dwb_Global.next_block_to_flush = 0;
	pthread_mutex_init(&dwb_Global.mutex, NULL);
	dwb_init_wait_queue(&dwb_Global.wait_queue);
	dwb_Global.vdes = vdes;
	dwb_Global.file_sync_helper_block = NULL;

	dwb_Global.slots_hashmap.init(dwb_slots_Ts, THREAD_TS_DWB_SLOTS, DWB_SLOTS_HASH_SIZE, freelist_block_size,
								  freelist_block_count, slots_entry_Descriptor);

	/* Set creation flag. */
	new_position_with_flags = DWB_RESET_POSITION(*current_position_with_flags);
	new_position_with_flags = DWB_STARTS_CREATION(new_position_with_flags);
	// position_with_flags 를 초기화하고(MSB에서 32bit, CREATE, MODIFY_STRUCTURE 를 제외한 flag 정리) CREATE bit 을 올린다.
	if (!ATOMIC_CAS_64(&dwb_Global.position_with_flags, *current_position_with_flags, new_position_with_flags))
	{
		/* Impossible. */
		assert(false);
	}
	*current_position_with_flags = new_position_with_flags;

	return NO_ERROR;

exit_on_error:
	if (vdes != NULL_VOLDES)
	{
		fileio_dismount(thread_p, vdes);
		fileio_unformat(NULL, dwb_volume_name);
	}

	if (blocks != NULL)
	{
		for (i = 0; i < num_blocks; i++)
		{
			dwb_finalize_block(&blocks[i]);
		}
		free_and_init(blocks);
	}

	return error_code;
}

 

dwb_create_blocks()

storage/double_write_buffer.c: 994

/*
 * dwb_create_blocks () - Create the blocks.
 *
 * return   : Error code.
 * thread_p (in) : The thread entry.
 * num_blocks(in): The number of blocks.
 * num_block_pages(in): The number of block pages.
 * p_blocks(out): The created blocks.
 */
STATIC_INLINE int
dwb_create_blocks(THREAD_ENTRY *thread_p, unsigned int num_blocks, unsigned int num_block_pages,
				  DWB_BLOCK **p_blocks)
{
	DWB_BLOCK *blocks = NULL;
	char *blocks_write_buffer[DWB_MAX_BLOCKS];
	FLUSH_VOLUME_INFO *flush_volumes_info[DWB_MAX_BLOCKS];
	DWB_SLOT *slots[DWB_MAX_BLOCKS];
	unsigned int block_buffer_size, i, j;
	int error_code;
	FILEIO_PAGE *io_page;

	assert(num_blocks <= DWB_MAX_BLOCKS);

	*p_blocks = NULL;

	for (i = 0; i < DWB_MAX_BLOCKS; i++)
	{
		blocks_write_buffer[i] = NULL;
		slots[i] = NULL;
		flush_volumes_info[i] = NULL;
	}

	blocks = (DWB_BLOCK *)malloc(num_blocks * sizeof(DWB_BLOCK));
	if (blocks == NULL)
	{
		er_set(ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, num_blocks * sizeof(DWB_BLOCK));
		error_code = ER_OUT_OF_VIRTUAL_MEMORY;
		goto exit_on_error;
	}
	memset(blocks, 0, num_blocks * sizeof(DWB_BLOCK));
	// blocks 메모리할당 및 초기화, 블록 생성과 초기화 

	block_buffer_size = num_block_pages * IO_PAGESIZE;
	// double_write_buffer_size / num_blocks
	for (i = 0; i < num_blocks; i++)
	{
		blocks_write_buffer[i] = (char *)malloc(block_buffer_size * sizeof(char));
		if (blocks_write_buffer[i] == NULL)
		{
			er_set(ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, block_buffer_size * sizeof(char));
			error_code = ER_OUT_OF_VIRTUAL_MEMORY;
			goto exit_on_error;
		}
		memset(blocks_write_buffer[i], 0, block_buffer_size * sizeof(char));
	}
	// blocks_write_buffer에 block의 크기만큼 할당 (블록당 1개씩 갖는 느낌)
	// block_write_buffer 생성
	// 각 Block에서 지역변수 write_buffer라는 포인터를 가지고 있다.
	// 블록 내 모든 Slot들이 참조하여 실제 Page의 내용이 저장된다.

	for (i = 0; i < num_blocks; i++)
	{
		slots[i] = (DWB_SLOT *)malloc(num_block_pages * sizeof(DWB_SLOT));
		if (slots[i] == NULL)
		{
			er_set(ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, num_block_pages * sizeof(DWB_SLOT));
			error_code = ER_OUT_OF_VIRTUAL_MEMORY;
			goto exit_on_error;
		}
		memset(slots[i], 0, num_block_pages * sizeof(DWB_SLOT));
	}
	// block 1개의 page 개수만큼 SLOT 생성 및 초기화

	for (i = 0; i < num_blocks; i++)
	{
		flush_volumes_info[i] = (FLUSH_VOLUME_INFO *)malloc(num_block_pages * sizeof(FLUSH_VOLUME_INFO));
		if (flush_volumes_info[i] == NULL)
		{
			er_set(ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
				   num_block_pages * sizeof(FLUSH_VOLUME_INFO));
			error_code = ER_OUT_OF_VIRTUAL_MEMORY;
			goto exit_on_error;
		}
		memset(flush_volumes_info[i], 0, num_block_pages * sizeof(FLUSH_VOLUME_INFO));
	}
	// flush_volumes_info 는 vdes, volume 에 flush 할 페이지의 수, 모든 페이지가 쓰였는지 여부, flush status 등을 담고있다.
	// 각 블록의 페이지수만큼 할당하고 초기화한다.

	for (i = 0; i < num_blocks; i++)
	{
		/* No need to initialize FILEIO_PAGE header here, since is overwritten before flushing */
		for (j = 0; j < num_block_pages; j++)
		{
			io_page = (FILEIO_PAGE *)(blocks_write_buffer[i] + j * IO_PAGESIZE);
			// buffer의 각 블록의 페이지마다의 point 를 io_page 에 저장

			fileio_initialize_res(thread_p, io_page, IO_PAGESIZE);
			// io_page 초기화 LOG_LSA의 값은 null 로, pageid, volid -1 로 초기화한다.
			dwb_initialize_slot(&slots[i][j], io_page, j, i);
			// i = block_no, j = position_in_blocks, slot 의 VPID, LOG_LSA 생성 및 초기화
		}

		dwb_initialize_block(&blocks[i], i, 0, blocks_write_buffer[i], slots[i], flush_volumes_info[i], 0,
							 num_block_pages);
	}
	// block 의 초기화 flush_volumes_info, count_flush_volumes_info = 0 (현재 flush 할 볼륨의 수)
	// max_to_flush_vdes (flush 할 수 있는 최대 개수 = num_block_pages), write_buffer, slots, dwb_wait_queue
	// count_wb_pages = 0  (Count DWB pages 라는데 뭔지 모르겠다.), block_no, version = 0, all_pages_written = false; 로 초기화한다.

	*p_blocks = blocks;

	return NO_ERROR;

exit_on_error:
	for (i = 0; i < DWB_MAX_BLOCKS; i++)
	{
		if (slots[i] != NULL)
		{
			free_and_init(slots[i]);
		}

		if (blocks_write_buffer[i] != NULL)
		{
			free_and_init(blocks_write_buffer[i]);
		}

		if (flush_volumes_info[i] != NULL)
		{
			free_and_init(flush_volumes_info[i]);
		}
	}

	if (blocks != NULL)
	{
		free_and_init(blocks);
	}

	return error_code;
}

 

본 시리즈의 글들은 CUBRID DB엔진 오픈 스터디를 진행하며 팀원들과 함께 공부한 내용을 정리한 것입니다.
Github 링크

profile
블록체인 개발 어때요

0개의 댓글