Commit 8025dc7a authored by Jondy Zhao's avatar Jondy Zhao

Implement function: netuse.mapDrive

parent e14b3d1a
...@@ -70,87 +70,134 @@ wchar2mchar(wchar_t *ws, char *buffer, size_t size) ...@@ -70,87 +70,134 @@ wchar2mchar(wchar_t *ws, char *buffer, size_t size)
static PyObject * static PyObject *
netuse_user_info(PyObject *self, PyObject *args) netuse_user_info(PyObject *self, PyObject *args)
{ {
DWORD dwLevel = 1; DWORD dwLevel = 1;
LPWKSTA_USER_INFO_1 pBuf = NULL; LPWKSTA_USER_INFO_1 pBuf = NULL;
NET_API_STATUS nStatus; NET_API_STATUS nStatus;
// //
// Call the NetWkstaUserGetInfo function; // Call the NetWkstaUserGetInfo function;
// specify level 1. // specify level 1.
// //
nStatus = NetWkstaUserGetInfo(NULL, nStatus = NetWkstaUserGetInfo(NULL,
dwLevel, dwLevel,
(LPBYTE *)&pBuf); (LPBYTE *)&pBuf);
// //
// If the call succeeds, print the information // If the call succeeds, print the information
// about the logged-on user. // about the logged-on user.
// //
if (nStatus == NERR_Success) { if (nStatus == NERR_Success) {
if (pBuf != NULL) { if (pBuf != NULL) {
size_t size = MAX_USERBUFFER_SIZE; size_t size = MAX_USERBUFFER_SIZE;
size_t len; size_t len;
logonuser = userinfo; logonuser = userinfo;
len = wchar2mchar(pBuf->wkui1_username, logonuser, size); len = wchar2mchar(pBuf->wkui1_username, logonuser, size);
if (len == -1) { if (len == -1) {
PyErr_SetString(PyExc_RuntimeError, "Unicode convertion error"); PyErr_SetString(PyExc_RuntimeError, "Unicode convertion error");
return NULL; return NULL;
} }
size -= len; size -= len;
logondomain = logonuser + len; logondomain = logonuser + len;
len = wchar2mchar(pBuf->wkui1_logon_domain, logondomain, size); len = wchar2mchar(pBuf->wkui1_logon_domain, logondomain, size);
if (len == -1) { if (len == -1) {
PyErr_SetString(PyExc_RuntimeError, "Unicode convertion error"); PyErr_SetString(PyExc_RuntimeError, "Unicode convertion error");
return NULL; return NULL;
} }
size -= len; size -= len;
logonserver = logondomain + len; logonserver = logondomain + len;
len = wchar2mchar(pBuf->wkui1_logon_server, logonserver, size); len = wchar2mchar(pBuf->wkui1_logon_server, logonserver, size);
if (len == -1) { if (len == -1) {
PyErr_SetString(PyExc_RuntimeError, "Unicode convertion error"); PyErr_SetString(PyExc_RuntimeError, "Unicode convertion error");
return NULL; return NULL;
} }
} }
} }
// Otherwise, print the system error. // Otherwise, print the system error.
// //
else { else {
PyErr_Format(PyExc_RuntimeError, PyErr_Format(PyExc_RuntimeError,
"A system error has occurred: %ld", "A system error has occurred: %ld",
nStatus nStatus
); );
return NULL; return NULL;
} }
// //
// Free the allocated memory. // Free the allocated memory.
// //
if (pBuf != NULL) { if (pBuf != NULL) {
NetApiBufferFree(pBuf); NetApiBufferFree(pBuf);
return Py_BuildValue("sss", logonserver, logondomain, logonuser); return Py_BuildValue("sss", logonserver, logondomain, logonuser);
} }
PyErr_SetString(PyExc_RuntimeError, "No logon user information"); PyErr_SetString(PyExc_RuntimeError, "No logon user information");
return NULL; return NULL;
}
static char
get_free_drive_letter()
{
DWORD bitmasks = GetLogicalDrives();
char ch = 'A';
while (bitmasks) {
if ((bitmasks & 1L) == 0)
return ch;
++ ch;
bitmasks >>= 1;
}
return 0;
} }
static PyObject * static PyObject *
netuse_map_drive(PyObject *self, PyObject *args) netuse_map_drive(PyObject *self, PyObject *args)
{ {
PyErr_SetString(PyExc_RuntimeError, "Not Implemented"); DWORD dwRetVal;
NETRESOURCE nr;
DWORD dwFlags;
char *remote = NULL;
char drive[] = { 0, ':', 0 };
char *user = NULL;
char *password = NULL;
if (! PyArg_ParseTuple(args, "s", &remote)) {
return NULL;
}
drive[0] = get_free_drive_letter();
if (!drive[0]) {
PyErr_SetString(PyExc_RuntimeError,
"Add net drive faild: no available drive letter."
);
return NULL;
}
memset(&nr, 0, sizeof (NETRESOURCE));
nr.dwType = RESOURCETYPE_DISK;
nr.lpLocalName = drive;
nr.lpRemoteName = remote;
nr.lpProvider = NULL;
dwFlags = CONNECT_UPDATE_PROFILE;
dwRetVal = WNetAddConnection2(&nr, password, user, dwFlags);
if (dwRetVal == NO_ERROR)
return PyString_FromString(drive);
PyErr_Format(PyExc_RuntimeError,
"WNetAddConnection2 failed with error: %lu\n",
dwRetVal
);
return NULL; return NULL;
} }
/* /*
* Travel all the mapped drive to check whether there is duplicated * Travel all the mapped drive to check whether there is duplicated
* shared folder: * shared folder:
* *
* Return 1 if current share folder is same or sub-folder of the * Return 1 if current share folder is same or sub-folder of the
* mapped folder; * mapped folder;
* *
* Return -1 if unknown exception occurs; * Return -1 if unknown exception occurs;
* *
* Remove mapped item from list if the mapped folder is sub-folder * Remove mapped item from list if the mapped folder is sub-folder
* of current share folder. * of current share folder.
* *
* Return 0 if it's new share folder. * Return 0 if it's new share folder.
* *
*/ */
static int static int
check_duplicate_shared_folder(PyObject *retvalue, const char *folder) check_duplicate_shared_folder(PyObject *retvalue, const char *folder)
...@@ -163,7 +210,7 @@ check_duplicate_shared_folder(PyObject *retvalue, const char *folder) ...@@ -163,7 +210,7 @@ check_duplicate_shared_folder(PyObject *retvalue, const char *folder)
int len2; int len2;
PyObject *item; PyObject *item;
char * s; char * s;
while (size > 0) { while (size > 0) {
size --; size --;
item = PySequence_GetItem(retvalue, size); item = PySequence_GetItem(retvalue, size);
...@@ -173,7 +220,7 @@ check_duplicate_shared_folder(PyObject *retvalue, const char *folder) ...@@ -173,7 +220,7 @@ check_duplicate_shared_folder(PyObject *retvalue, const char *folder)
if (s == NULL) if (s == NULL)
return -1; return -1;
len2 = strlen(s); len2 = strlen(s);
if (strncmp(folder, s, len > len2 ? len : len2) == 0) { if (strncmp(folder, s, len > len2 ? len : len2) == 0) {
if (len2 > len) { if (len2 > len) {
if (PySequence_DelItem(retvalue, size) == -1) if (PySequence_DelItem(retvalue, size) == -1)
...@@ -198,7 +245,7 @@ netuse_usage_report(PyObject *self, PyObject *args) ...@@ -198,7 +245,7 @@ netuse_usage_report(PyObject *self, PyObject *args)
char drivename[] = { 'A', ':', 0 }; char drivename[] = { 'A', ':', 0 };
ULARGE_INTEGER lFreeBytesAvailable; ULARGE_INTEGER lFreeBytesAvailable;
ULARGE_INTEGER lTotalNumberOfBytes; ULARGE_INTEGER lTotalNumberOfBytes;
ULARGE_INTEGER lTotalNumberOfFreeBytes; /* ULARGE_INTEGER lTotalNumberOfFreeBytes; */
char szRemoteName[MAX_PATH]; char szRemoteName[MAX_PATH];
DWORD dwResult, cchBuff = MAX_PATH; DWORD dwResult, cchBuff = MAX_PATH;
...@@ -208,7 +255,7 @@ netuse_usage_report(PyObject *self, PyObject *args) ...@@ -208,7 +255,7 @@ netuse_usage_report(PyObject *self, PyObject *args)
return NULL; return NULL;
} }
if (servername) if (servername)
serverlen = strlen(servername); serverlen = strlen(servername);
bitmasks = GetLogicalDrives(); bitmasks = GetLogicalDrives();
...@@ -302,7 +349,7 @@ netuse_usage_report(PyObject *self, PyObject *args) ...@@ -302,7 +349,7 @@ netuse_usage_report(PyObject *self, PyObject *args)
return NULL; return NULL;
} }
} }
else { else {
PyErr_Format(PyExc_RuntimeError, PyErr_Format(PyExc_RuntimeError,
"A system error has occurred in GetDiskFreeSpaceEx(%s): %ld", "A system error has occurred in GetDiskFreeSpaceEx(%s): %ld",
drivepath, drivepath,
...@@ -331,8 +378,9 @@ static PyMethodDef NetUseMethods[] = { ...@@ -331,8 +378,9 @@ static PyMethodDef NetUseMethods[] = {
netuse_map_drive, netuse_map_drive,
METH_VARARGS, METH_VARARGS,
( (
"mapDrive()\n\n" "mapDrive(sharefolder)\n\n"
"Create mapped drive from server shared folder\n" "Create mapped drive from shared folder, it uses the default user\n"
"name. (provided by the user context for the process.) \n"
) )
}, },
{ {
......
...@@ -22,6 +22,10 @@ class NetUseTests(unittest.TestCase): ...@@ -22,6 +22,10 @@ class NetUseTests(unittest.TestCase):
self.assertEquals(len(r), 3) self.assertEquals(len(r), 3)
self.assertEquals(r, ('JONDY', 'JONDY', 'Administrator')) self.assertEquals(r, ('JONDY', 'JONDY', 'Administrator'))
def test_map_drive(self):
r = real_netuse.mapDrive(r'\\server\path')
self.assertEquals(r, 'X:')
def test_usage_report(self): def test_usage_report(self):
r = real_netuse.usageReport() r = real_netuse.usageReport()
self.assertEquals(len(r), 0) self.assertEquals(len(r), 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment